提交 2ec5f42d7d5aeed4d65df6f1a5d9d410a9f09f19

作者 LJH 李佳桓
1 个父辈 1ed1b94b

add

正在显示 1 个修改的文件 包含 295 行增加0 行删除
  1 +/*
  2 + * Copyright 2007 Stephen Liu
  3 + * For license terms, see the file COPYING along with this library.
  4 + */
  5 +
  6 +#include <stdlib.h>
  7 +#include <stdio.h>
  8 +#include <string.h>
  9 +#include <assert.h>
  10 +#include <errno.h>
  11 +#include <signal.h>
  12 +
  13 +#include "spporting.hpp"
  14 +#include "spthread.hpp"
  15 +
  16 +#include "spdispatcher.hpp"
  17 +
  18 +#include "speventcb.hpp"
  19 +#include "sphandler.hpp"
  20 +#include "spsession.hpp"
  21 +#include "spexecutor.hpp"
  22 +#include "sputils.hpp"
  23 +#include "spiochannel.hpp"
  24 +#include "spioutils.hpp"
  25 +#include "sprequest.hpp"
  26 +
  27 +#include "event_msgqueue.h"
  28 +
  29 +SP_Dispatcher :: SP_Dispatcher( SP_CompletionHandler * completionHandler, int maxThreads )
  30 +{
  31 +#ifdef SIGPIPE
  32 + /* Don't die with SIGPIPE on remote read shutdown. That's dumb. */
  33 + signal( SIGPIPE, SIG_IGN );
  34 +#endif
  35 +
  36 + mIsShutdown = 0;
  37 + mIsRunning = 0;
  38 +
  39 + mEventArg = new SP_EventArg( 600 );
  40 +
  41 + mMaxThreads = maxThreads > 0 ? maxThreads : 4;
  42 +
  43 + mCompletionHandler = completionHandler;
  44 +
  45 + mPushQueue = msgqueue_new( mEventArg->getEventBase(), 0, onPush, mEventArg );
  46 +}
  47 +
  48 +SP_Dispatcher :: ~SP_Dispatcher()
  49 +{
  50 + if( 0 == mIsRunning ) sleep( 1 );
  51 +
  52 + shutdown();
  53 +
  54 + for( ; mIsRunning; ) sleep( 1 );
  55 +
  56 + //msgqueue_destroy( (struct event_msgqueue*)mPushQueue );
  57 +
  58 + delete mEventArg;
  59 + mEventArg = NULL;
  60 +}
  61 +
  62 +void SP_Dispatcher :: setTimeout( int timeout )
  63 +{
  64 + mEventArg->setTimeout( timeout );
  65 +}
  66 +
  67 +void SP_Dispatcher :: shutdown()
  68 +{
  69 + mIsShutdown = 1;
  70 +}
  71 +
  72 +int SP_Dispatcher :: isRunning()
  73 +{
  74 + return mIsRunning;
  75 +}
  76 +
  77 +int SP_Dispatcher :: getSessionCount()
  78 +{
  79 + return mEventArg->getSessionManager()->getCount();
  80 +}
  81 +
  82 +int SP_Dispatcher :: getReqQueueLength()
  83 +{
  84 + return mEventArg->getInputResultQueue()->getLength();
  85 +}
  86 +
  87 +int SP_Dispatcher :: dispatch()
  88 +{
  89 + int ret = -1;
  90 +
  91 + sp_thread_attr_t attr;
  92 + sp_thread_attr_init( &attr );
  93 + assert( sp_thread_attr_setstacksize( &attr, 1024 * 1024 ) == 0 );
  94 + sp_thread_attr_setdetachstate( &attr, SP_THREAD_CREATE_DETACHED );
  95 +
  96 + sp_thread_t thread;
  97 + ret = sp_thread_create( &thread, &attr, eventLoop, this );
  98 + sp_thread_attr_destroy( &attr );
  99 + if( 0 == ret ) {
  100 + sp_syslog( LOG_NOTICE, "Thread #%ld has been created for dispatcher", thread );
  101 + } else {
  102 + mIsRunning = 0;
  103 + sp_syslog( LOG_WARNING, "Unable to create a thread for dispatcher, %s",
  104 + strerror( errno ) ) ;
  105 + }
  106 +
  107 + return ret;
  108 +}
  109 +
  110 +sp_thread_result_t SP_THREAD_CALL SP_Dispatcher :: eventLoop( void * arg )
  111 +{
  112 + SP_Dispatcher * dispatcher = (SP_Dispatcher*)arg;
  113 +
  114 + dispatcher->mIsRunning = 1;
  115 +
  116 + dispatcher->start();
  117 +
  118 + dispatcher->mIsRunning = 0;
  119 +
  120 + return 0;
  121 +}
  122 +
  123 +void SP_Dispatcher :: outputCompleted( void * arg )
  124 +{
  125 + SP_CompletionHandler * handler = ( SP_CompletionHandler * ) ((void**)arg)[0];
  126 + SP_Message * msg = ( SP_Message * ) ((void**)arg)[ 1 ];
  127 +
  128 + handler->completionMessage( msg );
  129 +
  130 + free( arg );
  131 +}
  132 +
  133 +int SP_Dispatcher :: start()
  134 +{
  135 + SP_Executor workerExecutor( mMaxThreads, "work" );
  136 + SP_Executor actExecutor( 1, "act" );
  137 +
  138 + /* Start the event loop. */
  139 + while( 0 == mIsShutdown ) {
  140 + event_base_loop( mEventArg->getEventBase(), EVLOOP_ONCE );
  141 +
  142 + for( ; NULL != mEventArg->getInputResultQueue()->top(); ) {
  143 + SP_Task * task = (SP_Task*)mEventArg->getInputResultQueue()->pop();
  144 + workerExecutor.execute( task );
  145 + }
  146 +
  147 + for( ; NULL != mEventArg->getOutputResultQueue()->top(); ) {
  148 + SP_Message * msg = (SP_Message*)mEventArg->getOutputResultQueue()->pop();
  149 +
  150 + void ** arg = ( void** )malloc( sizeof( void * ) * 2 );
  151 + arg[ 0 ] = (void*)mCompletionHandler;
  152 + arg[ 1 ] = (void*)msg;
  153 +
  154 + actExecutor.execute( outputCompleted, arg );
  155 + }
  156 + }
  157 +
  158 + sp_syslog( LOG_NOTICE, "Dispatcher is shutdown." );
  159 +
  160 + return 0;
  161 +}
  162 +
  163 +typedef struct tagSP_PushArg {
  164 + int mType; // 0 : fd, 1 : timer
  165 +
  166 + // for push fd
  167 + int mFd;
  168 + SP_Handler * mHandler;
  169 + SP_IOChannel * mIOChannel;
  170 + int mNeedStart;
  171 +
  172 + // for push timer
  173 + struct timeval mTimeout;
  174 + struct event mTimerEvent;
  175 + SP_TimerHandler * mTimerHandler;
  176 + SP_EventArg * mEventArg;
  177 + void * mPushQueue;
  178 +} SP_PushArg_t;
  179 +
  180 +void SP_Dispatcher :: onPush( void * queueData, void * arg )
  181 +{
  182 + SP_PushArg_t * pushArg = (SP_PushArg_t*)queueData;
  183 + SP_EventArg * eventArg = (SP_EventArg*)arg;
  184 +
  185 + if( 0 == pushArg->mType ) {
  186 + SP_Sid_t sid;
  187 + sid.mKey = eventArg->getSessionManager()->allocKey( &sid.mSeq );
  188 + assert( sid.mKey > 0 );
  189 +
  190 + SP_Session * session = new SP_Session( sid );
  191 +
  192 + char clientIP[ 32 ] = { 0 };
  193 + {
  194 + struct sockaddr_in clientAddr;
  195 + socklen_t clientLen = sizeof( clientAddr );
  196 + getpeername( pushArg->mFd, (struct sockaddr *)&clientAddr, &clientLen );
  197 + SP_IOUtils::inetNtoa( &( clientAddr.sin_addr ), clientIP, sizeof( clientIP ) );
  198 + session->getRequest()->setClientPort( ntohs( clientAddr.sin_port ) );
  199 + }
  200 + session->getRequest()->setClientIP( clientIP );
  201 +
  202 +
  203 + eventArg->getSessionManager()->put( sid.mKey, sid.mSeq, session );
  204 +
  205 + session->setHandler( pushArg->mHandler );
  206 + session->setIOChannel( pushArg->mIOChannel );
  207 + session->setArg( eventArg );
  208 +
  209 + event_set( session->getReadEvent(), pushArg->mFd, EV_READ,
  210 + SP_EventCallback::onRead, session );
  211 + event_set( session->getWriteEvent(), pushArg->mFd, EV_WRITE,
  212 + SP_EventCallback::onWrite, session );
  213 +
  214 + if( pushArg->mNeedStart ) {
  215 + SP_EventHelper::doStart( session );
  216 + } else {
  217 + SP_EventCallback::addEvent( session, EV_WRITE, pushArg->mFd );
  218 + SP_EventCallback::addEvent( session, EV_READ, pushArg->mFd );
  219 + }
  220 +
  221 + free( pushArg );
  222 + } else {
  223 + event_set( &( pushArg->mTimerEvent ), -1, 0, onTimer, pushArg );
  224 + event_base_set( eventArg->getEventBase(), &( pushArg->mTimerEvent ) );
  225 + event_add( &( pushArg->mTimerEvent ), &( pushArg->mTimeout ) );
  226 + }
  227 +}
  228 +
  229 +int SP_Dispatcher :: push( int fd, SP_Handler * handler, int needStart )
  230 +{
  231 + SP_IOChannel * ioChannel = new SP_DefaultIOChannel();
  232 + return push( fd, handler, ioChannel, needStart );
  233 +}
  234 +
  235 +int SP_Dispatcher :: push( int fd, SP_Handler * handler,
  236 + SP_IOChannel * ioChannel, int needStart )
  237 +{
  238 + SP_PushArg_t * arg = (SP_PushArg_t*)malloc( sizeof( SP_PushArg_t ) );
  239 + arg->mType = 0;
  240 + arg->mFd = fd;
  241 + arg->mHandler = handler;
  242 + arg->mIOChannel = ioChannel;
  243 + arg->mNeedStart = needStart;
  244 +
  245 + SP_IOUtils::setNonblock( fd );
  246 +
  247 + return msgqueue_push( (struct event_msgqueue*)mPushQueue, arg );
  248 +}
  249 +
  250 +void SP_Dispatcher :: onTimer( int, short, void * arg )
  251 +{
  252 + SP_PushArg_t * pushArg = (SP_PushArg_t*)arg;
  253 +
  254 + pushArg->mEventArg->getInputResultQueue()->push(
  255 + new SP_SimpleTask( timer, pushArg, 1 ) );
  256 +}
  257 +
  258 +void SP_Dispatcher :: timer( void * arg )
  259 +{
  260 + SP_PushArg_t * pushArg = (SP_PushArg_t*)arg;
  261 + SP_TimerHandler * handler = pushArg->mTimerHandler;
  262 + SP_EventArg * eventArg = pushArg->mEventArg;
  263 +
  264 + SP_Sid_t sid;
  265 + sid.mKey = SP_Sid_t::eTimerKey;
  266 + sid.mSeq = SP_Sid_t::eTimerSeq;
  267 + SP_Response * response = new SP_Response( sid );
  268 + if( 0 == handler->handle( response, &( pushArg->mTimeout ) ) ) {
  269 + msgqueue_push( (struct event_msgqueue*)pushArg->mPushQueue, arg );
  270 + } else {
  271 + delete pushArg->mTimerHandler;
  272 + free( pushArg );
  273 + }
  274 +
  275 + msgqueue_push( (struct event_msgqueue*)eventArg->getResponseQueue(), response );
  276 +}
  277 +
  278 +int SP_Dispatcher :: push( const struct timeval * timeout, SP_TimerHandler * handler )
  279 +{
  280 + SP_PushArg_t * arg = (SP_PushArg_t*)malloc( sizeof( SP_PushArg_t ) );
  281 +
  282 + arg->mType = 1;
  283 + arg->mTimeout = *timeout;
  284 + arg->mTimerHandler = handler;
  285 + arg->mEventArg = mEventArg;
  286 + arg->mPushQueue = mPushQueue;
  287 +
  288 + return msgqueue_push( (struct event_msgqueue*)mPushQueue, arg );
  289 +}
  290 +
  291 +int SP_Dispatcher :: push( SP_Response * response )
  292 +{
  293 + return msgqueue_push( (struct event_msgqueue*)mEventArg->getResponseQueue(), response );
  294 +}
  295 +
... ...
注册登录 后发表评论