返回> 网站首页
[转载]7种网络编程I/O模型代码实现实例之三
yoours2015-05-10 18:45:36
简介一边听听音乐,一边写写文章。
七,IOCP
大框架为书中例子,对强化了发送操作,部分异常处理,且加入了连接超时处理。
注意:当一个投递完成,且对应socket上已经没有未决的投递,必须要再投递一个请求或者关闭连接,否则socket对应的数据结构无法被释放,对应socket连接断开时也无法被
检测到。所以如果业务逻辑结束,要关闭连接。或者你需要等客户端来断开连接,那么你可以在业务逻辑结束后,再投递一个接收请求(客户端断开时,接收请求返回且接收的字节数为0,则此类中的异常处理逻辑便会将资源清理掉)。
头文件
// IOCP.h文件
#ifndef __IOCP_H__
#define __IOCP_H__
#include <winsock2.h>
#include <windows.h>
#include <Mswsock.h>
#define BUFFER_SIZE 1024*4 // I/O请求的缓冲区大小
#define MAX_THREAD 1 // I/O服务线程的数量
// 这是per-I/O数据。它包含了在套节字上处理I/O操作的必要信息
struct CIOCPBuffer
{
CIOCPBuffer()
{
memset(&ol,0,sizeof(WSAOVERLAPPED));
sClient = INVALID_SOCKET;
memset(buff,0,BUFFER_SIZE);
nLen = 0;
nSequenceNumber = 0;
bIsReleased = FALSE;
nOperation = 0;
pNext = NULL;
}
WSAOVERLAPPED ol;
SOCKET sClient; // AcceptEx接收的客户方套节字
char buff[BUFFER_SIZE]; // I/O操作使用的缓冲区
int nLen; // buff缓冲区(使用的)大小
ULONG nSequenceNumber; // 此I/O的序列号
BOOL bIsReleased;
int nOperation; // 操作类型
#define OP_ACCEPT 1
#define OP_WRITE 2
#define OP_READ 3
CIOCPBuffer *pNext;
};
struct CIOCPNextToSend;
struct CIOCPTimerData;
// 这是per-Handle数据。它包含了一个套节字的信息
struct CIOCPContext
{
CIOCPContext()
{
s = INVALID_SOCKET;
memset(&addrLocal,0,sizeof(SOCKADDR_IN));
memset(&addrRemote,0,sizeof(SOCKADDR_IN));
bClosing = FALSE;
nOutstandingRecv = 0;
nOutstandingSend = 0;
nReadSequence = 0;
nCurrentReadSequence = 0;
nCurrentStep = 0;
pOutOfOrderReads = NULL;
pNextToSend = NULL;
bIsReleased = FALSE;
pNext = NULL;
pPreData = NULL;
strcpy(szClientName,"");
hTimer = NULL;
hCompletion = NULL;
}
CIOCPBuffer m_pBuffer;
SOCKET s; // 套节字句柄
SOCKADDR_IN addrLocal; // 连接的本地地址
SOCKADDR_IN addrRemote; // 连接的远程地址
BOOL bClosing; // 套节字是否关闭
int nOutstandingRecv; // 此套节字上抛出的重叠操作的数量
int nOutstandingSend;
ULONG nReadSequence; // 安排给接收的下一个序列号
ULONG nCurrentReadSequence; // 当前要读的序列号
CIOCPBuffer *pOutOfOrderReads; // 记录没有按顺序完成的读I/O
CIOCPNextToSend *pNextToSend; //xss,按顺序发送的下一个要发送的。
LPVOID pPreData; //xss,用于2个过程之间的数据交流。
ULONG nCurrentStep;//xss,用于记录当前处于的过程步骤数。
BOOL bIsReleased;
CRITICAL_SECTION Lock; // 保护这个结构
CIOCPContext *pNext;
char szClientName[256];//xss
HANDLE hTimer;//xss
HANDLE hCompletion;//xss
};
struct CIOCPNextToSend//xss
{
CIOCPBuffer * pBuffer;
CIOCPNextToSend * pNext;
};
struct CIOCPTimerData
{
CIOCPContext* pContext;
HANDLE hCompletion;
};
class CIOCPServer // 处理线程
{
public:
CIOCPServer();
~CIOCPServer();
// 开始服务
BOOL Start(int nPort = 3456, int nMaxConnections = 2000,
int nMaxFreeBuffers = 200, int nMaxFreeContexts = 100, int nInitialReads = 4);
// 停止服务
void Shutdown();
// 关闭一个连接和关闭所有连接
void CloseAConnection(CIOCPContext *pContext);
void CloseAllConnections();
// 取得当前的连接数量
ULONG GetCurrentConnection() { return m_nCurrentConnection; }
// 向指定客户发送文本
BOOL SendText(CIOCPContext *pContext, char *pszText, int nLen);
protected:
// 申请和释放缓冲区对象
CIOCPBuffer *AllocateBuffer(int nLen);
void ReleaseBuffer(CIOCPBuffer *pBuffer);
// 申请和释放套节字上下文
CIOCPContext *AllocateContext(SOCKET s);
void ReleaseContext(CIOCPContext *pContext);
// 释放空闲缓冲区对象列表和空闲上下文对象列表
void FreeBuffers();
void FreeContexts();
// 向连接列表中添加一个连接
BOOL AddAConnection(CIOCPContext *pContext);
// 插入和移除未决的接受请求
BOOL InsertPendingAccept(CIOCPBuffer *pBuffer);
BOOL RemovePendingAccept(CIOCPBuffer *pBuffer);
//xss,把要发送的数据加入队列,按顺序发送
BOOL PostSendToList(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
//xss,发送下一个需要发送的
BOOL PostNextWriteBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
// 取得下一个要读取的
CIOCPBuffer *GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
void ErrorHandle(CIOCPContext *pContext, CIOCPBuffer *pBuffer);//xss,错误集中处理
// 投递接受I/O、发送I/O、接收I/O
BOOL PostAccept(CIOCPBuffer *pBuffer);
BOOL PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
BOOL PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
BOOL PostRecv2(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
void HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError);
// 事件通知函数
// 建立了一个新的连接
virtual void OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
// 一个连接关闭
virtual void OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
// 在一个连接上发生了错误
virtual void OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError);
// 一个连接上的读操作完成
virtual void OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
// 一个连接上的写操作完成
virtual void OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
protected:
// 记录空闲结构信息
CIOCPBuffer *m_pFreeBufferList;
CIOCPContext *m_pFreeContextList;
int m_nFreeBufferCount;
int m_nFreeContextCount;
CRITICAL_SECTION m_FreeBufferListLock;
CRITICAL_SECTION m_FreeContextListLock;
CRITICAL_SECTION m_HeapLock;
CRITICAL_SECTION m_RepostLock;
// 记录抛出的Accept请求
CIOCPBuffer *m_pPendingAccepts; // 抛出请求列表。
long m_nPendingAcceptCount;
CRITICAL_SECTION m_PendingAcceptsLock;
// 记录连接列表
CIOCPContext *m_pConnectionList;
int m_nCurrentConnection;
CRITICAL_SECTION m_ConnectionListLock;
// 用于投递Accept请求
HANDLE m_hAcceptEvent;
HANDLE m_hRepostEvent;
LONG m_nRepostCount;
int m_nPort; // 服务器监听的端口
int m_nInitialAccepts;
int m_nInitialReads;
int m_nMaxAccepts;
int m_nMaxSends;
int m_nMaxFreeBuffers;
int m_nMaxFreeContexts;
int m_nMaxConnections;
HANDLE m_hListenThread; // 监听线程
HANDLE m_hCompletion; // 完成端口句柄
SOCKET m_sListen; // 监听套节字句柄
LPFN_ACCEPTEX m_lpfnAcceptEx; // AcceptEx函数地址
LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockaddrs; // GetAcceptExSockaddrs函数地址
BOOL m_bShutDown; // 用于通知监听线程退出
BOOL m_bServerStarted; // 记录服务是否启动
HANDLE m_hTimerQueue;//xss
private: // 线程函数
static DWORD WINAPI _ListenThreadProc(LPVOID lpParam);
static DWORD WINAPI _WorkerThreadProc(LPVOID lpParam);
};
#endif // __IOCP_H__
cpp文件
//////////////////////////////////////////////////
// IOCP.cpp文件
#define _WIN32_WINNT 0x0500 //xss
#include "iocp.h"
#pragma comment(lib, "WS2_32.lib")
#include <stdio.h>
#include "httpFun.h"
static int iBufferCount = 0;
static int iContextCount = 0;
CIOCPServer::CIOCPServer()
{
// 列表
m_pFreeBufferList = NULL;
m_pFreeContextList = NULL;
m_pPendingAccepts = NULL;
m_pConnectionList = NULL;
m_nFreeBufferCount = 0;
m_nFreeContextCount = 0;
m_nPendingAcceptCount = 0;
m_nCurrentConnection = 0;
::InitializeCriticalSection(&m_FreeBufferListLock);
::InitializeCriticalSection(&m_FreeContextListLock);
::InitializeCriticalSection(&m_PendingAcceptsLock);
::InitializeCriticalSection(&m_ConnectionListLock);
::InitializeCriticalSection(&m_HeapLock);
::InitializeCriticalSection(&m_RepostLock);
// Accept请求
m_hAcceptEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
m_hRepostEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
m_nRepostCount = 0;
m_nPort = 8888;
m_nInitialAccepts = 10;
m_nInitialReads = 4;
m_nMaxAccepts = 100;
m_nMaxSends = 20;
m_nMaxFreeBuffers = 200;
m_nMaxFreeContexts = 100;
m_nMaxConnections = 2000;
m_hListenThread = NULL;
m_hCompletion = NULL;
m_sListen = INVALID_SOCKET;
m_lpfnAcceptEx = NULL;
m_lpfnGetAcceptExSockaddrs = NULL;
m_bShutDown = FALSE;
m_bServerStarted = FALSE;
m_hTimerQueue = ::CreateTimerQueue();
// 初始化WS2_32.dll
WSADATA wsaData;
WORD sockVersion = MAKEWORD(2, 2);
::WSAStartup(sockVersion, &wsaData);
}
CIOCPServer::~CIOCPServer()
{
Shutdown();
if(m_sListen != INVALID_SOCKET)
::closesocket(m_sListen);
if(m_hListenThread != NULL)
::CloseHandle(m_hListenThread);
::CloseHandle(m_hRepostEvent);
::CloseHandle(m_hAcceptEvent);
::DeleteCriticalSection(&m_FreeBufferListLock);
::DeleteCriticalSection(&m_FreeContextListLock);
::DeleteCriticalSection(&m_PendingAcceptsLock);
::DeleteCriticalSection(&m_ConnectionListLock);
::DeleteCriticalSection(&m_HeapLock);
::DeleteCriticalSection(&m_RepostLock);
::DeleteTimerQueue(m_hTimerQueue);//xss
::WSACleanup();
}
///////////////////////////////////////
static VOID CALLBACK TimerRoutine(PVOID lpParam, BOOLEAN TimerOrWaitFired)
{
CIOCPContext* pContext = (CIOCPContext*)lpParam;
if(pContext != NULL && pContext->bClosing == FALSE)
{
EnterCriticalSection(&pContext->Lock);
if(pContext->hCompletion != NULL)
{
PostQueuedCompletionStatus(pContext->hCompletion,-2,(ULONG_PTR)pContext,NULL);
}
LeaveCriticalSection(&pContext->Lock);
}
}
///////////////////////////////////
// 自定义帮助函数
CIOCPBuffer *CIOCPServer::AllocateBuffer(int nLen)
{
CIOCPBuffer *pBuffer = NULL;
if(nLen > BUFFER_SIZE)
return NULL;
// 为缓冲区对象申请内存
::EnterCriticalSection(&m_FreeBufferListLock);
if(m_pFreeBufferList == NULL) // 内存池为空,申请新的内存
{
// pBuffer = (CIOCPBuffer *)::HeapAlloc(GetProcessHeap(),
// HEAP_ZERO_MEMORY, sizeof(CIOCPBuffer) + BUFFER_SIZE);
pBuffer = new CIOCPBuffer();
}
else // 从内存池中取一块来使用
{
pBuffer = m_pFreeBufferList;
m_pFreeBufferList = m_pFreeBufferList->pNext;
pBuffer->pNext = NULL;
m_nFreeBufferCount --;
}
::LeaveCriticalSection(&m_FreeBufferListLock);
EnterCriticalSection(&m_HeapLock);
iBufferCount++;
LeaveCriticalSection(&m_HeapLock);
// 初始化新的缓冲区对象
if(pBuffer != NULL)
{
//pBuffer->buff = (char*)(pBuffer + sizeof(CIOCPBuffer)/*1*/);//xss,个人以为应该+sizeof(CIOCPBuffer);
pBuffer->nLen = nLen;
pBuffer->bIsReleased = FALSE;
}
return pBuffer;
}
void CIOCPServer::ReleaseBuffer(CIOCPBuffer *pBuffer)
{
if(pBuffer == NULL || pBuffer->bIsReleased)
return;
::EnterCriticalSection(&m_FreeBufferListLock);
if(m_nFreeBufferCount <= m_nMaxFreeBuffers) // 将要释放的内存添加到空闲列表中
{
memset(pBuffer, 0, sizeof(CIOCPBuffer) /*+ BUFFER_SIZE*/);
pBuffer->pNext = m_pFreeBufferList;
m_pFreeBufferList = pBuffer;
m_nFreeBufferCount ++ ;
pBuffer->bIsReleased = TRUE;
}else{
// 已经达到最大值,真正的释放内存
//::HeapFree(::GetProcessHeap(), 0, pBuffer);
delete pBuffer;
}
::LeaveCriticalSection(&m_FreeBufferListLock);
EnterCriticalSection(&m_HeapLock);
iBufferCount--;
LeaveCriticalSection(&m_HeapLock);
}
CIOCPContext *CIOCPServer::AllocateContext(SOCKET s)
{
CIOCPContext *pContext;
// 申请一个CIOCPContext对象
::EnterCriticalSection(&m_FreeContextListLock);
if(m_pFreeContextList == NULL)
{
//pContext = (CIOCPContext *)::HeapAlloc(::GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(CIOCPContext));
pContext = new CIOCPContext();
::InitializeCriticalSection(&pContext->Lock);
}else{
// 在空闲列表中申请
pContext = m_pFreeContextList;
m_pFreeContextList = m_pFreeContextList->pNext;
pContext->pNext = NULL;
m_nFreeBufferCount --;
}
::LeaveCriticalSection(&m_FreeContextListLock);
EnterCriticalSection(&m_HeapLock);
iContextCount++;
LeaveCriticalSection(&m_HeapLock);
// 初始化对象成员
if(pContext != NULL)
{
pContext->s = s;
pContext->bIsReleased = FALSE;
}
return pContext;
}
void CIOCPServer::ReleaseContext(CIOCPContext *pContext)
{
if(pContext == NULL || pContext->bIsReleased)
return;
printf("\n%s释放了Context\n\n",pContext->szClientName);
if(pContext->s != INVALID_SOCKET)
::closesocket(pContext->s);
// 首先释放(如果有的话)此套节字上的没有按顺序完成的读I/O的缓冲区
CIOCPBuffer *pNext;
while(pContext->pOutOfOrderReads != NULL)
{
pNext = pContext->pOutOfOrderReads->pNext;
ReleaseBuffer(pContext->pOutOfOrderReads);
pContext->pOutOfOrderReads = pNext;
}
//xss,再释放(如果有的话)此套接字上未完成的写I/O缓冲区
CIOCPNextToSend* pSend = NULL;
while(pContext->pNextToSend != NULL)
{
pSend = pContext->pNextToSend->pNext;
if(pContext->pNextToSend->pBuffer != NULL && pContext->pNextToSend->pBuffer->bIsReleased == FALSE)
{
ReleaseBuffer(pContext->pNextToSend->pBuffer);
}
delete pContext->pNextToSend;
pContext->pNextToSend = pSend;
}
if(pContext->hTimer != NULL)
{
DeleteTimerQueueTimer(m_hTimerQueue,pContext->hTimer,NULL);
pContext->hTimer = NULL;
}
::EnterCriticalSection(&m_FreeContextListLock);
if(m_nFreeContextCount <= m_nMaxFreeContexts) // 添加到空闲列表
{
// 先将关键代码段变量保存到一个临时变量中
CRITICAL_SECTION cstmp = pContext->Lock;
// 将要释放的上下文对象初始化为0
memset(pContext, 0, sizeof(CIOCPContext));
// 再放会关键代码段变量,将要释放的上下文对象添加到空闲列表的表头
pContext->Lock = cstmp;
pContext->pNext = m_pFreeContextList;
m_pFreeContextList = pContext;
// 更新计数
m_nFreeContextCount ++;
pContext->bIsReleased = TRUE;
}else{
::DeleteCriticalSection(&pContext->Lock);
//::HeapFree(::GetProcessHeap(), 0, pContext);
delete pContext;
}
::LeaveCriticalSection(&m_FreeContextListLock);
EnterCriticalSection(&m_HeapLock);
iContextCount--;
LeaveCriticalSection(&m_HeapLock);
}
void CIOCPServer::FreeBuffers()
{
// 遍历m_pFreeBufferList空闲列表,释放缓冲区池内存
::EnterCriticalSection(&m_FreeBufferListLock);
CIOCPBuffer *pFreeBuffer = m_pFreeBufferList;
CIOCPBuffer *pNextBuffer;
while(pFreeBuffer != NULL)
{
pNextBuffer = pFreeBuffer->pNext;
delete pFreeBuffer;
// if(!::HeapFree(::GetProcessHeap(), 0, pFreeBuffer))
// {
// #ifdef _DEBUG
// ::OutputDebugString(" FreeBuffers释放内存出错!");
// #endif // _DEBUG
// break;
// }
pFreeBuffer = pNextBuffer;
}
m_pFreeBufferList = NULL;
m_nFreeBufferCount = 0;
::LeaveCriticalSection(&m_FreeBufferListLock);
}
void CIOCPServer::FreeContexts()
{
// 遍历m_pFreeContextList空闲列表,释放缓冲区池内存
::EnterCriticalSection(&m_FreeContextListLock);
CIOCPContext *pFreeContext = m_pFreeContextList;
CIOCPContext *pNextContext;
while(pFreeContext != NULL)
{
pNextContext = pFreeContext->pNext;
::DeleteCriticalSection(&pFreeContext->Lock);
delete pFreeContext;
// if(!::HeapFree(::GetProcessHeap(), 0, pFreeContext))
// {
// #ifdef _DEBUG
// ::OutputDebugString(" FreeBuffers释放内存出错!");
// #endif // _DEBUG
// break;
// }
pFreeContext = pNextContext;
}
m_pFreeContextList = NULL;
m_nFreeContextCount = 0;
::LeaveCriticalSection(&m_FreeContextListLock);
}
BOOL CIOCPServer::AddAConnection(CIOCPContext *pContext)
{
// 向客户连接列表添加一个CIOCPContext对象
::EnterCriticalSection(&m_ConnectionListLock);
if(m_nCurrentConnection <= m_nMaxConnections)
{
// 添加到表头
pContext->pNext = m_pConnectionList;
m_pConnectionList = pContext;
// 更新计数
m_nCurrentConnection ++;
::LeaveCriticalSection(&m_ConnectionListLock);
return TRUE;
}
::LeaveCriticalSection(&m_ConnectionListLock);
return FALSE;
}
void CIOCPServer::CloseAConnection(CIOCPContext *pContext)
{
if(pContext == NULL || pContext->bClosing == TRUE)
return;
// 首先从列表中移除要关闭的连接
::EnterCriticalSection(&m_ConnectionListLock);
CIOCPContext* pTest = m_pConnectionList;
if(pTest == pContext)
{
m_pConnectionList = pContext->pNext;
m_nCurrentConnection --;
}else{
while(pTest != NULL && pTest->pNext != pContext)
pTest = pTest->pNext;
if(pTest != NULL)
{
pTest->pNext = pContext->pNext;
m_nCurrentConnection --;
}
}
::LeaveCriticalSection(&m_ConnectionListLock);
// 然后关闭客户套节字
::EnterCriticalSection(&pContext->Lock);
if(pContext->s != INVALID_SOCKET)
{
::closesocket(pContext->s);
pContext->s = INVALID_SOCKET;
}
pContext->bClosing = TRUE;
::LeaveCriticalSection(&pContext->Lock);
}
void CIOCPServer::CloseAllConnections()
{
// 遍历整个连接列表,关闭所有的客户套节字
::EnterCriticalSection(&m_ConnectionListLock);
CIOCPContext *pContext = m_pConnectionList;
while(pContext != NULL)
{
::EnterCriticalSection(&pContext->Lock);
if(pContext->s != INVALID_SOCKET)
{
::closesocket(pContext->s);
pContext->s = INVALID_SOCKET;
}
pContext->bClosing = TRUE;
::LeaveCriticalSection(&pContext->Lock);
pContext = pContext->pNext;
}
m_pConnectionList = NULL;
m_nCurrentConnection = 0;
::LeaveCriticalSection(&m_ConnectionListLock);
}
BOOL CIOCPServer::InsertPendingAccept(CIOCPBuffer *pBuffer)
{
// 将一个I/O缓冲区对象插入到m_pPendingAccepts表中
::EnterCriticalSection(&m_PendingAcceptsLock);
if(m_pPendingAccepts == NULL)
m_pPendingAccepts = pBuffer;
else{
pBuffer->pNext = m_pPendingAccepts;
m_pPendingAccepts = pBuffer;
}
m_nPendingAcceptCount ++;
::LeaveCriticalSection(&m_PendingAcceptsLock);
return TRUE;
}
BOOL CIOCPServer::RemovePendingAccept(CIOCPBuffer *pBuffer)
{
BOOL bResult = FALSE;
// 遍历m_pPendingAccepts表,从中移除pBuffer所指向的缓冲区对象
::EnterCriticalSection(&m_PendingAcceptsLock);
CIOCPBuffer *pTest = m_pPendingAccepts;
if(pTest == pBuffer) // 如果是表头元素
{
m_pPendingAccepts = pBuffer->pNext;
bResult = TRUE;
}else{
// 不是表头元素的话,就要遍历这个表来查找了
while(pTest != NULL && pTest->pNext != pBuffer)
pTest = pTest->pNext;
if(pTest != NULL)
{
pTest->pNext = pBuffer->pNext;
bResult = TRUE;
}
}
// 更新计数
if(bResult)
m_nPendingAcceptCount --;
::LeaveCriticalSection(&m_PendingAcceptsLock);
return bResult;
}
void CIOCPServer::ErrorHandle(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
CloseAConnection(pContext);
}
BOOL CIOCPServer::PostSendToList(CIOCPContext *pContext, CIOCPBuffer *pBuffer)//xss
{
::EnterCriticalSection(&pContext->Lock);
CIOCPNextToSend *ptr = pContext->pNextToSend;
CIOCPNextToSend * pSend = new CIOCPNextToSend();
pSend->pBuffer = pBuffer;
pSend->pNext = NULL;
if(ptr == NULL)
{
printf("数据:%10.10s ...,被直接发送。\n",pBuffer->buff);
//::EnterCriticalSection(&pContext->Lock);
pContext->pNextToSend = pSend;
//::LeaveCriticalSection(&pContext->Lock);
if(!PostSend(pContext,pBuffer))//如果没有需要等待的send就直接发送
{
::LeaveCriticalSection(&pContext->Lock);
return FALSE;
}
}else{
printf("数据:%10.10s ...,被放入链表结尾。\n",pBuffer->buff);
while(ptr->pNext != NULL)
{
ptr = ptr->pNext;
}
ptr->pNext = pSend;//新的发送请求放在链表结尾
}
::LeaveCriticalSection(&pContext->Lock);
return TRUE;
}
BOOL CIOCPServer::PostNextWriteBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)//xss
{
::EnterCriticalSection(&pContext->Lock);
CIOCPNextToSend* pSend = pContext->pNextToSend;
CIOCPNextToSend* pNextSend = NULL;
if(pSend != NULL && pSend->pNext != NULL)//发送成功的pBuffer是队列的第一个,发送下一个,pNextToSend指向下一个,pBuffer由外面释放。
{
pNextSend = pSend->pNext;
if(pNextSend->pBuffer != NULL)
{
printf("数据:%10.10s ...从链表中弹出被发送。\n",pNextSend->pBuffer->buff);
if(!PostSend(pContext,pNextSend->pBuffer))
{
delete pSend;
pContext->pNextToSend = pNextSend;
::LeaveCriticalSection(&pContext->Lock);
return FALSE;
}
}
}
if(pSend != NULL)
{
pNextSend = pSend->pNext;
delete pSend;
pContext->pNextToSend = pNextSend;
}
::LeaveCriticalSection(&pContext->Lock);
return TRUE;
}
CIOCPBuffer *CIOCPServer::GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
if(pBuffer != NULL)
{
// 如果与要读的下一个序列号相等,则读这块缓冲区
if(pBuffer->nSequenceNumber == pContext->nCurrentReadSequence)
{
return pBuffer;
}
// 如果不相等,则说明没有按顺序接收数据,将这块缓冲区保存到连接的pOutOfOrderReads列表中
// 列表中的缓冲区是按照其序列号从小到大的顺序排列的
pBuffer->pNext = NULL;
CIOCPBuffer *ptr = pContext->pOutOfOrderReads;
CIOCPBuffer *pPre = NULL;
while(ptr != NULL)
{
if(pBuffer->nSequenceNumber < ptr->nSequenceNumber)
break;
pPre = ptr;
ptr = ptr->pNext;
}
if(pPre == NULL) // 应该插入到表头
{
pBuffer->pNext = pContext->pOutOfOrderReads;
pContext->pOutOfOrderReads = pBuffer;
}else{
// 应该插入到表的中间
pBuffer->pNext = pPre->pNext;
pPre->pNext = pBuffer/*->pNext*/;//xss,个人觉得应该是pPre->pNext = pBuffer;
}
}
// 检查表头元素的序列号,如果与要读的序列号一致,就将它从表中移除,返回给用户
CIOCPBuffer *ptr = pContext->pOutOfOrderReads;
if(ptr != NULL && (ptr->nSequenceNumber == pContext->nCurrentReadSequence))
{
pContext->pOutOfOrderReads = ptr->pNext;
return ptr;
}
return NULL;
}
BOOL CIOCPServer::PostAccept(CIOCPBuffer *pBuffer) // 在监听套节字上投递Accept请求
{
// 设置I/O类型
pBuffer->nOperation = OP_ACCEPT;
// 投递此重叠I/O
DWORD dwBytes;
pBuffer->sClient = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
BOOL b = m_lpfnAcceptEx(m_sListen,
pBuffer->sClient,
pBuffer->buff,
pBuffer->nLen - ((sizeof(sockaddr_in) + 16) * 2),//xss,第一次都是收一个cmd_header
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
&dwBytes,
&pBuffer->ol);
if(!b && ::WSAGetLastError() != WSA_IO_PENDING)
{
return FALSE;
}
if(pBuffer->nOperation == 0)
{
int x = 0;
}
return TRUE;
};
文章评论
1867人参与,0条评论