提交 4d8227c0c9b8dad2407538ad5c13f8351a27d423

作者 LJH 李佳桓
1 个父辈 7fe2d536

add

  1 +/*
  2 + * Copyright 2008 Stephen Liu
  3 + * For license terms, see the file COPYING along with this library.
  4 + */
  5 +
  6 +#include <stdio.h>
  7 +#include <stdlib.h>
  8 +#include <string.h>
  9 +#include <signal.h>
  10 +
  11 +#include "spporting.hpp"
  12 +
  13 +#include "spiocplfserver.hpp"
  14 +
  15 +#include "spiocpevent.hpp"
  16 +#include "spwin32iocp.hpp"
  17 +#include "spthreadpool.hpp"
  18 +#include "sphandler.hpp"
  19 +#include "spexecutor.hpp"
  20 +#include "sputils.hpp"
  21 +#include "spioutils.hpp"
  22 +#include "spiochannel.hpp"
  23 +
  24 +SP_IocpLFServer :: SP_IocpLFServer( const char * bindIP, int port, SP_HandlerFactory * handlerFactory )
  25 +{
  26 + snprintf( mBindIP, sizeof( mBindIP ), "%s", bindIP );
  27 + mPort = port;
  28 +
  29 + mIsShutdown = 0;
  30 + mIsRunning = 0;
  31 +
  32 + mEventArg = new SP_IocpEventArg( 600 );
  33 +
  34 + mMaxThreads = 4;
  35 +
  36 + mAcceptArg = (SP_IocpAcceptArg_t*)malloc( sizeof( SP_IocpAcceptArg_t ) );
  37 + memset( mAcceptArg, 0, sizeof( SP_IocpAcceptArg_t ) );
  38 + mAcceptArg->mMaxConnections = 256;
  39 + mAcceptArg->mReqQueueSize = 128;
  40 + mAcceptArg->mRefusedMsg = strdup( "System busy, try again later." );
  41 + mAcceptArg->mHandlerFactory = handlerFactory;
  42 +
  43 + mAcceptArg->mEventArg = mEventArg;
  44 +
  45 + mThreadPool = NULL;
  46 +
  47 + mCompletionHandler = NULL;
  48 +
  49 + sp_thread_mutex_init( &mMutex, NULL );
  50 +
  51 + mCompletionPort = mEventArg->getCompletionPort();
  52 +}
  53 +
  54 +SP_IocpLFServer :: ~SP_IocpLFServer()
  55 +{
  56 + shutdown();
  57 +
  58 + for( ; mIsRunning; ) sleep( 1 );
  59 +
  60 + if( NULL != mThreadPool ) delete mThreadPool;
  61 + mThreadPool = NULL;
  62 +
  63 + if( NULL != mCompletionHandler ) delete mCompletionHandler;
  64 + mCompletionHandler = NULL;
  65 +
  66 + delete mAcceptArg->mHandlerFactory;
  67 + free( mAcceptArg->mRefusedMsg );
  68 +
  69 + free( mAcceptArg );
  70 + mAcceptArg = NULL;
  71 +
  72 + delete mEventArg;
  73 + mEventArg = NULL;
  74 +
  75 + sp_thread_mutex_destroy( &mMutex );
  76 +
  77 + mCompletionPort = NULL;
  78 +}
  79 +
  80 +void SP_IocpLFServer :: setTimeout( int timeout )
  81 +{
  82 + mEventArg->setTimeout( timeout );
  83 +}
  84 +
  85 +void SP_IocpLFServer :: setMaxConnections( int maxConnections )
  86 +{
  87 + mAcceptArg->mMaxConnections = maxConnections > 0 ?
  88 + maxConnections : mAcceptArg->mMaxConnections;
  89 +}
  90 +
  91 +void SP_IocpLFServer :: setMaxThreads( int maxThreads )
  92 +{
  93 + mMaxThreads = maxThreads > 0 ? maxThreads : mMaxThreads;
  94 +}
  95 +
  96 +void SP_IocpLFServer :: setReqQueueSize( int reqQueueSize, const char * refusedMsg )
  97 +{
  98 + mAcceptArg->mReqQueueSize = reqQueueSize > 0 ?
  99 + reqQueueSize : mAcceptArg->mReqQueueSize;
  100 +
  101 + if( NULL != mAcceptArg->mRefusedMsg ) free( mAcceptArg->mRefusedMsg );
  102 + mAcceptArg->mRefusedMsg = strdup( refusedMsg );
  103 +}
  104 +
  105 +void SP_IocpLFServer :: setIOChannelFactory( SP_IOChannelFactory * ioChannelFactory )
  106 +{
  107 + mAcceptArg->mIOChannelFactory = ioChannelFactory;
  108 +}
  109 +
  110 +void SP_IocpLFServer :: shutdown()
  111 +{
  112 + mIsShutdown = 1;
  113 +
  114 + if( NULL != mCompletionPort ) {
  115 + PostQueuedCompletionStatus( mCompletionPort, 0, 0, 0 );
  116 + }
  117 +}
  118 +
  119 +int SP_IocpLFServer :: isRunning()
  120 +{
  121 + return mIsRunning;
  122 +}
  123 +
  124 +void SP_IocpLFServer :: sigHandler( int, short, void * arg )
  125 +{
  126 + SP_IocpLFServer * server = (SP_IocpLFServer*)arg;
  127 + server->shutdown();
  128 +}
  129 +
  130 +void SP_IocpLFServer :: lfHandler( void * arg )
  131 +{
  132 + SP_IocpLFServer * server = (SP_IocpLFServer*)arg;
  133 +
  134 + for( ; 0 == server->mIsShutdown; ) {
  135 + server->handleOneEvent();
  136 + }
  137 +
  138 + server->mIsRunning = 0;
  139 +}
  140 +
  141 +void SP_IocpLFServer :: handleOneEvent()
  142 +{
  143 + SP_Task * task = NULL;
  144 + SP_Message * msg = NULL;
  145 +
  146 + sp_thread_mutex_lock( &mMutex );
  147 +
  148 + for( ; 0 == mIsShutdown && NULL == task && NULL == msg; ) {
  149 + if( mEventArg->getInputResultQueue()->getLength() > 0 ) {
  150 + task = (SP_Task*)mEventArg->getInputResultQueue()->pop();
  151 + } else if( mEventArg->getOutputResultQueue()->getLength() > 0 ) {
  152 + msg = (SP_Message*)mEventArg->getOutputResultQueue()->pop();
  153 + }
  154 +
  155 + if( NULL == task && NULL == msg ) {
  156 + SP_IocpEventCallback::eventLoop( mEventArg, mAcceptArg );
  157 + }
  158 + }
  159 +
  160 + sp_thread_mutex_unlock( &mMutex );
  161 +
  162 + if( NULL != task ) task->run();
  163 +
  164 + if( NULL != msg ) mCompletionHandler->completionMessage( msg );
  165 +}
  166 +
  167 +sp_thread_result_t SP_THREAD_CALL SP_IocpLFServer :: acceptThread( void * arg )
  168 +{
  169 + DWORD recvBytes = 0;
  170 +
  171 + SP_IocpAcceptArg_t * acceptArg = (SP_IocpAcceptArg_t*)arg;
  172 +
  173 + for( ; ; ) {
  174 + acceptArg->mClientSocket = (HANDLE)WSASocket( AF_INET, SOCK_STREAM,
  175 + IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED );
  176 + if( INVALID_SOCKET == (int)acceptArg->mClientSocket ) {
  177 + sp_syslog( LOG_ERR, "WSASocket fail, errno %d", WSAGetLastError() );
  178 + Sleep( 50 );
  179 + continue;
  180 + }
  181 +
  182 + SP_IOUtils::setNonblock( (int)acceptArg->mClientSocket );
  183 + memset( &( acceptArg->mOverlapped ), 0, sizeof( OVERLAPPED ) );
  184 +
  185 + BOOL ret = AcceptEx( (SOCKET)acceptArg->mListenSocket, (SOCKET)acceptArg->mClientSocket,
  186 + acceptArg->mBuffer, 0, sizeof(struct sockaddr_in) + 16, sizeof(struct sockaddr_in) + 16,
  187 + &recvBytes, &( acceptArg->mOverlapped ) );
  188 +
  189 + int lastError = WSAGetLastError();
  190 + if( FALSE == ret && (ERROR_IO_PENDING != lastError) ) {
  191 + sp_syslog( LOG_ERR, "AcceptEx() fail, errno %d", lastError );
  192 + closesocket( (int)acceptArg->mClientSocket );
  193 + if( WSAENOBUFS == lastError ) Sleep( 50 );
  194 + } else {
  195 + WaitForSingleObject( acceptArg->mAcceptEvent, INFINITE );
  196 + ResetEvent( acceptArg->mAcceptEvent );
  197 + }
  198 + }
  199 +
  200 + return 0;
  201 +}
  202 +
  203 +int SP_IocpLFServer :: run()
  204 +{
  205 + int ret = 0;
  206 + int listenFD = -1;
  207 +
  208 + ret = SP_IOUtils::tcpListen( mBindIP, mPort, &listenFD, 0 );
  209 +
  210 + if( 0 == ret ) {
  211 + mCompletionHandler = mAcceptArg->mHandlerFactory->createCompletionHandler();
  212 +
  213 + SP_IocpMsgQueue * msgQueue = new SP_IocpMsgQueue( mEventArg->getCompletionPort(),
  214 + SP_IocpEventCallback::eKeyMsgQueue, SP_IocpEventCallback::onResponse, mEventArg );
  215 + mEventArg->setResponseQueue( msgQueue );
  216 + mEventArg->loadDisconnectEx( listenFD );
  217 +
  218 + mAcceptArg->mAcceptEvent = CreateEvent( NULL, TRUE, FALSE, NULL );
  219 + mAcceptArg->mListenSocket = (HANDLE)listenFD;
  220 +
  221 + if( NULL == CreateIoCompletionPort( mAcceptArg->mListenSocket,
  222 + mEventArg->getCompletionPort(), SP_IocpEventCallback::eKeyAccept, 0 ) ) {
  223 + sp_syslog( LOG_ERR, "CreateIoCompletionPort fail, errno %d", WSAGetLastError() );
  224 + return -1;
  225 + }
  226 +
  227 + if( NULL == mAcceptArg->mIOChannelFactory ) {
  228 + mAcceptArg->mIOChannelFactory = new SP_DefaultIOChannelFactory();
  229 + }
  230 +
  231 + sp_thread_t thread;
  232 + ret = sp_thread_create( &thread, NULL, acceptThread, mAcceptArg );
  233 + if( 0 == ret ) {
  234 + sp_syslog( LOG_NOTICE, "Thread #%ld has been created to accept socket", thread );
  235 + } else {
  236 + sp_syslog( LOG_WARNING, "Unable to create a thread to accept socket, %s", strerror( errno ) );
  237 + return -1;
  238 + }
  239 +
  240 + mIsRunning = 1;
  241 +
  242 + mThreadPool = new SP_ThreadPool( mMaxThreads );
  243 + for( int i = 0; i < mMaxThreads; i++ ) {
  244 + mThreadPool->dispatch( lfHandler, this );
  245 + }
  246 + }
  247 +
  248 + return ret;
  249 +}
  250 +
  251 +void SP_IocpLFServer :: runForever()
  252 +{
  253 + run();
  254 + pause();
  255 +}
  256 +
... ...
注册登录 后发表评论