提交 314c90068e94acaf57fbf9be77f5fcd4be551c61

作者 LJH 李佳桓
1 个父辈 5142db00

add

正在显示 1 个修改的文件 包含 390 行增加0 行删除
  1 +/*
  2 + * Copyright 2008 Stephen Liu
  3 + * For license terms, see the file COPYING along with this library.
  4 + */
  5 +
  6 +#include <stdio.h>
  7 +#include <stdlib.h>
  8 +#include <string.h>
  9 +#include <signal.h>
  10 +#include <sys/types.h>
  11 +#include <assert.h>
  12 +
  13 +#include "spthread.hpp"
  14 +
  15 +#include "spmsgdecoder.hpp"
  16 +#include "spbuffer.hpp"
  17 +
  18 +#include "spdispatcher.hpp"
  19 +#include "sphandler.hpp"
  20 +#include "spresponse.hpp"
  21 +#include "sprequest.hpp"
  22 +#include "spioutils.hpp"
  23 +#include "sputils.hpp"
  24 +
  25 +#ifdef WIN32
  26 +#include "spgetopt.h"
  27 +#endif
  28 +
  29 +typedef struct tagSP_OnlineInfo {
  30 + SP_Sid_t mSid;
  31 + int mChatID;
  32 +} SP_OnlineInfo_t;
  33 +
  34 +class SP_OnlineManager {
  35 +public:
  36 + SP_OnlineManager();
  37 + ~SP_OnlineManager();
  38 +
  39 + void copy( SP_SidList * outList, SP_Sid_t * ignoreSid = NULL );
  40 + void remove( SP_Sid_t sid );
  41 + void add( SP_OnlineInfo_t * info );
  42 + int getChatID( SP_Sid_t sid );
  43 +
  44 + int getCount();
  45 +
  46 +private:
  47 + SP_ArrayList mList;
  48 + sp_thread_mutex_t mMutex;
  49 +};
  50 +
  51 +SP_OnlineManager :: SP_OnlineManager()
  52 +{
  53 + sp_thread_mutex_init( &mMutex, NULL );
  54 +}
  55 +
  56 +SP_OnlineManager :: ~SP_OnlineManager()
  57 +{
  58 + sp_thread_mutex_destroy( &mMutex );
  59 +}
  60 +
  61 +void SP_OnlineManager :: copy( SP_SidList * outList, SP_Sid_t * ignoreSid )
  62 +{
  63 + sp_thread_mutex_lock( &mMutex );
  64 +
  65 + for( int i = 0; i < mList.getCount(); i++ ) {
  66 + SP_OnlineInfo_t * info = (SP_OnlineInfo_t*)mList.getItem( i );
  67 +
  68 + if( NULL != ignoreSid ) {
  69 + SP_Sid_t theSid = info->mSid;
  70 + if( theSid.mKey == ignoreSid->mKey && theSid.mSeq == ignoreSid->mSeq ) {
  71 + continue;
  72 + }
  73 + }
  74 +
  75 + outList->add( info->mSid );
  76 + }
  77 +
  78 + sp_thread_mutex_unlock( &mMutex );
  79 +}
  80 +
  81 +void SP_OnlineManager :: remove( SP_Sid_t sid )
  82 +{
  83 + sp_thread_mutex_lock( &mMutex );
  84 +
  85 + for( int i = 0; i < mList.getCount(); i++ ) {
  86 + SP_OnlineInfo_t * info = (SP_OnlineInfo_t*)mList.getItem( i );
  87 + SP_Sid_t theSid = info->mSid;
  88 + if( theSid.mKey == sid.mKey && theSid.mSeq == sid.mSeq ) {
  89 + mList.takeItem( i );
  90 + free( info );
  91 + break;
  92 + }
  93 + }
  94 +
  95 + sp_thread_mutex_unlock( &mMutex );
  96 +}
  97 +
  98 +void SP_OnlineManager :: add( SP_OnlineInfo_t * info )
  99 +{
  100 + sp_thread_mutex_lock( &mMutex );
  101 +
  102 + mList.append( info );
  103 +
  104 + sp_thread_mutex_unlock( &mMutex );
  105 +}
  106 +
  107 +int SP_OnlineManager :: getCount()
  108 +{
  109 + int count = 0;
  110 +
  111 + sp_thread_mutex_lock( &mMutex );
  112 +
  113 + count = mList.getCount();
  114 +
  115 + sp_thread_mutex_unlock( &mMutex );
  116 +
  117 + return count;
  118 +}
  119 +
  120 +int SP_OnlineManager :: getChatID( SP_Sid_t sid )
  121 +{
  122 + int chatID = -1;
  123 +
  124 + sp_thread_mutex_lock( &mMutex );
  125 +
  126 + for( int i = 0; i < mList.getCount(); i++ ) {
  127 + SP_OnlineInfo_t * info = (SP_OnlineInfo_t*)mList.getItem( i );
  128 + SP_Sid_t theSid = info->mSid;
  129 + if( theSid.mKey == sid.mKey && theSid.mSeq == sid.mSeq ) {
  130 + chatID = info->mChatID;
  131 + break;
  132 + }
  133 + }
  134 +
  135 + sp_thread_mutex_unlock( &mMutex );
  136 +
  137 + return chatID;
  138 +}
  139 +
  140 +//---------------------------------------------------------
  141 +
  142 +class SP_ChatHandler : public SP_Handler {
  143 +public:
  144 + SP_ChatHandler( SP_OnlineManager * onlineManager, int chatID );
  145 + virtual ~SP_ChatHandler();
  146 +
  147 + virtual int start( SP_Request * request, SP_Response * response );
  148 +
  149 + // return -1 : terminate session, 0 : continue
  150 + virtual int handle( SP_Request * request, SP_Response * response );
  151 +
  152 + virtual void error( SP_Response * response );
  153 +
  154 + virtual void timeout( SP_Response * response );
  155 +
  156 + virtual void close();
  157 +
  158 +private:
  159 + SP_Sid_t mSid;
  160 + int mChatID;
  161 +
  162 + SP_OnlineManager * mOnlineManager;
  163 +
  164 + static int mMsgSeq;
  165 +
  166 + void broadcast( SP_Response * response, const char * buffer, SP_Sid_t * ignoreSid = 0 );
  167 +};
  168 +
  169 +int SP_ChatHandler :: mMsgSeq = 0;
  170 +
  171 +SP_ChatHandler :: SP_ChatHandler( SP_OnlineManager * onlineManager, int chatID )
  172 +{
  173 + memset( &mSid, 0, sizeof( mSid ) );
  174 + mChatID = chatID;
  175 +
  176 + mOnlineManager = onlineManager;
  177 +}
  178 +
  179 +SP_ChatHandler :: ~SP_ChatHandler()
  180 +{
  181 +}
  182 +
  183 +void SP_ChatHandler :: broadcast( SP_Response * response, const char * buffer, SP_Sid_t * ignoreSid )
  184 +{
  185 + if( mOnlineManager->getCount() > 0 ) {
  186 + SP_Message * msg = new SP_Message();
  187 + mOnlineManager->copy( msg->getToList(), ignoreSid );
  188 + msg->setCompletionKey( ++mMsgSeq );
  189 +
  190 + msg->getMsg()->append( buffer );
  191 + response->addMessage( msg );
  192 + }
  193 +}
  194 +
  195 +int SP_ChatHandler :: start( SP_Request * request, SP_Response * response )
  196 +{
  197 + request->setMsgDecoder( new SP_LineMsgDecoder() );
  198 +
  199 + mSid = response->getFromSid();
  200 +
  201 + char buffer[ 128 ] = { 0 };
  202 + snprintf( buffer, sizeof( buffer ),
  203 + "Welcome %d to chat server, enter 'quit' to quit.\r\n", mChatID );
  204 + response->getReply()->getMsg()->append( buffer );
  205 + response->getReply()->setCompletionKey( ++mMsgSeq );
  206 +
  207 + SP_OnlineInfo_t * info = (SP_OnlineInfo_t *)malloc( sizeof( SP_OnlineInfo_t ) );
  208 + info->mSid = mSid;
  209 + info->mChatID = mChatID;
  210 + mOnlineManager->add( info );
  211 +
  212 + return 0;
  213 +}
  214 +
  215 +int SP_ChatHandler :: handle( SP_Request * request, SP_Response * response )
  216 +{
  217 + SP_LineMsgDecoder * decoder = (SP_LineMsgDecoder*)request->getMsgDecoder();
  218 +
  219 + char buffer[ 256 ] = { 0 };
  220 +
  221 + if( 0 != strcasecmp( (char*)decoder->getMsg(), "quit" ) ) {
  222 + snprintf( buffer, sizeof( buffer ), "%d say: %s\r\n", mChatID, (char*)decoder->getMsg() );
  223 + broadcast( response, buffer );
  224 +
  225 + return 0;
  226 + } else {
  227 + snprintf( buffer, sizeof( buffer ), "SYS : %d normal offline\r\n", mChatID );
  228 + broadcast( response, buffer, &mSid );
  229 +
  230 + response->getReply()->getMsg()->append( "SYS : Byebye\r\n" );
  231 + response->getReply()->setCompletionKey( ++mMsgSeq );
  232 +
  233 + return -1;
  234 + }
  235 +}
  236 +
  237 +void SP_ChatHandler :: error( SP_Response * response )
  238 +{
  239 + char buffer[ 64 ] = { 0 };
  240 + snprintf( buffer, sizeof( buffer ), "SYS : %d error offline\r\n", mChatID );
  241 +
  242 + broadcast( response, buffer, &mSid );
  243 +}
  244 +
  245 +void SP_ChatHandler :: timeout( SP_Response * response )
  246 +{
  247 + char buffer[ 64 ] = { 0 };
  248 + snprintf( buffer, sizeof( buffer ), "SYS : %d timeout offline\r\n", mChatID );
  249 +
  250 + broadcast( response, buffer, &mSid );
  251 +}
  252 +
  253 +void SP_ChatHandler :: close()
  254 +{
  255 + mOnlineManager->remove( mSid );
  256 +}
  257 +
  258 +//---------------------------------------------------------
  259 +
  260 +class SP_ChatCompletionHandler : public SP_CompletionHandler {
  261 +public:
  262 + SP_ChatCompletionHandler();
  263 + ~SP_ChatCompletionHandler();
  264 + virtual void completionMessage( SP_Message * msg );
  265 +};
  266 +
  267 +SP_ChatCompletionHandler :: SP_ChatCompletionHandler()
  268 +{
  269 +}
  270 +
  271 +SP_ChatCompletionHandler :: ~SP_ChatCompletionHandler()
  272 +{
  273 +}
  274 +
  275 +void SP_ChatCompletionHandler :: completionMessage( SP_Message * msg )
  276 +{
  277 +#if 0
  278 + printf( "message completed { completion key : %d }\n", msg->getCompletionKey() );
  279 +
  280 + printf( "\tsuccess {" );
  281 + for( int i = 0; i < msg->getSuccess()->getCount(); i++ ) {
  282 + printf( " %d", msg->getSuccess()->get( i ).mKey );
  283 + }
  284 + printf( "}\n" );
  285 +
  286 + printf( "\tfailure {" );
  287 + for( int i = 0; i < msg->getFailure()->getCount(); i++ ) {
  288 + printf( " %d", msg->getFailure()->get( i ).mKey );
  289 + }
  290 + printf( "}\n" );
  291 +#endif
  292 +
  293 + delete msg;
  294 +}
  295 +
  296 +//---------------------------------------------------------
  297 +
  298 +int main( int argc, char * argv[] )
  299 +{
  300 + int port = 5555, maxThreads = 10;
  301 +
  302 + extern char *optarg ;
  303 + int c ;
  304 +
  305 + while( ( c = getopt ( argc, argv, "p:t:v" )) != EOF ) {
  306 + switch ( c ) {
  307 + case 'p' :
  308 + port = atoi( optarg );
  309 + break;
  310 + case 't':
  311 + maxThreads = atoi( optarg );
  312 + break;
  313 + case '?' :
  314 + case 'v' :
  315 + printf( "Usage: %s [-p <port>] [-t <threads>]\n", argv[0] );
  316 + exit( 0 );
  317 + }
  318 + }
  319 +
  320 + sp_openlog( "testchat_d", LOG_CONS | LOG_PID | LOG_PERROR, LOG_USER );
  321 +
  322 + assert( 0 == sp_initsock() );
  323 +
  324 + SP_OnlineManager onlineManager;
  325 + int chatID = 0;
  326 +
  327 + int maxConnections = 100, reqQueueSize = 10;
  328 + const char * refusedMsg = "System busy, try again later.";
  329 +
  330 + int listenFd = -1;
  331 + if( 0 == SP_IOUtils::tcpListen( "", port, &listenFd ) ) {
  332 + SP_Dispatcher dispatcher( new SP_ChatCompletionHandler(), maxThreads );
  333 + dispatcher.dispatch();
  334 +
  335 + for( ; ; ) {
  336 + struct sockaddr_in addr;
  337 + socklen_t socklen = sizeof( addr );
  338 + int fd = accept( listenFd, (struct sockaddr*)&addr, &socklen );
  339 +
  340 + if( fd > 0 ) {
  341 + if( dispatcher.getSessionCount() >= maxConnections
  342 + || dispatcher.getReqQueueLength() >= reqQueueSize ) {
  343 + send( fd, refusedMsg, strlen( refusedMsg ), 0 );
  344 + sp_close( fd );
  345 + } else {
  346 +
  347 + char buffer[ 256 ] = { 0 };
  348 + snprintf( buffer, sizeof( buffer ), "SYS : %d online\r\n", ++chatID );
  349 +
  350 + SP_Message * msg = new SP_Message();
  351 + onlineManager.copy( msg->getToList(), NULL );
  352 + msg->getMsg()->append( buffer );
  353 +
  354 + SP_Sid_t sid = { SP_Sid_t::ePushKey, SP_Sid_t::ePushSeq };
  355 + SP_Response * response = new SP_Response( sid );
  356 + response->addMessage( msg );
  357 +
  358 + /* close a random session */
  359 + if( onlineManager.getCount() > 0 && ( 0 == rand() % 3 ) ) {
  360 + sid = msg->getToList()->get( rand() % msg->getToList()->getCount() );
  361 + response->getToCloseList()->add( sid );
  362 +
  363 + msg = new SP_Message();
  364 + snprintf( buffer, sizeof( buffer ), "SYS : %d force to offline\r\n",
  365 + onlineManager.getChatID( sid ) );
  366 + msg->getMsg()->append( buffer );
  367 + onlineManager.copy( msg->getToList(), NULL );
  368 + response->addMessage( msg );
  369 +
  370 + msg = new SP_Message();
  371 + msg->getMsg()->append( "SYS : Force to close\r\n" );
  372 + msg->getToList()->add( sid );
  373 + response->addMessage( msg );
  374 + }
  375 +
  376 + dispatcher.push( response );
  377 +
  378 + dispatcher.push( fd, new SP_ChatHandler( &onlineManager, chatID ) );
  379 + }
  380 + } else {
  381 + break;
  382 + }
  383 + }
  384 + }
  385 +
  386 + sp_closelog();
  387 +
  388 + return 0;
  389 +}
  390 +
... ...
注册登录 后发表评论