返回> 网站首页 

[转载]7种网络编程I/O模型代码实现实例之四

yoours2015-05-10 18:46:30 阅读 1627

简介一边听听音乐,一边写写文章。


BOOL CIOCPServer::PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
    // 设置I/O类型
    pBuffer->nOperation = OP_READ;

    ::EnterCriticalSection(&pContext->Lock);
    // 设置序列号
    pBuffer->nSequenceNumber = pContext->nReadSequence;

    // 投递此重叠I/O
    DWORD dwBytes;
    DWORD dwFlags = 0;
    WSABUF buf;
    buf.buf = pBuffer->buff;
    buf.len = pBuffer->nLen;
    if(::WSARecv(pContext->s, &buf, 1, &dwBytes, &dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
    {
        if(::WSAGetLastError() != WSA_IO_PENDING)
        {
            printf("WSARecv出错:%d\n",WSAGetLastError());
            ::LeaveCriticalSection(&pContext->Lock);
            return FALSE;
        }
    }

    // 增加套节字上的重叠I/O计数和读序列号计数
    pContext->nOutstandingRecv ++;
    pContext->nReadSequence ++;
    ::LeaveCriticalSection(&pContext->Lock);

    return TRUE;
}

BOOL CIOCPServer::PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
    // 跟踪投递的发送的数量,防止用户仅发送数据而不接收,导致服务器抛出大量发送操作
    if(pContext->nOutstandingSend > m_nMaxSends)
        return FALSE;

    // 设置I/O类型,增加套节字上的重叠I/O计数
    pBuffer->nOperation = OP_WRITE;

    // 投递此重叠I/O
    DWORD dwBytes;
    DWORD dwFlags = 0;
    WSABUF buf;
    buf.buf = pBuffer->buff;
    buf.len = pBuffer->nLen;
    if(::WSASend(pContext->s,
            &buf, 1, &dwBytes, dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
    {
        int x;
        if((x=::WSAGetLastError()) != WSA_IO_PENDING)
        {
            printf("发送失败!错误码:%d",x);
            return FALSE;
        }
    }
    // 增加套节字上的重叠I/O计数
    ::EnterCriticalSection(&pContext->Lock);
    pContext->nOutstandingSend ++;
    ::LeaveCriticalSection(&pContext->Lock);

    if(pBuffer->nOperation == 0)
    {
        int x = 0;
    }
    return TRUE;
}

BOOL CIOCPServer::Start(int nPort, int nMaxConnections,
            int nMaxFreeBuffers, int nMaxFreeContexts, int nInitialReads)
{
    // 检查服务是否已经启动
    if(m_bServerStarted)
        return FALSE;

    // 保存用户参数
    m_nPort = nPort;
    m_nMaxConnections = nMaxConnections;
    m_nMaxFreeBuffers = nMaxFreeBuffers;
    m_nMaxFreeContexts = nMaxFreeContexts;
    m_nInitialReads = nInitialReads;

    // 初始化状态变量
    m_bShutDown = FALSE;
    m_bServerStarted = TRUE;

    // 创建监听套节字,绑定到本地端口,进入监听模式
    m_sListen = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    SOCKADDR_IN si;
    si.sin_family = AF_INET;
    si.sin_port = ::ntohs(m_nPort);
    si.sin_addr.S_un.S_addr = INADDR_ANY;
    if(::bind(m_sListen, (sockaddr*)&si, sizeof(si)) == SOCKET_ERROR)
    {
        m_bServerStarted = FALSE;
        return FALSE;
    }
    ::listen(m_sListen, 200);

    // 创建完成端口对象
    m_hCompletion = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);

    // 加载扩展函数AcceptEx
    GUID GuidAcceptEx = WSAID_ACCEPTEX;
    DWORD dwBytes;
    ::WSAIoctl(m_sListen,
        SIO_GET_EXTENSION_FUNCTION_POINTER,
        &GuidAcceptEx,
        sizeof(GuidAcceptEx),
        &m_lpfnAcceptEx,
        sizeof(m_lpfnAcceptEx),
        &dwBytes,
        NULL,
        NULL);

    // 加载扩展函数GetAcceptExSockaddrs
    GUID GuidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
    ::WSAIoctl(m_sListen,
        SIO_GET_EXTENSION_FUNCTION_POINTER,
        &GuidGetAcceptExSockaddrs,
        sizeof(GuidGetAcceptExSockaddrs),
        &m_lpfnGetAcceptExSockaddrs,
        sizeof(m_lpfnGetAcceptExSockaddrs),
        &dwBytes,
        NULL,
        NULL
        );

    // 将监听套节字关联到完成端口,注意,这里为它传递的CompletionKey为0
    ::CreateIoCompletionPort((HANDLE)m_sListen, m_hCompletion, (DWORD)0, 0);

    // 注册FD_ACCEPT事件。
    // 如果投递的AcceptEx I/O不够,线程会接收到FD_ACCEPT网络事件,说明应该投递更多的AcceptEx I/O
    WSAEventSelect(m_sListen, m_hAcceptEvent, FD_ACCEPT);

    // 创建监听线程
    m_hListenThread = ::CreateThread(NULL, 0, _ListenThreadProc, this, 0, NULL);
    return TRUE;
}

void CIOCPServer::Shutdown()
{
    if(!m_bServerStarted)
        return;

    // 通知监听线程,马上停止服务
    m_bShutDown = TRUE;
    ::SetEvent(m_hAcceptEvent);
    // 等待监听线程退出
    ::WaitForSingleObject(m_hListenThread, INFINITE);
    ::CloseHandle(m_hListenThread);
    m_hListenThread = NULL;
    m_bServerStarted = FALSE;
}

DWORD WINAPI CIOCPServer::_ListenThreadProc(LPVOID lpParam)
{
    CIOCPServer *pThis = (CIOCPServer*)lpParam;

    // 先在监听套节字上投递几个Accept I/O
    CIOCPBuffer *pBuffer;
    for(int i=0; i<pThis->m_nInitialAccepts; i++)
    {
        pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);//xss,BUFFER_SIZE
        if(pBuffer == NULL)
            return -1;
        pThis->InsertPendingAccept(pBuffer);
        pThis->PostAccept(pBuffer);
    }

    // 构建事件对象数组,以便在上面调用WSAWaitForMultipleEvents函数
    HANDLE hWaitEvents[2 + MAX_THREAD];
    int nEventCount = 0;
    hWaitEvents[nEventCount ++] = pThis->m_hAcceptEvent;
    hWaitEvents[nEventCount ++] = pThis->m_hRepostEvent;

    // 创建指定数量的工作线程在完成端口上处理I/O
    for(int i=0; i<MAX_THREAD; i++)
    {
        hWaitEvents[nEventCount ++] = ::CreateThread(NULL, 0, _WorkerThreadProc, pThis, 0, NULL);
    }

    // 下面进入无限循环,处理事件对象数组中的事件
    while(TRUE)
    {
        int nIndex = ::WSAWaitForMultipleEvents(nEventCount, hWaitEvents, FALSE, 60*1000, FALSE);
        // 首先检查是否要停止服务
        if(pThis->m_bShutDown || nIndex == WSA_WAIT_FAILED)
        {
            // 关闭所有连接
            pThis->CloseAllConnections();
            ::Sleep(0);     // 给I/O工作线程一个执行的机会
            // 关闭监听套节字
            ::closesocket(pThis->m_sListen);
            pThis->m_sListen = INVALID_SOCKET;
            ::Sleep(0);     // 给I/O工作线程一个执行的机会

            // 通知所有I/O处理线程退出
            for(int i=2; i<MAX_THREAD + 2; i++)
            {
                ::PostQueuedCompletionStatus(pThis->m_hCompletion, -1, 0, NULL);
            }

            // 等待I/O处理线程退出
            ::WaitForMultipleObjects(MAX_THREAD, &hWaitEvents[2], TRUE, 5*1000);
            for(int i=2; i<MAX_THREAD + 2; i++)
            {
                ::CloseHandle(hWaitEvents[i]);
            }

            ::CloseHandle(pThis->m_hCompletion);

            pThis->FreeBuffers();
            pThis->FreeContexts();
            ::ExitThread(0);
        }

        // 1)定时检查所有未返回的AcceptEx I/O的连接建立了多长时间
        if(nIndex == WSA_WAIT_TIMEOUT)
        {
            pBuffer = pThis->m_pPendingAccepts;
            while(pBuffer != NULL)
            {
                int nSeconds;
                int nLen = sizeof(nSeconds);
                // 取得连接建立的时间
                ::getsockopt(pBuffer->sClient,
                    SOL_SOCKET, SO_CONNECT_TIME, (char *)&nSeconds, &nLen);
                // 如果超过2分钟客户还不发送初始数据,就让这个客户go away
                if(nSeconds != -1 && nSeconds > /*2*60*/50)
                {
                    closesocket(pBuffer->sClient);
                    pBuffer->sClient = INVALID_SOCKET;
                }

                pBuffer = pBuffer->pNext;
            }
        }else{
            nIndex = nIndex - WAIT_OBJECT_0;
            WSANETWORKEVENTS ne;
            int nLimit=0;
            if(nIndex == 0)         // 2)m_hAcceptEvent事件对象受信,说明投递的Accept请求不够,需要增加
            {
                ::WSAEnumNetworkEvents(pThis->m_sListen, hWaitEvents[nIndex], &ne);
                if(ne.lNetworkEvents & FD_ACCEPT)
                {
                    nLimit = 50;  // 增加的个数,这里设为50个
                }
            }else if(nIndex == 1){
// 3)m_hRepostEvent事件对象受信,说明处理I/O的线程接受到新的客户
                nLimit = InterlockedExchange(&pThis->m_nRepostCount, 0);
            }else if(nIndex > 1){
// I/O服务线程退出,说明有错误发生,关闭服务器
                pThis->m_bShutDown = TRUE;
                continue;
            }

            // 投递nLimit个AcceptEx I/O请求
            int i = 0;
            while(i++ < nLimit && pThis->m_nPendingAcceptCount < pThis->m_nMaxAccepts)
            {
                pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);
                if(pBuffer != NULL)
                {
                    pThis->InsertPendingAccept(pBuffer);
                    pThis->PostAccept(pBuffer);
                }
            }
        }
    }
    return 0;
}

DWORD WINAPI CIOCPServer::_WorkerThreadProc(LPVOID lpParam)
{
#ifdef _DEBUG
            ::OutputDebugString("   WorkerThread 启动... \n");
#endif // _DEBUG

    CIOCPServer *pThis = (CIOCPServer*)lpParam;
    CIOCPBuffer *pBuffer = NULL;
    DWORD dwKey;
    DWORD dwTrans;
    LPOVERLAPPED lpol;

    while(TRUE)
    {
        // 在关联到此完成端口的所有套节字上等待I/O完成
        BOOL bOK = ::GetQueuedCompletionStatus(pThis->m_hCompletion,
                    &dwTrans, (LPDWORD)&dwKey, (LPOVERLAPPED*)&lpol, WSA_INFINITE);

        if(dwTrans == -1) // 用户通知退出
        {
#ifdef _DEBUG
            ::OutputDebugString("   WorkerThread 退出 \n");
#endif // _DEBUG
            ::ExitThread(0);
        }
        if(dwTrans != -2)
            pBuffer = CONTAINING_RECORD(lpol, CIOCPBuffer, ol);
        int nError = NO_ERROR;
        if(!bOK)                        // 在此套节字上有错误发生
        {
            printf("完成端口套接字上有错误:%d\n",GetLastError());
            SOCKET s;
            if(pBuffer->nOperation == OP_ACCEPT)
            {
                s = pThis->m_sListen;
            }else{
                if(dwKey == 0)
                    break;
                s = ((CIOCPContext*)dwKey)->s;
            }
            DWORD dwFlags = 0;
            if(!::WSAGetOverlappedResult(s, &pBuffer->ol, &dwTrans, FALSE, &dwFlags))
            {
                nError = ::WSAGetLastError();
            }
        }
        pThis->HandleIO(dwKey, pBuffer, dwTrans, nError);
        printf("Buffer:%d     Context:%d\n",iBufferCount,iContextCount);
    }

#ifdef _DEBUG
            ::OutputDebugString("   WorkerThread 退出 \n");
#endif // _DEBUG
    return 0;
}

int g_x = 0;
void CIOCPServer::HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError)
{
    CIOCPContext *pContext = (CIOCPContext *)dwKey;

#ifdef _DEBUG
            ::OutputDebugString("   HandleIO... \n");
#endif // _DEBUG

    // 1)首先减少套节字上的未决I/O计数
    if(dwTrans == -2)
    {
        CloseAConnection(pContext);
        return;
    }
    if(pContext != NULL)
    {
        ::EnterCriticalSection(&pContext->Lock);
        if(pBuffer->nOperation == OP_READ)
            pContext->nOutstandingRecv --;
        else if(pBuffer->nOperation == OP_WRITE)
            pContext->nOutstandingSend --;
        ::LeaveCriticalSection(&pContext->Lock);

        // 2)检查套节字是否已经被我们关闭
        if(pContext->bClosing)
        {
#ifdef _DEBUG
            ::OutputDebugString("   检查到套节字已经被我们关闭 \n");
#endif // _DEBUG
            if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
            {
                ReleaseContext(pContext);
                pContext = NULL;
            }
            // 释放已关闭套节字的未决I/O
            ReleaseBuffer(pBuffer);
            pBuffer = NULL;
            return;
        }
    }else{
        RemovePendingAccept(pBuffer);
    }

    // 3)检查套节字上发生的错误,如果有的话,通知用户,然后关闭套节字
    if(nError != NO_ERROR)
    {
        if(pBuffer->nOperation != OP_ACCEPT)
        {
            OnConnectionError(pContext, pBuffer, nError);
            CloseAConnection(pContext);
            if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
            {
                ReleaseContext(pContext);
                pContext = NULL;
            }
#ifdef _DEBUG
            ::OutputDebugString("   检查到客户套节字上发生错误 \n");
#endif // _DEBUG
        }else{
// 在监听套节字上发生错误,也就是监听套节字处理的客户出错了
            // 客户端出错,释放I/O缓冲区
            if(pBuffer->sClient != INVALID_SOCKET)
            {
                ::closesocket(pBuffer->sClient);
                pBuffer->sClient = INVALID_SOCKET;
            }
#ifdef _DEBUG
            ::OutputDebugString("   检查到监听套节字上发生错误 \n");
#endif // _DEBUG
        }

        ReleaseBuffer(pBuffer);
        pBuffer = NULL;
        return;
    }

    // 开始处理
    if(pBuffer->nOperation == OP_ACCEPT)
    {
        if(dwTrans == 0)
        {
#ifdef _DEBUG
            ::OutputDebugString("   监听套节字上客户端关闭 \n");
#endif // _DEBUG

            if(pBuffer->sClient != INVALID_SOCKET)
            {
                ::closesocket(pBuffer->sClient);
                pBuffer->sClient = INVALID_SOCKET;
            }
        }else{
            // 为新接受的连接申请客户上下文对象
            CIOCPContext *pClient = AllocateContext(pBuffer->sClient);
            if(pClient != NULL)
            {
                if(AddAConnection(pClient))
                {
                    // 取得客户地址
                    int nLocalLen, nRmoteLen;
                    LPSOCKADDR pLocalAddr, pRemoteAddr;
                    m_lpfnGetAcceptExSockaddrs(
                        pBuffer->buff,
                        pBuffer->nLen - (sizeof(sockaddr_in) + 16) * 2/*sizeof(cmd_header)*/,
                        sizeof(sockaddr_in) + 16,
                        sizeof(sockaddr_in) + 16,
                        (SOCKADDR **)&pLocalAddr,
                        &nLocalLen,
                        (SOCKADDR **)&pRemoteAddr,
                        &nRmoteLen);
                    memcpy(&pClient->addrLocal, pLocalAddr, nLocalLen);
                    memcpy(&pClient->addrRemote, pRemoteAddr, nRmoteLen);

                    // 关联新连接到完成端口对象
                    ::CreateIoCompletionPort((HANDLE)pClient->s, m_hCompletion, (DWORD)pClient, 0);

                    // 通知用户
                    pBuffer->nLen = dwTrans;
                    OnConnectionEstablished(pClient, pBuffer);

                    if(pClient->bClosing && pClient->nOutstandingRecv == 0 && pClient->nOutstandingSend == 0)
                    {
                        ReleaseContext(pClient);
                        pContext = NULL;
                    }else if(pClient->hTimer == NULL){
//接收一个客户端的同时创建一个检测I/O超时的Timer
                        pClient->hCompletion = m_hCompletion;
                        CreateTimerQueueTimer(&pClient->hTimer,m_hTimerQueue,(WAITORTIMERCALLBACK)TimerRoutine,(PVOID)pClient,60*1000,0,0);
                    }

                    // 向新连接投递Read请求或者Write请求,直接关闭这些空间在套节字关闭或出错时释放
//                      CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
//                      if(p != NULL)
//                      {
//                          if(!PostRecv(pClient, p))
//                          {
//                              CloseAConnection(pClient);
//                          }
//                      }
                }else{
// 连接数量已满,关闭连接
                    CloseAConnection(pClient);
                    ReleaseContext(pClient);
                    pContext = NULL;
                }
            }else{
                // 资源不足,关闭与客户的连接即可
                ::closesocket(pBuffer->sClient);
                pBuffer->sClient = INVALID_SOCKET;
            }
        }

        // Accept请求完成,释放I/O缓冲区
        ReleaseBuffer(pBuffer);
        pBuffer = NULL;

        // 通知监听线程继续再投递一个Accept请求
        ::InterlockedIncrement(&m_nRepostCount);
        ::SetEvent(m_hRepostEvent);
    }else if(pBuffer->nOperation == OP_READ){
        if(dwTrans == 0)    // 对方关闭套节字
        {
            // 先通知用户
            pBuffer->nLen = 0;
            OnConnectionClosing(pContext, pBuffer);
            // 再关闭连接
            CloseAConnection(pContext);
            // 释放客户上下文和缓冲区对象
            if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
            {
                ReleaseContext(pContext);
                pContext = NULL;
            }
            ReleaseBuffer(pBuffer);
            pBuffer = NULL;
        }else{
            pBuffer->nLen = dwTrans;
            // 按照I/O投递的顺序读取接收到的数据
            CIOCPBuffer *p = GetNextReadBuffer(pContext, pBuffer);
            while(p != NULL)
            {
                // 通知用户
                OnReadCompleted(pContext, p);
                // 增加要读的序列号的值
                ::InterlockedIncrement((LONG*)&pContext->nCurrentReadSequence);
                // 释放这个已完成的I/O
                ReleaseBuffer(p);
                p = GetNextReadBuffer(pContext, NULL);
            }

            if(pContext->bClosing && pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
            {
                ReleaseContext(pContext);
                pContext = NULL;
            }else if(pContext->hTimer != NULL){
                ChangeTimerQueueTimer(m_hTimerQueue,pContext->hTimer,60*1000,0);//重置监视时间,当一个投递完成后,60s内无任何交互则断开。
            }

            // 继续投递一个新的接收请求
         //   pBuffer = AllocateBuffer(BUFFER_SIZE);
            //if(pBuffer == NULL || !PostRecv(pContext, pBuffer))
            //{
            //  CloseAConnection(pContext);
            //}
        }
    }else if(pBuffer->nOperation == OP_WRITE){
        if(dwTrans == 0)    // 对方关闭套节字
        {
            // 先通知用户
            pBuffer->nLen = 0;
            OnConnectionClosing(pContext, pBuffer);

            // 再关闭连接
            CloseAConnection(pContext);

            // 释放客户上下文和缓冲区对象
            if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
            {
                ReleaseContext(pContext);
                pContext = NULL;
            }
            ReleaseBuffer(pBuffer);
            pBuffer = NULL;
        }else{
            if(pContext->bClosing && pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
            {
                ReleaseContext(pContext);
                pContext = NULL;
                ReleaseBuffer(pBuffer);
                pBuffer = NULL;
                return;
            }else if(pContext->hTimer != NULL){
                ChangeTimerQueueTimer(m_hTimerQueue,pContext->hTimer,60*1000,0);
            }

            // 写操作完成,通知用户
            if(dwTrans < pBuffer->nLen)//如果此send没有发送完全,则发送剩下的部分(此部分如果还是没发完全,这里同样进行)
            {
                printf("send未发送完全,发送:%d,总长度:%d\n",dwTrans,pBuffer->nLen);
                CIOCPBuffer* p = AllocateBuffer(pBuffer->nLen - dwTrans);
                if(p != NULL)
                    memcpy(p->buff,pBuffer->buff + dwTrans,pBuffer->nLen - dwTrans);
                if(p == NULL || !PostSend(pContext,p))
                {
                    CloseAConnection(pContext);
                    return;
                }
            }else{
                if(!PostNextWriteBuffer(pContext,pBuffer))
                {
                    CloseAConnection(pContext);
                    return;
                }
            }
            pBuffer->nLen = dwTrans;
            OnWriteCompleted(pContext, pBuffer);
            if(pContext->bClosing && pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
            {
                ReleaseContext(pContext);
                pContext = NULL;
            }
            // 释放SendText函数申请的缓冲区
            ReleaseBuffer(pBuffer);
            pBuffer = NULL;
        }
    }
}

BOOL CIOCPServer::SendText(CIOCPContext *pContext, char *pszText, int nLen)
{
    CIOCPBuffer *pBuffer = AllocateBuffer(nLen);
    if(pBuffer != NULL)
    {
        memcpy(pBuffer->buff, pszText, nLen);
        return PostSend(pContext, pBuffer);
    }
    return FALSE;
}

//投递接收请求示例
//CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
//if(p != NULL)
//{
//  if(!PostRecv(pContext, p))
//  {
//      CloseAConnection(pContext);
//  }
//}
//投递发送请求示例
//CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
//if(p != NULL)
//{
//  if(!PostSendToList(pContext, p))
//  {
//      CloseAConnection(pContext);
//  }
//}
void CIOCPServer::OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
    //连接建立,且第一次数据接收完成。
    //接下来可以根据业务逻辑,PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接
}

void CIOCPServer::OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
}

void CIOCPServer::OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
    //一次数据接收完成。
    //接下来可以根据业务逻辑,PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接
}

void CIOCPServer::OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
    //一次数据发送完成。
    //接下来可以根据业务逻辑,PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接
}

void CIOCPServer::OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError)
{
}

微信小程序扫码登陆

文章评论

1627人参与,0条评论