返回> 网站首页 

[转载]7种网络编程I/O模型代码实现实例之二

yoours2015-05-10 18:43:16 阅读 2019

简介一边听听音乐,一边写写文章。

六,重叠I/O模型
若需要建线程池,可参考事件选择模型。若纠结于send,可参考下面的IOCP。

#include <WinSock2.h>
#include <Windows.h>
#include <MSWSock.h>
#include <stdio.h>

#pragma comment(lib,"Ws2_32.lib")
#define BUFFER_SIZE 4096

typedef struct _SOCKET_OBJ
{
    SOCKET s;
    int nOutstandingOps;
    LPFN_ACCEPTEX lpfnAcceptEx;
}SOCKET_OBJ,*PSOCKET_OBJ;

PSOCKET_OBJ CreateSocketObj(SOCKET s)
{
    PSOCKET_OBJ pSocket = new SOCKET_OBJ();
    if(pSocket != NULL)
        pSocket->s = s;
    return pSocket;
}

void FreeSocketObj(PSOCKET_OBJ pSocket)
{
    if(pSocket == NULL)
        return;
    if(pSocket->s != INVALID_SOCKET)
        closesocket(pSocket->s);
    delete pSocket;
}

typedef struct _BUFFER_OBJ
{
    OVERLAPPED ol;
    char* buff;
    int nLen;
    PSOCKET_OBJ pSocket;
    int nOperation;
#define OP_ACCEPT 1
#define OP_READ 2
#define OP_WRITE 3
    SOCKET sAccept;
    _BUFFER_OBJ* pNext;
}BUFFER_OBJ,*PBUFFER_OBJ;

HANDLE g_events[WSA_MAXIMUM_WAIT_EVENTS];
int g_nBufferCount;
PBUFFER_OBJ g_pBufferHeader,g_pBufferTail;
BOOL g_bServerRunning;
CRITICAL_SECTION g_cs;

PBUFFER_OBJ CreateBufferObj(PSOCKET_OBJ pSocket,ULONG nLen)
{
    if(g_nBufferCount > WSA_MAXIMUM_WAIT_EVENTS - 1)
        return NULL;
    PBUFFER_OBJ pBuffer = new BUFFER_OBJ();
    if(pBuffer != NULL)
    {
        pBuffer->buff = new char[nLen];
        pBuffer->nLen = nLen;
        pBuffer->ol.hEvent = WSACreateEvent();
        pBuffer->pSocket = pSocket;
        pBuffer->sAccept = INVALID_SOCKET;
        pBuffer->pNext = NULL;
        EnterCriticalSection(&g_cs);
        if(g_pBufferHeader == NULL)
        {
            g_pBufferHeader = g_pBufferTail = pBuffer;
        }else{
            g_pBufferTail->pNext = pBuffer;
            g_pBufferTail = pBuffer;
        }
        LeaveCriticalSection(&g_cs);
        g_events[++g_nBufferCount] = pBuffer->ol.hEvent;
    }
    return pBuffer;
}

void FreeBufferObj(PBUFFER_OBJ pBuffer)
{
    EnterCriticalSection(&g_cs);
    PBUFFER_OBJ pTest = g_pBufferHeader;
    BOOL bFind = FALSE;
    if(pTest == pBuffer)
    {
        if(g_pBufferHeader == g_pBufferTail)
            g_pBufferHeader = g_pBufferTail = NULL;
        else
            g_pBufferHeader = g_pBufferHeader->pNext;
        bFind = TRUE;
    }else{
        while(pTest != NULL && pTest->pNext != pBuffer)
            pTest = pTest->pNext;
        if(pTest != NULL)
        {
            pTest->pNext = pBuffer->pNext;
            if(pTest->pNext == NULL)
                g_pBufferTail = pTest;
            bFind = TRUE;
        }
    }

    if(bFind)
    {
        g_nBufferCount--;
        WSACloseEvent(pBuffer->ol.hEvent);
        delete [] pBuffer->buff;
        delete pBuffer;
    }
    LeaveCriticalSection(&g_cs);
}

PBUFFER_OBJ FindBufferObj(HANDLE hEvent)
{
    if(hEvent == NULL || hEvent == INVALID_HANDLE_VALUE)
        return NULL;
    EnterCriticalSection(&g_cs);
    PBUFFER_OBJ pTest = g_pBufferHeader;
    while(pTest != NULL && pTest->ol.hEvent != hEvent)
        pTest = pTest->pNext;
    LeaveCriticalSection(&g_cs);
    return pTest;
}

void RebuildArray()
{
    EnterCriticalSection(&g_cs);
    PBUFFER_OBJ pBuffer = g_pBufferHeader;
    int i=1;
    while(pBuffer != NULL)
    {
        g_events[i++] = pBuffer->ol.hEvent;
        pBuffer = pBuffer->pNext;
    }
    LeaveCriticalSection(&g_cs);
}

BOOL PostAccept(PBUFFER_OBJ pBuffer)
{
    PSOCKET_OBJ pSocket = pBuffer->pSocket;
    if(pSocket->lpfnAcceptEx != NULL)
    {
        pBuffer->nOperation = OP_ACCEPT;
        pSocket->nOutstandingOps++;

        DWORD dwBytes;
        pBuffer->sAccept = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
        BOOL b = pSocket->lpfnAcceptEx(pSocket->s,
            pBuffer->sAccept,pBuffer->buff,BUFFER_SIZE - ((sizeof(sockaddr_in) + 16)*2),
            sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16,&dwBytes,&pBuffer->ol);
        if(!b)
        {
            if(WSAGetLastError() != WSA_IO_PENDING)
                return FALSE;
        }
        return TRUE;
    }
    return FALSE;
}

BOOL PostRecv(PBUFFER_OBJ pBuffer)
{
    pBuffer->nOperation = OP_READ;
    pBuffer->pSocket->nOutstandingOps++;

    DWORD dwBytes;
    DWORD dwFlags = 0;
    WSABUF buf;
    buf.buf = pBuffer->buff;
    buf.len = pBuffer->nLen;
    if(WSARecv(pBuffer->pSocket->s,&buf,1,&dwBytes,&dwFlags,&pBuffer->ol,NULL))
    {
        if(WSAGetLastError() != WSA_IO_PENDING)
            return FALSE;
    }
    return TRUE;
}

BOOL PostSend(PBUFFER_OBJ pBuffer)
{
    pBuffer->nOperation = OP_WRITE;
    pBuffer->pSocket->nOutstandingOps++;
    DWORD dwBytes;
    DWORD dwFlags = 0;
    WSABUF buf;
    buf.buf = pBuffer->buff;
    buf.len = pBuffer->nLen;
    if(WSASend(pBuffer->pSocket->s,&buf,1,&dwBytes,dwFlags,&pBuffer->ol,NULL))
    {
        if(WSAGetLastError() != WSA_IO_PENDING)
            return FALSE;
    }
    return TRUE;
}

BOOL HandleIo(PBUFFER_OBJ pBuffer)
{
    if(pBuffer == NULL)
        return FALSE;

    PSOCKET_OBJ pSocket = pBuffer->pSocket;
    pSocket->nOutstandingOps--;

    DWORD dwTrans;
    DWORD dwFlags;
    BOOL bRet = WSAGetOverlappedResult(pSocket->s,&pBuffer->ol,&dwTrans,FALSE,&dwFlags);
    if(!bRet)
    {
        if(pSocket->s != INVALID_SOCKET)
        {
            closesocket(pSocket->s);
            pSocket->s = INVALID_SOCKET;
        }
        if(pBuffer->nOperation == OP_ACCEPT && pBuffer->sAccept != INVALID_SOCKET)
        {
            closesocket(pBuffer->sAccept);
            pBuffer->sAccept = INVALID_SOCKET;
        }
        if(pSocket->nOutstandingOps == 0)
        {
            FreeSocketObj(pSocket);
        }
        FreeBufferObj(pBuffer);
        return FALSE;
    }

    switch(pBuffer->nOperation)
    {
    case OP_ACCEPT:
        {
            if(dwTrans > 0)
            {
                pBuffer->buff[dwTrans] = 0;
                printf("Accept收到数据:%s\n",pBuffer->buff);

                PSOCKET_OBJ pClient = CreateSocketObj(pBuffer->sAccept);
                PBUFFER_OBJ pRecv = CreateBufferObj(pClient,BUFFER_SIZE);
                if(pRecv == NULL)
                {
                    printf("Too much connections!\n");
                    FreeSocketObj(pClient);
                    return FALSE;
                }
                RebuildArray();
                if(!PostRecv(pRecv))
                {
                    FreeSocketObj(pClient);
                    FreeBufferObj(pBuffer);
                    return FALSE;
                }
            }else{
                if(pSocket->s != INVALID_SOCKET)
                {
                    closesocket(pSocket->s);
                    pSocket->s = INVALID_SOCKET;
                }
                if(pBuffer->sAccept != INVALID_SOCKET)
                {
                    closesocket(pBuffer->sAccept);
                    pBuffer->sAccept = INVALID_SOCKET;
                }
                if(pSocket->nOutstandingOps == 0)
                {
                    FreeSocketObj(pSocket);
                }
                FreeBufferObj(pBuffer);
            }
//          PBUFFER_OBJ pSend = CreateBufferObj(pClient,BUFFER_SIZE);
            //if(pSend == NULL)
            //{
            //  printf("Too much connections!\n");
            //  FreeSocketObj(pClient);
            //  return FALSE;
            //}
            //RebuildArray();
            //pSend->nLen = dwTrans;
            //memcpy(pSend->buff,pBuffer->buff,dwTrans);

            //if(!PostSend(pSend))
            //{
            //  FreeSocketObj(pSocket);
            //  FreeBufferObj(pBuffer);
            //  return FALSE;
            //}

            PostAccept(pBuffer);
        }break;
    case OP_READ:
        {
            if(dwTrans > 0)
            {
                pBuffer->buff[dwTrans] = 0;
                printf("Recv收到数据:%s\n",pBuffer->buff);
                PostRecv(pBuffer);
            }else{
                if(pSocket->s != INVALID_SOCKET)
                {
                    closesocket(pSocket->s);
                    pSocket->s = INVALID_SOCKET;
                }
                if(pSocket->nOutstandingOps == 0)
                {
                    FreeSocketObj(pSocket);
                }
                FreeBufferObj(pBuffer);
            }
        }break;
    case OP_WRITE:
        {
            if(dwTrans > 0)
            {
                pBuffer->buff[dwTrans] = 0;
                printf("发送数据: %s 成功!\n",pBuffer->buff);
                FreeBufferObj(pBuffer);
            }else{
                if(pSocket->s != INVALID_SOCKET)
                {
                    closesocket(pSocket->s);
                    pSocket->s = INVALID_SOCKET;
                }
                if(pSocket->nOutstandingOps == 0)
                {
                    FreeSocketObj(pSocket);
                }
                FreeBufferObj(pBuffer);
            }
        }break;
    }
}

DWORD WINAPI ControlThread(LPVOID lpParma)
{
    char cmd[128];
    while(scanf("%s",cmd))
    {
        if(cmd[0] == 's')
        {
            g_bServerRunning = FALSE;
            EnterCriticalSection(&g_cs);
            PBUFFER_OBJ pBuffer = g_pBufferHeader;
            while(pBuffer != NULL)
            {
                if(pBuffer->pSocket != NULL && pBuffer->pSocket->s != INVALID_SOCKET)
                    closesocket(pBuffer->pSocket->s);
                pBuffer = pBuffer->pNext;
            }
            LeaveCriticalSection(&g_cs);
            break;
        }
    }
    return 0;
}

int main()
{
    InitializeCriticalSectionAndSpinCount(&g_cs,4000);
    WSAData wsaData;
    if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
    {
        printf("初始化失败!%d\n",WSAGetLastError());
        Sleep(5000);
        return -1;
    }
    USHORT nport = 3456;
    SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
    sockaddr_in sin;
    sin.sin_family = AF_INET;
    sin.sin_port = htons(nport);
    sin.sin_addr.S_un.S_addr = ADDR_ANY;

    if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))
    {
        printf("bind failed!%d\n",WSAGetLastError());
        Sleep(5000);
        return -1;
    }

    listen(sListen,200);

    g_bServerRunning = TRUE;
    PSOCKET_OBJ pListen = CreateSocketObj(sListen);
    GUID GuidAcceptEx = WSAID_ACCEPTEX;
    DWORD dwBytes;
    WSAIoctl(pListen->s,
        SIO_GET_EXTENSION_FUNCTION_POINTER,
        &GuidAcceptEx,
        sizeof(GuidAcceptEx),
        &pListen->lpfnAcceptEx,
        sizeof(pListen->lpfnAcceptEx),
        &dwBytes,
        NULL,
        NULL);
    g_events[0] = WSACreateEvent();

    for(int i=0;i<5;++i)
    {
        PostAccept(CreateBufferObj(pListen,BUFFER_SIZE));
    }

    HANDLE hThread = CreateThread(NULL,0,ControlThread,NULL,0,NULL);
    while(TRUE)
    {
        int nIndex = WSAWaitForMultipleEvents(g_nBufferCount+1,g_events,FALSE,WSA_INFINITE,FALSE);
        if(nIndex == WSA_WAIT_FAILED)
        {
            printf("WSAWaitForMultipleEvents Failed!\n");
            break;
        }
        nIndex = nIndex - WSA_WAIT_EVENT_0;
        for(int i=nIndex;i<= g_nBufferCount;i++)
        {
            int nRet = WSAWaitForMultipleEvents(1,&g_events[i],FALSE,0,FALSE);
            if(nRet == WSA_WAIT_TIMEOUT)
                continue;

            if(i == 0)
            {
                RebuildArray();
                continue;
            }

            PBUFFER_OBJ pBuffer = FindBufferObj(g_events[i]);
            if(pBuffer != NULL)
            {
                if(!HandleIo(pBuffer))
                    RebuildArray();
            }
        }
        if(!g_bServerRunning && g_nBufferCount == 0)
            break;
    }
    WSACloseEvent(g_events[0]);
    WaitForSingleObject(hThread,INFINITE);
    CloseHandle(hThread);
    closesocket(sListen);
    WSACleanup();
    DeleteCriticalSection(&g_cs);
    return 0;
}
微信小程序扫码登陆

文章评论

2019人参与,0条评论