提交 b8af812008f149c9bf666e415e0a4ef9d92489ab

作者 LJH 李佳桓
1 个父辈 1f50a4b4

add

正在显示 1 个修改的文件 包含 288 行增加0 行删除
  1 +/*
  2 + * Copyright 2008 Stephen Liu
  3 + * For license terms, see the file COPYING along with this library.
  4 + */
  5 +
  6 +#include <stdlib.h>
  7 +
  8 +#include "spiocpevent.hpp"
  9 +#include "sputils.hpp"
  10 +#include "spsession.hpp"
  11 +
  12 +SP_IocpEventHeap :: SP_IocpEventHeap()
  13 +{
  14 + mEntries = NULL;
  15 + mMaxCount = mCount = 0;
  16 +}
  17 +
  18 +SP_IocpEventHeap :: ~SP_IocpEventHeap()
  19 +{
  20 + if( NULL != mEntries ) free( mEntries );
  21 + mEntries = NULL;
  22 + mMaxCount = mCount = 0;
  23 +}
  24 +
  25 +int SP_IocpEventHeap :: getCount()
  26 +{
  27 + return mCount;
  28 +}
  29 +
  30 +int SP_IocpEventHeap :: push( SP_IocpEvent_t * item )
  31 +{
  32 + if( 0 != reserve( mCount + 1 ) ) return -1;
  33 +
  34 + shiftUp( mCount++, item );
  35 +
  36 + return 0;
  37 +}
  38 +
  39 +SP_IocpEvent_t * SP_IocpEventHeap :: top()
  40 +{
  41 + return mCount ? mEntries[ 0 ] : NULL;
  42 +}
  43 +
  44 +SP_IocpEvent_t * SP_IocpEventHeap :: pop()
  45 +{
  46 + if( mCount ) {
  47 + SP_IocpEvent_t * ret = mEntries[ 0 ];
  48 + shiftDown( 0, mEntries[ --mCount ] );
  49 + ret->mHeapIndex = -1;
  50 +
  51 + return ret;
  52 + }
  53 +
  54 + return NULL;
  55 +}
  56 +
  57 +int SP_IocpEventHeap :: erase( SP_IocpEvent_t * item )
  58 +{
  59 + if( -1 != item->mHeapIndex ) {
  60 + shiftDown( item->mHeapIndex, mEntries[ -- mCount ] );
  61 + item->mHeapIndex = -1;
  62 +
  63 + return 0;
  64 + }
  65 +
  66 + return -1;
  67 +}
  68 +
  69 +int SP_IocpEventHeap :: reserve( int count )
  70 +{
  71 + if( mMaxCount < count ) {
  72 + int maxCount = mMaxCount ? mMaxCount * 2 : 8;
  73 + if( maxCount < count ) maxCount = count;
  74 +
  75 + SP_IocpEvent_t ** p = (SP_IocpEvent_t**)realloc( mEntries, maxCount * sizeof( SP_IocpEvent_t ) );
  76 + if( NULL == p ) return -1;
  77 +
  78 + mEntries = p;
  79 + mMaxCount = maxCount;
  80 + }
  81 +
  82 + return 0;
  83 +
  84 +}
  85 +
  86 +int SP_IocpEventHeap :: isGreater( SP_IocpEvent_t * item1, SP_IocpEvent_t * item2 )
  87 +{
  88 + if( item1->mTimeout.tv_sec == item2->mTimeout.tv_sec ) {
  89 + return item1->mTimeout.tv_usec > item2->mTimeout.tv_usec;
  90 + } else {
  91 + return item1->mTimeout.tv_sec > item2->mTimeout.tv_sec;
  92 + }
  93 +}
  94 +
  95 +void SP_IocpEventHeap :: shiftUp( int index, SP_IocpEvent_t * item )
  96 +{
  97 + int parent = ( index - 1 ) / 2;
  98 +
  99 + for( ; index && isGreater( mEntries[ parent ], item ); ) {
  100 + mEntries[ index ] = mEntries[ parent ];
  101 + mEntries[ index ]->mHeapIndex = index;
  102 + index = parent;
  103 + parent = ( index - 1 ) / 2;
  104 + }
  105 + mEntries[ index ] = item;
  106 + item->mHeapIndex = index;
  107 +}
  108 +
  109 +void SP_IocpEventHeap :: shiftDown( int index, SP_IocpEvent_t * item )
  110 +{
  111 + int minChild = 2 * ( index + 1 );
  112 + for( ; minChild <= mCount; ) {
  113 + minChild -= ( minChild == mCount || isGreater( mEntries[ minChild ], mEntries[ minChild - 1 ] ) );
  114 + if( ! isGreater( item, mEntries[ minChild ] ) ) break;
  115 +
  116 + mEntries[ index ] = mEntries[ minChild ];
  117 + mEntries[ index ]->mHeapIndex = index;
  118 + index = minChild;
  119 + minChild = 2 * ( index + 1 );
  120 + }
  121 + shiftUp( index, item );
  122 +}
  123 +
  124 +//===================================================================
  125 +
  126 +SP_IocpMsgQueue :: SP_IocpMsgQueue( HANDLE completionPort,
  127 + DWORD completionKey, QueueFunc_t func, void * arg )
  128 +{
  129 + mCompletionPort = completionPort;
  130 + mCompletionKey = completionKey;
  131 + mFunc = func;
  132 + mArg = arg;
  133 +
  134 + mMutex = CreateMutex( NULL, FALSE, NULL );
  135 + mQueue = new SP_CircleQueue();
  136 +}
  137 +
  138 +SP_IocpMsgQueue :: ~SP_IocpMsgQueue()
  139 +{
  140 + CloseHandle( mMutex );
  141 + delete mQueue;
  142 + mQueue = NULL;
  143 +}
  144 +
  145 +int SP_IocpMsgQueue :: push( void * queueData )
  146 +{
  147 + WaitForSingleObject( mMutex, INFINITE );
  148 +
  149 + mQueue->push( queueData );
  150 + if( 1 == mQueue->getLength() ) {
  151 + PostQueuedCompletionStatus( mCompletionPort, 0, mCompletionKey, (OVERLAPPED*)this );
  152 + }
  153 +
  154 + ReleaseMutex( mMutex );
  155 +
  156 + return 0;
  157 +}
  158 +
  159 +int SP_IocpMsgQueue :: process()
  160 +{
  161 + WaitForSingleObject( mMutex, INFINITE );
  162 +
  163 + for( ; mQueue->getLength() > 0; ) {
  164 + void * queueData = mQueue->pop();
  165 +
  166 + ReleaseMutex( mMutex );
  167 +
  168 + mFunc( queueData, mArg );
  169 +
  170 + WaitForSingleObject( mMutex, INFINITE );
  171 + }
  172 +
  173 + ReleaseMutex( mMutex );
  174 +
  175 + return 0;
  176 +}
  177 +
  178 +//===================================================================
  179 +
  180 +SP_IocpEventArg :: SP_IocpEventArg( int timeout )
  181 +{
  182 + mInputResultQueue = new SP_BlockingQueue();
  183 + mOutputResultQueue = new SP_BlockingQueue();
  184 +
  185 + mResponseQueue = NULL;
  186 +
  187 + mSessionManager = new SP_SessionManager();
  188 +
  189 + mEventHeap = new SP_IocpEventHeap();
  190 +
  191 + mTimeout = timeout;
  192 +
  193 + mCompletionPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0 );
  194 + if( NULL == mCompletionPort ) {
  195 + sp_syslog( LOG_ERR, "CreateIoCompletionPort failed, errno %d", WSAGetLastError() );
  196 + }
  197 +
  198 + mDisconnectExFunc = NULL;
  199 +}
  200 +
  201 +SP_IocpEventArg :: ~SP_IocpEventArg()
  202 +{
  203 + if( NULL != mInputResultQueue ) delete mInputResultQueue;
  204 + mInputResultQueue = NULL;
  205 +
  206 + if( NULL != mOutputResultQueue ) delete mOutputResultQueue;
  207 + mOutputResultQueue = NULL;
  208 +
  209 + if( NULL != mEventHeap ) delete mEventHeap;
  210 + mEventHeap = NULL;
  211 +
  212 + if( NULL != mResponseQueue ) delete mResponseQueue;
  213 + mResponseQueue = NULL;
  214 +
  215 + if( NULL != mSessionManager ) delete mSessionManager;
  216 + mSessionManager = NULL;
  217 +}
  218 +
  219 +HANDLE SP_IocpEventArg :: getCompletionPort()
  220 +{
  221 + return mCompletionPort;
  222 +}
  223 +
  224 +SP_BlockingQueue * SP_IocpEventArg :: getInputResultQueue()
  225 +{
  226 + return mInputResultQueue;
  227 +}
  228 +
  229 +SP_BlockingQueue * SP_IocpEventArg :: getOutputResultQueue()
  230 +{
  231 + return mOutputResultQueue;
  232 +}
  233 +
  234 +void SP_IocpEventArg :: setResponseQueue( SP_IocpMsgQueue * responseQueue )
  235 +{
  236 + mResponseQueue = responseQueue;
  237 +}
  238 +
  239 +SP_IocpMsgQueue * SP_IocpEventArg :: getResponseQueue()
  240 +{
  241 + return mResponseQueue;
  242 +}
  243 +
  244 +SP_SessionManager * SP_IocpEventArg :: getSessionManager()
  245 +{
  246 + return mSessionManager;
  247 +}
  248 +
  249 +SP_IocpEventHeap * SP_IocpEventArg :: getEventHeap()
  250 +{
  251 + return mEventHeap;
  252 +}
  253 +
  254 +void SP_IocpEventArg :: setTimeout( int timeout )
  255 +{
  256 + mTimeout = timeout;
  257 +}
  258 +
  259 +int SP_IocpEventArg :: getTimeout()
  260 +{
  261 + return mTimeout;
  262 +}
  263 +
  264 +int SP_IocpEventArg :: loadDisconnectEx( SOCKET fd )
  265 +{
  266 + LPFN_DISCONNECTEX fnDisConnectEx = NULL;
  267 + GUID guidDisConnectEx = WSAID_DISCONNECTEX;
  268 + DWORD dwByte;
  269 + ::WSAIoctl( fd, SIO_GET_EXTENSION_FUNCTION_POINTER,
  270 + &guidDisConnectEx, sizeof(guidDisConnectEx),
  271 + &fnDisConnectEx, sizeof(fnDisConnectEx),
  272 + &dwByte, NULL, NULL);
  273 +
  274 + mDisconnectExFunc = fnDisConnectEx;
  275 +
  276 + return NULL != mDisconnectExFunc ? 0 : -1;
  277 +}
  278 +
  279 +BOOL SP_IocpEventArg :: disconnectEx( SOCKET fd, LPOVERLAPPED lpOverlapped,
  280 + DWORD dwFlags, DWORD reserved )
  281 +{
  282 + LPFN_DISCONNECTEX fnDisConnectEx = (LPFN_DISCONNECTEX)mDisconnectExFunc;
  283 + if( NULL != fnDisConnectEx ) {
  284 + return fnDisConnectEx( fd, lpOverlapped, dwFlags, reserved );
  285 + }
  286 +
  287 + return FALSE;
  288 +}
... ...
注册登录 后发表评论