spdispatcher.cpp 7.2 KB
/*
 * Copyright 2007 Stephen Liu
 * For license terms, see the file COPYING along with this library.
 */

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <errno.h>
#include <signal.h>

#include "spporting.hpp"
#include "spthread.hpp"

#include "spdispatcher.hpp"

#include "speventcb.hpp"
#include "sphandler.hpp"
#include "spsession.hpp"
#include "spexecutor.hpp"
#include "sputils.hpp"
#include "spiochannel.hpp"
#include "spioutils.hpp"
#include "sprequest.hpp"

#include "event_msgqueue.h"

SP_Dispatcher :: SP_Dispatcher( SP_CompletionHandler * completionHandler, int maxThreads )
{
#ifdef SIGPIPE
	/* Don't die with SIGPIPE on remote read shutdown. That's dumb. */
	signal( SIGPIPE, SIG_IGN );
#endif

	mIsShutdown = 0;
	mIsRunning = 0;

	mEventArg = new SP_EventArg( 600 );

	mMaxThreads = maxThreads > 0 ? maxThreads : 4;

	mCompletionHandler = completionHandler;

	mPushQueue = msgqueue_new( mEventArg->getEventBase(), 0, onPush, mEventArg );
}

SP_Dispatcher :: ~SP_Dispatcher()
{
	if( 0 == mIsRunning ) sleep( 1 );

	shutdown();

	for( ; mIsRunning; ) sleep( 1 );

	//msgqueue_destroy( (struct event_msgqueue*)mPushQueue );

	delete mEventArg;
	mEventArg = NULL;
}

void SP_Dispatcher :: setTimeout( int timeout )
{
	mEventArg->setTimeout( timeout );
}

void SP_Dispatcher :: shutdown()
{
	mIsShutdown = 1;
}

int SP_Dispatcher :: isRunning()
{
	return mIsRunning;
}

int SP_Dispatcher :: getSessionCount()
{
	return mEventArg->getSessionManager()->getCount();
}

int SP_Dispatcher :: getReqQueueLength()
{
	return mEventArg->getInputResultQueue()->getLength();
}

int SP_Dispatcher :: dispatch()
{
	int ret = -1;

	sp_thread_attr_t attr;
	sp_thread_attr_init( &attr );
	assert( sp_thread_attr_setstacksize( &attr, 1024 * 1024 ) == 0 );
	sp_thread_attr_setdetachstate( &attr, SP_THREAD_CREATE_DETACHED );

	sp_thread_t thread;
	ret = sp_thread_create( &thread, &attr, eventLoop, this );
	sp_thread_attr_destroy( &attr );
	if( 0 == ret ) {
		sp_syslog( LOG_NOTICE, "Thread #%ld has been created for dispatcher", thread );
	} else {
		mIsRunning = 0;
		sp_syslog( LOG_WARNING, "Unable to create a thread for dispatcher, %s",
			strerror( errno ) ) ;
	}

	return ret;
}

sp_thread_result_t SP_THREAD_CALL SP_Dispatcher :: eventLoop( void * arg )
{
	SP_Dispatcher * dispatcher = (SP_Dispatcher*)arg;

	dispatcher->mIsRunning = 1;

	dispatcher->start();

	dispatcher->mIsRunning = 0;

	return 0;
}

void SP_Dispatcher :: outputCompleted( void * arg )
{
	SP_CompletionHandler * handler = ( SP_CompletionHandler * ) ((void**)arg)[0];
	SP_Message * msg = ( SP_Message * ) ((void**)arg)[ 1 ];

	handler->completionMessage( msg );

	free( arg );
}

int SP_Dispatcher :: start()
{
	SP_Executor workerExecutor( mMaxThreads, "work" );
	SP_Executor actExecutor( 1, "act" );

	/* Start the event loop. */
	while( 0 == mIsShutdown ) {
		event_base_loop( mEventArg->getEventBase(), EVLOOP_ONCE );

		for( ; NULL != mEventArg->getInputResultQueue()->top(); ) {
			SP_Task * task = (SP_Task*)mEventArg->getInputResultQueue()->pop();
			workerExecutor.execute( task );
		}

		for( ; NULL != mEventArg->getOutputResultQueue()->top(); ) {
			SP_Message * msg = (SP_Message*)mEventArg->getOutputResultQueue()->pop();

			void ** arg = ( void** )malloc( sizeof( void * ) * 2 );
			arg[ 0 ] = (void*)mCompletionHandler;
			arg[ 1 ] = (void*)msg;

			actExecutor.execute( outputCompleted, arg );
		}
	}

	sp_syslog( LOG_NOTICE, "Dispatcher is shutdown." );

	return 0;
}

typedef struct tagSP_PushArg {
	int mType;      // 0 : fd, 1 : timer

	// for push fd
	int mFd;
	SP_Handler * mHandler;
	SP_IOChannel * mIOChannel;
	int mNeedStart;

	// for push timer
	struct timeval mTimeout;
	struct event mTimerEvent;
	SP_TimerHandler * mTimerHandler;
	SP_EventArg * mEventArg;
	void * mPushQueue;
} SP_PushArg_t;

void SP_Dispatcher :: onPush( void * queueData, void * arg )
{
	SP_PushArg_t * pushArg = (SP_PushArg_t*)queueData;
	SP_EventArg * eventArg = (SP_EventArg*)arg;

	if( 0 == pushArg->mType ) {
		SP_Sid_t sid;
		sid.mKey = eventArg->getSessionManager()->allocKey( &sid.mSeq );
		assert( sid.mKey > 0 );

		SP_Session * session = new SP_Session( sid );

		char clientIP[ 32 ] = { 0 };
		{
			struct sockaddr_in clientAddr;
			socklen_t clientLen = sizeof( clientAddr );
			getpeername( pushArg->mFd, (struct sockaddr *)&clientAddr, &clientLen );
			SP_IOUtils::inetNtoa( &( clientAddr.sin_addr ), clientIP, sizeof( clientIP ) );
			session->getRequest()->setClientPort( ntohs( clientAddr.sin_port ) );
		}
		session->getRequest()->setClientIP( clientIP );


		eventArg->getSessionManager()->put( sid.mKey, sid.mSeq, session );

		session->setHandler( pushArg->mHandler );
		session->setIOChannel( pushArg->mIOChannel );
		session->setArg( eventArg );

		event_set( session->getReadEvent(), pushArg->mFd, EV_READ,
				SP_EventCallback::onRead, session );
		event_set( session->getWriteEvent(), pushArg->mFd, EV_WRITE,
				SP_EventCallback::onWrite, session );

		if( pushArg->mNeedStart ) {
			SP_EventHelper::doStart( session );
		} else {
			SP_EventCallback::addEvent( session, EV_WRITE, pushArg->mFd );
			SP_EventCallback::addEvent( session, EV_READ, pushArg->mFd );
		}

		free( pushArg );
	} else {
		event_set( &( pushArg->mTimerEvent ), -1, 0, onTimer, pushArg );
		event_base_set( eventArg->getEventBase(), &( pushArg->mTimerEvent ) );
		event_add( &( pushArg->mTimerEvent ), &( pushArg->mTimeout ) );
	}
}

int SP_Dispatcher :: push( int fd, SP_Handler * handler, int needStart )
{
	SP_IOChannel * ioChannel = new SP_DefaultIOChannel();
	return push( fd, handler, ioChannel, needStart );
}

int SP_Dispatcher :: push( int fd, SP_Handler * handler,
		SP_IOChannel * ioChannel, int needStart )
{
	SP_PushArg_t * arg = (SP_PushArg_t*)malloc( sizeof( SP_PushArg_t ) );
	arg->mType = 0;
	arg->mFd = fd;
	arg->mHandler = handler;
	arg->mIOChannel = ioChannel;
	arg->mNeedStart = needStart;

	SP_IOUtils::setNonblock( fd );

	return msgqueue_push( (struct event_msgqueue*)mPushQueue, arg );
}

void SP_Dispatcher :: onTimer( int, short, void * arg )
{
	SP_PushArg_t * pushArg = (SP_PushArg_t*)arg;

	pushArg->mEventArg->getInputResultQueue()->push(
		new SP_SimpleTask( timer, pushArg, 1 ) );
}

void SP_Dispatcher :: timer( void * arg )
{
	SP_PushArg_t * pushArg = (SP_PushArg_t*)arg;
	SP_TimerHandler * handler = pushArg->mTimerHandler;
	SP_EventArg * eventArg = pushArg->mEventArg;

	SP_Sid_t sid;
	sid.mKey = SP_Sid_t::eTimerKey;
	sid.mSeq = SP_Sid_t::eTimerSeq;
	SP_Response * response = new SP_Response( sid );
	if( 0 == handler->handle( response, &( pushArg->mTimeout ) ) ) {
		msgqueue_push( (struct event_msgqueue*)pushArg->mPushQueue, arg );
	} else {
		delete pushArg->mTimerHandler;
		free( pushArg );
	}

	msgqueue_push( (struct event_msgqueue*)eventArg->getResponseQueue(), response );
}

int SP_Dispatcher :: push( const struct timeval * timeout, SP_TimerHandler * handler )
{
	SP_PushArg_t * arg = (SP_PushArg_t*)malloc( sizeof( SP_PushArg_t ) );

	arg->mType = 1;
	arg->mTimeout = *timeout;
	arg->mTimerHandler = handler;
	arg->mEventArg = mEventArg;
	arg->mPushQueue = mPushQueue;

	return msgqueue_push( (struct event_msgqueue*)mPushQueue, arg );
}

int SP_Dispatcher :: push( SP_Response * response )
{
	return msgqueue_push( (struct event_msgqueue*)mEventArg->getResponseQueue(), response );
}