返回> 网站首页
[转载]7种网络编程I/O模型代码实现实例之二
yoours2015-05-10 18:43:16
简介一边听听音乐,一边写写文章。
六,重叠I/O模型
若需要建线程池,可参考事件选择模型。若纠结于send,可参考下面的IOCP。
#include <WinSock2.h>
#include <Windows.h>
#include <MSWSock.h>
#include <stdio.h>
#pragma comment(lib,"Ws2_32.lib")
#define BUFFER_SIZE 4096
typedef struct _SOCKET_OBJ
{
SOCKET s;
int nOutstandingOps;
LPFN_ACCEPTEX lpfnAcceptEx;
}SOCKET_OBJ,*PSOCKET_OBJ;
PSOCKET_OBJ CreateSocketObj(SOCKET s)
{
PSOCKET_OBJ pSocket = new SOCKET_OBJ();
if(pSocket != NULL)
pSocket->s = s;
return pSocket;
}
void FreeSocketObj(PSOCKET_OBJ pSocket)
{
if(pSocket == NULL)
return;
if(pSocket->s != INVALID_SOCKET)
closesocket(pSocket->s);
delete pSocket;
}
typedef struct _BUFFER_OBJ
{
OVERLAPPED ol;
char* buff;
int nLen;
PSOCKET_OBJ pSocket;
int nOperation;
#define OP_ACCEPT 1
#define OP_READ 2
#define OP_WRITE 3
SOCKET sAccept;
_BUFFER_OBJ* pNext;
}BUFFER_OBJ,*PBUFFER_OBJ;
HANDLE g_events[WSA_MAXIMUM_WAIT_EVENTS];
int g_nBufferCount;
PBUFFER_OBJ g_pBufferHeader,g_pBufferTail;
BOOL g_bServerRunning;
CRITICAL_SECTION g_cs;
PBUFFER_OBJ CreateBufferObj(PSOCKET_OBJ pSocket,ULONG nLen)
{
if(g_nBufferCount > WSA_MAXIMUM_WAIT_EVENTS - 1)
return NULL;
PBUFFER_OBJ pBuffer = new BUFFER_OBJ();
if(pBuffer != NULL)
{
pBuffer->buff = new char[nLen];
pBuffer->nLen = nLen;
pBuffer->ol.hEvent = WSACreateEvent();
pBuffer->pSocket = pSocket;
pBuffer->sAccept = INVALID_SOCKET;
pBuffer->pNext = NULL;
EnterCriticalSection(&g_cs);
if(g_pBufferHeader == NULL)
{
g_pBufferHeader = g_pBufferTail = pBuffer;
}else{
g_pBufferTail->pNext = pBuffer;
g_pBufferTail = pBuffer;
}
LeaveCriticalSection(&g_cs);
g_events[++g_nBufferCount] = pBuffer->ol.hEvent;
}
return pBuffer;
}
void FreeBufferObj(PBUFFER_OBJ pBuffer)
{
EnterCriticalSection(&g_cs);
PBUFFER_OBJ pTest = g_pBufferHeader;
BOOL bFind = FALSE;
if(pTest == pBuffer)
{
if(g_pBufferHeader == g_pBufferTail)
g_pBufferHeader = g_pBufferTail = NULL;
else
g_pBufferHeader = g_pBufferHeader->pNext;
bFind = TRUE;
}else{
while(pTest != NULL && pTest->pNext != pBuffer)
pTest = pTest->pNext;
if(pTest != NULL)
{
pTest->pNext = pBuffer->pNext;
if(pTest->pNext == NULL)
g_pBufferTail = pTest;
bFind = TRUE;
}
}
if(bFind)
{
g_nBufferCount--;
WSACloseEvent(pBuffer->ol.hEvent);
delete [] pBuffer->buff;
delete pBuffer;
}
LeaveCriticalSection(&g_cs);
}
PBUFFER_OBJ FindBufferObj(HANDLE hEvent)
{
if(hEvent == NULL || hEvent == INVALID_HANDLE_VALUE)
return NULL;
EnterCriticalSection(&g_cs);
PBUFFER_OBJ pTest = g_pBufferHeader;
while(pTest != NULL && pTest->ol.hEvent != hEvent)
pTest = pTest->pNext;
LeaveCriticalSection(&g_cs);
return pTest;
}
void RebuildArray()
{
EnterCriticalSection(&g_cs);
PBUFFER_OBJ pBuffer = g_pBufferHeader;
int i=1;
while(pBuffer != NULL)
{
g_events[i++] = pBuffer->ol.hEvent;
pBuffer = pBuffer->pNext;
}
LeaveCriticalSection(&g_cs);
}
BOOL PostAccept(PBUFFER_OBJ pBuffer)
{
PSOCKET_OBJ pSocket = pBuffer->pSocket;
if(pSocket->lpfnAcceptEx != NULL)
{
pBuffer->nOperation = OP_ACCEPT;
pSocket->nOutstandingOps++;
DWORD dwBytes;
pBuffer->sAccept = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
BOOL b = pSocket->lpfnAcceptEx(pSocket->s,
pBuffer->sAccept,pBuffer->buff,BUFFER_SIZE - ((sizeof(sockaddr_in) + 16)*2),
sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16,&dwBytes,&pBuffer->ol);
if(!b)
{
if(WSAGetLastError() != WSA_IO_PENDING)
return FALSE;
}
return TRUE;
}
return FALSE;
}
BOOL PostRecv(PBUFFER_OBJ pBuffer)
{
pBuffer->nOperation = OP_READ;
pBuffer->pSocket->nOutstandingOps++;
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = pBuffer->buff;
buf.len = pBuffer->nLen;
if(WSARecv(pBuffer->pSocket->s,&buf,1,&dwBytes,&dwFlags,&pBuffer->ol,NULL))
{
if(WSAGetLastError() != WSA_IO_PENDING)
return FALSE;
}
return TRUE;
}
BOOL PostSend(PBUFFER_OBJ pBuffer)
{
pBuffer->nOperation = OP_WRITE;
pBuffer->pSocket->nOutstandingOps++;
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = pBuffer->buff;
buf.len = pBuffer->nLen;
if(WSASend(pBuffer->pSocket->s,&buf,1,&dwBytes,dwFlags,&pBuffer->ol,NULL))
{
if(WSAGetLastError() != WSA_IO_PENDING)
return FALSE;
}
return TRUE;
}
BOOL HandleIo(PBUFFER_OBJ pBuffer)
{
if(pBuffer == NULL)
return FALSE;
PSOCKET_OBJ pSocket = pBuffer->pSocket;
pSocket->nOutstandingOps--;
DWORD dwTrans;
DWORD dwFlags;
BOOL bRet = WSAGetOverlappedResult(pSocket->s,&pBuffer->ol,&dwTrans,FALSE,&dwFlags);
if(!bRet)
{
if(pSocket->s != INVALID_SOCKET)
{
closesocket(pSocket->s);
pSocket->s = INVALID_SOCKET;
}
if(pBuffer->nOperation == OP_ACCEPT && pBuffer->sAccept != INVALID_SOCKET)
{
closesocket(pBuffer->sAccept);
pBuffer->sAccept = INVALID_SOCKET;
}
if(pSocket->nOutstandingOps == 0)
{
FreeSocketObj(pSocket);
}
FreeBufferObj(pBuffer);
return FALSE;
}
switch(pBuffer->nOperation)
{
case OP_ACCEPT:
{
if(dwTrans > 0)
{
pBuffer->buff[dwTrans] = 0;
printf("Accept收到数据:%s\n",pBuffer->buff);
PSOCKET_OBJ pClient = CreateSocketObj(pBuffer->sAccept);
PBUFFER_OBJ pRecv = CreateBufferObj(pClient,BUFFER_SIZE);
if(pRecv == NULL)
{
printf("Too much connections!\n");
FreeSocketObj(pClient);
return FALSE;
}
RebuildArray();
if(!PostRecv(pRecv))
{
FreeSocketObj(pClient);
FreeBufferObj(pBuffer);
return FALSE;
}
}else{
if(pSocket->s != INVALID_SOCKET)
{
closesocket(pSocket->s);
pSocket->s = INVALID_SOCKET;
}
if(pBuffer->sAccept != INVALID_SOCKET)
{
closesocket(pBuffer->sAccept);
pBuffer->sAccept = INVALID_SOCKET;
}
if(pSocket->nOutstandingOps == 0)
{
FreeSocketObj(pSocket);
}
FreeBufferObj(pBuffer);
}
// PBUFFER_OBJ pSend = CreateBufferObj(pClient,BUFFER_SIZE);
//if(pSend == NULL)
//{
// printf("Too much connections!\n");
// FreeSocketObj(pClient);
// return FALSE;
//}
//RebuildArray();
//pSend->nLen = dwTrans;
//memcpy(pSend->buff,pBuffer->buff,dwTrans);
//if(!PostSend(pSend))
//{
// FreeSocketObj(pSocket);
// FreeBufferObj(pBuffer);
// return FALSE;
//}
PostAccept(pBuffer);
}break;
case OP_READ:
{
if(dwTrans > 0)
{
pBuffer->buff[dwTrans] = 0;
printf("Recv收到数据:%s\n",pBuffer->buff);
PostRecv(pBuffer);
}else{
if(pSocket->s != INVALID_SOCKET)
{
closesocket(pSocket->s);
pSocket->s = INVALID_SOCKET;
}
if(pSocket->nOutstandingOps == 0)
{
FreeSocketObj(pSocket);
}
FreeBufferObj(pBuffer);
}
}break;
case OP_WRITE:
{
if(dwTrans > 0)
{
pBuffer->buff[dwTrans] = 0;
printf("发送数据: %s 成功!\n",pBuffer->buff);
FreeBufferObj(pBuffer);
}else{
if(pSocket->s != INVALID_SOCKET)
{
closesocket(pSocket->s);
pSocket->s = INVALID_SOCKET;
}
if(pSocket->nOutstandingOps == 0)
{
FreeSocketObj(pSocket);
}
FreeBufferObj(pBuffer);
}
}break;
}
}
DWORD WINAPI ControlThread(LPVOID lpParma)
{
char cmd[128];
while(scanf("%s",cmd))
{
if(cmd[0] == 's')
{
g_bServerRunning = FALSE;
EnterCriticalSection(&g_cs);
PBUFFER_OBJ pBuffer = g_pBufferHeader;
while(pBuffer != NULL)
{
if(pBuffer->pSocket != NULL && pBuffer->pSocket->s != INVALID_SOCKET)
closesocket(pBuffer->pSocket->s);
pBuffer = pBuffer->pNext;
}
LeaveCriticalSection(&g_cs);
break;
}
}
return 0;
}
int main()
{
InitializeCriticalSectionAndSpinCount(&g_cs,4000);
WSAData wsaData;
if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
{
printf("初始化失败!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}
USHORT nport = 3456;
SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_port = htons(nport);
sin.sin_addr.S_un.S_addr = ADDR_ANY;
if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))
{
printf("bind failed!%d\n",WSAGetLastError());
Sleep(5000);
return -1;
}
listen(sListen,200);
g_bServerRunning = TRUE;
PSOCKET_OBJ pListen = CreateSocketObj(sListen);
GUID GuidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes;
WSAIoctl(pListen->s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&pListen->lpfnAcceptEx,
sizeof(pListen->lpfnAcceptEx),
&dwBytes,
NULL,
NULL);
g_events[0] = WSACreateEvent();
for(int i=0;i<5;++i)
{
PostAccept(CreateBufferObj(pListen,BUFFER_SIZE));
}
HANDLE hThread = CreateThread(NULL,0,ControlThread,NULL,0,NULL);
while(TRUE)
{
int nIndex = WSAWaitForMultipleEvents(g_nBufferCount+1,g_events,FALSE,WSA_INFINITE,FALSE);
if(nIndex == WSA_WAIT_FAILED)
{
printf("WSAWaitForMultipleEvents Failed!\n");
break;
}
nIndex = nIndex - WSA_WAIT_EVENT_0;
for(int i=nIndex;i<= g_nBufferCount;i++)
{
int nRet = WSAWaitForMultipleEvents(1,&g_events[i],FALSE,0,FALSE);
if(nRet == WSA_WAIT_TIMEOUT)
continue;
if(i == 0)
{
RebuildArray();
continue;
}
PBUFFER_OBJ pBuffer = FindBufferObj(g_events[i]);
if(pBuffer != NULL)
{
if(!HandleIo(pBuffer))
RebuildArray();
}
}
if(!g_bServerRunning && g_nBufferCount == 0)
break;
}
WSACloseEvent(g_events[0]);
WaitForSingleObject(hThread,INFINITE);
CloseHandle(hThread);
closesocket(sListen);
WSACleanup();
DeleteCriticalSection(&g_cs);
return 0;
}
文章评论
2019人参与,0条评论