提交 79acf55ebcda317012b75b2df1b487e4f2709ec7

作者 LJH 李佳桓
1 个父辈 a9564154

add

正在显示 1 个修改的文件 包含 219 行增加0 行删除
  1 +/*
  2 + * Copyright 2007 Stephen Liu
  3 + * For license terms, see the file COPYING along with this library.
  4 + */
  5 +
  6 +#include <stdio.h>
  7 +#include <stdlib.h>
  8 +#include <string.h>
  9 +#include <signal.h>
  10 +
  11 +#include "spporting.hpp"
  12 +
  13 +#include "splfserver.hpp"
  14 +
  15 +#include "speventcb.hpp"
  16 +#include "spthreadpool.hpp"
  17 +#include "sphandler.hpp"
  18 +#include "spexecutor.hpp"
  19 +#include "sputils.hpp"
  20 +#include "spioutils.hpp"
  21 +#include "spiochannel.hpp"
  22 +
  23 +#include "event_msgqueue.h"
  24 +
  25 +SP_LFServer :: SP_LFServer( const char * bindIP, int port, SP_HandlerFactory * handlerFactory )
  26 +{
  27 + snprintf( mBindIP, sizeof( mBindIP ), "%s", bindIP );
  28 + mPort = port;
  29 +
  30 + mIsShutdown = 0;
  31 + mIsRunning = 0;
  32 +
  33 + mEventArg = new SP_EventArg( 600 );
  34 + mMaxThreads = 4;
  35 +
  36 + mAcceptArg = (SP_AcceptArg_t*)malloc( sizeof( SP_AcceptArg_t ) );
  37 + memset( mAcceptArg, 0, sizeof( SP_AcceptArg_t ) );
  38 + mAcceptArg->mMaxConnections = 256;
  39 + mAcceptArg->mReqQueueSize = 128;
  40 + mAcceptArg->mRefusedMsg = strdup( "System busy, try again later." );
  41 + mAcceptArg->mHandlerFactory = handlerFactory;
  42 +
  43 + mAcceptArg->mEventArg = mEventArg;
  44 +
  45 + mThreadPool = NULL;
  46 +
  47 + mEvAccept = mEvSigTerm = mEvSigInt = NULL;
  48 +
  49 + mCompletionHandler = NULL;
  50 +
  51 + sp_thread_mutex_init( &mMutex, NULL );
  52 +}
  53 +
  54 +SP_LFServer :: ~SP_LFServer()
  55 +{
  56 + shutdown();
  57 +
  58 + if( NULL != mThreadPool ) delete mThreadPool;
  59 + mThreadPool = NULL;
  60 +
  61 + if( NULL != mCompletionHandler ) delete mCompletionHandler;
  62 + mCompletionHandler = NULL;
  63 +
  64 + event_del( mEvAccept );
  65 + free( mEvAccept );
  66 + mEvAccept = NULL;
  67 +
  68 + signal_del( mEvSigTerm );
  69 + free( mEvSigTerm );
  70 + mEvSigTerm = NULL;
  71 +
  72 + signal_del( mEvSigInt );
  73 + free( mEvSigInt );
  74 + mEvSigInt = NULL;
  75 +
  76 + delete mAcceptArg->mHandlerFactory;
  77 + free( mAcceptArg->mRefusedMsg );
  78 +
  79 + free( mAcceptArg );
  80 + mAcceptArg = NULL;
  81 +
  82 + delete mEventArg;
  83 + mEventArg = NULL;
  84 +
  85 + sp_thread_mutex_destroy( &mMutex );
  86 +}
  87 +
  88 +void SP_LFServer :: setTimeout( int timeout )
  89 +{
  90 + mEventArg->setTimeout( timeout );
  91 +}
  92 +
  93 +void SP_LFServer :: setMaxConnections( int maxConnections )
  94 +{
  95 + mAcceptArg->mMaxConnections = maxConnections > 0 ?
  96 + maxConnections : mAcceptArg->mMaxConnections;
  97 +}
  98 +
  99 +void SP_LFServer :: setMaxThreads( int maxThreads )
  100 +{
  101 + mMaxThreads = maxThreads > 0 ? maxThreads : mMaxThreads;
  102 +}
  103 +
  104 +void SP_LFServer :: setReqQueueSize( int reqQueueSize, const char * refusedMsg )
  105 +{
  106 + mAcceptArg->mReqQueueSize = reqQueueSize > 0 ?
  107 + reqQueueSize : mAcceptArg->mReqQueueSize;
  108 +
  109 + if( NULL != mAcceptArg->mRefusedMsg ) free( mAcceptArg->mRefusedMsg );
  110 + mAcceptArg->mRefusedMsg = strdup( refusedMsg );
  111 +}
  112 +
  113 +void SP_LFServer :: setIOChannelFactory( SP_IOChannelFactory * ioChannelFactory )
  114 +{
  115 + mAcceptArg->mIOChannelFactory = ioChannelFactory;
  116 +}
  117 +
  118 +void SP_LFServer :: shutdown()
  119 +{
  120 + mIsShutdown = 1;
  121 +}
  122 +
  123 +int SP_LFServer :: isRunning()
  124 +{
  125 + return mIsRunning;
  126 +}
  127 +
  128 +void SP_LFServer :: sigHandler( int, short, void * arg )
  129 +{
  130 + SP_LFServer * server = (SP_LFServer*)arg;
  131 + server->shutdown();
  132 +}
  133 +
  134 +void SP_LFServer :: lfHandler( void * arg )
  135 +{
  136 + SP_LFServer * server = (SP_LFServer*)arg;
  137 +
  138 + for( ; 0 == server->mIsShutdown; ) {
  139 + server->handleOneEvent();
  140 + }
  141 +}
  142 +
  143 +void SP_LFServer :: handleOneEvent()
  144 +{
  145 + SP_Task * task = NULL;
  146 + SP_Message * msg = NULL;
  147 +
  148 + sp_thread_mutex_lock( &mMutex );
  149 +
  150 + for( ; 0 == mIsShutdown && NULL == task && NULL == msg; ) {
  151 + if( mEventArg->getInputResultQueue()->getLength() > 0 ) {
  152 + task = (SP_Task*)mEventArg->getInputResultQueue()->pop();
  153 + } else if( mEventArg->getOutputResultQueue()->getLength() > 0 ) {
  154 + msg = (SP_Message*)mEventArg->getOutputResultQueue()->pop();
  155 + }
  156 +
  157 + if( NULL == task && NULL == msg ) {
  158 + event_base_loop( mEventArg->getEventBase(), EVLOOP_ONCE );
  159 + }
  160 + }
  161 +
  162 + sp_thread_mutex_unlock( &mMutex );
  163 +
  164 + if( NULL != task ) task->run();
  165 +
  166 + if( NULL != msg ) mCompletionHandler->completionMessage( msg );
  167 +}
  168 +
  169 +int SP_LFServer :: run()
  170 +{
  171 +#ifdef SIGPIPE
  172 + /* Don't die with SIGPIPE on remote read shutdown. That's dumb. */
  173 + signal( SIGPIPE, SIG_IGN );
  174 +#endif
  175 +
  176 + int ret = 0;
  177 + int listenFD = -1;
  178 +
  179 + ret = SP_IOUtils::tcpListen( mBindIP, mPort, &listenFD, 0 );
  180 +
  181 + if( 0 == ret ) {
  182 + // Clean close on SIGINT or SIGTERM.
  183 + mEvSigInt = (struct event*)malloc( sizeof( struct event ) );
  184 + signal_set( mEvSigInt, SIGINT, sigHandler, this );
  185 + event_base_set( mEventArg->getEventBase(), mEvSigInt );
  186 + signal_add( mEvSigInt, NULL);
  187 +
  188 + mEvSigTerm = (struct event*)malloc( sizeof( struct event ) );
  189 + signal_set( mEvSigTerm, SIGTERM, sigHandler, this );
  190 + event_base_set( mEventArg->getEventBase(), mEvSigTerm );
  191 + signal_add( mEvSigTerm, NULL);
  192 +
  193 + mEvAccept = (struct event*)malloc( sizeof( struct event ) );
  194 + event_set( mEvAccept, listenFD, EV_READ|EV_PERSIST,
  195 + SP_EventCallback::onAccept, mAcceptArg );
  196 + event_base_set( mEventArg->getEventBase(), mEvAccept );
  197 + event_add( mEvAccept, NULL );
  198 +
  199 + mCompletionHandler = mAcceptArg->mHandlerFactory->createCompletionHandler();
  200 +
  201 + if( NULL == mAcceptArg->mIOChannelFactory ) {
  202 + mAcceptArg->mIOChannelFactory = new SP_DefaultIOChannelFactory();
  203 + }
  204 +
  205 + mThreadPool = new SP_ThreadPool( mMaxThreads );
  206 + for( int i = 0; i < mMaxThreads; i++ ) {
  207 + mThreadPool->dispatch( lfHandler, this );
  208 + }
  209 + }
  210 +
  211 + return ret;
  212 +}
  213 +
  214 +void SP_LFServer :: runForever()
  215 +{
  216 + run();
  217 + pause();
  218 +}
  219 +
注册登录 后发表评论