提交 dfbffc31f9da31e248dfee0f4a5bac1638bd0ab2

作者 LJH 李佳桓
1 个父辈 a7ede63f

add

正在显示 1 个修改的文件 包含 284 行增加0 行删除
  1 +/*
  2 + * Copyright 2008 Stephen Liu
  3 + * For license terms, see the file COPYING along with this library.
  4 + */
  5 +
  6 +#include <stdio.h>
  7 +#include <string.h>
  8 +#include <stdlib.h>
  9 +#include <assert.h>
  10 +#include <errno.h>
  11 +#include <signal.h>
  12 +
  13 +#include "spiocpserver.hpp"
  14 +#include "spwin32iocp.hpp"
  15 +#include "sphandler.hpp"
  16 +#include "spsession.hpp"
  17 +#include "spexecutor.hpp"
  18 +#include "sputils.hpp"
  19 +#include "spioutils.hpp"
  20 +#include "spiochannel.hpp"
  21 +
  22 +SP_IocpServer :: SP_IocpServer( const char * bindIP, int port,
  23 + SP_HandlerFactory * handlerFactory )
  24 +{
  25 + snprintf( mBindIP, sizeof( mBindIP ), "%s", bindIP );
  26 + mPort = port;
  27 + mIsShutdown = 0;
  28 + mIsRunning = 0;
  29 +
  30 + mHandlerFactory = handlerFactory;
  31 + mIOChannelFactory = NULL;
  32 +
  33 + mTimeout = 600;
  34 + mMaxThreads = 4;
  35 + mReqQueueSize = 128;
  36 + mMaxConnections = 256;
  37 + mRefusedMsg = strdup( "System busy, try again later." );
  38 +
  39 + mCompletionPort = NULL;
  40 +}
  41 +
  42 +SP_IocpServer :: ~SP_IocpServer()
  43 +{
  44 + shutdown();
  45 +
  46 + for( ; mIsRunning; ) {
  47 + shutdown();
  48 + sleep( 1 );
  49 + }
  50 +
  51 + if( NULL != mHandlerFactory ) delete mHandlerFactory;
  52 + mHandlerFactory = NULL;
  53 +
  54 + if( NULL != mRefusedMsg ) free( mRefusedMsg );
  55 + mRefusedMsg = NULL;
  56 +
  57 + if( NULL != mIOChannelFactory ) delete mIOChannelFactory;
  58 + mIOChannelFactory = NULL;
  59 +
  60 + mCompletionPort = NULL;
  61 +}
  62 +
  63 +void SP_IocpServer :: setTimeout( int timeout )
  64 +{
  65 + mTimeout = timeout;
  66 +}
  67 +
  68 +void SP_IocpServer :: setMaxThreads( int maxThreads )
  69 +{
  70 + mMaxThreads = maxThreads > 0 ? maxThreads : mMaxThreads;
  71 +}
  72 +
  73 +void SP_IocpServer :: setMaxConnections( int maxConnections )
  74 +{
  75 + mMaxConnections = maxConnections > 0 ? maxConnections : mMaxConnections;
  76 +}
  77 +
  78 +void SP_IocpServer :: setReqQueueSize( int reqQueueSize, const char * refusedMsg )
  79 +{
  80 + mReqQueueSize = reqQueueSize > 0 ? reqQueueSize : mReqQueueSize;
  81 +
  82 + if( NULL != mRefusedMsg ) free( mRefusedMsg );
  83 + mRefusedMsg = strdup( refusedMsg );
  84 +}
  85 +
  86 +void SP_IocpServer :: setIOChannelFactory( SP_IOChannelFactory * ioChannelFactory )
  87 +{
  88 + mIOChannelFactory = ioChannelFactory;
  89 +}
  90 +
  91 +void SP_IocpServer :: shutdown()
  92 +{
  93 + mIsShutdown = 1;
  94 +
  95 + if( NULL != mCompletionPort ) {
  96 + PostQueuedCompletionStatus( mCompletionPort, 0, 0, 0 );
  97 + }
  98 +}
  99 +
  100 +int SP_IocpServer :: isRunning()
  101 +{
  102 + return mIsRunning;
  103 +}
  104 +
  105 +int SP_IocpServer :: run()
  106 +{
  107 + int ret = -1;
  108 +
  109 + sp_thread_attr_t attr;
  110 + sp_thread_attr_init( &attr );
  111 + assert( sp_thread_attr_setstacksize( &attr, 1024 * 1024 ) == 0 );
  112 + sp_thread_attr_setdetachstate( &attr, SP_THREAD_CREATE_DETACHED );
  113 +
  114 + sp_thread_t thread;
  115 + ret = sp_thread_create( &thread, &attr, eventLoop, this );
  116 + sp_thread_attr_destroy( &attr );
  117 + if( 0 == ret ) {
  118 + sp_syslog( LOG_NOTICE, "Thread #%ld has been created to listen on port [%d]", thread, mPort );
  119 + } else {
  120 + mIsRunning = 0;
  121 + sp_syslog( LOG_WARNING, "Unable to create a thread for TCP server on port [%d], %s",
  122 + mPort, strerror( errno ) ) ;
  123 + }
  124 +
  125 + return ret;
  126 +}
  127 +
  128 +void SP_IocpServer :: runForever()
  129 +{
  130 + eventLoop( this );
  131 +}
  132 +
  133 +sp_thread_result_t SP_THREAD_CALL SP_IocpServer :: eventLoop( void * arg )
  134 +{
  135 + SP_IocpServer * server = (SP_IocpServer*)arg;
  136 +
  137 + server->mIsRunning = 1;
  138 +
  139 + server->start();
  140 +
  141 + server->mIsRunning = 0;
  142 +
  143 + return NULL;
  144 +}
  145 +
  146 +void SP_IocpServer :: sigHandler( int, short, void * arg )
  147 +{
  148 + SP_IocpServer * server = (SP_IocpServer*)arg;
  149 + server->shutdown();
  150 +}
  151 +
  152 +void SP_IocpServer :: outputCompleted( void * arg )
  153 +{
  154 + SP_CompletionHandler * handler = ( SP_CompletionHandler * ) ((void**)arg)[0];
  155 + SP_Message * msg = ( SP_Message * ) ((void**)arg)[ 1 ];
  156 +
  157 + handler->completionMessage( msg );
  158 +
  159 + free( arg );
  160 +}
  161 +
  162 +sp_thread_result_t SP_THREAD_CALL SP_IocpServer :: acceptThread( void * arg )
  163 +{
  164 + DWORD recvBytes = 0;
  165 +
  166 + SP_IocpAcceptArg_t * acceptArg = (SP_IocpAcceptArg_t*)arg;
  167 +
  168 + for( ; ; ) {
  169 + acceptArg->mClientSocket = (HANDLE)WSASocket( AF_INET, SOCK_STREAM,
  170 + IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED );
  171 + if( INVALID_SOCKET == (int)acceptArg->mClientSocket ) {
  172 + sp_syslog( LOG_ERR, "WSASocket fail, errno %d", WSAGetLastError() );
  173 + Sleep( 50 );
  174 + continue;
  175 + }
  176 +
  177 + SP_IOUtils::setNonblock( (int)acceptArg->mClientSocket );
  178 + memset( &( acceptArg->mOverlapped ), 0, sizeof( OVERLAPPED ) );
  179 +
  180 + BOOL ret = AcceptEx( (SOCKET)acceptArg->mListenSocket, (SOCKET)acceptArg->mClientSocket,
  181 + acceptArg->mBuffer, 0, sizeof(struct sockaddr_in) + 16, sizeof(struct sockaddr_in) + 16,
  182 + &recvBytes, &( acceptArg->mOverlapped ) );
  183 +
  184 + int lastError = WSAGetLastError();
  185 + if( FALSE == ret && (ERROR_IO_PENDING != lastError) ) {
  186 + sp_syslog( LOG_ERR, "AcceptEx() fail, errno %d", lastError );
  187 + closesocket( (int)acceptArg->mClientSocket );
  188 + if( WSAENOBUFS == lastError ) Sleep( 50 );
  189 + } else {
  190 + WaitForSingleObject( acceptArg->mAcceptEvent, INFINITE );
  191 + ResetEvent( acceptArg->mAcceptEvent );
  192 + }
  193 + }
  194 +
  195 + return 0;
  196 +}
  197 +
  198 +int SP_IocpServer :: start()
  199 +{
  200 +#ifdef SIGPIPE
  201 + /* Don't die with SIGPIPE on remote read shutdown. That's dumb. */
  202 + signal( SIGPIPE, SIG_IGN );
  203 +#endif
  204 +
  205 + int ret = 0;
  206 + int listenFD = -1;
  207 +
  208 + ret = SP_IOUtils::tcpListen( mBindIP, mPort, &listenFD, 0 );
  209 +
  210 + if( 0 == ret ) {
  211 +
  212 + SP_IocpEventArg eventArg( mTimeout );
  213 + eventArg.loadDisconnectEx( listenFD );
  214 + SP_IocpMsgQueue * msgQueue = new SP_IocpMsgQueue( eventArg.getCompletionPort(),
  215 + SP_IocpEventCallback::eKeyMsgQueue, SP_IocpEventCallback::onResponse, &eventArg );
  216 + eventArg.setResponseQueue( msgQueue );
  217 + mCompletionPort = eventArg.getCompletionPort();
  218 +
  219 + if( NULL == mIOChannelFactory ) {
  220 + mIOChannelFactory = new SP_DefaultIOChannelFactory();
  221 + }
  222 +
  223 + SP_IocpAcceptArg_t acceptArg;
  224 + memset( &acceptArg, 0, sizeof( acceptArg ) );
  225 +
  226 + acceptArg.mHandlerFactory = mHandlerFactory;
  227 + acceptArg.mIOChannelFactory = mIOChannelFactory;
  228 + acceptArg.mReqQueueSize = mReqQueueSize;
  229 + acceptArg.mMaxConnections = mMaxConnections;
  230 + acceptArg.mRefusedMsg = mRefusedMsg;
  231 + acceptArg.mAcceptEvent = CreateEvent( NULL, TRUE, FALSE, NULL );
  232 +
  233 + acceptArg.mEventArg = &eventArg;
  234 + acceptArg.mListenSocket = (HANDLE)listenFD;
  235 +
  236 + if( NULL == CreateIoCompletionPort( acceptArg.mListenSocket,
  237 + eventArg.getCompletionPort(), SP_IocpEventCallback::eKeyAccept, 0 ) ) {
  238 + sp_syslog( LOG_ERR, "CreateIoCompletionPort fail, errno %d", WSAGetLastError() );
  239 + return -1;
  240 + }
  241 +
  242 + sp_thread_t thread;
  243 + ret = sp_thread_create( &thread, NULL, acceptThread, &acceptArg );
  244 + if( 0 == ret ) {
  245 + sp_syslog( LOG_NOTICE, "Thread #%ld has been created to accept socket", thread );
  246 + } else {
  247 + sp_syslog( LOG_WARNING, "Unable to create a thread to accept socket, %s", strerror( errno ) );
  248 + return -1;
  249 + }
  250 +
  251 + SP_Executor actExecutor( 1, "act" );
  252 + SP_Executor workerExecutor( mMaxThreads, "work" );
  253 + SP_CompletionHandler * completionHandler = mHandlerFactory->createCompletionHandler();
  254 +
  255 + /* Start the event loop. */
  256 + while( 0 == mIsShutdown ) {
  257 + SP_IocpEventCallback::eventLoop( &eventArg, &acceptArg );
  258 +
  259 + for( ; NULL != eventArg.getInputResultQueue()->top(); ) {
  260 + SP_Task * task = (SP_Task*)eventArg.getInputResultQueue()->pop();
  261 + workerExecutor.execute( task );
  262 + }
  263 +
  264 + for( ; NULL != eventArg.getOutputResultQueue()->top(); ) {
  265 + SP_Message * msg = (SP_Message*)eventArg.getOutputResultQueue()->pop();
  266 +
  267 + void ** arg = ( void** )malloc( sizeof( void * ) * 2 );
  268 + arg[ 0 ] = (void*)completionHandler;
  269 + arg[ 1 ] = (void*)msg;
  270 +
  271 + actExecutor.execute( outputCompleted, arg );
  272 + }
  273 + }
  274 +
  275 + delete completionHandler;
  276 +
  277 + sp_syslog( LOG_NOTICE, "Server is shutdown." );
  278 +
  279 + sp_close( listenFD );
  280 + }
  281 +
  282 + return ret;
  283 +}
  284 +
... ...
注册登录 后发表评论