提交 a9d4ce8700fd6ae921dd0f8b1808a8a497dac08c

作者 LJH 李佳桓
1 个父辈 1aaa0760

add

正在显示 1 个修改的文件 包含 619 行增加0 行删除
  1 +/*
  2 + * Copyright 2007 Stephen Liu
  3 + * For license terms, see the file COPYING along with this library.
  4 + */
  5 +
  6 +#include <fcntl.h>
  7 +#include <stdio.h>
  8 +#include <string.h>
  9 +#include <assert.h>
  10 +#include <errno.h>
  11 +#include <stdlib.h>
  12 +
  13 +#include "spporting.hpp"
  14 +
  15 +#include "speventcb.hpp"
  16 +#include "spexecutor.hpp"
  17 +#include "spsession.hpp"
  18 +#include "spresponse.hpp"
  19 +#include "sphandler.hpp"
  20 +#include "spbuffer.hpp"
  21 +#include "spmsgdecoder.hpp"
  22 +#include "sputils.hpp"
  23 +#include "sprequest.hpp"
  24 +#include "spmsgblock.hpp"
  25 +#include "spiochannel.hpp"
  26 +#include "spioutils.hpp"
  27 +
  28 +#include "event_msgqueue.h"
  29 +#include "event.h"
  30 +
  31 +SP_EventArg :: SP_EventArg( int timeout )
  32 +{
  33 + mEventBase = (struct event_base*)event_init();
  34 +
  35 + mResponseQueue = msgqueue_new( mEventBase, 0,
  36 + SP_EventCallback::onResponse, this );
  37 +
  38 + mInputResultQueue = new SP_BlockingQueue();
  39 +
  40 + mOutputResultQueue = new SP_BlockingQueue();
  41 +
  42 + mSessionManager = new SP_SessionManager();
  43 +
  44 + mTimeout = timeout;
  45 +}
  46 +
  47 +SP_EventArg :: ~SP_EventArg()
  48 +{
  49 + delete mInputResultQueue;
  50 + delete mOutputResultQueue;
  51 +
  52 + delete mSessionManager;
  53 +
  54 + //msgqueue_destroy( (struct event_msgqueue*)mResponseQueue );
  55 + //event_base_free( mEventBase );
  56 +}
  57 +
  58 +struct event_base * SP_EventArg :: getEventBase() const
  59 +{
  60 + return mEventBase;
  61 +}
  62 +
  63 +void * SP_EventArg :: getResponseQueue() const
  64 +{
  65 + return mResponseQueue;
  66 +}
  67 +
  68 +SP_BlockingQueue * SP_EventArg :: getInputResultQueue() const
  69 +{
  70 + return mInputResultQueue;
  71 +}
  72 +
  73 +SP_BlockingQueue * SP_EventArg :: getOutputResultQueue() const
  74 +{
  75 + return mOutputResultQueue;
  76 +}
  77 +
  78 +SP_SessionManager * SP_EventArg :: getSessionManager() const
  79 +{
  80 + return mSessionManager;
  81 +}
  82 +
  83 +void SP_EventArg :: setTimeout( int timeout )
  84 +{
  85 + mTimeout = timeout;
  86 +}
  87 +
  88 +int SP_EventArg :: getTimeout() const
  89 +{
  90 + return mTimeout;
  91 +}
  92 +
  93 +//-------------------------------------------------------------------
  94 +
  95 +void SP_EventCallback :: onAccept( int fd, short events, void * arg )
  96 +{
  97 + int clientFD;
  98 + struct sockaddr_in addr;
  99 + socklen_t addrLen = sizeof( addr );
  100 +
  101 + SP_AcceptArg_t * acceptArg = (SP_AcceptArg_t*)arg;
  102 + SP_EventArg * eventArg = acceptArg->mEventArg;
  103 +
  104 + clientFD = accept( fd, (struct sockaddr *)&addr, &addrLen );
  105 + if( -1 == clientFD ) {
  106 + sp_syslog( LOG_WARNING, "accept failed" );
  107 + return;
  108 + }
  109 +
  110 + if( SP_IOUtils::setNonblock( clientFD ) < 0 ) {
  111 + sp_syslog( LOG_WARNING, "failed to set client socket non-blocking" );
  112 + }
  113 +
  114 + SP_Sid_t sid;
  115 + sid.mKey = eventArg->getSessionManager()->allocKey( &sid.mSeq );
  116 + assert( sid.mKey > 0 );
  117 +
  118 + SP_Session * session = new SP_Session( sid );
  119 +
  120 + char strip[ 32 ] = { 0 };
  121 + SP_IOUtils::inetNtoa( &( addr.sin_addr ), strip, sizeof( strip ) );
  122 + session->getRequest()->setClientIP( strip );
  123 + session->getRequest()->setClientPort( ntohs( addr.sin_port ) );
  124 +
  125 + if( 0 == getsockname( clientFD, (struct sockaddr*)&addr, &addrLen ) ) {
  126 + SP_IOUtils::inetNtoa( &( addr.sin_addr ), strip, sizeof( strip ) );
  127 + session->getRequest()->setServerIP( strip );
  128 + }
  129 +
  130 + if( NULL != session ) {
  131 + eventArg->getSessionManager()->put( sid.mKey, sid.mSeq, session );
  132 +
  133 + session->setHandler( acceptArg->mHandlerFactory->create() );
  134 + session->setIOChannel( acceptArg->mIOChannelFactory->create() );
  135 + session->setArg( eventArg );
  136 +
  137 + event_set( session->getReadEvent(), clientFD, EV_READ, onRead, session );
  138 + event_set( session->getWriteEvent(), clientFD, EV_WRITE, onWrite, session );
  139 +
  140 + if( eventArg->getSessionManager()->getCount() > acceptArg->mMaxConnections
  141 + || eventArg->getInputResultQueue()->getLength() >= acceptArg->mReqQueueSize ) {
  142 + sp_syslog( LOG_WARNING, "System busy, session.count %d [%d], queue.length %d [%d]",
  143 + eventArg->getSessionManager()->getCount(), acceptArg->mMaxConnections,
  144 + eventArg->getInputResultQueue()->getLength(), acceptArg->mReqQueueSize );
  145 +
  146 + SP_Message * msg = new SP_Message();
  147 + msg->getMsg()->append( acceptArg->mRefusedMsg );
  148 + msg->getMsg()->append( "\r\n" );
  149 + session->getOutList()->append( msg );
  150 + session->setStatus( SP_Session::eExit );
  151 +
  152 + addEvent( session, EV_WRITE, clientFD );
  153 + } else {
  154 + SP_EventHelper::doStart( session );
  155 + }
  156 + } else {
  157 + eventArg->getSessionManager()->remove( sid.mKey, sid.mSeq );
  158 + sp_close( clientFD );
  159 + sp_syslog( LOG_WARNING, "Out of memory, cannot allocate session object!" );
  160 + }
  161 +}
  162 +
  163 +void SP_EventCallback :: onRead( int fd, short events, void * arg )
  164 +{
  165 + SP_Session * session = (SP_Session*)arg;
  166 +
  167 + session->setReading( 0 );
  168 +
  169 + SP_Sid_t sid = session->getSid();
  170 +
  171 + if( EV_READ & events ) {
  172 + int len = session->getIOChannel()->receive( session );
  173 +
  174 + if( len > 0 ) {
  175 + session->addRead( len );
  176 + if( 0 == session->getRunning() ) {
  177 + SP_MsgDecoder * decoder = session->getRequest()->getMsgDecoder();
  178 + if( SP_MsgDecoder::eOK == decoder->decode( session->getInBuffer() ) ) {
  179 + SP_EventHelper::doWork( session );
  180 + }
  181 + }
  182 + addEvent( session, EV_READ, -1 );
  183 + } else {
  184 + int saved = errno;
  185 +
  186 + if( 0 != errno ) {
  187 + sp_syslog( LOG_WARNING, "session(%d.%d) read error, errno %d, status %d",
  188 + sid.mKey, sid.mSeq, errno, session->getStatus() );
  189 + }
  190 +
  191 + if( EAGAIN != saved ) {
  192 + if( 0 == session->getRunning() ) {
  193 + SP_EventHelper::doError( session );
  194 + } else {
  195 + sp_syslog( LOG_NOTICE, "session(%d.%d) busy, process session error later",
  196 + sid.mKey, sid.mSeq );
  197 + // If this session is running, then onResponse will add write event for this session.
  198 + // It will be processed as write fail at the last. So no need to re-add event here.
  199 + }
  200 + } else {
  201 + addEvent( session, EV_READ, -1 );
  202 + }
  203 + }
  204 + } else {
  205 + if( 0 == session->getRunning() ) {
  206 + SP_EventHelper::doTimeout( session );
  207 + } else {
  208 + sp_syslog( LOG_NOTICE, "session(%d.%d) busy, process session timeout later",
  209 + sid.mKey, sid.mSeq );
  210 + // If this session is running, then onResponse will add write event for this session.
  211 + // It will be processed as write fail at the last. So no need to re-add event here.
  212 + }
  213 + }
  214 +}
  215 +
  216 +void SP_EventCallback :: onWrite( int fd, short events, void * arg )
  217 +{
  218 + SP_Session * session = (SP_Session*)arg;
  219 +
  220 + session->setWriting( 0 );
  221 +
  222 + SP_Sid_t sid = session->getSid();
  223 +
  224 + if( EV_WRITE & events ) {
  225 + int ret = 0;
  226 +
  227 + if( session->getOutList()->getCount() > 0 ) {
  228 + int len = session->getIOChannel()->transmit( session );
  229 + if( len > 0 ) {
  230 + session->addWrite( len );
  231 + if( session->getOutList()->getCount() > 0 ) {
  232 + // left for next write event
  233 + addEvent( session, EV_WRITE, -1 );
  234 + }
  235 + } else {
  236 + if( EAGAIN != errno ) {
  237 + ret = -1;
  238 + if( 0 == session->getRunning() ) {
  239 + sp_syslog( LOG_NOTICE, "session(%d.%d) write error, errno %d, status %d, count %d",
  240 + sid.mKey, sid.mSeq, errno, session->getStatus(), session->getOutList()->getCount() );
  241 + SP_EventHelper::doError( session );
  242 + } else {
  243 + sp_syslog( LOG_NOTICE, "session(%d.%d) busy, process session error later, errno [%d]",
  244 + sid.mKey, sid.mSeq, errno );
  245 + // If this session is running, then onResponse will add write event for this session.
  246 + // It will be processed as write fail at the last. So no need to re-add event here.
  247 + }
  248 + } else {
  249 + addEvent( session, EV_WRITE, -1 );
  250 + }
  251 + }
  252 + }
  253 +
  254 + if( 0 == ret && session->getOutList()->getCount() <= 0 ) {
  255 + if( SP_Session::eExit == session->getStatus() ) {
  256 + ret = -1;
  257 + if( 0 == session->getRunning() ) {
  258 + sp_syslog( LOG_DEBUG, "session(%d.%d) normal exit", sid.mKey, sid.mSeq );
  259 + SP_EventHelper::doClose( session );
  260 + } else {
  261 + sp_syslog( LOG_NOTICE, "session(%d.%d) busy, terminate session later",
  262 + sid.mKey, sid.mSeq );
  263 + // If this session is running, then onResponse will add write event for this session.
  264 + // It will be processed as write fail at the last. So no need to re-add event here.
  265 + }
  266 + }
  267 + }
  268 +
  269 + if( 0 == ret ) {
  270 + if( 0 == session->getRunning() ) {
  271 + SP_MsgDecoder * decoder = session->getRequest()->getMsgDecoder();
  272 + if( SP_MsgDecoder::eOK == decoder->decode( session->getInBuffer() ) ) {
  273 + SP_EventHelper::doWork( session );
  274 + }
  275 + } else {
  276 + // If this session is running, then onResponse will add write event for this session.
  277 + // So no need to add write event here.
  278 + }
  279 + }
  280 + } else {
  281 + if( 0 == session->getRunning() ) {
  282 + SP_EventHelper::doTimeout( session );
  283 + } else {
  284 + sp_syslog( LOG_NOTICE, "session(%d.%d) busy, process session timeout later",
  285 + sid.mKey, sid.mSeq );
  286 + // If this session is running, then onResponse will add write event for this session.
  287 + // It will be processed as write fail at the last. So no need to re-add event here.
  288 + }
  289 + }
  290 +}
  291 +
  292 +void SP_EventCallback :: onResponse( void * queueData, void * arg )
  293 +{
  294 + SP_Response * response = (SP_Response*)queueData;
  295 + SP_EventArg * eventArg = (SP_EventArg*)arg;
  296 + SP_SessionManager * manager = eventArg->getSessionManager();
  297 +
  298 + SP_Sid_t fromSid = response->getFromSid();
  299 + uint16_t seq = 0;
  300 +
  301 + if( ! SP_EventHelper::isSystemSid( &fromSid ) ) {
  302 + SP_Session * session = manager->get( fromSid.mKey, &seq );
  303 + if( seq == fromSid.mSeq && NULL != session ) {
  304 + if( SP_Session::eWouldExit == session->getStatus() ) {
  305 + session->setStatus( SP_Session::eExit );
  306 + }
  307 +
  308 + if( SP_Session::eNormal != session->getStatus() ) {
  309 + event_del( session->getReadEvent() );
  310 + }
  311 +
  312 + // always add a write event for sender,
  313 + // so the pending input can be processed in onWrite
  314 + addEvent( session, EV_WRITE, -1 );
  315 + addEvent( session, EV_READ, -1 );
  316 + } else {
  317 + sp_syslog( LOG_WARNING, "session(%d.%d) invalid, unknown FROM",
  318 + fromSid.mKey, fromSid.mSeq );
  319 + }
  320 + }
  321 +
  322 + for( SP_Message * msg = response->takeMessage();
  323 + NULL != msg; msg = response->takeMessage() ) {
  324 +
  325 + SP_SidList * sidList = msg->getToList();
  326 +
  327 + if( msg->getTotalSize() > 0 ) {
  328 + for( int i = sidList->getCount() - 1; i >= 0; i-- ) {
  329 + SP_Sid_t sid = sidList->get( i );
  330 + SP_Session * session = manager->get( sid.mKey, &seq );
  331 + if( seq == sid.mSeq && NULL != session ) {
  332 + if( 0 != memcmp( &fromSid, &sid, sizeof( sid ) )
  333 + && SP_Session::eExit == session->getStatus() ) {
  334 + sidList->take( i );
  335 + msg->getFailure()->add( sid );
  336 + sp_syslog( LOG_WARNING, "session(%d.%d) would exit, invalid TO", sid.mKey, sid.mSeq );
  337 + } else {
  338 + session->getOutList()->append( msg );
  339 + addEvent( session, EV_WRITE, -1 );
  340 + }
  341 + } else {
  342 + sidList->take( i );
  343 + msg->getFailure()->add( sid );
  344 + sp_syslog( LOG_WARNING, "session(%d.%d) invalid, unknown TO", sid.mKey, sid.mSeq );
  345 + }
  346 + }
  347 + } else {
  348 + for( ; sidList->getCount() > 0; ) {
  349 + msg->getFailure()->add( sidList->take( SP_ArrayList::LAST_INDEX ) );
  350 + }
  351 + }
  352 +
  353 + if( msg->getToList()->getCount() <= 0 ) {
  354 + SP_EventHelper::doCompletion( eventArg, msg );
  355 + }
  356 + }
  357 +
  358 + for( int i = 0; i < response->getToCloseList()->getCount(); i++ ) {
  359 + SP_Sid_t sid = response->getToCloseList()->get( i );
  360 + SP_Session * session = manager->get( sid.mKey, &seq );
  361 + if( seq == sid.mSeq && NULL != session ) {
  362 + session->setStatus( SP_Session::eExit );
  363 + addEvent( session, EV_WRITE, -1 );
  364 + } else {
  365 + sp_syslog( LOG_WARNING, "session(%d.%d) invalid, unknown CLOSE", sid.mKey, sid.mSeq );
  366 + }
  367 + }
  368 +
  369 + delete response;
  370 +}
  371 +
  372 +void SP_EventCallback :: addEvent( SP_Session * session, short events, int fd )
  373 +{
  374 + SP_EventArg * eventArg = (SP_EventArg*)session->getArg();
  375 +
  376 + if( ( events & EV_WRITE ) && 0 == session->getWriting() ) {
  377 + session->setWriting( 1 );
  378 +
  379 + if( fd < 0 ) fd = EVENT_FD( session->getWriteEvent() );
  380 +
  381 + event_set( session->getWriteEvent(), fd, events, onWrite, session );
  382 + event_base_set( eventArg->getEventBase(), session->getWriteEvent() );
  383 +
  384 + struct timeval timeout;
  385 + memset( &timeout, 0, sizeof( timeout ) );
  386 + timeout.tv_sec = eventArg->getTimeout();
  387 + event_add( session->getWriteEvent(), &timeout );
  388 + }
  389 +
  390 + if( events & EV_READ && 0 == session->getReading() ) {
  391 + session->setReading( 1 );
  392 +
  393 + if( fd < 0 ) fd = EVENT_FD( session->getWriteEvent() );
  394 +
  395 + event_set( session->getReadEvent(), fd, events, onRead, session );
  396 + event_base_set( eventArg->getEventBase(), session->getReadEvent() );
  397 +
  398 + struct timeval timeout;
  399 + memset( &timeout, 0, sizeof( timeout ) );
  400 + timeout.tv_sec = eventArg->getTimeout();
  401 + event_add( session->getReadEvent(), &timeout );
  402 + }
  403 +}
  404 +
  405 +//-------------------------------------------------------------------
  406 +
  407 +int SP_EventHelper :: isSystemSid( SP_Sid_t * sid )
  408 +{
  409 + return ( sid->mKey == SP_Sid_t::eTimerKey && sid->mSeq == SP_Sid_t::eTimerSeq )
  410 + || ( sid->mKey == SP_Sid_t::ePushKey && sid->mSeq == SP_Sid_t::ePushSeq );
  411 +}
  412 +
  413 +void SP_EventHelper :: doWork( SP_Session * session )
  414 +{
  415 + if( SP_Session::eNormal == session->getStatus() ) {
  416 + session->setRunning( 1 );
  417 + SP_EventArg * eventArg = (SP_EventArg*)session->getArg();
  418 + eventArg->getInputResultQueue()->push( new SP_SimpleTask( worker, session, 1 ) );
  419 + } else {
  420 + SP_Sid_t sid = session->getSid();
  421 +
  422 + char buffer[ 16 ] = { 0 };
  423 + session->getInBuffer()->take( buffer, sizeof( buffer ) );
  424 + sp_syslog( LOG_WARNING, "session(%d.%d) status is %d, ignore [%s...] (%dB)",
  425 + sid.mKey, sid.mSeq, session->getStatus(), buffer, session->getInBuffer()->getSize() );
  426 + session->getInBuffer()->reset();
  427 + }
  428 +}
  429 +
  430 +void SP_EventHelper :: worker( void * arg )
  431 +{
  432 + SP_Session * session = (SP_Session*)arg;
  433 + SP_Handler * handler = session->getHandler();
  434 + SP_EventArg * eventArg = (SP_EventArg *)session->getArg();
  435 +
  436 + SP_Response * response = new SP_Response( session->getSid() );
  437 + if( 0 != handler->handle( session->getRequest(), response ) ) {
  438 + session->setStatus( SP_Session::eWouldExit );
  439 + }
  440 +
  441 + session->setRunning( 0 );
  442 +
  443 + msgqueue_push( (struct event_msgqueue*)eventArg->getResponseQueue(), response );
  444 +}
  445 +
  446 +void SP_EventHelper :: doError( SP_Session * session )
  447 +{
  448 + SP_EventArg * eventArg = (SP_EventArg *)session->getArg();
  449 +
  450 + event_del( session->getWriteEvent() );
  451 + event_del( session->getReadEvent() );
  452 +
  453 + SP_Sid_t sid = session->getSid();
  454 +
  455 + SP_ArrayList * outList = session->getOutList();
  456 + for( ; outList->getCount() > 0; ) {
  457 + SP_Message * msg = ( SP_Message * ) outList->takeItem( SP_ArrayList::LAST_INDEX );
  458 +
  459 + int index = msg->getToList()->find( sid );
  460 + if( index >= 0 ) msg->getToList()->take( index );
  461 + msg->getFailure()->add( sid );
  462 +
  463 + if( msg->getToList()->getCount() <= 0 ) {
  464 + doCompletion( eventArg, msg );
  465 + }
  466 + }
  467 +
  468 + // remove session from SessionManager, onResponse will ignore this session
  469 + eventArg->getSessionManager()->remove( sid.mKey, sid.mSeq );
  470 +
  471 + eventArg->getInputResultQueue()->push( new SP_SimpleTask( error, session, 1 ) );
  472 +}
  473 +
  474 +void SP_EventHelper :: error( void * arg )
  475 +{
  476 + SP_Session * session = ( SP_Session * )arg;
  477 + SP_EventArg * eventArg = (SP_EventArg*)session->getArg();
  478 +
  479 + SP_Sid_t sid = session->getSid();
  480 +
  481 + SP_Response * response = new SP_Response( sid );
  482 + session->getHandler()->error( response );
  483 +
  484 + msgqueue_push( (struct event_msgqueue*)eventArg->getResponseQueue(), response );
  485 +
  486 + sp_syslog( LOG_WARNING, "session(%d.%d) error, r %d(%d), w %d(%d), i %d, o %d, s %d(%d)",
  487 + sid.mKey, sid.mSeq, session->getTotalRead(), session->getReading(),
  488 + session->getTotalWrite(), session->getWriting(),
  489 + session->getInBuffer()->getSize(), session->getOutList()->getCount(),
  490 + eventArg->getSessionManager()->getCount(), eventArg->getSessionManager()->getFreeCount() );
  491 +
  492 + // onResponse will ignore this session, so it's safe to destroy session here
  493 + session->getHandler()->close();
  494 + sp_close( EVENT_FD( session->getWriteEvent() ) );
  495 + delete session;
  496 +}
  497 +
  498 +void SP_EventHelper :: doTimeout( SP_Session * session )
  499 +{
  500 + SP_EventArg * eventArg = (SP_EventArg*)session->getArg();
  501 +
  502 + event_del( session->getWriteEvent() );
  503 + event_del( session->getReadEvent() );
  504 +
  505 + SP_Sid_t sid = session->getSid();
  506 +
  507 + SP_ArrayList * outList = session->getOutList();
  508 + for( ; outList->getCount() > 0; ) {
  509 + SP_Message * msg = ( SP_Message * ) outList->takeItem( SP_ArrayList::LAST_INDEX );
  510 +
  511 + int index = msg->getToList()->find( sid );
  512 + if( index >= 0 ) msg->getToList()->take( index );
  513 + msg->getFailure()->add( sid );
  514 +
  515 + if( msg->getToList()->getCount() <= 0 ) {
  516 + doCompletion( eventArg, msg );
  517 + }
  518 + }
  519 +
  520 + // remove session from SessionManager, onResponse will ignore this session
  521 + eventArg->getSessionManager()->remove( sid.mKey, sid.mSeq );
  522 +
  523 + eventArg->getInputResultQueue()->push( new SP_SimpleTask( timeout, session, 1 ) );
  524 +}
  525 +
  526 +void SP_EventHelper :: timeout( void * arg )
  527 +{
  528 + SP_Session * session = ( SP_Session * )arg;
  529 + SP_EventArg * eventArg = (SP_EventArg*)session->getArg();
  530 +
  531 + SP_Sid_t sid = session->getSid();
  532 +
  533 + SP_Response * response = new SP_Response( sid );
  534 + session->getHandler()->timeout( response );
  535 + msgqueue_push( (struct event_msgqueue*)eventArg->getResponseQueue(), response );
  536 +
  537 + sp_syslog( LOG_WARNING, "session(%d.%d) timeout, r %d(%d), w %d(%d), i %d, o %d, s %d(%d)",
  538 + sid.mKey, sid.mSeq, session->getTotalRead(), session->getReading(),
  539 + session->getTotalWrite(), session->getWriting(),
  540 + session->getInBuffer()->getSize(), session->getOutList()->getCount(),
  541 + eventArg->getSessionManager()->getCount(), eventArg->getSessionManager()->getFreeCount() );
  542 +
  543 + // onResponse will ignore this session, so it's safe to destroy session here
  544 + session->getHandler()->close();
  545 + sp_close( EVENT_FD( session->getWriteEvent() ) );
  546 + delete session;
  547 +}
  548 +
  549 +void SP_EventHelper :: doClose( SP_Session * session )
  550 +{
  551 + SP_EventArg * eventArg = (SP_EventArg*)session->getArg();
  552 +
  553 + event_del( session->getWriteEvent() );
  554 + event_del( session->getReadEvent() );
  555 +
  556 + SP_Sid_t sid = session->getSid();
  557 +
  558 + eventArg->getSessionManager()->remove( sid.mKey, sid.mSeq );
  559 +
  560 + eventArg->getInputResultQueue()->push( new SP_SimpleTask( myclose, session, 1 ) );
  561 +}
  562 +
  563 +void SP_EventHelper :: myclose( void * arg )
  564 +{
  565 + SP_Session * session = ( SP_Session * )arg;
  566 + SP_EventArg * eventArg = (SP_EventArg*)session->getArg();
  567 + SP_Sid_t sid = session->getSid();
  568 +
  569 + sp_syslog( LOG_DEBUG, "session(%d.%d) close, r %d(%d), w %d(%d), i %d, o %d, s %d(%d)",
  570 + sid.mKey, sid.mSeq, session->getTotalRead(), session->getReading(),
  571 + session->getTotalWrite(), session->getWriting(),
  572 + session->getInBuffer()->getSize(), session->getOutList()->getCount(),
  573 + eventArg->getSessionManager()->getCount(), eventArg->getSessionManager()->getFreeCount() );
  574 +
  575 + session->getHandler()->close();
  576 + sp_close( EVENT_FD( session->getWriteEvent() ) );
  577 + delete session;
  578 +}
  579 +
  580 +void SP_EventHelper :: doStart( SP_Session * session )
  581 +{
  582 + session->setRunning( 1 );
  583 + SP_EventArg * eventArg = (SP_EventArg*)session->getArg();
  584 + eventArg->getInputResultQueue()->push( new SP_SimpleTask( start, session, 1 ) );
  585 +}
  586 +
  587 +void SP_EventHelper :: start( void * arg )
  588 +{
  589 + SP_Session * session = ( SP_Session * )arg;
  590 + SP_EventArg * eventArg = (SP_EventArg*)session->getArg();
  591 +
  592 + SP_IOChannel * ioChannel = session->getIOChannel();
  593 +
  594 + int initRet = ioChannel->init( EVENT_FD( session->getWriteEvent() ) );
  595 +
  596 + // always call SP_Handler::start
  597 + SP_Response * response = new SP_Response( session->getSid() );
  598 + int startRet = session->getHandler()->start( session->getRequest(), response );
  599 +
  600 + int status = SP_Session::eWouldExit;
  601 +
  602 + if( 0 == initRet ) {
  603 + if( 0 == startRet ) status = SP_Session::eNormal;
  604 + } else {
  605 + delete response;
  606 + // make an empty response
  607 + response = new SP_Response( session->getSid() );
  608 + }
  609 +
  610 + session->setStatus( status );
  611 + session->setRunning( 0 );
  612 + msgqueue_push( (struct event_msgqueue*)eventArg->getResponseQueue(), response );
  613 +}
  614 +
  615 +void SP_EventHelper :: doCompletion( SP_EventArg * eventArg, SP_Message * msg )
  616 +{
  617 + eventArg->getOutputResultQueue()->push( msg );
  618 +}
  619 +
... ...
注册登录 后发表评论