返回> 网站首页
[转载]7种网络编程I/O模型代码实现实例之四
yoours2015-05-10 18:46:30
简介一边听听音乐,一边写写文章。
BOOL CIOCPServer::PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
// 设置I/O类型
pBuffer->nOperation = OP_READ;
::EnterCriticalSection(&pContext->Lock);
// 设置序列号
pBuffer->nSequenceNumber = pContext->nReadSequence;
// 投递此重叠I/O
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = pBuffer->buff;
buf.len = pBuffer->nLen;
if(::WSARecv(pContext->s, &buf, 1, &dwBytes, &dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
{
if(::WSAGetLastError() != WSA_IO_PENDING)
{
printf("WSARecv出错:%d\n",WSAGetLastError());
::LeaveCriticalSection(&pContext->Lock);
return FALSE;
}
}
// 增加套节字上的重叠I/O计数和读序列号计数
pContext->nOutstandingRecv ++;
pContext->nReadSequence ++;
::LeaveCriticalSection(&pContext->Lock);
return TRUE;
}
BOOL CIOCPServer::PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
// 跟踪投递的发送的数量,防止用户仅发送数据而不接收,导致服务器抛出大量发送操作
if(pContext->nOutstandingSend > m_nMaxSends)
return FALSE;
// 设置I/O类型,增加套节字上的重叠I/O计数
pBuffer->nOperation = OP_WRITE;
// 投递此重叠I/O
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = pBuffer->buff;
buf.len = pBuffer->nLen;
if(::WSASend(pContext->s,
&buf, 1, &dwBytes, dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
{
int x;
if((x=::WSAGetLastError()) != WSA_IO_PENDING)
{
printf("发送失败!错误码:%d",x);
return FALSE;
}
}
// 增加套节字上的重叠I/O计数
::EnterCriticalSection(&pContext->Lock);
pContext->nOutstandingSend ++;
::LeaveCriticalSection(&pContext->Lock);
if(pBuffer->nOperation == 0)
{
int x = 0;
}
return TRUE;
}
BOOL CIOCPServer::Start(int nPort, int nMaxConnections,
int nMaxFreeBuffers, int nMaxFreeContexts, int nInitialReads)
{
// 检查服务是否已经启动
if(m_bServerStarted)
return FALSE;
// 保存用户参数
m_nPort = nPort;
m_nMaxConnections = nMaxConnections;
m_nMaxFreeBuffers = nMaxFreeBuffers;
m_nMaxFreeContexts = nMaxFreeContexts;
m_nInitialReads = nInitialReads;
// 初始化状态变量
m_bShutDown = FALSE;
m_bServerStarted = TRUE;
// 创建监听套节字,绑定到本地端口,进入监听模式
m_sListen = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
SOCKADDR_IN si;
si.sin_family = AF_INET;
si.sin_port = ::ntohs(m_nPort);
si.sin_addr.S_un.S_addr = INADDR_ANY;
if(::bind(m_sListen, (sockaddr*)&si, sizeof(si)) == SOCKET_ERROR)
{
m_bServerStarted = FALSE;
return FALSE;
}
::listen(m_sListen, 200);
// 创建完成端口对象
m_hCompletion = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
// 加载扩展函数AcceptEx
GUID GuidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes;
::WSAIoctl(m_sListen,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&m_lpfnAcceptEx,
sizeof(m_lpfnAcceptEx),
&dwBytes,
NULL,
NULL);
// 加载扩展函数GetAcceptExSockaddrs
GUID GuidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
::WSAIoctl(m_sListen,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptExSockaddrs,
sizeof(GuidGetAcceptExSockaddrs),
&m_lpfnGetAcceptExSockaddrs,
sizeof(m_lpfnGetAcceptExSockaddrs),
&dwBytes,
NULL,
NULL
);
// 将监听套节字关联到完成端口,注意,这里为它传递的CompletionKey为0
::CreateIoCompletionPort((HANDLE)m_sListen, m_hCompletion, (DWORD)0, 0);
// 注册FD_ACCEPT事件。
// 如果投递的AcceptEx I/O不够,线程会接收到FD_ACCEPT网络事件,说明应该投递更多的AcceptEx I/O
WSAEventSelect(m_sListen, m_hAcceptEvent, FD_ACCEPT);
// 创建监听线程
m_hListenThread = ::CreateThread(NULL, 0, _ListenThreadProc, this, 0, NULL);
return TRUE;
}
void CIOCPServer::Shutdown()
{
if(!m_bServerStarted)
return;
// 通知监听线程,马上停止服务
m_bShutDown = TRUE;
::SetEvent(m_hAcceptEvent);
// 等待监听线程退出
::WaitForSingleObject(m_hListenThread, INFINITE);
::CloseHandle(m_hListenThread);
m_hListenThread = NULL;
m_bServerStarted = FALSE;
}
DWORD WINAPI CIOCPServer::_ListenThreadProc(LPVOID lpParam)
{
CIOCPServer *pThis = (CIOCPServer*)lpParam;
// 先在监听套节字上投递几个Accept I/O
CIOCPBuffer *pBuffer;
for(int i=0; i<pThis->m_nInitialAccepts; i++)
{
pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);//xss,BUFFER_SIZE
if(pBuffer == NULL)
return -1;
pThis->InsertPendingAccept(pBuffer);
pThis->PostAccept(pBuffer);
}
// 构建事件对象数组,以便在上面调用WSAWaitForMultipleEvents函数
HANDLE hWaitEvents[2 + MAX_THREAD];
int nEventCount = 0;
hWaitEvents[nEventCount ++] = pThis->m_hAcceptEvent;
hWaitEvents[nEventCount ++] = pThis->m_hRepostEvent;
// 创建指定数量的工作线程在完成端口上处理I/O
for(int i=0; i<MAX_THREAD; i++)
{
hWaitEvents[nEventCount ++] = ::CreateThread(NULL, 0, _WorkerThreadProc, pThis, 0, NULL);
}
// 下面进入无限循环,处理事件对象数组中的事件
while(TRUE)
{
int nIndex = ::WSAWaitForMultipleEvents(nEventCount, hWaitEvents, FALSE, 60*1000, FALSE);
// 首先检查是否要停止服务
if(pThis->m_bShutDown || nIndex == WSA_WAIT_FAILED)
{
// 关闭所有连接
pThis->CloseAllConnections();
::Sleep(0); // 给I/O工作线程一个执行的机会
// 关闭监听套节字
::closesocket(pThis->m_sListen);
pThis->m_sListen = INVALID_SOCKET;
::Sleep(0); // 给I/O工作线程一个执行的机会
// 通知所有I/O处理线程退出
for(int i=2; i<MAX_THREAD + 2; i++)
{
::PostQueuedCompletionStatus(pThis->m_hCompletion, -1, 0, NULL);
}
// 等待I/O处理线程退出
::WaitForMultipleObjects(MAX_THREAD, &hWaitEvents[2], TRUE, 5*1000);
for(int i=2; i<MAX_THREAD + 2; i++)
{
::CloseHandle(hWaitEvents[i]);
}
::CloseHandle(pThis->m_hCompletion);
pThis->FreeBuffers();
pThis->FreeContexts();
::ExitThread(0);
}
// 1)定时检查所有未返回的AcceptEx I/O的连接建立了多长时间
if(nIndex == WSA_WAIT_TIMEOUT)
{
pBuffer = pThis->m_pPendingAccepts;
while(pBuffer != NULL)
{
int nSeconds;
int nLen = sizeof(nSeconds);
// 取得连接建立的时间
::getsockopt(pBuffer->sClient,
SOL_SOCKET, SO_CONNECT_TIME, (char *)&nSeconds, &nLen);
// 如果超过2分钟客户还不发送初始数据,就让这个客户go away
if(nSeconds != -1 && nSeconds > /*2*60*/50)
{
closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
pBuffer = pBuffer->pNext;
}
}else{
nIndex = nIndex - WAIT_OBJECT_0;
WSANETWORKEVENTS ne;
int nLimit=0;
if(nIndex == 0) // 2)m_hAcceptEvent事件对象受信,说明投递的Accept请求不够,需要增加
{
::WSAEnumNetworkEvents(pThis->m_sListen, hWaitEvents[nIndex], &ne);
if(ne.lNetworkEvents & FD_ACCEPT)
{
nLimit = 50; // 增加的个数,这里设为50个
}
}else if(nIndex == 1){
// 3)m_hRepostEvent事件对象受信,说明处理I/O的线程接受到新的客户
nLimit = InterlockedExchange(&pThis->m_nRepostCount, 0);
}else if(nIndex > 1){
// I/O服务线程退出,说明有错误发生,关闭服务器
pThis->m_bShutDown = TRUE;
continue;
}
// 投递nLimit个AcceptEx I/O请求
int i = 0;
while(i++ < nLimit && pThis->m_nPendingAcceptCount < pThis->m_nMaxAccepts)
{
pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);
if(pBuffer != NULL)
{
pThis->InsertPendingAccept(pBuffer);
pThis->PostAccept(pBuffer);
}
}
}
}
return 0;
}
DWORD WINAPI CIOCPServer::_WorkerThreadProc(LPVOID lpParam)
{
#ifdef _DEBUG
::OutputDebugString(" WorkerThread 启动... \n");
#endif // _DEBUG
CIOCPServer *pThis = (CIOCPServer*)lpParam;
CIOCPBuffer *pBuffer = NULL;
DWORD dwKey;
DWORD dwTrans;
LPOVERLAPPED lpol;
while(TRUE)
{
// 在关联到此完成端口的所有套节字上等待I/O完成
BOOL bOK = ::GetQueuedCompletionStatus(pThis->m_hCompletion,
&dwTrans, (LPDWORD)&dwKey, (LPOVERLAPPED*)&lpol, WSA_INFINITE);
if(dwTrans == -1) // 用户通知退出
{
#ifdef _DEBUG
::OutputDebugString(" WorkerThread 退出 \n");
#endif // _DEBUG
::ExitThread(0);
}
if(dwTrans != -2)
pBuffer = CONTAINING_RECORD(lpol, CIOCPBuffer, ol);
int nError = NO_ERROR;
if(!bOK) // 在此套节字上有错误发生
{
printf("完成端口套接字上有错误:%d\n",GetLastError());
SOCKET s;
if(pBuffer->nOperation == OP_ACCEPT)
{
s = pThis->m_sListen;
}else{
if(dwKey == 0)
break;
s = ((CIOCPContext*)dwKey)->s;
}
DWORD dwFlags = 0;
if(!::WSAGetOverlappedResult(s, &pBuffer->ol, &dwTrans, FALSE, &dwFlags))
{
nError = ::WSAGetLastError();
}
}
pThis->HandleIO(dwKey, pBuffer, dwTrans, nError);
printf("Buffer:%d Context:%d\n",iBufferCount,iContextCount);
}
#ifdef _DEBUG
::OutputDebugString(" WorkerThread 退出 \n");
#endif // _DEBUG
return 0;
}
int g_x = 0;
void CIOCPServer::HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError)
{
CIOCPContext *pContext = (CIOCPContext *)dwKey;
#ifdef _DEBUG
::OutputDebugString(" HandleIO... \n");
#endif // _DEBUG
// 1)首先减少套节字上的未决I/O计数
if(dwTrans == -2)
{
CloseAConnection(pContext);
return;
}
if(pContext != NULL)
{
::EnterCriticalSection(&pContext->Lock);
if(pBuffer->nOperation == OP_READ)
pContext->nOutstandingRecv --;
else if(pBuffer->nOperation == OP_WRITE)
pContext->nOutstandingSend --;
::LeaveCriticalSection(&pContext->Lock);
// 2)检查套节字是否已经被我们关闭
if(pContext->bClosing)
{
#ifdef _DEBUG
::OutputDebugString(" 检查到套节字已经被我们关闭 \n");
#endif // _DEBUG
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
}
// 释放已关闭套节字的未决I/O
ReleaseBuffer(pBuffer);
pBuffer = NULL;
return;
}
}else{
RemovePendingAccept(pBuffer);
}
// 3)检查套节字上发生的错误,如果有的话,通知用户,然后关闭套节字
if(nError != NO_ERROR)
{
if(pBuffer->nOperation != OP_ACCEPT)
{
OnConnectionError(pContext, pBuffer, nError);
CloseAConnection(pContext);
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
}
#ifdef _DEBUG
::OutputDebugString(" 检查到客户套节字上发生错误 \n");
#endif // _DEBUG
}else{
// 在监听套节字上发生错误,也就是监听套节字处理的客户出错了
// 客户端出错,释放I/O缓冲区
if(pBuffer->sClient != INVALID_SOCKET)
{
::closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
#ifdef _DEBUG
::OutputDebugString(" 检查到监听套节字上发生错误 \n");
#endif // _DEBUG
}
ReleaseBuffer(pBuffer);
pBuffer = NULL;
return;
}
// 开始处理
if(pBuffer->nOperation == OP_ACCEPT)
{
if(dwTrans == 0)
{
#ifdef _DEBUG
::OutputDebugString(" 监听套节字上客户端关闭 \n");
#endif // _DEBUG
if(pBuffer->sClient != INVALID_SOCKET)
{
::closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
}else{
// 为新接受的连接申请客户上下文对象
CIOCPContext *pClient = AllocateContext(pBuffer->sClient);
if(pClient != NULL)
{
if(AddAConnection(pClient))
{
// 取得客户地址
int nLocalLen, nRmoteLen;
LPSOCKADDR pLocalAddr, pRemoteAddr;
m_lpfnGetAcceptExSockaddrs(
pBuffer->buff,
pBuffer->nLen - (sizeof(sockaddr_in) + 16) * 2/*sizeof(cmd_header)*/,
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
(SOCKADDR **)&pLocalAddr,
&nLocalLen,
(SOCKADDR **)&pRemoteAddr,
&nRmoteLen);
memcpy(&pClient->addrLocal, pLocalAddr, nLocalLen);
memcpy(&pClient->addrRemote, pRemoteAddr, nRmoteLen);
// 关联新连接到完成端口对象
::CreateIoCompletionPort((HANDLE)pClient->s, m_hCompletion, (DWORD)pClient, 0);
// 通知用户
pBuffer->nLen = dwTrans;
OnConnectionEstablished(pClient, pBuffer);
if(pClient->bClosing && pClient->nOutstandingRecv == 0 && pClient->nOutstandingSend == 0)
{
ReleaseContext(pClient);
pContext = NULL;
}else if(pClient->hTimer == NULL){
//接收一个客户端的同时创建一个检测I/O超时的Timer
pClient->hCompletion = m_hCompletion;
CreateTimerQueueTimer(&pClient->hTimer,m_hTimerQueue,(WAITORTIMERCALLBACK)TimerRoutine,(PVOID)pClient,60*1000,0,0);
}
// 向新连接投递Read请求或者Write请求,直接关闭这些空间在套节字关闭或出错时释放
// CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
// if(p != NULL)
// {
// if(!PostRecv(pClient, p))
// {
// CloseAConnection(pClient);
// }
// }
}else{
// 连接数量已满,关闭连接
CloseAConnection(pClient);
ReleaseContext(pClient);
pContext = NULL;
}
}else{
// 资源不足,关闭与客户的连接即可
::closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
}
// Accept请求完成,释放I/O缓冲区
ReleaseBuffer(pBuffer);
pBuffer = NULL;
// 通知监听线程继续再投递一个Accept请求
::InterlockedIncrement(&m_nRepostCount);
::SetEvent(m_hRepostEvent);
}else if(pBuffer->nOperation == OP_READ){
if(dwTrans == 0) // 对方关闭套节字
{
// 先通知用户
pBuffer->nLen = 0;
OnConnectionClosing(pContext, pBuffer);
// 再关闭连接
CloseAConnection(pContext);
// 释放客户上下文和缓冲区对象
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
}
ReleaseBuffer(pBuffer);
pBuffer = NULL;
}else{
pBuffer->nLen = dwTrans;
// 按照I/O投递的顺序读取接收到的数据
CIOCPBuffer *p = GetNextReadBuffer(pContext, pBuffer);
while(p != NULL)
{
// 通知用户
OnReadCompleted(pContext, p);
// 增加要读的序列号的值
::InterlockedIncrement((LONG*)&pContext->nCurrentReadSequence);
// 释放这个已完成的I/O
ReleaseBuffer(p);
p = GetNextReadBuffer(pContext, NULL);
}
if(pContext->bClosing && pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
}else if(pContext->hTimer != NULL){
ChangeTimerQueueTimer(m_hTimerQueue,pContext->hTimer,60*1000,0);//重置监视时间,当一个投递完成后,60s内无任何交互则断开。
}
// 继续投递一个新的接收请求
// pBuffer = AllocateBuffer(BUFFER_SIZE);
//if(pBuffer == NULL || !PostRecv(pContext, pBuffer))
//{
// CloseAConnection(pContext);
//}
}
}else if(pBuffer->nOperation == OP_WRITE){
if(dwTrans == 0) // 对方关闭套节字
{
// 先通知用户
pBuffer->nLen = 0;
OnConnectionClosing(pContext, pBuffer);
// 再关闭连接
CloseAConnection(pContext);
// 释放客户上下文和缓冲区对象
if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
}
ReleaseBuffer(pBuffer);
pBuffer = NULL;
}else{
if(pContext->bClosing && pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
ReleaseBuffer(pBuffer);
pBuffer = NULL;
return;
}else if(pContext->hTimer != NULL){
ChangeTimerQueueTimer(m_hTimerQueue,pContext->hTimer,60*1000,0);
}
// 写操作完成,通知用户
if(dwTrans < pBuffer->nLen)//如果此send没有发送完全,则发送剩下的部分(此部分如果还是没发完全,这里同样进行)
{
printf("send未发送完全,发送:%d,总长度:%d\n",dwTrans,pBuffer->nLen);
CIOCPBuffer* p = AllocateBuffer(pBuffer->nLen - dwTrans);
if(p != NULL)
memcpy(p->buff,pBuffer->buff + dwTrans,pBuffer->nLen - dwTrans);
if(p == NULL || !PostSend(pContext,p))
{
CloseAConnection(pContext);
return;
}
}else{
if(!PostNextWriteBuffer(pContext,pBuffer))
{
CloseAConnection(pContext);
return;
}
}
pBuffer->nLen = dwTrans;
OnWriteCompleted(pContext, pBuffer);
if(pContext->bClosing && pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
pContext = NULL;
}
// 释放SendText函数申请的缓冲区
ReleaseBuffer(pBuffer);
pBuffer = NULL;
}
}
}
BOOL CIOCPServer::SendText(CIOCPContext *pContext, char *pszText, int nLen)
{
CIOCPBuffer *pBuffer = AllocateBuffer(nLen);
if(pBuffer != NULL)
{
memcpy(pBuffer->buff, pszText, nLen);
return PostSend(pContext, pBuffer);
}
return FALSE;
}
//投递接收请求示例
//CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
//if(p != NULL)
//{
// if(!PostRecv(pContext, p))
// {
// CloseAConnection(pContext);
// }
//}
//投递发送请求示例
//CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
//if(p != NULL)
//{
// if(!PostSendToList(pContext, p))
// {
// CloseAConnection(pContext);
// }
//}
void CIOCPServer::OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
//连接建立,且第一次数据接收完成。
//接下来可以根据业务逻辑,PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接
}
void CIOCPServer::OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
}
void CIOCPServer::OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
//一次数据接收完成。
//接下来可以根据业务逻辑,PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接
}
void CIOCPServer::OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
//一次数据发送完成。
//接下来可以根据业务逻辑,PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接
}
void CIOCPServer::OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError)
{
}
文章评论
1823人参与,0条评论