提交 cfe1508f1849463b86a3eb839c183e941e528432

作者 LJH 李佳桓
1 个父辈 95b02200

add

  1 +/*
  2 + * Copyright 2008 Stephen Liu
  3 + * For license terms, see the file COPYING along with this library.
  4 + */
  5 +
  6 +#include <stdlib.h>
  7 +#include <stdio.h>
  8 +#include <string.h>
  9 +#include <assert.h>
  10 +#include <errno.h>
  11 +#include <signal.h>
  12 +
  13 +#include "spporting.hpp"
  14 +#include "spthread.hpp"
  15 +
  16 +#include "spiocpdispatcher.hpp"
  17 +
  18 +#include "spwin32iocp.hpp"
  19 +#include "sphandler.hpp"
  20 +#include "spsession.hpp"
  21 +#include "spexecutor.hpp"
  22 +#include "sputils.hpp"
  23 +#include "spioutils.hpp"
  24 +#include "spiochannel.hpp"
  25 +#include "sprequest.hpp"
  26 +
  27 +#include "spiocpevent.hpp"
  28 +
  29 +SP_IocpDispatcher :: SP_IocpDispatcher( SP_CompletionHandler * completionHandler, int maxThreads )
  30 +{
  31 +#ifdef SIGPIPE
  32 + /* Don't die with SIGPIPE on remote read shutdown. That's dumb. */
  33 + signal( SIGPIPE, SIG_IGN );
  34 +#endif
  35 +
  36 + mIsShutdown = 0;
  37 + mIsRunning = 0;
  38 +
  39 + mEventArg = new SP_IocpEventArg( 600 );
  40 + SP_IocpMsgQueue * msgQueue = new SP_IocpMsgQueue( mEventArg->getCompletionPort(),
  41 + SP_IocpEventCallback::eKeyMsgQueue, SP_IocpEventCallback::onResponse, mEventArg );
  42 + mEventArg->setResponseQueue( msgQueue );
  43 + // load DisconnectEx
  44 + {
  45 + int fd = WSASocket( AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED );
  46 + mEventArg->loadDisconnectEx( fd );
  47 + closesocket( fd );
  48 + }
  49 +
  50 + mMaxThreads = maxThreads > 0 ? maxThreads : 4;
  51 +
  52 + mCompletionHandler = completionHandler;
  53 +
  54 + mPushQueue = new SP_IocpMsgQueue( mEventArg->getCompletionPort(),
  55 + SP_IocpEventCallback::eKeyMsgQueue, onPush, mEventArg );
  56 +}
  57 +
  58 +SP_IocpDispatcher :: ~SP_IocpDispatcher()
  59 +{
  60 + if( 0 == mIsRunning ) sleep( 1 );
  61 +
  62 + shutdown();
  63 +
  64 + for( ; mIsRunning; ) sleep( 1 );
  65 +
  66 + delete mPushQueue;
  67 + mPushQueue = NULL;
  68 +
  69 + delete mEventArg;
  70 + mEventArg = NULL;
  71 +}
  72 +
  73 +void SP_IocpDispatcher :: setTimeout( int timeout )
  74 +{
  75 + mEventArg->setTimeout( timeout );
  76 +}
  77 +
  78 +void SP_IocpDispatcher :: shutdown()
  79 +{
  80 + mIsShutdown = 1;
  81 +
  82 + PostQueuedCompletionStatus( mEventArg->getCompletionPort(), 0, 0, 0 );
  83 +}
  84 +
  85 +int SP_IocpDispatcher :: isRunning()
  86 +{
  87 + return mIsRunning;
  88 +}
  89 +
  90 +int SP_IocpDispatcher :: getSessionCount()
  91 +{
  92 + return mEventArg->getSessionManager()->getCount();
  93 +}
  94 +
  95 +int SP_IocpDispatcher :: getReqQueueLength()
  96 +{
  97 + return mEventArg->getInputResultQueue()->getLength();
  98 +}
  99 +
  100 +int SP_IocpDispatcher :: dispatch()
  101 +{
  102 + int ret = -1;
  103 +
  104 + sp_thread_attr_t attr;
  105 + sp_thread_attr_init( &attr );
  106 + assert( sp_thread_attr_setstacksize( &attr, 1024 * 1024 ) == 0 );
  107 + sp_thread_attr_setdetachstate( &attr, SP_THREAD_CREATE_DETACHED );
  108 +
  109 + sp_thread_t thread;
  110 + ret = sp_thread_create( &thread, &attr, eventLoop, this );
  111 + sp_thread_attr_destroy( &attr );
  112 + if( 0 == ret ) {
  113 + sp_syslog( LOG_NOTICE, "Thread #%ld has been created for dispatcher", thread );
  114 + } else {
  115 + mIsRunning = 0;
  116 + sp_syslog( LOG_WARNING, "Unable to create a thread for dispatcher, %s",
  117 + strerror( errno ) ) ;
  118 + }
  119 +
  120 + return ret;
  121 +}
  122 +
  123 +sp_thread_result_t SP_THREAD_CALL SP_IocpDispatcher :: eventLoop( void * arg )
  124 +{
  125 + SP_IocpDispatcher * dispatcher = (SP_IocpDispatcher*)arg;
  126 +
  127 + dispatcher->mIsRunning = 1;
  128 +
  129 + dispatcher->start();
  130 +
  131 + dispatcher->mIsRunning = 0;
  132 +
  133 + return 0;
  134 +}
  135 +
  136 +void SP_IocpDispatcher :: outputCompleted( void * arg )
  137 +{
  138 + SP_CompletionHandler * handler = ( SP_CompletionHandler * ) ((void**)arg)[0];
  139 + SP_Message * msg = ( SP_Message * ) ((void**)arg)[ 1 ];
  140 +
  141 + handler->completionMessage( msg );
  142 +
  143 + free( arg );
  144 +}
  145 +
  146 +int SP_IocpDispatcher :: start()
  147 +{
  148 + SP_Executor workerExecutor( mMaxThreads, "work" );
  149 + SP_Executor actExecutor( 1, "act" );
  150 +
  151 + /* Start the event loop. */
  152 + while( 0 == mIsShutdown ) {
  153 + SP_IocpEventCallback::eventLoop( mEventArg, NULL );
  154 +
  155 + for( ; NULL != mEventArg->getInputResultQueue()->top(); ) {
  156 + SP_Task * task = (SP_Task*)mEventArg->getInputResultQueue()->pop();
  157 + workerExecutor.execute( task );
  158 + }
  159 +
  160 + for( ; NULL != mEventArg->getOutputResultQueue()->top(); ) {
  161 + SP_Message * msg = (SP_Message*)mEventArg->getOutputResultQueue()->pop();
  162 +
  163 + void ** arg = ( void** )malloc( sizeof( void * ) * 2 );
  164 + arg[ 0 ] = (void*)mCompletionHandler;
  165 + arg[ 1 ] = (void*)msg;
  166 +
  167 + actExecutor.execute( outputCompleted, arg );
  168 + }
  169 + }
  170 +
  171 + sp_syslog( LOG_NOTICE, "Dispatcher is shutdown." );
  172 +
  173 + return 0;
  174 +}
  175 +
  176 +typedef struct tagSP_IocpPushArg {
  177 + int mType; // 0 : fd, 1 : timer
  178 +
  179 + // for push fd
  180 + int mFd;
  181 + SP_Handler * mHandler;
  182 + SP_IOChannel * mIOChannel;
  183 + int mNeedStart;
  184 +
  185 + // for push timer
  186 + struct timeval mTimeout;
  187 + SP_IocpEvent_t mTimerEvent;
  188 + SP_TimerHandler * mTimerHandler;
  189 + SP_IocpEventArg * mEventArg;
  190 + SP_IocpMsgQueue * mPushQueue;
  191 +} SP_IocpPushArg_t;
  192 +
  193 +void SP_IocpDispatcher :: onPush( void * queueData, void * arg )
  194 +{
  195 + SP_IocpPushArg_t * pushArg = (SP_IocpPushArg_t*)queueData;
  196 + SP_IocpEventArg * eventArg = (SP_IocpEventArg*)arg;
  197 +
  198 + if( 0 == pushArg->mType ) {
  199 + SP_Sid_t sid;
  200 + sid.mKey = eventArg->getSessionManager()->allocKey( &sid.mSeq );
  201 + assert( sid.mKey > 0 );
  202 +
  203 + SP_Session * session = new SP_Session( sid );
  204 +
  205 + char clientIP[ 32 ] = { 0 };
  206 + {
  207 + struct sockaddr_in clientAddr;
  208 + socklen_t clientLen = sizeof( clientAddr );
  209 + getpeername( pushArg->mFd, (struct sockaddr *)&clientAddr, &clientLen );
  210 + SP_IOUtils::inetNtoa( &( clientAddr.sin_addr ), clientIP, sizeof( clientIP ) );
  211 + session->getRequest()->setClientPort( ntohs( clientAddr.sin_port ) );
  212 + }
  213 + session->getRequest()->setClientIP( clientIP );
  214 +
  215 + session->setHandler( pushArg->mHandler );
  216 + session->setArg( eventArg );
  217 + session->setIOChannel( pushArg->mIOChannel );
  218 +
  219 + if( SP_IocpEventCallback::addSession( eventArg, (HANDLE)pushArg->mFd, session ) ) {
  220 + eventArg->getSessionManager()->put( sid.mKey, sid.mSeq, session );
  221 +
  222 + if( pushArg->mNeedStart ) {
  223 + SP_IocpEventHelper::doStart( session );
  224 + } else {
  225 + SP_IocpEventCallback::addRecv( session );
  226 + }
  227 + } else {
  228 + delete session;
  229 + }
  230 +
  231 + free( pushArg );
  232 + } else {
  233 + memset( &( pushArg->mTimerEvent ), 0, sizeof( SP_IocpEvent_t ) );
  234 + pushArg->mTimerEvent.mType = SP_IocpEvent_t::eEventTimer;
  235 +
  236 + struct timeval curr;
  237 + sp_gettimeofday( &curr, NULL );
  238 + struct timeval * dest = &( pushArg->mTimerEvent.mTimeout );
  239 + struct timeval * src = &( pushArg->mTimeout );
  240 +
  241 + dest->tv_sec = curr.tv_sec + src->tv_sec;
  242 + dest->tv_usec = curr.tv_usec + src->tv_usec;
  243 + if( dest->tv_usec >= 1000000 ) {
  244 + dest->tv_sec++;
  245 + dest->tv_usec -= 1000000;
  246 + }
  247 +
  248 + pushArg->mTimerEvent.mHeapIndex = -1;
  249 + pushArg->mTimerEvent.mOnTimer = onTimer;
  250 +
  251 + eventArg->getEventHeap()->push( &( pushArg->mTimerEvent ) );
  252 + }
  253 +}
  254 +
  255 +int SP_IocpDispatcher :: push( int fd, SP_Handler * handler, int needStart )
  256 +{
  257 + SP_IOChannel * ioChannel = new SP_DefaultIOChannel();
  258 + return push( fd, handler, ioChannel, needStart );
  259 +}
  260 +
  261 +int SP_IocpDispatcher :: push( int fd, SP_Handler * handler,
  262 + SP_IOChannel * ioChannel, int needStart )
  263 +{
  264 + SP_IocpPushArg_t * arg = (SP_IocpPushArg_t*)malloc( sizeof( SP_IocpPushArg_t ) );
  265 + arg->mType = 0;
  266 + arg->mFd = fd;
  267 + arg->mHandler = handler;
  268 + arg->mNeedStart = needStart;
  269 + arg->mIOChannel = ioChannel;
  270 +
  271 + SP_IOUtils::setNonblock( fd );
  272 +
  273 + return mPushQueue->push( arg );
  274 +}
  275 +
  276 +void SP_IocpDispatcher :: onTimer( void * arg )
  277 +{
  278 + SP_IocpEvent_t * event = (SP_IocpEvent_t*)arg;
  279 + SP_IocpPushArg_t * pushArg = CONTAINING_RECORD( event, SP_IocpPushArg_t, mTimerEvent );
  280 +
  281 + pushArg->mEventArg->getInputResultQueue()->push(
  282 + new SP_SimpleTask( timer, pushArg, 1 ) );
  283 +}
  284 +
  285 +void SP_IocpDispatcher :: timer( void * arg )
  286 +{
  287 + SP_IocpPushArg_t * pushArg = (SP_IocpPushArg_t*)arg;
  288 + SP_TimerHandler * handler = pushArg->mTimerHandler;
  289 + SP_IocpEventArg * eventArg = pushArg->mEventArg;
  290 +
  291 + SP_Sid_t sid;
  292 + sid.mKey = SP_Sid_t::eTimerKey;
  293 + sid.mSeq = SP_Sid_t::eTimerSeq;
  294 + SP_Response * response = new SP_Response( sid );
  295 + if( 0 == handler->handle( response, &( pushArg->mTimeout ) ) ) {
  296 + pushArg->mPushQueue->push( arg );
  297 + } else {
  298 + delete pushArg->mTimerHandler;
  299 + free( pushArg );
  300 + }
  301 +
  302 + eventArg->getResponseQueue()->push( response );
  303 +}
  304 +
  305 +int SP_IocpDispatcher :: push( const struct timeval * timeout, SP_TimerHandler * handler )
  306 +{
  307 + SP_IocpPushArg_t * arg = (SP_IocpPushArg_t*)malloc( sizeof( SP_IocpPushArg_t ) );
  308 +
  309 + arg->mType = 1;
  310 + arg->mTimeout = *timeout;
  311 + arg->mTimerHandler = handler;
  312 + arg->mEventArg = mEventArg;
  313 + arg->mPushQueue = mPushQueue;
  314 +
  315 + return mPushQueue->push( arg );
  316 +}
  317 +
  318 +int SP_IocpDispatcher :: push( SP_Response * response )
  319 +{
  320 + return mEventArg->getResponseQueue()->push( response );
  321 +}
... ...
注册登录 后发表评论