提交 96c3fd6bac43d0b8eac6bd2a2c4c484610cb0d4f

作者 LJH 李佳桓
1 个父辈 9baf9036

add

正在显示 1 个修改的文件 包含 904 行增加0 行删除
  1 +/*
  2 + * Copyright 2008 Stephen Liu
  3 + * For license terms, see the file COPYING along with this library.
  4 + */
  5 +
  6 +#include <assert.h>
  7 +#include <time.h>
  8 +
  9 +#include "spwin32iocp.hpp"
  10 +
  11 +#include "spsession.hpp"
  12 +#include "spbuffer.hpp"
  13 +#include "spmsgdecoder.hpp"
  14 +#include "sprequest.hpp"
  15 +#include "sputils.hpp"
  16 +#include "sphandler.hpp"
  17 +#include "spexecutor.hpp"
  18 +#include "spioutils.hpp"
  19 +#include "spmsgblock.hpp"
  20 +#include "spwin32buffer.hpp"
  21 +#include "spiochannel.hpp"
  22 +
  23 +BOOL SP_IocpEventCallback :: addSession( SP_IocpEventArg * eventArg, HANDLE client, SP_Session * session )
  24 +{
  25 + BOOL ret = TRUE;
  26 +
  27 + SP_IocpSession_t * iocpSession = (SP_IocpSession_t*)malloc( sizeof( SP_IocpSession_t ) );
  28 + if( NULL == iocpSession ) {
  29 + sp_syslog( LOG_ERR, "malloc fail, errno %d", GetLastError() );
  30 + ret = FALSE;
  31 + }
  32 +
  33 + DWORD completionKey = 0;
  34 + SP_Sid_t sid = session->getSid();
  35 + assert( sizeof( completionKey ) == sizeof( SP_Sid_t ) );
  36 + memcpy( &completionKey, &sid, sizeof( completionKey ) );
  37 +
  38 + if( ret ) {
  39 + memset( iocpSession, 0, sizeof( SP_IocpSession_t ) );
  40 + iocpSession->mRecvEvent.mHeapIndex = -1;
  41 + iocpSession->mSendEvent.mHeapIndex = -1;
  42 + iocpSession->mRecvEvent.mType = SP_IocpEvent_t::eEventRecv;
  43 + iocpSession->mSendEvent.mType = SP_IocpEvent_t::eEventSend;
  44 +
  45 + iocpSession->mHandle = client;
  46 + iocpSession->mSession = session;
  47 + iocpSession->mEventArg = eventArg;
  48 + session->setArg( iocpSession );
  49 +
  50 + if( NULL == CreateIoCompletionPort( client, eventArg->getCompletionPort(), completionKey, 0 ) ) {
  51 + sp_syslog( LOG_ERR, "CreateIoCompletionPort fail, errno %d", WSAGetLastError() );
  52 + ret = FALSE;
  53 + }
  54 + }
  55 +
  56 + if( ! ret ) {
  57 + sp_close( (SOCKET)client );
  58 +
  59 + if( NULL != iocpSession ) free( iocpSession );
  60 + session->setArg( NULL );
  61 + }
  62 +
  63 + return ret;
  64 +
  65 +}
  66 +
  67 +BOOL SP_IocpEventCallback :: addRecv( SP_Session * session )
  68 +{
  69 + BOOL ret = TRUE;
  70 +
  71 + if( 0 == session->getReading() && SP_Session::eNormal == session->getStatus() ) {
  72 + SP_Sid_t sid = session->getSid();
  73 +
  74 + SP_IocpSession_t * iocpSession = (SP_IocpSession_t*)session->getArg();
  75 + SP_IocpEvent_t * recvEvent = &( iocpSession->mRecvEvent );
  76 +
  77 + const int SP_MAX_RETRY = 5;
  78 +
  79 + for( int retry = 0; retry < SP_MAX_RETRY; retry++ ) {
  80 + memset( &( recvEvent->mOverlapped ), 0, sizeof( OVERLAPPED ) );
  81 + recvEvent->mType = SP_IocpEvent_t::eEventRecv;
  82 + recvEvent->mWsaBuf.buf = NULL;
  83 + recvEvent->mWsaBuf.len = 0;
  84 +
  85 + DWORD recvBytes = 0, flags = 0;
  86 + if( SOCKET_ERROR == WSARecv( (SOCKET)iocpSession->mHandle, &(recvEvent->mWsaBuf), 1,
  87 + &recvBytes, &flags, &( recvEvent->mOverlapped ), NULL ) ) {
  88 + int lastError = WSAGetLastError();
  89 + if( ERROR_IO_PENDING != lastError ) {
  90 + sp_syslog( LOG_ERR, "session(%d.%d) WSARecv fail, errno %d, retry %d",
  91 + sid.mKey, sid.mSeq, lastError, retry );
  92 + }
  93 +
  94 + if( WSAENOBUFS == lastError && retry < SP_MAX_RETRY - 1 ) {
  95 + Sleep( 50 * retry );
  96 + continue;
  97 + } else {
  98 + if( ERROR_IO_PENDING != lastError ) ret = FALSE;
  99 + break;
  100 + }
  101 + } else {
  102 + break;
  103 + }
  104 + }
  105 +
  106 + if( ret ) {
  107 + iocpSession->mSession->setReading( 1 );
  108 +
  109 + SP_IocpEventArg * eventArg = iocpSession->mEventArg;
  110 +
  111 + if( eventArg->getTimeout() > 0 ) {
  112 + sp_gettimeofday( &( recvEvent->mTimeout ), NULL );
  113 + recvEvent->mTimeout.tv_sec += eventArg->getTimeout();
  114 + eventArg->getEventHeap()->push( recvEvent );
  115 + }
  116 + }
  117 + }
  118 +
  119 + return ret;
  120 +}
  121 +
  122 +void SP_IocpEventCallback :: onRecv( SP_IocpSession_t * iocpSession )
  123 +{
  124 + SP_IocpEvent_t * recvEvent = &( iocpSession->mRecvEvent );
  125 + SP_Session * session = iocpSession->mSession;
  126 + SP_IocpEventArg * eventArg = iocpSession->mEventArg;
  127 +
  128 + SP_Sid_t sid = session->getSid();
  129 +
  130 + eventArg->getEventHeap()->erase( recvEvent );
  131 +
  132 + session->setReading( 0 );
  133 +
  134 + int len = session->getIOChannel()->receive( session );
  135 +
  136 + if( len > 0 ) {
  137 + session->addRead( len );
  138 + if( 0 == session->getRunning() ) {
  139 + SP_IocpEventHelper::doDecodeForWork( session );
  140 + }
  141 + if( ! addRecv( session ) ) {
  142 + if( 0 == session->getRunning() ) {
  143 + SP_IocpEventHelper::doError( session );
  144 + } else {
  145 + sp_syslog( LOG_NOTICE, "session(%d.%d) busy, process session error later",
  146 + sid.mKey, sid.mSeq );
  147 + }
  148 + }
  149 + } else if( 0 == len ) {
  150 + if( 0 == session->getRunning() ) {
  151 + SP_IocpEventHelper::doClose( session );
  152 + } else {
  153 + sp_syslog( LOG_NOTICE, "session(%d.%d) busy, process session close later",
  154 + sid.mKey, sid.mSeq );
  155 + }
  156 + } else {
  157 + int ret = -1, lastError = WSAGetLastError();
  158 +
  159 + if( WSAEWOULDBLOCK == lastError && addRecv( session ) ) ret = 0;
  160 +
  161 + if( 0 != ret ) {
  162 + if( 0 == session->getRunning() ) {
  163 + sp_syslog( LOG_NOTICE, "session(%d.%d) read error, errno %d, status %d",
  164 + sid.mKey, sid.mSeq, lastError, session->getStatus() );
  165 + SP_IocpEventHelper::doError( session );
  166 + } else {
  167 + sp_syslog( LOG_NOTICE, "session(%d.%d) busy, process session error later",
  168 + sid.mKey, sid.mSeq );
  169 + }
  170 + }
  171 + }
  172 +}
  173 +
  174 +BOOL SP_IocpEventCallback :: addSend( SP_Session * session )
  175 +{
  176 + BOOL ret = TRUE;
  177 +
  178 + SP_IocpSession_t * iocpSession = (SP_IocpSession_t*)session->getArg();
  179 + SP_IocpEventArg * eventArg = iocpSession->mEventArg;
  180 + SP_IocpEvent_t * sendEvent = &( iocpSession->mSendEvent );
  181 + SP_Sid_t sid = session->getSid();
  182 +
  183 + if( 0 == session->getRunning() ) {
  184 + SP_IocpEventHelper::doDecodeForWork( session );
  185 + }
  186 +
  187 + if( 0 == session->getWriting() ) {
  188 +
  189 + const int SP_MAX_RETRY = 5;
  190 +
  191 + for( int retry = 0; retry < SP_MAX_RETRY; retry++ ) {
  192 + memset( &( sendEvent->mOverlapped ), 0, sizeof( OVERLAPPED ) );
  193 + sendEvent->mType = SP_IocpEvent_t::eEventSend;
  194 + sendEvent->mWsaBuf.buf = NULL;
  195 + sendEvent->mWsaBuf.len = 0;
  196 +
  197 + DWORD sendBytes = 0;
  198 +
  199 + if( SOCKET_ERROR == WSASend( (SOCKET)iocpSession->mHandle, &( sendEvent->mWsaBuf ), 1,
  200 + &sendBytes, 0, &( sendEvent->mOverlapped ), NULL ) ) {
  201 + int lastError = WSAGetLastError();
  202 + if( ERROR_IO_PENDING != lastError ) {
  203 + sp_syslog( LOG_ERR, "session(%d.%d) WSASend fail, errno %d, retry %d",
  204 + sid.mKey, sid.mSeq, lastError, retry );
  205 + }
  206 +
  207 + if( WSAENOBUFS == lastError && retry < SP_MAX_RETRY - 1 ) {
  208 + Sleep( 50 * retry );
  209 + continue;
  210 + } else {
  211 + if( ERROR_IO_PENDING != lastError ) ret = FALSE;
  212 + break;
  213 + }
  214 + } else {
  215 + break;
  216 + }
  217 + }
  218 +
  219 + if( ret ) {
  220 + if( eventArg->getTimeout() > 0 ) {
  221 + sp_gettimeofday( &( sendEvent->mTimeout ), NULL );
  222 + sendEvent->mTimeout.tv_sec += eventArg->getTimeout();
  223 + eventArg->getEventHeap()->push( sendEvent );
  224 + }
  225 +
  226 + session->setWriting( 1 );
  227 + }
  228 + }
  229 +
  230 + return ret;
  231 +}
  232 +
  233 +void SP_IocpEventCallback :: onSend( SP_IocpSession_t * iocpSession )
  234 +{
  235 + SP_IocpEvent_t * recvEvent = &( iocpSession->mRecvEvent );
  236 + SP_Session * session = iocpSession->mSession;
  237 + SP_IocpEventArg * eventArg = iocpSession->mEventArg;
  238 +
  239 + session->setWriting( 0 );
  240 +
  241 + SP_Sid_t sid = session->getSid();
  242 +
  243 + int ret = 0;
  244 +
  245 + if( session->getOutList()->getCount() > 0 ) {
  246 + int len = session->getIOChannel()->transmit( session );
  247 + if( len > 0 ) {
  248 + session->addWrite( len );
  249 + if( session->getOutList()->getCount() > 0 ) {
  250 + if( ! addSend( session ) ) {
  251 + if( 0 == session->getRunning() ) {
  252 + ret = -1;
  253 + SP_IocpEventHelper::doError( session );
  254 + }
  255 + }
  256 + }
  257 + } else {
  258 + ret = -1;
  259 +
  260 + int lastError = WSAGetLastError();
  261 +
  262 + if( WSAENOBUFS == lastError && addSend( session ) ) ret = 0;
  263 +
  264 + if( WSAEWOULDBLOCK == lastError && addSend( session ) ) ret = 0;
  265 +
  266 + if( 0 != ret ) {
  267 + if( 0 == session->getRunning() ) {
  268 + sp_syslog( LOG_NOTICE, "session(%d.%d) write error, errno %d, status %d, count %d",
  269 + sid.mKey, sid.mSeq, lastError, session->getStatus(), session->getOutList()->getCount() );
  270 + SP_IocpEventHelper::doError( session );
  271 + } else {
  272 + sp_syslog( LOG_NOTICE, "session(%d.%d) busy, process session error later, errno [%d]",
  273 + sid.mKey, sid.mSeq, errno );
  274 + }
  275 + }
  276 + }
  277 + }
  278 +
  279 + if( 0 == ret && session->getOutList()->getCount() <= 0 ) {
  280 + if( SP_Session::eExit == session->getStatus() ) {
  281 + ret = -1;
  282 + if( 0 == session->getRunning() ) {
  283 + //sp_syslog( LOG_NOTICE, "session(%d.%d) normal exit", sid.mKey, sid.mSeq );
  284 + SP_IocpEventHelper::doClose( session );
  285 + } else {
  286 + sp_syslog( LOG_NOTICE, "session(%d.%d) busy, terminate session later",
  287 + sid.mKey, sid.mSeq );
  288 + }
  289 + }
  290 + }
  291 +
  292 + if( 0 == ret && 0 == session->getRunning() ) {
  293 + SP_IocpEventHelper::doDecodeForWork( session );
  294 + }
  295 +}
  296 +
  297 +BOOL SP_IocpEventCallback :: onAccept( SP_IocpAcceptArg_t * acceptArg )
  298 +{
  299 + SP_IocpEventArg * eventArg = acceptArg->mEventArg;
  300 +
  301 + SP_Sid_t sid;
  302 + sid.mKey = eventArg->getSessionManager()->allocKey( &sid.mSeq );
  303 + assert( sid.mKey > 0 );
  304 +
  305 + SP_Session * session = new SP_Session( sid );
  306 +
  307 + int localLen = 0, remoteLen = 0;
  308 + struct sockaddr_in * localAddr = NULL, * remoteAddr = NULL;
  309 +
  310 + GetAcceptExSockaddrs( acceptArg->mBuffer, 0,
  311 + sizeof( sockaddr_in ) + 16, sizeof( sockaddr_in ) + 16,
  312 + (SOCKADDR**)&localAddr, &localLen, (SOCKADDR**)&remoteAddr, &remoteLen );
  313 +
  314 + struct sockaddr_in clientAddr;
  315 + memcpy( &clientAddr, remoteAddr, sizeof( clientAddr ) );
  316 +
  317 + char clientIP[ 32 ] = { 0 };
  318 + SP_IOUtils::inetNtoa( &( clientAddr.sin_addr ), clientIP, sizeof( clientIP ) );
  319 + session->getRequest()->setClientIP( clientIP );
  320 + session->getRequest()->setClientPort( ntohs( clientAddr.sin_port ) );
  321 +
  322 + session->setHandler( acceptArg->mHandlerFactory->create() );
  323 + session->setIOChannel( acceptArg->mIOChannelFactory->create() );
  324 +
  325 + if( addSession( eventArg, acceptArg->mClientSocket, session ) ) {
  326 + eventArg->getSessionManager()->put( sid.mKey, sid.mSeq, session );
  327 +
  328 + if( eventArg->getSessionManager()->getCount() > acceptArg->mMaxConnections
  329 + || eventArg->getInputResultQueue()->getLength() >= acceptArg->mReqQueueSize ) {
  330 +
  331 + sp_syslog( LOG_WARNING, "System busy, session.count %d [%d], queue.length %d [%d]",
  332 + eventArg->getSessionManager()->getCount(), acceptArg->mMaxConnections,
  333 + eventArg->getInputResultQueue()->getLength(), acceptArg->mReqQueueSize );
  334 +
  335 + SP_Message * msg = new SP_Message();
  336 + msg->getMsg()->append( acceptArg->mRefusedMsg );
  337 + msg->getMsg()->append( "\r\n" );
  338 + session->getOutList()->append( msg );
  339 + session->setStatus( SP_Session::eExit );
  340 +
  341 + addSend( session );
  342 + } else {
  343 + SP_IocpEventHelper::doStart( session );
  344 + }
  345 + } else {
  346 + eventArg->getSessionManager()->remove( sid.mKey, sid.mSeq );
  347 + delete session;
  348 + }
  349 +
  350 + // signal SP_IocpServer::acceptThread to post another AcceptEx
  351 + SetEvent( acceptArg->mAcceptEvent );
  352 +
  353 + return TRUE;
  354 +}
  355 +
  356 +void SP_IocpEventCallback :: onResponse( void * queueData, void * arg )
  357 +{
  358 + SP_Response * response = (SP_Response*)queueData;
  359 + SP_IocpEventArg * eventArg = (SP_IocpEventArg*)arg;
  360 + SP_SessionManager * manager = eventArg->getSessionManager();
  361 +
  362 + SP_Sid_t fromSid = response->getFromSid();
  363 + uint16_t seq = 0;
  364 +
  365 + if( ! SP_IocpEventHelper::isSystemSid( &fromSid ) ) {
  366 + SP_Session * session = manager->get( fromSid.mKey, &seq );
  367 + if( seq == fromSid.mSeq && NULL != session ) {
  368 + if( SP_Session::eWouldExit == session->getStatus() ) {
  369 + session->setStatus( SP_Session::eExit );
  370 + }
  371 +
  372 + if( SP_Session::eNormal == session->getStatus() ) {
  373 + if( addRecv( session ) ) {
  374 + if( 0 == session->getRunning() ) {
  375 + SP_IocpEventHelper::doDecodeForWork( session );
  376 + }
  377 + } else {
  378 + if( 0 == session->getRunning() ) {
  379 + SP_IocpEventHelper::doError( session );
  380 + }
  381 + }
  382 + }
  383 + } else {
  384 + sp_syslog( LOG_WARNING, "session(%d.%d) invalid, unknown FROM",
  385 + fromSid.mKey, fromSid.mSeq );
  386 + }
  387 + }
  388 +
  389 + for( SP_Message * msg = response->takeMessage();
  390 + NULL != msg; msg = response->takeMessage() ) {
  391 +
  392 + SP_SidList * sidList = msg->getToList();
  393 +
  394 + if( msg->getTotalSize() > 0 ) {
  395 + for( int i = sidList->getCount() - 1; i >= 0; i-- ) {
  396 + SP_Sid_t sid = sidList->get( i );
  397 + SP_Session * session = manager->get( sid.mKey, &seq );
  398 + if( seq == sid.mSeq && NULL != session ) {
  399 + if( 0 != memcmp( &fromSid, &sid, sizeof( sid ) )
  400 + && SP_Session::eExit == session->getStatus() ) {
  401 + sidList->take( i );
  402 + msg->getFailure()->add( sid );
  403 + sp_syslog( LOG_WARNING, "session(%d.%d) would exit, invalid TO", sid.mKey, sid.mSeq );
  404 + } else {
  405 + if( addSend( session ) ) {
  406 + session->getOutList()->append( msg );
  407 + } else {
  408 + if( 0 == session->getRunning() ) {
  409 + SP_IocpEventHelper::doError( session );
  410 + }
  411 + }
  412 + }
  413 + } else {
  414 + sidList->take( i );
  415 + msg->getFailure()->add( sid );
  416 + sp_syslog( LOG_WARNING, "session(%d.%d) invalid, unknown TO", sid.mKey, sid.mSeq );
  417 + }
  418 + }
  419 + } else {
  420 + for( ; sidList->getCount() > 0; ) {
  421 + msg->getFailure()->add( sidList->take( SP_ArrayList::LAST_INDEX ) );
  422 + }
  423 + }
  424 +
  425 + if( msg->getToList()->getCount() <= 0 ) {
  426 + SP_IocpEventHelper::doCompletion( eventArg, msg );
  427 + }
  428 + }
  429 +
  430 + if( ! SP_IocpEventHelper::isSystemSid( &fromSid ) ) {
  431 + SP_Session * session = manager->get( fromSid.mKey, &seq );
  432 + if( seq == fromSid.mSeq && NULL != session ) {
  433 + if( session->getOutList()->getCount() <= 0 && SP_Session::eExit == session->getStatus() ) {
  434 + if( 0 == session->getRunning() ) {
  435 + SP_IocpEventHelper::doClose( session );
  436 + } else {
  437 + sp_syslog( LOG_NOTICE, "session(%d.%d) busy, terminate session later",
  438 + fromSid.mKey, fromSid.mSeq );
  439 + }
  440 + }
  441 + }
  442 + }
  443 +
  444 + for( int i = 0; i < response->getToCloseList()->getCount(); i++ ) {
  445 + SP_Sid_t sid = response->getToCloseList()->get( i );
  446 + SP_Session * session = manager->get( sid.mKey, &seq );
  447 + if( seq == sid.mSeq && NULL != session ) {
  448 + session->setStatus( SP_Session::eExit );
  449 + if( !addSend( session ) ) {
  450 + if( 0 == session->getRunning() ) {
  451 + SP_IocpEventHelper::doError( session );
  452 + }
  453 + }
  454 + } else {
  455 + sp_syslog( LOG_WARNING, "session(%d.%d) invalid, unknown CLOSE", sid.mKey, sid.mSeq );
  456 + }
  457 + }
  458 +
  459 + delete response;
  460 +}
  461 +
  462 +void SP_IocpEventCallback :: onTimeout( SP_IocpEventArg * eventArg )
  463 +{
  464 + SP_IocpEventHeap * eventHeap = eventArg->getEventHeap();
  465 +
  466 + if( NULL == eventHeap->top() ) return;
  467 +
  468 + struct timeval curr;
  469 + sp_gettimeofday( &curr, NULL );
  470 +
  471 + for( ; NULL != eventHeap->top(); ) {
  472 + SP_IocpEvent_t * event = eventHeap->top();
  473 + struct timeval * first = &( event->mTimeout );
  474 +
  475 + if( ( curr.tv_sec == first->tv_sec && curr.tv_usec >= first->tv_usec )
  476 + ||( curr.tv_sec > first->tv_sec ) ) {
  477 + event = eventHeap->pop();
  478 +
  479 + if( SP_IocpEvent_t::eEventTimer == event->mType ) {
  480 + event->mOnTimer( event );
  481 + } else {
  482 + SP_IocpSession_t * iocpSession = NULL;
  483 +
  484 + if( SP_IocpEvent_t::eEventRecv == event->mType ) {
  485 + iocpSession = CONTAINING_RECORD( event, SP_IocpSession_t, mRecvEvent );
  486 + } else if( SP_IocpEvent_t::eEventSend == event->mType ) {
  487 + iocpSession = CONTAINING_RECORD( event, SP_IocpSession_t, mSendEvent );
  488 + }
  489 +
  490 + assert( NULL != iocpSession );
  491 +
  492 + if( 0 == iocpSession->mSession->getRunning() ) {
  493 + SP_IocpEventHelper::doTimeout( iocpSession->mSession );
  494 + }
  495 + }
  496 + } else {
  497 + break;
  498 + }
  499 + }
  500 +}
  501 +
  502 +BOOL SP_IocpEventCallback :: eventLoop( SP_IocpEventArg * eventArg, SP_IocpAcceptArg_t * acceptArg )
  503 +{
  504 + DWORD bytesTransferred = 0;
  505 + DWORD completionKey = 0;
  506 + OVERLAPPED * overlapped = NULL;
  507 + HANDLE completionPort = eventArg->getCompletionPort();
  508 + DWORD timeout = SP_IocpEventHelper::timeoutNext( eventArg->getEventHeap() );
  509 +
  510 + BOOL isSuccess = GetQueuedCompletionStatus( completionPort, &bytesTransferred,
  511 + &completionKey, &overlapped, timeout );
  512 + DWORD lastError = WSAGetLastError();
  513 +
  514 + SP_Sid_t sid;
  515 + memcpy( &sid, &completionKey, sizeof( completionKey ) );
  516 +
  517 + SP_IocpSession_t * iocpSession = NULL;
  518 + if( completionKey > 0 ) {
  519 + uint16_t seq = 0;
  520 + SP_Session * session = eventArg->getSessionManager()->get( sid.mKey, &seq );
  521 + if( NULL != session && sid.mSeq == seq ) {
  522 + iocpSession = (SP_IocpSession_t*)session->getArg();
  523 + }
  524 + }
  525 +
  526 + if( ! isSuccess ) {
  527 + if( eKeyAccept == completionKey ) {
  528 + sp_syslog( LOG_ERR, "accept(%d) fail, errno %d", acceptArg->mClientSocket, lastError );
  529 + sp_close( (SOCKET)acceptArg->mClientSocket );
  530 + // signal SP_IocpServer::acceptThread to post another AcceptEx
  531 + SetEvent( acceptArg->mAcceptEvent );
  532 + return TRUE;
  533 + }
  534 +
  535 + if( NULL != overlapped ) {
  536 + // process a failed completed I/O request
  537 + // lastError continas the reason for failure
  538 +
  539 + if( NULL != iocpSession ) {
  540 + if( 0 == iocpSession->mSession->getRunning() ) {
  541 + SP_IocpEventHelper::doClose( iocpSession->mSession );
  542 + }
  543 + }
  544 +
  545 + if( ERROR_NETNAME_DELETED == lastError // client abort
  546 + || ERROR_OPERATION_ABORTED == lastError ) {
  547 + return TRUE;
  548 + } else {
  549 + char errmsg[ 512 ] = { 0 };
  550 + spwin32_strerror( lastError, errmsg, sizeof( errmsg ) );
  551 + sp_syslog( LOG_ERR, "GetQueuedCompletionStatus fail, errno %d, %s",
  552 + lastError, errmsg );
  553 + return FALSE;
  554 + }
  555 + } else {
  556 + if( lastError == WAIT_TIMEOUT ) {
  557 + // time-out while waiting for completed I/O request
  558 + onTimeout( eventArg );
  559 + } else {
  560 + // bad call to GQCS, lastError contains the reason for the bad call
  561 + }
  562 + }
  563 +
  564 + return FALSE;
  565 + }
  566 +
  567 + if( eKeyAccept == completionKey ) {
  568 + return onAccept( acceptArg );
  569 + } else if( eKeyMsgQueue == completionKey ) {
  570 + SP_IocpMsgQueue * msgQueue = (SP_IocpMsgQueue*)overlapped;
  571 + msgQueue->process();
  572 + return TRUE;
  573 + } else if( eKeyFree == completionKey ) {
  574 + assert( NULL == iocpSession );
  575 + iocpSession = CONTAINING_RECORD( overlapped, SP_IocpSession_t, mFreeEvent );
  576 + delete iocpSession->mSession;
  577 + free( iocpSession );
  578 + return TRUE;
  579 + } else {
  580 + if( NULL == iocpSession ) return TRUE;
  581 +
  582 + SP_IocpEvent_t * iocpEvent =
  583 + CONTAINING_RECORD( overlapped, SP_IocpEvent_t, mOverlapped );
  584 +
  585 + eventArg->getEventHeap()->erase( iocpEvent );
  586 +
  587 + if( SP_IocpEvent_t::eEventRecv == iocpEvent->mType ) {
  588 + onRecv( iocpSession );
  589 + return TRUE;
  590 + }
  591 +
  592 + if( SP_IocpEvent_t::eEventSend == iocpEvent->mType ) {
  593 + onSend( iocpSession );
  594 + return TRUE;
  595 + }
  596 + }
  597 +
  598 + return TRUE;
  599 +}
  600 +
  601 +//===================================================================
  602 +
  603 +int SP_IocpEventHelper :: isSystemSid( SP_Sid_t * sid )
  604 +{
  605 + return sid->mKey == SP_Sid_t::eTimerKey && sid->mSeq == SP_Sid_t::eTimerSeq;
  606 +}
  607 +
  608 +DWORD SP_IocpEventHelper :: timeoutNext( SP_IocpEventHeap * eventHeap )
  609 +{
  610 + SP_IocpEvent_t * event = eventHeap->top();
  611 +
  612 + if( NULL == event ) return INFINITE;
  613 +
  614 + struct timeval curr;
  615 + sp_gettimeofday( &curr, NULL );
  616 +
  617 + struct timeval * first = &( event->mTimeout );
  618 +
  619 + DWORD ret = ( first->tv_sec - curr.tv_sec ) * 1000
  620 + + ( first->tv_usec - curr.tv_usec ) / 1000;
  621 +
  622 + if( ret < 0 ) ret = 0;
  623 +
  624 + return ret;
  625 +}
  626 +
  627 +void SP_IocpEventHelper :: doDecodeForWork( SP_Session * session )
  628 +{
  629 + SP_MsgDecoder * decoder = session->getRequest()->getMsgDecoder();
  630 + int ret = decoder->decode( session->getInBuffer() );
  631 + if( SP_MsgDecoder::eOK == ret ) {
  632 + doWork( session );
  633 + } else if( SP_MsgDecoder::eMoreData != ret ) {
  634 + doError( session );
  635 + } else {
  636 + assert( ret == SP_MsgDecoder::eMoreData );
  637 + }
  638 +}
  639 +
  640 +void SP_IocpEventHelper :: doWork( SP_Session * session )
  641 +{
  642 + if( SP_Session::eNormal == session->getStatus() ) {
  643 + session->setRunning( 1 );
  644 + SP_IocpSession_t * iocpSession = (SP_IocpSession_t*)session->getArg();
  645 + SP_IocpEventArg * eventArg = iocpSession->mEventArg;
  646 + eventArg->getInputResultQueue()->push( new SP_SimpleTask( worker, session, 1 ) );
  647 + } else {
  648 + SP_Sid_t sid = session->getSid();
  649 +
  650 + char buffer[ 16 ] = { 0 };
  651 + session->getInBuffer()->take( buffer, sizeof( buffer ) );
  652 + sp_syslog( LOG_WARNING, "session(%d.%d) status is %d, ignore [%s...] (%dB)",
  653 + sid.mKey, sid.mSeq, session->getStatus(), buffer, session->getInBuffer()->getSize() );
  654 + session->getInBuffer()->reset();
  655 + }
  656 +}
  657 +
  658 +void SP_IocpEventHelper :: worker( void * arg )
  659 +{
  660 + SP_Session * session = (SP_Session*)arg;
  661 + SP_Handler * handler = session->getHandler();
  662 + SP_IocpSession_t * iocpSession = (SP_IocpSession_t*)session->getArg();
  663 + SP_IocpEventArg * eventArg = iocpSession->mEventArg;
  664 +
  665 + SP_Response * response = new SP_Response( session->getSid() );
  666 + if( 0 != handler->handle( session->getRequest(), response ) ) {
  667 + session->setStatus( SP_Session::eWouldExit );
  668 + }
  669 +
  670 + session->setRunning( 0 );
  671 +
  672 + eventArg->getResponseQueue()->push( response );
  673 +}
  674 +
  675 +void SP_IocpEventHelper :: doClose( SP_Session * session )
  676 +{
  677 + SP_IocpSession_t * iocpSession = (SP_IocpSession_t*)session->getArg();
  678 + SP_IocpEventArg * eventArg = iocpSession->mEventArg;
  679 +
  680 + SP_Sid_t sid = session->getSid();
  681 +
  682 + session->setRunning( 1 );
  683 +
  684 + // remove session from SessionManager, the other threads will ignore this session
  685 + eventArg->getSessionManager()->remove( sid.mKey, sid.mSeq );
  686 +
  687 + eventArg->getEventHeap()->erase( &( iocpSession->mRecvEvent ) );
  688 + eventArg->getEventHeap()->erase( &( iocpSession->mSendEvent ) );
  689 +
  690 + eventArg->getInputResultQueue()->push( new SP_SimpleTask( close, session, 1 ) );
  691 +}
  692 +
  693 +void SP_IocpEventHelper :: close( void * arg )
  694 +{
  695 + SP_Session * session = (SP_Session*)arg;
  696 + SP_IocpSession_t * iocpSession = (SP_IocpSession_t*)session->getArg();
  697 + SP_IocpEventArg * eventArg = iocpSession->mEventArg;
  698 +
  699 + SP_Sid_t sid = session->getSid();
  700 +
  701 + //sp_syslog( LOG_NOTICE, "session(%d.%d) close, disconnect", sid.mKey, sid.mSeq );
  702 +
  703 + session->getHandler()->close();
  704 +
  705 + sp_syslog( LOG_NOTICE, "session(%d.%d) close, r %d(%d), w %d(%d), i %d, o %d, s %d(%d), t %d",
  706 + sid.mKey, sid.mSeq, session->getTotalRead(), session->getReading(),
  707 + session->getTotalWrite(), session->getWriting(),
  708 + session->getInBuffer()->getSize(), session->getOutList()->getCount(),
  709 + eventArg->getSessionManager()->getCount(), eventArg->getSessionManager()->getFreeCount(),
  710 + eventArg->getEventHeap()->getCount() );
  711 +
  712 + if( ! eventArg->disconnectEx( (SOCKET)iocpSession->mHandle, NULL, 0, 0 ) ) {
  713 + if( ERROR_IO_PENDING != WSAGetLastError () ) {
  714 + sp_syslog( LOG_ERR, "DisconnectEx(%d) fail, errno %d", sid.mKey, WSAGetLastError() );
  715 + }
  716 + }
  717 +
  718 + if( 0 != sp_close( (SOCKET)iocpSession->mHandle ) ) {
  719 + sp_syslog( LOG_ERR, "close(%d) fail, errno %d", sid.mKey, WSAGetLastError() );
  720 + }
  721 +
  722 + memset( &( iocpSession->mFreeEvent ), 0, sizeof( OVERLAPPED ) );
  723 + PostQueuedCompletionStatus( eventArg->getCompletionPort(), 0,
  724 + SP_IocpEventCallback::eKeyFree, &( iocpSession->mFreeEvent ) );
  725 +}
  726 +
  727 +void SP_IocpEventHelper :: doError( SP_Session * session )
  728 +{
  729 + SP_IocpSession_t * iocpSession = (SP_IocpSession_t*)session->getArg();
  730 + SP_IocpEventArg * eventArg = iocpSession->mEventArg;
  731 + SP_Sid_t sid = session->getSid();
  732 +
  733 + sp_syslog( LOG_WARNING, "session(%d.%d) error, r %d(%d), w %d(%d), i %d, o %d, s %d(%d), t %d",
  734 + sid.mKey, sid.mSeq, session->getTotalRead(), session->getReading(),
  735 + session->getTotalWrite(), session->getWriting(),
  736 + session->getInBuffer()->getSize(), session->getOutList()->getCount(),
  737 + eventArg->getSessionManager()->getCount(), eventArg->getSessionManager()->getFreeCount(),
  738 + eventArg->getEventHeap()->getCount() );
  739 +
  740 + session->setRunning( 1 );
  741 +
  742 + SP_ArrayList * outList = session->getOutList();
  743 + for( ; outList->getCount() > 0; ) {
  744 + SP_Message * msg = ( SP_Message * ) outList->takeItem( SP_ArrayList::LAST_INDEX );
  745 +
  746 + int index = msg->getToList()->find( sid );
  747 + if( index >= 0 ) msg->getToList()->take( index );
  748 + msg->getFailure()->add( sid );
  749 +
  750 + if( msg->getToList()->getCount() <= 0 ) {
  751 + doCompletion( eventArg, msg );
  752 + }
  753 + }
  754 +
  755 + // remove session from SessionManager, so the other threads will ignore this session
  756 + eventArg->getSessionManager()->remove( sid.mKey, sid.mSeq );
  757 +
  758 + eventArg->getEventHeap()->erase( &( iocpSession->mRecvEvent ) );
  759 + eventArg->getEventHeap()->erase( &( iocpSession->mSendEvent ) );
  760 +
  761 + eventArg->getInputResultQueue()->push( new SP_SimpleTask( error, session, 1 ) );
  762 +}
  763 +
  764 +void SP_IocpEventHelper :: error( void * arg )
  765 +{
  766 + SP_Session * session = ( SP_Session * )arg;
  767 + SP_IocpSession_t * iocpSession = (SP_IocpSession_t*)session->getArg();
  768 + SP_IocpEventArg * eventArg = iocpSession->mEventArg;
  769 +
  770 + SP_Sid_t sid = session->getSid();
  771 +
  772 + SP_Response * response = new SP_Response( sid );
  773 + session->getHandler()->error( response );
  774 +
  775 + eventArg->getResponseQueue()->push( response );
  776 +
  777 + // the other threads will ignore this session, so it's safe to destroy session here
  778 + session->getHandler()->close();
  779 +
  780 + if( ! eventArg->disconnectEx( (SOCKET)iocpSession->mHandle, NULL, 0, 0 ) ) {
  781 + if( ERROR_IO_PENDING != WSAGetLastError () ) {
  782 + sp_syslog( LOG_ERR, "DisconnectEx(%d) fail, errno %d", sid.mKey, WSAGetLastError() );
  783 + }
  784 + }
  785 +
  786 + if( 0 != sp_close( (SOCKET)iocpSession->mHandle ) ) {
  787 + sp_syslog( LOG_ERR, "close(%d) fail, errno %d", sid.mKey, WSAGetLastError() );
  788 + }
  789 +
  790 + memset( &( iocpSession->mFreeEvent ), 0, sizeof( OVERLAPPED ) );
  791 + PostQueuedCompletionStatus( eventArg->getCompletionPort(), 0,
  792 + SP_IocpEventCallback::eKeyFree, &( iocpSession->mFreeEvent ) );
  793 +}
  794 +
  795 +void SP_IocpEventHelper :: doTimeout( SP_Session * session )
  796 +{
  797 + SP_IocpSession_t * iocpSession = (SP_IocpSession_t*)session->getArg();
  798 + SP_IocpEventArg * eventArg = iocpSession->mEventArg;
  799 + SP_Sid_t sid = session->getSid();
  800 +
  801 + sp_syslog( LOG_WARNING, "session(%d.%d) timeout, r %d(%d), w %d(%d), i %d, o %d, s %d(%d), t %d",
  802 + sid.mKey, sid.mSeq, session->getTotalRead(), session->getReading(),
  803 + session->getTotalWrite(), session->getWriting(),
  804 + session->getInBuffer()->getSize(), session->getOutList()->getCount(),
  805 + eventArg->getSessionManager()->getCount(), eventArg->getSessionManager()->getFreeCount(),
  806 + eventArg->getEventHeap()->getCount() );
  807 +
  808 + session->setRunning( 1 );
  809 +
  810 + SP_ArrayList * outList = session->getOutList();
  811 + for( ; outList->getCount() > 0; ) {
  812 + SP_Message * msg = ( SP_Message * ) outList->takeItem( SP_ArrayList::LAST_INDEX );
  813 +
  814 + int index = msg->getToList()->find( sid );
  815 + if( index >= 0 ) msg->getToList()->take( index );
  816 + msg->getFailure()->add( sid );
  817 +
  818 + if( msg->getToList()->getCount() <= 0 ) {
  819 + doCompletion( eventArg, msg );
  820 + }
  821 + }
  822 +
  823 + // remove session from SessionManager, the other threads will ignore this session
  824 + eventArg->getSessionManager()->remove( sid.mKey, sid.mSeq );
  825 +
  826 + eventArg->getEventHeap()->erase( &( iocpSession->mRecvEvent ) );
  827 + eventArg->getEventHeap()->erase( &( iocpSession->mSendEvent ) );
  828 +
  829 + eventArg->getInputResultQueue()->push( new SP_SimpleTask( timeout, session, 1 ) );
  830 +}
  831 +
  832 +void SP_IocpEventHelper :: timeout( void * arg )
  833 +{
  834 + SP_Session * session = ( SP_Session * )arg;
  835 + SP_IocpSession_t * iocpSession = (SP_IocpSession_t*)session->getArg();
  836 + SP_IocpEventArg * eventArg = iocpSession->mEventArg;
  837 +
  838 + SP_Sid_t sid = session->getSid();
  839 +
  840 + SP_Response * response = new SP_Response( sid );
  841 + session->getHandler()->timeout( response );
  842 +
  843 + eventArg->getResponseQueue()->push( response );
  844 +
  845 + // the other threads will ignore this session, so it's safe to destroy session here
  846 + session->getHandler()->close();
  847 +
  848 + if( ! eventArg->disconnectEx( (SOCKET)iocpSession->mHandle, NULL, 0, 0 ) ) {
  849 + if( ERROR_IO_PENDING != WSAGetLastError () ) {
  850 + sp_syslog( LOG_ERR, "DisconnectEx(%d) fail, errno %d", sid.mKey, WSAGetLastError() );
  851 + }
  852 + }
  853 +
  854 + if( 0 != sp_close( (SOCKET)iocpSession->mHandle ) ) {
  855 + sp_syslog( LOG_ERR, "close(%d) fail, errno %d", sid.mKey, WSAGetLastError() );
  856 + }
  857 +
  858 + memset( &( iocpSession->mFreeEvent ), 0, sizeof( OVERLAPPED ) );
  859 + PostQueuedCompletionStatus( eventArg->getCompletionPort(), 0,
  860 + SP_IocpEventCallback::eKeyFree, &( iocpSession->mFreeEvent ) );
  861 +}
  862 +
  863 +void SP_IocpEventHelper :: doStart( SP_Session * session )
  864 +{
  865 + session->setRunning( 1 );
  866 +
  867 + SP_IocpSession_t * iocpSession = (SP_IocpSession_t*)session->getArg();
  868 + SP_IocpEventArg * eventArg = iocpSession->mEventArg;
  869 + eventArg->getInputResultQueue()->push( new SP_SimpleTask( start, session, 1 ) );
  870 +}
  871 +
  872 +void SP_IocpEventHelper :: start( void * arg )
  873 +{
  874 + SP_Session * session = ( SP_Session * )arg;
  875 + SP_IocpSession_t * iocpSession = (SP_IocpSession_t*)session->getArg();
  876 + SP_IocpEventArg * eventArg = iocpSession->mEventArg;
  877 +
  878 + SP_IOChannel * ioChannel = session->getIOChannel();
  879 +
  880 + int initRet = ioChannel->init( (int)iocpSession->mHandle );
  881 +
  882 + SP_Response * response = new SP_Response( session->getSid() );
  883 + int startRet = session->getHandler()->start( session->getRequest(), response );
  884 +
  885 + int status = SP_Session::eWouldExit;
  886 +
  887 + if( 0 == initRet ) {
  888 + if( 0 == startRet ) status = SP_Session::eNormal;
  889 + } else {
  890 + delete response;
  891 + // make an empty response
  892 + response = new SP_Response( session->getSid() );
  893 + }
  894 +
  895 + session->setStatus( status );
  896 + session->setRunning( 0 );
  897 +
  898 + eventArg->getResponseQueue()->push( response );
  899 +}
  900 +
  901 +void SP_IocpEventHelper :: doCompletion( SP_IocpEventArg * eventArg, SP_Message * msg )
  902 +{
  903 + eventArg->getOutputResultQueue()->push( msg );
  904 +}
... ...
注册登录 后发表评论