提交 b8f7da1dfa8bf8552b614d4b2a36c996b85937d3

作者 LJH 李佳桓
1 个父辈 135e14e6

add

正在显示 1 个修改的文件 包含 232 行增加0 行删除
  1 +/*
  2 + * Copyright 2007 Stephen Liu
  3 + * For license terms, see the file COPYING along with this library.
  4 + */
  5 +
  6 +#include <stdio.h>
  7 +#include <string.h>
  8 +#include <stdlib.h>
  9 +#include <assert.h>
  10 +#include <errno.h>
  11 +#include <signal.h>
  12 +
  13 +
  14 +#include "spserver.hpp"
  15 +#include "speventcb.hpp"
  16 +#include "sphandler.hpp"
  17 +#include "spsession.hpp"
  18 +#include "spexecutor.hpp"
  19 +#include "sputils.hpp"
  20 +#include "spiochannel.hpp"
  21 +#include "spioutils.hpp"
  22 +
  23 +#include "event_msgqueue.h"
  24 +
  25 +SP_Server :: SP_Server( const char * bindIP, int port,
  26 + SP_HandlerFactory * handlerFactory )
  27 +{
  28 + snprintf( mBindIP, sizeof( mBindIP ), "%s", bindIP );
  29 + mPort = port;
  30 + mIsShutdown = 0;
  31 + mIsRunning = 0;
  32 +
  33 + mHandlerFactory = handlerFactory;
  34 + mIOChannelFactory = NULL;
  35 +
  36 + mTimeout = 600;
  37 + mMaxThreads = 4;
  38 + mReqQueueSize = 128;
  39 + mMaxConnections = 256;
  40 + mRefusedMsg = strdup( "System busy, try again later." );
  41 +}
  42 +
  43 +SP_Server :: ~SP_Server()
  44 +{
  45 + if( NULL != mHandlerFactory ) delete mHandlerFactory;
  46 + mHandlerFactory = NULL;
  47 +
  48 + if( NULL != mIOChannelFactory ) delete mIOChannelFactory;
  49 + mIOChannelFactory = NULL;
  50 +
  51 + if( NULL != mRefusedMsg ) free( mRefusedMsg );
  52 + mRefusedMsg = NULL;
  53 +}
  54 +
  55 +void SP_Server :: setIOChannelFactory( SP_IOChannelFactory * ioChannelFactory )
  56 +{
  57 + mIOChannelFactory = ioChannelFactory;
  58 +}
  59 +
  60 +void SP_Server :: setTimeout( int timeout )
  61 +{
  62 + mTimeout = timeout > 0 ? timeout : mTimeout;
  63 +}
  64 +
  65 +void SP_Server :: setMaxThreads( int maxThreads )
  66 +{
  67 + mMaxThreads = maxThreads > 0 ? maxThreads : mMaxThreads;
  68 +}
  69 +
  70 +void SP_Server :: setMaxConnections( int maxConnections )
  71 +{
  72 + mMaxConnections = maxConnections > 0 ? maxConnections : mMaxConnections;
  73 +}
  74 +
  75 +void SP_Server :: setReqQueueSize( int reqQueueSize, const char * refusedMsg )
  76 +{
  77 + mReqQueueSize = reqQueueSize > 0 ? reqQueueSize : mReqQueueSize;
  78 +
  79 + if( NULL != mRefusedMsg ) free( mRefusedMsg );
  80 + mRefusedMsg = strdup( refusedMsg );
  81 +}
  82 +
  83 +void SP_Server :: shutdown()
  84 +{
  85 + mIsShutdown = 1;
  86 +}
  87 +
  88 +int SP_Server :: isRunning()
  89 +{
  90 + return mIsRunning;
  91 +}
  92 +
  93 +int SP_Server :: run()
  94 +{
  95 + int ret = -1;
  96 +
  97 + sp_thread_attr_t attr;
  98 + sp_thread_attr_init( &attr );
  99 + assert( sp_thread_attr_setstacksize( &attr, 1024 * 1024 ) == 0 );
  100 + sp_thread_attr_setdetachstate( &attr, SP_THREAD_CREATE_DETACHED );
  101 +
  102 + sp_thread_t thread;
  103 + ret = sp_thread_create( &thread, &attr, eventLoop, this );
  104 + sp_thread_attr_destroy( &attr );
  105 + if( 0 == ret ) {
  106 + sp_syslog( LOG_NOTICE, "Thread #%ld has been created to listen on port [%d]", thread, mPort );
  107 + } else {
  108 + mIsRunning = 0;
  109 + sp_syslog( LOG_WARNING, "Unable to create a thread for TCP server on port [%d], %s",
  110 + mPort, strerror( errno ) ) ;
  111 + }
  112 +
  113 + return ret;
  114 +}
  115 +
  116 +void SP_Server :: runForever()
  117 +{
  118 + eventLoop( this );
  119 +}
  120 +
  121 +sp_thread_result_t SP_THREAD_CALL SP_Server :: eventLoop( void * arg )
  122 +{
  123 + SP_Server * server = (SP_Server*)arg;
  124 +
  125 + server->mIsRunning = 1;
  126 +
  127 + server->start();
  128 +
  129 + server->mIsRunning = 0;
  130 +
  131 + return 0;
  132 +}
  133 +
  134 +void SP_Server :: sigHandler( int, short, void * arg )
  135 +{
  136 + SP_Server * server = (SP_Server*)arg;
  137 + server->shutdown();
  138 +}
  139 +
  140 +void SP_Server :: outputCompleted( void * arg )
  141 +{
  142 + SP_CompletionHandler * handler = ( SP_CompletionHandler * ) ((void**)arg)[0];
  143 + SP_Message * msg = ( SP_Message * ) ((void**)arg)[ 1 ];
  144 +
  145 + handler->completionMessage( msg );
  146 +
  147 + free( arg );
  148 +}
  149 +
  150 +int SP_Server :: start()
  151 +{
  152 +#ifdef SIGPIPE
  153 + /* Don't die with SIGPIPE on remote read shutdown. That's dumb. */
  154 + signal( SIGPIPE, SIG_IGN );
  155 +#endif
  156 +
  157 + int ret = 0;
  158 + int listenFD = -1;
  159 +
  160 + ret = SP_IOUtils::tcpListen( mBindIP, mPort, &listenFD, 0 );
  161 +
  162 + if( 0 == ret ) {
  163 +
  164 + SP_EventArg eventArg( mTimeout );
  165 +
  166 + // Clean close on SIGINT or SIGTERM.
  167 + struct event evSigInt, evSigTerm;
  168 + signal_set( &evSigInt, SIGINT, sigHandler, this );
  169 + event_base_set( eventArg.getEventBase(), &evSigInt );
  170 + signal_add( &evSigInt, NULL);
  171 + signal_set( &evSigTerm, SIGTERM, sigHandler, this );
  172 + event_base_set( eventArg.getEventBase(), &evSigTerm );
  173 + signal_add( &evSigTerm, NULL);
  174 +
  175 + SP_AcceptArg_t acceptArg;
  176 + memset( &acceptArg, 0, sizeof( SP_AcceptArg_t ) );
  177 +
  178 + if( NULL == mIOChannelFactory ) {
  179 + mIOChannelFactory = new SP_DefaultIOChannelFactory();
  180 + }
  181 + acceptArg.mEventArg = &eventArg;
  182 + acceptArg.mHandlerFactory = mHandlerFactory;
  183 + acceptArg.mIOChannelFactory = mIOChannelFactory;
  184 + acceptArg.mReqQueueSize = mReqQueueSize;
  185 + acceptArg.mMaxConnections = mMaxConnections;
  186 + acceptArg.mRefusedMsg = mRefusedMsg;
  187 +
  188 + struct event evAccept;
  189 + event_set( &evAccept, listenFD, EV_READ|EV_PERSIST,
  190 + SP_EventCallback::onAccept, &acceptArg );
  191 + event_base_set( eventArg.getEventBase(), &evAccept );
  192 + event_add( &evAccept, NULL );
  193 +
  194 + SP_Executor workerExecutor( mMaxThreads, "work" );
  195 + SP_Executor actExecutor( 1, "act" );
  196 + SP_CompletionHandler * completionHandler = mHandlerFactory->createCompletionHandler();
  197 +
  198 + /* Start the event loop. */
  199 + while( 0 == mIsShutdown ) {
  200 + event_base_loop( eventArg.getEventBase(), EVLOOP_ONCE );
  201 +
  202 + for( ; NULL != eventArg.getInputResultQueue()->top(); ) {
  203 + SP_Task * task = (SP_Task*)eventArg.getInputResultQueue()->pop();
  204 + workerExecutor.execute( task );
  205 + }
  206 +
  207 + for( ; NULL != eventArg.getOutputResultQueue()->top(); ) {
  208 + SP_Message * msg = (SP_Message*)eventArg.getOutputResultQueue()->pop();
  209 +
  210 + void ** arg = ( void** )malloc( sizeof( void * ) * 2 );
  211 + arg[ 0 ] = (void*)completionHandler;
  212 + arg[ 1 ] = (void*)msg;
  213 +
  214 + actExecutor.execute( outputCompleted, arg );
  215 + }
  216 + }
  217 +
  218 + delete completionHandler;
  219 +
  220 + sp_syslog( LOG_NOTICE, "Server is shutdown." );
  221 +
  222 + event_del( &evAccept );
  223 +
  224 + signal_del( &evSigTerm );
  225 + signal_del( &evSigInt );
  226 +
  227 + sp_close( listenFD );
  228 + }
  229 +
  230 + return ret;
  231 +}
  232 +
... ...
注册登录 后发表评论