提交 ec4e72bba4c50589ed2c94c9b38e53f9907e2b47

作者 LJH 李佳桓
1 个父辈 d605d7a5

add

  1 +/*
  2 + * Copyright 2008 Stephen Liu
  3 + * For license terms, see the file COPYING along with this library.
  4 + */
  5 +
  6 +#include <stdio.h>
  7 +#include <stdlib.h>
  8 +#include <string.h>
  9 +#include <signal.h>
  10 +#include <sys/types.h>
  11 +#include <assert.h>
  12 +
  13 +#include "spthread.hpp"
  14 +
  15 +#include "spmsgdecoder.hpp"
  16 +#include "spbuffer.hpp"
  17 +
  18 +#include "spiocpdispatcher.hpp"
  19 +#include "sphandler.hpp"
  20 +#include "spresponse.hpp"
  21 +#include "sprequest.hpp"
  22 +#include "spioutils.hpp"
  23 +#include "sputils.hpp"
  24 +
  25 +#ifdef WIN32
  26 +#include "spgetopt.h"
  27 +#endif
  28 +
  29 +#pragma comment(lib,"ws2_32")
  30 +#pragma comment(lib,"mswsock")
  31 +
  32 +typedef struct tagSP_OnlineInfo {
  33 + SP_Sid_t mSid;
  34 + int mChatID;
  35 +} SP_OnlineInfo_t;
  36 +
  37 +class SP_OnlineManager {
  38 +public:
  39 + SP_OnlineManager();
  40 + ~SP_OnlineManager();
  41 +
  42 + void copy( SP_SidList * outList, SP_Sid_t * ignoreSid = NULL );
  43 + void remove( SP_Sid_t sid );
  44 + void add( SP_OnlineInfo_t * info );
  45 + int getChatID( SP_Sid_t sid );
  46 +
  47 + int getCount();
  48 +
  49 +private:
  50 + SP_ArrayList mList;
  51 + sp_thread_mutex_t mMutex;
  52 +};
  53 +
  54 +SP_OnlineManager :: SP_OnlineManager()
  55 +{
  56 + sp_thread_mutex_init( &mMutex, NULL );
  57 +}
  58 +
  59 +SP_OnlineManager :: ~SP_OnlineManager()
  60 +{
  61 + sp_thread_mutex_destroy( &mMutex );
  62 +}
  63 +
  64 +void SP_OnlineManager :: copy( SP_SidList * outList, SP_Sid_t * ignoreSid )
  65 +{
  66 + sp_thread_mutex_lock( &mMutex );
  67 +
  68 + for( int i = 0; i < mList.getCount(); i++ ) {
  69 + SP_OnlineInfo_t * info = (SP_OnlineInfo_t*)mList.getItem( i );
  70 +
  71 + if( NULL != ignoreSid ) {
  72 + SP_Sid_t theSid = info->mSid;
  73 + if( theSid.mKey == ignoreSid->mKey && theSid.mSeq == ignoreSid->mSeq ) {
  74 + continue;
  75 + }
  76 + }
  77 +
  78 + outList->add( info->mSid );
  79 + }
  80 +
  81 + sp_thread_mutex_unlock( &mMutex );
  82 +}
  83 +
  84 +void SP_OnlineManager :: remove( SP_Sid_t sid )
  85 +{
  86 + sp_thread_mutex_lock( &mMutex );
  87 +
  88 + for( int i = 0; i < mList.getCount(); i++ ) {
  89 + SP_OnlineInfo_t * info = (SP_OnlineInfo_t*)mList.getItem( i );
  90 + SP_Sid_t theSid = info->mSid;
  91 + if( theSid.mKey == sid.mKey && theSid.mSeq == sid.mSeq ) {
  92 + mList.takeItem( i );
  93 + free( info );
  94 + break;
  95 + }
  96 + }
  97 +
  98 + sp_thread_mutex_unlock( &mMutex );
  99 +}
  100 +
  101 +void SP_OnlineManager :: add( SP_OnlineInfo_t * info )
  102 +{
  103 + sp_thread_mutex_lock( &mMutex );
  104 +
  105 + mList.append( info );
  106 +
  107 + sp_thread_mutex_unlock( &mMutex );
  108 +}
  109 +
  110 +int SP_OnlineManager :: getCount()
  111 +{
  112 + int count = 0;
  113 +
  114 + sp_thread_mutex_lock( &mMutex );
  115 +
  116 + count = mList.getCount();
  117 +
  118 + sp_thread_mutex_unlock( &mMutex );
  119 +
  120 + return count;
  121 +}
  122 +
  123 +int SP_OnlineManager :: getChatID( SP_Sid_t sid )
  124 +{
  125 + int chatID = -1;
  126 +
  127 + sp_thread_mutex_lock( &mMutex );
  128 +
  129 + for( int i = 0; i < mList.getCount(); i++ ) {
  130 + SP_OnlineInfo_t * info = (SP_OnlineInfo_t*)mList.getItem( i );
  131 + SP_Sid_t theSid = info->mSid;
  132 + if( theSid.mKey == sid.mKey && theSid.mSeq == sid.mSeq ) {
  133 + chatID = info->mChatID;
  134 + break;
  135 + }
  136 + }
  137 +
  138 + sp_thread_mutex_unlock( &mMutex );
  139 +
  140 + return chatID;
  141 +}
  142 +
  143 +//---------------------------------------------------------
  144 +
  145 +class SP_ChatHandler : public SP_Handler {
  146 +public:
  147 + SP_ChatHandler( SP_OnlineManager * onlineManager, int chatID );
  148 + virtual ~SP_ChatHandler();
  149 +
  150 + virtual int start( SP_Request * request, SP_Response * response );
  151 +
  152 + // return -1 : terminate session, 0 : continue
  153 + virtual int handle( SP_Request * request, SP_Response * response );
  154 +
  155 + virtual void error( SP_Response * response );
  156 +
  157 + virtual void timeout( SP_Response * response );
  158 +
  159 + virtual void close();
  160 +
  161 +private:
  162 + SP_Sid_t mSid;
  163 + int mChatID;
  164 +
  165 + SP_OnlineManager * mOnlineManager;
  166 +
  167 + static int mMsgSeq;
  168 +
  169 + void broadcast( SP_Response * response, const char * buffer, SP_Sid_t * ignoreSid = 0 );
  170 +};
  171 +
  172 +int SP_ChatHandler :: mMsgSeq = 0;
  173 +
  174 +SP_ChatHandler :: SP_ChatHandler( SP_OnlineManager * onlineManager, int chatID )
  175 +{
  176 + memset( &mSid, 0, sizeof( mSid ) );
  177 + mChatID = chatID;
  178 +
  179 + mOnlineManager = onlineManager;
  180 +}
  181 +
  182 +SP_ChatHandler :: ~SP_ChatHandler()
  183 +{
  184 +}
  185 +
  186 +void SP_ChatHandler :: broadcast( SP_Response * response, const char * buffer, SP_Sid_t * ignoreSid )
  187 +{
  188 + if( mOnlineManager->getCount() > 0 ) {
  189 + SP_Message * msg = new SP_Message();
  190 + mOnlineManager->copy( msg->getToList(), ignoreSid );
  191 + msg->setCompletionKey( ++mMsgSeq );
  192 +
  193 + msg->getMsg()->append( buffer );
  194 + response->addMessage( msg );
  195 + }
  196 +}
  197 +
  198 +int SP_ChatHandler :: start( SP_Request * request, SP_Response * response )
  199 +{
  200 + request->setMsgDecoder( new SP_LineMsgDecoder() );
  201 +
  202 + mSid = response->getFromSid();
  203 +
  204 + char buffer[ 128 ] = { 0 };
  205 + snprintf( buffer, sizeof( buffer ),
  206 + "Welcome %d to chat server, enter 'quit' to quit.\r\n", mChatID );
  207 + response->getReply()->getMsg()->append( buffer );
  208 + response->getReply()->setCompletionKey( ++mMsgSeq );
  209 +
  210 + SP_OnlineInfo_t * info = (SP_OnlineInfo_t *)malloc( sizeof( SP_OnlineInfo_t ) );
  211 + info->mSid = mSid;
  212 + info->mChatID = mChatID;
  213 + mOnlineManager->add( info );
  214 +
  215 + return 0;
  216 +}
  217 +
  218 +int SP_ChatHandler :: handle( SP_Request * request, SP_Response * response )
  219 +{
  220 + SP_LineMsgDecoder * decoder = (SP_LineMsgDecoder*)request->getMsgDecoder();
  221 +
  222 + char buffer[ 256 ] = { 0 };
  223 +
  224 + if( 0 != strcasecmp( (char*)decoder->getMsg(), "quit" ) ) {
  225 + snprintf( buffer, sizeof( buffer ), "%d say: %s\r\n", mChatID, (char*)decoder->getMsg() );
  226 + broadcast( response, buffer );
  227 +
  228 + return 0;
  229 + } else {
  230 + snprintf( buffer, sizeof( buffer ), "SYS : %d normal offline\r\n", mChatID );
  231 + broadcast( response, buffer, &mSid );
  232 +
  233 + response->getReply()->getMsg()->append( "SYS : Byebye\r\n" );
  234 + response->getReply()->setCompletionKey( ++mMsgSeq );
  235 +
  236 + return -1;
  237 + }
  238 +}
  239 +
  240 +void SP_ChatHandler :: error( SP_Response * response )
  241 +{
  242 + char buffer[ 64 ] = { 0 };
  243 + snprintf( buffer, sizeof( buffer ), "SYS : %d error offline\r\n", mChatID );
  244 +
  245 + broadcast( response, buffer, &mSid );
  246 +}
  247 +
  248 +void SP_ChatHandler :: timeout( SP_Response * response )
  249 +{
  250 + char buffer[ 64 ] = { 0 };
  251 + snprintf( buffer, sizeof( buffer ), "SYS : %d timeout offline\r\n", mChatID );
  252 +
  253 + broadcast( response, buffer, &mSid );
  254 +}
  255 +
  256 +void SP_ChatHandler :: close()
  257 +{
  258 + mOnlineManager->remove( mSid );
  259 +}
  260 +
  261 +//---------------------------------------------------------
  262 +
  263 +class SP_ChatCompletionHandler : public SP_CompletionHandler {
  264 +public:
  265 + SP_ChatCompletionHandler();
  266 + ~SP_ChatCompletionHandler();
  267 + virtual void completionMessage( SP_Message * msg );
  268 +};
  269 +
  270 +SP_ChatCompletionHandler :: SP_ChatCompletionHandler()
  271 +{
  272 +}
  273 +
  274 +SP_ChatCompletionHandler :: ~SP_ChatCompletionHandler()
  275 +{
  276 +}
  277 +
  278 +void SP_ChatCompletionHandler :: completionMessage( SP_Message * msg )
  279 +{
  280 +#if 0
  281 + printf( "message completed { completion key : %d }\n", msg->getCompletionKey() );
  282 +
  283 + printf( "\tsuccess {" );
  284 + for( int i = 0; i < msg->getSuccess()->getCount(); i++ ) {
  285 + printf( " %d", msg->getSuccess()->get( i ).mKey );
  286 + }
  287 + printf( "}\n" );
  288 +
  289 + printf( "\tfailure {" );
  290 + for( int i = 0; i < msg->getFailure()->getCount(); i++ ) {
  291 + printf( " %d", msg->getFailure()->get( i ).mKey );
  292 + }
  293 + printf( "}\n" );
  294 +#endif
  295 +
  296 + delete msg;
  297 +}
  298 +
  299 +//---------------------------------------------------------
  300 +
  301 +int main( int argc, char * argv[] )
  302 +{
  303 + int port = 5555, maxThreads = 10;
  304 +
  305 + extern char *optarg ;
  306 + int c ;
  307 +
  308 + while( ( c = getopt ( argc, argv, "p:t:v" )) != EOF ) {
  309 + switch ( c ) {
  310 + case 'p' :
  311 + port = atoi( optarg );
  312 + break;
  313 + case 't':
  314 + maxThreads = atoi( optarg );
  315 + break;
  316 + case '?' :
  317 + case 'v' :
  318 + printf( "Usage: %s [-p <port>] [-t <threads>]\n", argv[0] );
  319 + exit( 0 );
  320 + }
  321 + }
  322 +
  323 + sp_openlog( "testiocpdispatcher", LOG_CONS | LOG_PID | LOG_PERROR, LOG_USER );
  324 +
  325 + assert( 0 == sp_initsock() );
  326 +
  327 + SP_OnlineManager onlineManager;
  328 + int chatID = 0;
  329 +
  330 + int maxConnections = 100, reqQueueSize = 10;
  331 + const char * refusedMsg = "System busy, try again later.";
  332 +
  333 + int listenFd = -1;
  334 + if( 0 == SP_IOUtils::tcpListen( "", port, &listenFd ) ) {
  335 + SP_IocpDispatcher dispatcher( new SP_ChatCompletionHandler(), maxThreads );
  336 + dispatcher.dispatch();
  337 +
  338 + for( ; ; ) {
  339 + struct sockaddr_in addr;
  340 + socklen_t socklen = sizeof( addr );
  341 + int fd = accept( listenFd, (struct sockaddr*)&addr, &socklen );
  342 +
  343 + if( fd > 0 ) {
  344 + if( dispatcher.getSessionCount() >= maxConnections
  345 + || dispatcher.getReqQueueLength() >= reqQueueSize ) {
  346 + send( fd, refusedMsg, strlen( refusedMsg ), 0 );
  347 + sp_close( fd );
  348 + } else {
  349 +
  350 + char buffer[ 256 ] = { 0 };
  351 + snprintf( buffer, sizeof( buffer ), "SYS : %d online\r\n", ++chatID );
  352 +
  353 + SP_Message * msg = new SP_Message();
  354 + onlineManager.copy( msg->getToList(), NULL );
  355 + msg->getMsg()->append( buffer );
  356 +
  357 + SP_Sid_t sid = { SP_Sid_t::ePushKey, SP_Sid_t::ePushSeq };
  358 + SP_Response * response = new SP_Response( sid );
  359 + response->addMessage( msg );
  360 +
  361 + /* close a random session */
  362 + if( onlineManager.getCount() > 0 && ( 0 == rand() % 3 ) ) {
  363 + sid = msg->getToList()->get( rand() % msg->getToList()->getCount() );
  364 + response->getToCloseList()->add( sid );
  365 +
  366 + msg = new SP_Message();
  367 + snprintf( buffer, sizeof( buffer ), "SYS : %d force to offline\r\n",
  368 + onlineManager.getChatID( sid ) );
  369 + msg->getMsg()->append( buffer );
  370 + onlineManager.copy( msg->getToList(), NULL );
  371 + response->addMessage( msg );
  372 +
  373 + msg = new SP_Message();
  374 + msg->getMsg()->append( "SYS : Force to close\r\n" );
  375 + msg->getToList()->add( sid );
  376 + response->addMessage( msg );
  377 + }
  378 +
  379 + dispatcher.push( response );
  380 +
  381 + dispatcher.push( fd, new SP_ChatHandler( &onlineManager, chatID ) );
  382 + }
  383 + } else {
  384 + break;
  385 + }
  386 + }
  387 + }
  388 +
  389 + sp_closelog();
  390 +
  391 + return 0;
  392 +}
  393 +
... ...
注册登录 后发表评论