返回> 网站首页 

[转载]7种网络编程I/O模型代码实现实例之三

yoours2015-05-10 18:45:36 阅读 1656

简介一边听听音乐,一边写写文章。

七,IOCP
大框架为书中例子,对强化了发送操作,部分异常处理,且加入了连接超时处理。
注意:当一个投递完成,且对应socket上已经没有未决的投递,必须要再投递一个请求或者关闭连接,否则socket对应的数据结构无法被释放,对应socket连接断开时也无法被
检测到。所以如果业务逻辑结束,要关闭连接。或者你需要等客户端来断开连接,那么你可以在业务逻辑结束后,再投递一个接收请求(客户端断开时,接收请求返回且接收的字节数为0,则此类中的异常处理逻辑便会将资源清理掉)。
头文件

// IOCP.h文件

#ifndef __IOCP_H__
#define __IOCP_H__

#include <winsock2.h>
#include <windows.h>
#include <Mswsock.h>

#define BUFFER_SIZE 1024*4      // I/O请求的缓冲区大小
#define MAX_THREAD  1           // I/O服务线程的数量

// 这是per-I/O数据。它包含了在套节字上处理I/O操作的必要信息
struct CIOCPBuffer
{
    CIOCPBuffer()
    {
        memset(&ol,0,sizeof(WSAOVERLAPPED));
        sClient = INVALID_SOCKET;
        memset(buff,0,BUFFER_SIZE);
        nLen = 0;
        nSequenceNumber = 0;
        bIsReleased = FALSE;
        nOperation = 0;
        pNext = NULL;
    }
    WSAOVERLAPPED ol;

    SOCKET sClient;         // AcceptEx接收的客户方套节字
    char buff[BUFFER_SIZE];             // I/O操作使用的缓冲区
    int nLen;               // buff缓冲区(使用的)大小
    ULONG nSequenceNumber;  // 此I/O的序列号
    BOOL  bIsReleased;
    int nOperation;         // 操作类型

#define OP_ACCEPT   1
#define OP_WRITE    2
#define OP_READ     3

    CIOCPBuffer *pNext;
};
struct CIOCPNextToSend;
struct CIOCPTimerData;
// 这是per-Handle数据。它包含了一个套节字的信息
struct CIOCPContext
{
    CIOCPContext()
    {
        s = INVALID_SOCKET;
        memset(&addrLocal,0,sizeof(SOCKADDR_IN));
        memset(&addrRemote,0,sizeof(SOCKADDR_IN));
        bClosing = FALSE;
        nOutstandingRecv = 0;
        nOutstandingSend = 0;
        nReadSequence = 0;
        nCurrentReadSequence = 0;
        nCurrentStep = 0;
        pOutOfOrderReads = NULL;
        pNextToSend = NULL;
        bIsReleased = FALSE;
        pNext = NULL;
        pPreData = NULL;
        strcpy(szClientName,"");
        hTimer = NULL;
        hCompletion = NULL;
    }
    CIOCPBuffer m_pBuffer;
    SOCKET s;                       // 套节字句柄

    SOCKADDR_IN addrLocal;          // 连接的本地地址
    SOCKADDR_IN addrRemote;         // 连接的远程地址

    BOOL bClosing;                  // 套节字是否关闭

    int nOutstandingRecv;           // 此套节字上抛出的重叠操作的数量
    int nOutstandingSend;

    ULONG nReadSequence;            // 安排给接收的下一个序列号
    ULONG nCurrentReadSequence;     // 当前要读的序列号

    CIOCPBuffer *pOutOfOrderReads;  // 记录没有按顺序完成的读I/O
    CIOCPNextToSend *pNextToSend;       //xss,按顺序发送的下一个要发送的。

    LPVOID pPreData; //xss,用于2个过程之间的数据交流。
    ULONG  nCurrentStep;//xss,用于记录当前处于的过程步骤数。
    BOOL   bIsReleased;

    CRITICAL_SECTION Lock;          // 保护这个结构
    CIOCPContext *pNext;

    char szClientName[256];//xss
    HANDLE hTimer;//xss
    HANDLE hCompletion;//xss
};

struct CIOCPNextToSend//xss
{
    CIOCPBuffer * pBuffer;
    CIOCPNextToSend * pNext;
};

struct CIOCPTimerData
{
    CIOCPContext* pContext;
    HANDLE hCompletion;
};

class CIOCPServer   // 处理线程
{
public:
    CIOCPServer();
    ~CIOCPServer();

    // 开始服务
    BOOL Start(int nPort = 3456, int nMaxConnections = 2000,
            int nMaxFreeBuffers = 200, int nMaxFreeContexts = 100, int nInitialReads = 4);
    // 停止服务
    void Shutdown();

    // 关闭一个连接和关闭所有连接
    void CloseAConnection(CIOCPContext *pContext);
    void CloseAllConnections();

    // 取得当前的连接数量
    ULONG GetCurrentConnection() { return m_nCurrentConnection; }

    // 向指定客户发送文本
    BOOL SendText(CIOCPContext *pContext, char *pszText, int nLen);

protected:
    // 申请和释放缓冲区对象
    CIOCPBuffer *AllocateBuffer(int nLen);
    void ReleaseBuffer(CIOCPBuffer *pBuffer);

    // 申请和释放套节字上下文
    CIOCPContext *AllocateContext(SOCKET s);
    void ReleaseContext(CIOCPContext *pContext);

    // 释放空闲缓冲区对象列表和空闲上下文对象列表
    void FreeBuffers();
    void FreeContexts();

    // 向连接列表中添加一个连接
    BOOL AddAConnection(CIOCPContext *pContext);

    // 插入和移除未决的接受请求
    BOOL InsertPendingAccept(CIOCPBuffer *pBuffer);
    BOOL RemovePendingAccept(CIOCPBuffer *pBuffer);

    //xss,把要发送的数据加入队列,按顺序发送
    BOOL PostSendToList(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
    //xss,发送下一个需要发送的
    BOOL PostNextWriteBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
    // 取得下一个要读取的
    CIOCPBuffer *GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer);

    void ErrorHandle(CIOCPContext *pContext, CIOCPBuffer *pBuffer);//xss,错误集中处理
    // 投递接受I/O、发送I/O、接收I/O
    BOOL PostAccept(CIOCPBuffer *pBuffer);
    BOOL PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
    BOOL PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
    BOOL PostRecv2(CIOCPContext *pContext, CIOCPBuffer *pBuffer);

    void HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError);

    // 事件通知函数
    // 建立了一个新的连接
    virtual void OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
    // 一个连接关闭
    virtual void OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
    // 在一个连接上发生了错误
    virtual void OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError);
    // 一个连接上的读操作完成
    virtual void OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
    // 一个连接上的写操作完成
    virtual void OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);

protected:
    // 记录空闲结构信息
    CIOCPBuffer *m_pFreeBufferList;
    CIOCPContext *m_pFreeContextList;
    int m_nFreeBufferCount;
    int m_nFreeContextCount;
    CRITICAL_SECTION m_FreeBufferListLock;
    CRITICAL_SECTION m_FreeContextListLock;

    CRITICAL_SECTION m_HeapLock;
    CRITICAL_SECTION m_RepostLock;

    // 记录抛出的Accept请求
    CIOCPBuffer *m_pPendingAccepts;   // 抛出请求列表。
    long m_nPendingAcceptCount;
    CRITICAL_SECTION m_PendingAcceptsLock;

    // 记录连接列表
    CIOCPContext *m_pConnectionList;
    int m_nCurrentConnection;
    CRITICAL_SECTION m_ConnectionListLock;

    // 用于投递Accept请求
    HANDLE m_hAcceptEvent;
    HANDLE m_hRepostEvent;
    LONG m_nRepostCount;

    int m_nPort;                // 服务器监听的端口
    int m_nInitialAccepts;
    int m_nInitialReads;
    int m_nMaxAccepts;
    int m_nMaxSends;
    int m_nMaxFreeBuffers;
    int m_nMaxFreeContexts;
    int m_nMaxConnections;

    HANDLE m_hListenThread;         // 监听线程
    HANDLE m_hCompletion;           // 完成端口句柄
    SOCKET m_sListen;               // 监听套节字句柄
    LPFN_ACCEPTEX m_lpfnAcceptEx;   // AcceptEx函数地址
    LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockaddrs; // GetAcceptExSockaddrs函数地址

    BOOL m_bShutDown;       // 用于通知监听线程退出
    BOOL m_bServerStarted;  // 记录服务是否启动
    HANDLE m_hTimerQueue;//xss

private:    // 线程函数
    static DWORD WINAPI _ListenThreadProc(LPVOID lpParam);
    static DWORD WINAPI _WorkerThreadProc(LPVOID lpParam);
};
#endif // __IOCP_H__

cpp文件
//////////////////////////////////////////////////
// IOCP.cpp文件
#define _WIN32_WINNT 0x0500 //xss
#include "iocp.h"
#pragma comment(lib, "WS2_32.lib")
#include <stdio.h>
#include "httpFun.h"

static int iBufferCount = 0;
static int iContextCount = 0;
CIOCPServer::CIOCPServer()
{
    // 列表
    m_pFreeBufferList = NULL;
    m_pFreeContextList = NULL;
    m_pPendingAccepts = NULL;
    m_pConnectionList = NULL;

    m_nFreeBufferCount = 0;
    m_nFreeContextCount = 0;
    m_nPendingAcceptCount = 0;
    m_nCurrentConnection = 0;

    ::InitializeCriticalSection(&m_FreeBufferListLock);
    ::InitializeCriticalSection(&m_FreeContextListLock);
    ::InitializeCriticalSection(&m_PendingAcceptsLock);
    ::InitializeCriticalSection(&m_ConnectionListLock);

    ::InitializeCriticalSection(&m_HeapLock);
    ::InitializeCriticalSection(&m_RepostLock);

    // Accept请求
    m_hAcceptEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
    m_hRepostEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
    m_nRepostCount = 0;

    m_nPort = 8888;

    m_nInitialAccepts = 10;
    m_nInitialReads = 4;
    m_nMaxAccepts = 100;
    m_nMaxSends = 20;
    m_nMaxFreeBuffers = 200;
    m_nMaxFreeContexts = 100;
    m_nMaxConnections = 2000;

    m_hListenThread = NULL;
    m_hCompletion = NULL;
    m_sListen = INVALID_SOCKET;
    m_lpfnAcceptEx = NULL;
    m_lpfnGetAcceptExSockaddrs = NULL;

    m_bShutDown = FALSE;
    m_bServerStarted = FALSE;

    m_hTimerQueue = ::CreateTimerQueue();

    // 初始化WS2_32.dll
    WSADATA wsaData;
    WORD sockVersion = MAKEWORD(2, 2);
    ::WSAStartup(sockVersion, &wsaData);
}

CIOCPServer::~CIOCPServer()
{
    Shutdown();

    if(m_sListen != INVALID_SOCKET)
        ::closesocket(m_sListen);
    if(m_hListenThread != NULL)
        ::CloseHandle(m_hListenThread);

    ::CloseHandle(m_hRepostEvent);
    ::CloseHandle(m_hAcceptEvent);

    ::DeleteCriticalSection(&m_FreeBufferListLock);
    ::DeleteCriticalSection(&m_FreeContextListLock);
    ::DeleteCriticalSection(&m_PendingAcceptsLock);
    ::DeleteCriticalSection(&m_ConnectionListLock);

    ::DeleteCriticalSection(&m_HeapLock);
    ::DeleteCriticalSection(&m_RepostLock);

    ::DeleteTimerQueue(m_hTimerQueue);//xss
    ::WSACleanup();
}

///////////////////////////////////////
static VOID CALLBACK TimerRoutine(PVOID lpParam, BOOLEAN TimerOrWaitFired)
{
    CIOCPContext* pContext = (CIOCPContext*)lpParam;
    if(pContext != NULL && pContext->bClosing == FALSE)
    {
        EnterCriticalSection(&pContext->Lock);
        if(pContext->hCompletion != NULL)
        {
            PostQueuedCompletionStatus(pContext->hCompletion,-2,(ULONG_PTR)pContext,NULL);
        }
        LeaveCriticalSection(&pContext->Lock);
    }
}

///////////////////////////////////
// 自定义帮助函数
CIOCPBuffer *CIOCPServer::AllocateBuffer(int nLen)
{
    CIOCPBuffer *pBuffer = NULL;
    if(nLen > BUFFER_SIZE)
        return NULL;

    // 为缓冲区对象申请内存
    ::EnterCriticalSection(&m_FreeBufferListLock);
    if(m_pFreeBufferList == NULL)  // 内存池为空,申请新的内存
    {
//      pBuffer = (CIOCPBuffer *)::HeapAlloc(GetProcessHeap(),
//                      HEAP_ZERO_MEMORY, sizeof(CIOCPBuffer) + BUFFER_SIZE);
        pBuffer = new CIOCPBuffer();
    }
    else    // 从内存池中取一块来使用
    {
        pBuffer = m_pFreeBufferList;
        m_pFreeBufferList = m_pFreeBufferList->pNext;
        pBuffer->pNext = NULL;
        m_nFreeBufferCount --;
    }
    ::LeaveCriticalSection(&m_FreeBufferListLock);

    EnterCriticalSection(&m_HeapLock);
    iBufferCount++;
    LeaveCriticalSection(&m_HeapLock);

    // 初始化新的缓冲区对象
    if(pBuffer != NULL)
    {
        //pBuffer->buff = (char*)(pBuffer + sizeof(CIOCPBuffer)/*1*/);//xss,个人以为应该+sizeof(CIOCPBuffer);
        pBuffer->nLen = nLen;
        pBuffer->bIsReleased = FALSE;
    }
    return pBuffer;
}

void CIOCPServer::ReleaseBuffer(CIOCPBuffer *pBuffer)
{
    if(pBuffer == NULL || pBuffer->bIsReleased)
        return;

    ::EnterCriticalSection(&m_FreeBufferListLock);
    if(m_nFreeBufferCount <= m_nMaxFreeBuffers)  // 将要释放的内存添加到空闲列表中
    {
        memset(pBuffer, 0, sizeof(CIOCPBuffer) /*+ BUFFER_SIZE*/);
        pBuffer->pNext = m_pFreeBufferList;
        m_pFreeBufferList = pBuffer;
        m_nFreeBufferCount ++ ;
        pBuffer->bIsReleased = TRUE;
    }else{
    // 已经达到最大值,真正的释放内存
        //::HeapFree(::GetProcessHeap(), 0, pBuffer);
        delete pBuffer;
    }

    ::LeaveCriticalSection(&m_FreeBufferListLock);

    EnterCriticalSection(&m_HeapLock);
    iBufferCount--;
    LeaveCriticalSection(&m_HeapLock);
}

CIOCPContext *CIOCPServer::AllocateContext(SOCKET s)
{
    CIOCPContext *pContext;

    // 申请一个CIOCPContext对象
    ::EnterCriticalSection(&m_FreeContextListLock);
    if(m_pFreeContextList == NULL)
    {
        //pContext = (CIOCPContext *)::HeapAlloc(::GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(CIOCPContext));
        pContext = new CIOCPContext();
        ::InitializeCriticalSection(&pContext->Lock);
    }else{
        // 在空闲列表中申请
        pContext = m_pFreeContextList;
        m_pFreeContextList = m_pFreeContextList->pNext;
        pContext->pNext = NULL;
        m_nFreeBufferCount --;
    }
    ::LeaveCriticalSection(&m_FreeContextListLock);

    EnterCriticalSection(&m_HeapLock);
    iContextCount++;
    LeaveCriticalSection(&m_HeapLock);

    // 初始化对象成员
    if(pContext != NULL)
    {
        pContext->s = s;
        pContext->bIsReleased = FALSE;
    }
    return pContext;
}

void CIOCPServer::ReleaseContext(CIOCPContext *pContext)
{
    if(pContext == NULL || pContext->bIsReleased)
        return;

    printf("\n%s释放了Context\n\n",pContext->szClientName);
    if(pContext->s != INVALID_SOCKET)
        ::closesocket(pContext->s);

    // 首先释放(如果有的话)此套节字上的没有按顺序完成的读I/O的缓冲区
    CIOCPBuffer *pNext;
    while(pContext->pOutOfOrderReads != NULL)
    {
        pNext = pContext->pOutOfOrderReads->pNext;
        ReleaseBuffer(pContext->pOutOfOrderReads);
        pContext->pOutOfOrderReads = pNext;
    }

    //xss,再释放(如果有的话)此套接字上未完成的写I/O缓冲区
    CIOCPNextToSend* pSend = NULL;
    while(pContext->pNextToSend != NULL)
    {
        pSend = pContext->pNextToSend->pNext;
        if(pContext->pNextToSend->pBuffer != NULL && pContext->pNextToSend->pBuffer->bIsReleased == FALSE)
        {
            ReleaseBuffer(pContext->pNextToSend->pBuffer);
        }
        delete pContext->pNextToSend;
        pContext->pNextToSend = pSend;
    }

    if(pContext->hTimer != NULL)
    {
        DeleteTimerQueueTimer(m_hTimerQueue,pContext->hTimer,NULL);
        pContext->hTimer = NULL;
    }

    ::EnterCriticalSection(&m_FreeContextListLock);
    if(m_nFreeContextCount <= m_nMaxFreeContexts) // 添加到空闲列表
    {
        // 先将关键代码段变量保存到一个临时变量中
        CRITICAL_SECTION cstmp = pContext->Lock;
        // 将要释放的上下文对象初始化为0
        memset(pContext, 0, sizeof(CIOCPContext));

        // 再放会关键代码段变量,将要释放的上下文对象添加到空闲列表的表头
        pContext->Lock = cstmp;
        pContext->pNext = m_pFreeContextList;
        m_pFreeContextList = pContext;

        // 更新计数
        m_nFreeContextCount ++;
        pContext->bIsReleased = TRUE;
    }else{
        ::DeleteCriticalSection(&pContext->Lock);
        //::HeapFree(::GetProcessHeap(), 0, pContext);
        delete pContext;
    }
    ::LeaveCriticalSection(&m_FreeContextListLock);

    EnterCriticalSection(&m_HeapLock);
    iContextCount--;
    LeaveCriticalSection(&m_HeapLock);
}

void CIOCPServer::FreeBuffers()
{
    // 遍历m_pFreeBufferList空闲列表,释放缓冲区池内存
    ::EnterCriticalSection(&m_FreeBufferListLock);
    CIOCPBuffer *pFreeBuffer = m_pFreeBufferList;
    CIOCPBuffer *pNextBuffer;
    while(pFreeBuffer != NULL)
    {
        pNextBuffer = pFreeBuffer->pNext;
        delete pFreeBuffer;
//      if(!::HeapFree(::GetProcessHeap(), 0, pFreeBuffer))
//      {
// #ifdef _DEBUG
//          ::OutputDebugString("  FreeBuffers释放内存出错!");
// #endif // _DEBUG
//          break;
//      }
        pFreeBuffer = pNextBuffer;
    }
    m_pFreeBufferList = NULL;
    m_nFreeBufferCount = 0;
    ::LeaveCriticalSection(&m_FreeBufferListLock);
}

void CIOCPServer::FreeContexts()
{
    // 遍历m_pFreeContextList空闲列表,释放缓冲区池内存
    ::EnterCriticalSection(&m_FreeContextListLock);
    CIOCPContext *pFreeContext = m_pFreeContextList;
    CIOCPContext *pNextContext;
    while(pFreeContext != NULL)
    {
        pNextContext = pFreeContext->pNext;
        ::DeleteCriticalSection(&pFreeContext->Lock);
        delete pFreeContext;
//      if(!::HeapFree(::GetProcessHeap(), 0, pFreeContext))
//      {
// #ifdef _DEBUG
//          ::OutputDebugString("  FreeBuffers释放内存出错!");
// #endif // _DEBUG
//          break;
//      }
        pFreeContext = pNextContext;
    }
    m_pFreeContextList = NULL;
    m_nFreeContextCount = 0;
    ::LeaveCriticalSection(&m_FreeContextListLock);
}

BOOL CIOCPServer::AddAConnection(CIOCPContext *pContext)
{
    // 向客户连接列表添加一个CIOCPContext对象
    ::EnterCriticalSection(&m_ConnectionListLock);
    if(m_nCurrentConnection <= m_nMaxConnections)
    {
        // 添加到表头
        pContext->pNext = m_pConnectionList;
        m_pConnectionList = pContext;
        // 更新计数
        m_nCurrentConnection ++;
        ::LeaveCriticalSection(&m_ConnectionListLock);
        return TRUE;
    }
    ::LeaveCriticalSection(&m_ConnectionListLock);

    return FALSE;
}

void CIOCPServer::CloseAConnection(CIOCPContext *pContext)
{
    if(pContext == NULL || pContext->bClosing == TRUE)
        return;

    // 首先从列表中移除要关闭的连接
    ::EnterCriticalSection(&m_ConnectionListLock);
    CIOCPContext* pTest = m_pConnectionList;
    if(pTest == pContext)
    {
        m_pConnectionList =  pContext->pNext;
        m_nCurrentConnection --;
    }else{
        while(pTest != NULL && pTest->pNext !=  pContext)
            pTest = pTest->pNext;
        if(pTest != NULL)
        {
            pTest->pNext =  pContext->pNext;
            m_nCurrentConnection --;
        }
    }
    ::LeaveCriticalSection(&m_ConnectionListLock);

    // 然后关闭客户套节字
    ::EnterCriticalSection(&pContext->Lock);
    if(pContext->s != INVALID_SOCKET)
    {
        ::closesocket(pContext->s);
        pContext->s = INVALID_SOCKET;
    }
    pContext->bClosing = TRUE;
    ::LeaveCriticalSection(&pContext->Lock);
}

void CIOCPServer::CloseAllConnections()
{
    // 遍历整个连接列表,关闭所有的客户套节字
    ::EnterCriticalSection(&m_ConnectionListLock);
    CIOCPContext *pContext = m_pConnectionList;
    while(pContext != NULL)
    {
        ::EnterCriticalSection(&pContext->Lock);
        if(pContext->s != INVALID_SOCKET)
        {
            ::closesocket(pContext->s);
            pContext->s = INVALID_SOCKET;
        }

        pContext->bClosing = TRUE;
        ::LeaveCriticalSection(&pContext->Lock);
        pContext = pContext->pNext;
    }

    m_pConnectionList = NULL;
    m_nCurrentConnection = 0;
    ::LeaveCriticalSection(&m_ConnectionListLock);
}

BOOL CIOCPServer::InsertPendingAccept(CIOCPBuffer *pBuffer)
{
    // 将一个I/O缓冲区对象插入到m_pPendingAccepts表中
    ::EnterCriticalSection(&m_PendingAcceptsLock);
    if(m_pPendingAccepts == NULL)
        m_pPendingAccepts = pBuffer;
    else{
        pBuffer->pNext = m_pPendingAccepts;
        m_pPendingAccepts = pBuffer;
    }
    m_nPendingAcceptCount ++;
    ::LeaveCriticalSection(&m_PendingAcceptsLock);
    return TRUE;
}

BOOL CIOCPServer::RemovePendingAccept(CIOCPBuffer *pBuffer)
{
    BOOL bResult = FALSE;
    // 遍历m_pPendingAccepts表,从中移除pBuffer所指向的缓冲区对象
    ::EnterCriticalSection(&m_PendingAcceptsLock);
    CIOCPBuffer *pTest = m_pPendingAccepts;
    if(pTest == pBuffer)    // 如果是表头元素
    {
        m_pPendingAccepts = pBuffer->pNext;
        bResult = TRUE;
    }else{
        // 不是表头元素的话,就要遍历这个表来查找了
        while(pTest != NULL && pTest->pNext != pBuffer)
            pTest = pTest->pNext;
        if(pTest != NULL)
        {
            pTest->pNext = pBuffer->pNext;
             bResult = TRUE;
        }
    }
    // 更新计数
    if(bResult)
        m_nPendingAcceptCount --;
    ::LeaveCriticalSection(&m_PendingAcceptsLock);
    return  bResult;
}

void CIOCPServer::ErrorHandle(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
    CloseAConnection(pContext);
}

BOOL CIOCPServer::PostSendToList(CIOCPContext *pContext, CIOCPBuffer *pBuffer)//xss
{
    ::EnterCriticalSection(&pContext->Lock);
    CIOCPNextToSend *ptr = pContext->pNextToSend;
    CIOCPNextToSend * pSend = new CIOCPNextToSend();
    pSend->pBuffer = pBuffer;
    pSend->pNext = NULL;
    if(ptr == NULL)
    {
        printf("数据:%10.10s ...,被直接发送。\n",pBuffer->buff);
        //::EnterCriticalSection(&pContext->Lock);
        pContext->pNextToSend = pSend;
        //::LeaveCriticalSection(&pContext->Lock);
        if(!PostSend(pContext,pBuffer))//如果没有需要等待的send就直接发送
        {
            ::LeaveCriticalSection(&pContext->Lock);
            return FALSE;
        }
    }else{
        printf("数据:%10.10s ...,被放入链表结尾。\n",pBuffer->buff);
        while(ptr->pNext != NULL)
        {
            ptr = ptr->pNext;
        }
        ptr->pNext = pSend;//新的发送请求放在链表结尾
    }
    ::LeaveCriticalSection(&pContext->Lock);
    return TRUE;
}

BOOL CIOCPServer::PostNextWriteBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)//xss
{
    ::EnterCriticalSection(&pContext->Lock);
    CIOCPNextToSend* pSend = pContext->pNextToSend;
    CIOCPNextToSend* pNextSend = NULL;
    if(pSend != NULL && pSend->pNext != NULL)//发送成功的pBuffer是队列的第一个,发送下一个,pNextToSend指向下一个,pBuffer由外面释放。
    {
        pNextSend = pSend->pNext;
        if(pNextSend->pBuffer != NULL)
        {
            printf("数据:%10.10s ...从链表中弹出被发送。\n",pNextSend->pBuffer->buff);
            if(!PostSend(pContext,pNextSend->pBuffer))
            {
                delete pSend;
                pContext->pNextToSend = pNextSend;
                ::LeaveCriticalSection(&pContext->Lock);
                return FALSE;
            }
        }
    }
    if(pSend != NULL)
    {
        pNextSend = pSend->pNext;
        delete pSend;
        pContext->pNextToSend = pNextSend;
    }
    ::LeaveCriticalSection(&pContext->Lock);
    return TRUE;
}

CIOCPBuffer *CIOCPServer::GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
    if(pBuffer != NULL)
    {
        // 如果与要读的下一个序列号相等,则读这块缓冲区
        if(pBuffer->nSequenceNumber == pContext->nCurrentReadSequence)
        {
            return pBuffer;
        }

        // 如果不相等,则说明没有按顺序接收数据,将这块缓冲区保存到连接的pOutOfOrderReads列表中
        // 列表中的缓冲区是按照其序列号从小到大的顺序排列的
        pBuffer->pNext = NULL;

        CIOCPBuffer *ptr = pContext->pOutOfOrderReads;
        CIOCPBuffer *pPre = NULL;
        while(ptr != NULL)
        {
            if(pBuffer->nSequenceNumber < ptr->nSequenceNumber)
                break;

            pPre = ptr;
            ptr = ptr->pNext;
        }

        if(pPre == NULL) // 应该插入到表头
        {
            pBuffer->pNext = pContext->pOutOfOrderReads;
            pContext->pOutOfOrderReads = pBuffer;
        }else{
// 应该插入到表的中间
            pBuffer->pNext = pPre->pNext;
            pPre->pNext = pBuffer/*->pNext*/;//xss,个人觉得应该是pPre->pNext = pBuffer;
        }
    }

    // 检查表头元素的序列号,如果与要读的序列号一致,就将它从表中移除,返回给用户
    CIOCPBuffer *ptr = pContext->pOutOfOrderReads;
    if(ptr != NULL && (ptr->nSequenceNumber == pContext->nCurrentReadSequence))
    {
        pContext->pOutOfOrderReads = ptr->pNext;
        return ptr;
    }
    return NULL;
}

BOOL CIOCPServer::PostAccept(CIOCPBuffer *pBuffer)  // 在监听套节字上投递Accept请求
{
        // 设置I/O类型
        pBuffer->nOperation = OP_ACCEPT;

        // 投递此重叠I/O
        DWORD dwBytes;
        pBuffer->sClient = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
        BOOL b = m_lpfnAcceptEx(m_sListen,
            pBuffer->sClient,
            pBuffer->buff,
            pBuffer->nLen - ((sizeof(sockaddr_in) + 16) * 2),//xss,第一次都是收一个cmd_header
            sizeof(sockaddr_in) + 16,
            sizeof(sockaddr_in) + 16,
            &dwBytes,
            &pBuffer->ol);
        if(!b && ::WSAGetLastError() != WSA_IO_PENDING)
        {
            return FALSE;
        }
        if(pBuffer->nOperation == 0)
        {
            int x = 0;
        }
        return TRUE;
};

微信小程序扫码登陆

文章评论

1656人参与,0条评论