返回> 网站首页 

[转载]7种网络编程I/O模型代码实现实例之一

yoours2015-05-10 18:40:18 阅读 1853

简介一边听听音乐,一边写写文章。

阻塞模式下,send会发生阻塞(非阻塞模式下send返回WSAEWOULDBLOCK错误,重叠I/O下表现为投递的发送请求一直无法完成)的情况一般可以分为3种 :
1,  服务器虽然发送了大量数据,但客户端并未调用recv函数去接。
2,网络状况不佳,发送缓冲区中的数据一直发不出去。
3,发送数据量很大,如下载功能,协议发送数据的速度比不上send函数将数据拷贝到发送缓冲区的速度。

对于1,2情况,我们似乎可以直接关闭套接字,让客户端重新请求。但对于3,却不行。而且实际操作过程中,我们无法区分是1,2,还是3,我们能做的是尽量去保证发送的正确性。当然防止1情况或者2情况中长时间网络不畅,可以设定超时。若socket一直处于不可写状态超过1分钟,那么就关闭套接字。在最后的IOCP模型中就加入了这种超时机制。其他模型若要加入,可参考它来做。

一,基本的阻塞模型
#include <WinSock2.h>
#include <Windows.h>
#include <stdio.h>
#pragma comment(lib,"Ws2_32.lib")

DWORD WINAPI WorkThread(void* param)
{
    SOCKET* psClient = (SOCKET*)param;
    char buf[4096];
    while(true)
    {
        int len = recv(*psClient,buf,4096,0);
        if(len <= 0)
        {
            printf("recv失败!%d\n",WSAGetLastError());
            Sleep(5000);
            break;
        }
        buf[len] = '\0';
        printf("收到数据:%s\n",buf);
    }
    closesocket(*psClient);
    delete psClient;
    return 0;
}

int main()
{
    WSAData wsaData;
    if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
    {
        printf("WSAStartup失败!\n",WSAGetLastError());
        Sleep(5000);
        return 0;
    }
    USHORT nPort = 3456;
    SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
    sockaddr_in sin;
    sin.sin_family = AF_INET;
    sin.sin_port = htons(nPort);
    sin.sin_addr.S_un.S_addr = INADDR_ANY;

    if(SOCKET_ERROR == ::bind(sListen,(sockaddr*)&sin,sizeof(sin)))
    {
        printf("bind失败!%d\n",WSAGetLastError());
        Sleep(5000);
        return -1;
    }

    ::listen(sListen,5);

    while(true)
    {
        sockaddr_in addrRemote;
        int nAddrLen = sizeof(addrRemote);
        SOCKET *psClient = new SOCKET;
        *psClient = accept(sListen,(sockaddr*)&addrRemote,&nAddrLen);
        HANDLE hThread = CreateThread(NULL,0,WorkThread,psClient,0,NULL);
        CloseHandle(hThread);
    }
    closesocket(sListen);
    WSACleanup();
}

二,无任何优化的非阻塞模型
#include <WinSock2.h>
#include <Windows.h>
#include <stdio.h>
#include <vector>
using namespace std;

#pragma comment(lib,"Ws2_32.lib")

CRITICAL_SECTION g_cs;
HANDLE           g_StartEvent;
vector<SOCKET> g_vecClients;
int g_iVecSize = 0;
DWORD WINAPI WorkThread(void* param)
{
    char buf[4096];
    while(1)
    {
        if(g_vecClients.empty())
        {
            ResetEvent(g_StartEvent);
            WaitForSingleObject(g_StartEvent,INFINITE);
        }

        EnterCriticalSection(&g_cs);
        for(vector<SOCKET>::iterator it = g_vecClients.begin();it != g_vecClients.end();)
        {
            int len = recv(*it,buf,4096,0);
            if(len == SOCKET_ERROR)
            {
                if(WSAEWOULDBLOCK != WSAGetLastError())
                {
                    printf("recv Error:%d\n",WSAGetLastError());
                    closesocket(*it);
                    it = g_vecClients.erase(it);
                }else{
                    printf("%d.",*it);
                    ++it;
                }
            }else{
                buf[len] = 0;
                printf("收到数据: %s\n",buf);
                ++it;
            }
        }
        LeaveCriticalSection(&g_cs);
        Sleep(100);

    }
    return 0;
}

int main()
{
    InitializeCriticalSectionAndSpinCount(&g_cs,4000);
    g_StartEvent = CreateEvent(NULL,FALSE,FALSE,NULL);

    WSAData wsaDate;
    WSAStartup(MAKEWORD(2,2),&wsaDate);
    USHORT nport = 3456;
    u_long ul = 1;
    SOCKET s = socket(AF_INET,SOCK_STREAM,0);
    ioctlsocket(s,FIONBIO,&ul);
    sockaddr_in sin;
    sin.sin_family = AF_INET;
    sin.sin_port = htons(nport);
    sin.sin_addr.S_un.S_addr = ADDR_ANY;

    if(SOCKET_ERROR == ::bind(s,(sockaddr*)&sin,sizeof(sin)))
    {
        return -1;
    }

    ::listen(s,5);

    HANDLE hThread = CreateThread(NULL,0,WorkThread,NULL,0,NULL);
    CloseHandle(hThread);

    while(true)
    {
        sockaddr_in addrRemote;
        int nAddrLen = sizeof(addrRemote);
        SOCKET sClient = accept(s,(sockaddr*)&addrRemote,&nAddrLen);
        if(sClient != SOCKET_ERROR)
        {
            EnterCriticalSection(&g_cs);
            g_vecClients.push_back(sClient);
            LeaveCriticalSection(&g_cs);
            if(g_vecClients.size() == 1)
                SetEvent(g_StartEvent);
        }else if(WSAEWOULDBLOCK == WSAGetLastError()){
            printf(".");
            Sleep(100);
        }else{
            printf("accept failed! %d\n",WSAGetLastError());
        }
    }
    closesocket(s);
    WSACleanup();
    CloseHandle(g_StartEvent);
    DeleteCriticalSection(&g_cs);
}

三,select模型

#include <WinSock2.h>
#include <Windows.h>
#include <MSWSock.h>
#include <stdio.h>
#include <map>
using namespace std;

#pragma comment(lib,"Ws2_32.lib")
#pragma comment(lib,"Mswsock.lib")

struct ThreadObj{
    OVERLAPPED *pOl;
    HANDLE s;
};

int g_iIndex = 0;
map<SOCKET,char*> g_map;

int main()
{
    WSAData wsaData;
    if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
    {
        printf("初始化失败!%d\n",WSAGetLastError());
        Sleep(5000);
        return -1;
    }
    USHORT nport = 3456;
    SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
    u_long ul = 1;
    ioctlsocket(sListen,FIONBIO,&ul);
    sockaddr_in sin;
    sin.sin_family = AF_INET;
    sin.sin_port = htons(nport);
    sin.sin_addr.S_un.S_addr = ADDR_ANY;

    if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))
    {
        printf("bind failed!%d\n",WSAGetLastError());
        Sleep(5000);
        return -1;
    }

    listen(sListen,5);

    //1)初始化一个套接字集合fdSocket,并将监听套接字放入
    fd_set fdSocket;
    FD_ZERO(&fdSocket);
    FD_SET(sListen,&fdSocket);
    TIMEVAL time={1,0};
    char buf[4096];
    fd_set fdWrite;
    FD_ZERO(&fdWrite);
    while(true)
    {
        //2)将fdSocket的一个拷贝fdRead传给select函数
        fd_set fdRead = fdSocket;
        fd_set fdTmp = fdWrite;
        int nRetAll = 0;
        if(fdTmp.fd_count > 0)
            nRetAll = select(0,&fdRead,&fdTmp,NULL,NULL/*&time*/);//若不设置超时则select为阻塞
        else
            nRetAll = select(0,&fdRead,NULL,NULL,NULL/*&time*/);
        if(nRetAll > 0)
        {
            //3)通过将原来的fdSocket和被select处理过的fdRead进行比较,决定由哪些socket有数据可以读取
            for(int i=0;i<fdSocket.fd_count;i++)
            {
                if(FD_ISSET(fdSocket.fd_array[i],&fdRead))
                {
                    if(fdSocket.fd_array[i] == sListen)
                    {
                        if(fdSocket.fd_count < FD_SETSIZE)
                        {
                            sockaddr_in addrRemote;
                            int nAddrLen = sizeof(addrRemote);
                            SOCKET sClient = accept(sListen,(sockaddr*)&addrRemote,&nAddrLen);
                            FD_SET(sClient,&fdSocket);
                            printf("接收到连接:(%s)\n",inet_ntoa(addrRemote.sin_addr));
                        }else{
                            printf("连接数量已达上限!\n");
                            continue;
                        }
                    }else{
                        int nRecv = recv(fdSocket.fd_array[i],buf,4096,0);
                        if(nRecv > 0)
                        {
                            buf[nRecv] = 0;
                            printf("收到数据:%s\n",buf);
                            int nRet = send(fdSocket.fd_array[i],buf,nRecv,0);
                            if(nRet <= 0)
                            {
                                SOCKET s = fdSocket.fd_array[i];
                                if(GetLastError() == WSAEWOULDBLOCK)
                                {
                                    if(g_map.find(s) == g_map.end())
                                    {
                                        char* szTmp = new char[nRecv + 1];
                                        strncpy(szTmp,buf,nRecv);
                                        szTmp[nRecv] = 0;
                                        g_map[s] = szTmp;
                                    }else{
                                        char* szOld = g_map[s];
                                        char* szTmp2 = new char[strlen(szOld) + nRecv + 1];
                                        strncpy(szTmp2,szOld,strlen(szOld));
                                        strncpy(szTmp2 + strlen(szOld),buf,nRecv);
                                        szTmp2[strlen(szOld) + nRecv] = 0;
                                        delete [] szOld;
                                        g_map[s] = szTmp2;
                                    }
                                    FD_SET(fdSocket.fd_array[i],&fdWrite);
                                }else{
                                    closesocket(fdSocket.fd_array[i]);
                                    if(g_map.find(s) != g_map.end())
                                    {
                                        if(g_map[s] != NULL)
                                            delete [] g_map[s];
                                        g_map.erase(s);
                                    }
                                    FD_CLR(fdSocket.fd_array[i],&fdSocket);
                                }
                            }
                            printf("发送了%d\n",nRet);
                        }else{
                            printf("1个Client已断开\n");
                            closesocket(fdSocket.fd_array[i]);
                            FD_CLR(fdSocket.fd_array[i],&fdSocket);
                        }
                    }
                }
                if(FD_ISSET(fdSocket.fd_array[i],&fdTmp))
                {
                    SOCKET s = fdSocket.fd_array[i];
                    if(g_map.find(s) != g_map.end())
                    {
                        char* szToSend = g_map[s];
                        int nToSend = strlen(szToSend);
                        int nRet = send(fdSocket.fd_array[i],szToSend,nToSend,0);
                        if(nRet <= 0)
                        {
                            if(GetLastError() == WSAEWOULDBLOCK)
                            {
                                //do nothing
                            }else{
                                closesocket(fdSocket.fd_array[i]);
                                if(g_map.find(s) != g_map.end())
                                {
                                    if(g_map[s] != NULL)
                                        delete [] g_map[s];
                                    g_map.erase(s);
                                }
                                FD_CLR(fdSocket.fd_array[i],&fdSocket);
                            }
                        }else if(nRet < nToSend){
                            printf("发送了%d/%d\n",nRet,nToSend);
                            nToSend -= nRet;
                            char* szTmp = new char[nToSend + 1];
                            strncpy(szTmp,szToSend + nRet,nToSend);
                            szTmp[nToSend] = 0;
                            delete [] szToSend;
                            g_map[s] = szTmp;
                        }else{
                            if(g_map[s] != NULL)
                                delete [] g_map[s];
                            g_map.erase(s);
                            FD_CLR(fdSocket.fd_array[i],&fdWrite);
                        }
                        printf("============================================发送了%d\n",nRet);
                    }
                }
            }
        }else if(nRetAll == 0){
            printf("time out!\n");
        }else{
            printf("select error!%d\n",WSAGetLastError());
            Sleep(5000);
            break;
        }
    }
    closesocket(sListen);
    WSACleanup();
}

四,异步选择模型
注意:收到FD_Write消息有2种情况:1,在socket第一次和窗口句柄绑定后。2,socket从不可写状态变成可写状态。下面的事件选择模型也是同理。

#include <WinSock2.h>
#include <Windows.h>
#include <stdio.h>
#include <map>
using namespace std;

#pragma comment(lib,"Ws2_32.lib")
#define WM_SOCKET (WM_USER + 100)

map<SOCKET,char*> g_map;
LRESULT WINAPI WindowProc(HWND hwnd,UINT uMsg,WPARAM wParam,LPARAM lParam)
{
    switch(uMsg)
    {
    case WM_SOCKET:
        {
            SOCKET s = wParam;
            if(WSAGETSELECTERROR(lParam))
            {
                printf("消息错误!\n");
                closesocket(s);
                return 0;
            }

            switch(WSAGETSELECTEVENT(lParam))
            {
            case FD_ACCEPT:
                {
                    sockaddr_in addrRemote;
                    int nAddrLen = sizeof(addrRemote);
                    SOCKET sClient = accept(s,(sockaddr*)&addrRemote,&nAddrLen);
                    WSAAsyncSelect(sClient,hwnd,WM_SOCKET,FD_READ | FD_WRITE | FD_CLOSE);
                }break;
            case FD_WRITE:
                {
                    printf("write====================\n");
                    if(!g_map.empty())
                    {
                        char* buf = g_map[s];
                        int nLenth = strlen(buf);
                        while(nLenth > 0)
                        {
                            int nRet = send(s,buf,nLenth,0);
                            if(nRet > 0)
                            {
                                buf += nRet;
                                nLenth -= nRet;
                            }else if(10035 == GetLastError()){
                                char* newBuf = new char[nLenth + 1];
                                strncpy(newBuf,buf,nLenth);
                                newBuf[nLenth] = 0;
                                delete [] g_map[s];
                                g_map[s] = newBuf;
                                break;
                            }else{
                                delete [] g_map[s];
                                g_map.erase(s);
                                closesocket(s);
                            }
                        }
                        if(nLenth == 0)
                        {
                            g_map.erase(s);
                        }
                    }
                }break;
            case FD_READ:
                {
                    char buf[4096];
                    int nRet = recv(s,buf,4096,0);
                    if(nRet > 0)
                    {
                        buf[nRet] = 0;
                        //printf("收到数据:%s\n",buf);
                        int x = send(s,buf,nRet,0);
                        printf("已发送字节数:%d , 线程号:%d\n",x,GetCurrentThreadId());
                        if(x < 0)
                        {
                            int iError = GetLastError();
                            printf("数据:%s ,错误:%d\n",buf,iError);
                            if(10035 == iError)
                            {
                                if(g_map.end() != g_map.find(s))
                                {
                                    int newLength = strlen(g_map[s]) + strlen(buf);
                                    char* newBuf = new char[newLength + 1];
                                    strncpy(newBuf,g_map[s],strlen(g_map[s]));
                                    strncpy(newBuf+strlen(g_map[s]),buf,strlen(buf));
                                    newBuf[newLength] = 0;
                                    delete [] g_map[s];
                                    g_map[s] = newBuf;
                                }else{
                                    char* newBuf = new char[strlen(buf) + 1];
                                    strncpy(newBuf,buf,strlen(buf));
                                    newBuf[strlen(buf)] = 0;
                                    g_map[s] = newBuf;
                                }
                            }else{
                                if(g_map.end() != g_map.find(s))
                                {
                                    delete [] g_map[s];
                                    g_map.erase(s);
                                }
                                closesocket(s);
                            }
                        }
                    }else{
                        printf("1个Client已经断开1111!\n");
                        if(g_map.end() != g_map.find(s))
                        {
                            delete [] g_map[s];
                            g_map.erase(s);
                        }
                        closesocket(s);
                    }
                }break;
            case FD_CLOSE:
                {
                    printf("1个Client已经断开222!\n");
                    if(g_map.end() != g_map.find(s))
                    {
                        delete [] g_map[s];
                        g_map.erase(s);
                    }
                    closesocket(s);
                }break;
            }
        }break;
    case WM_DESTROY:
        {
            printf("窗口已关闭!\n");
            PostQuitMessage(0);
        }
    }
    return DefWindowProc(hwnd,uMsg,wParam,lParam);
}

int main()
{
    char szClassName[] = "WSAAsyncSelect Test";
    static WNDCLASSEX wndClass;
    wndClass.cbSize = sizeof(wndClass);
    wndClass.style = CS_HREDRAW | CS_VREDRAW;
    wndClass.lpfnWndProc = WindowProc;
    wndClass.cbClsExtra = 0;
    wndClass.cbWndExtra = 0;
    wndClass.hInstance = GetModuleHandle(0);
    wndClass.hIcon = LoadIcon(NULL,IDI_APPLICATION);
    wndClass.hCursor = LoadCursor(NULL,IDC_ARROW);
    wndClass.hbrBackground = (HBRUSH)GetStockObject(WHITE_BRUSH);
    wndClass.lpszMenuName = NULL;
    wndClass.lpszClassName = szClassName;
    wndClass.hIconSm = NULL;

    ATOM atom = RegisterClassEx(&wndClass);
    if(0 == atom)
    {
        char error[256];
        sprintf(error,"RegisterClassEx错误!%d",GetLastError());
        MessageBox(NULL,error,"error",MB_OK);
        return -1;
    }
    HWND hwnd = CreateWindowEx(0,(char *)atom,"",WS_OVERLAPPEDWINDOW,CW_USEDEFAULT,CW_USEDEFAULT,
        CW_USEDEFAULT,CW_USEDEFAULT,HWND_MESSAGE,NULL,NULL,NULL);
    if(hwnd == NULL)
    {
        char error[256];
        sprintf(error,"创建窗口错误!%d",GetLastError());
        MessageBox(NULL,error,"error",MB_OK);
        return -1;
    }

    WSAData wsaData;
    if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
    {
        printf("初始化失败!%d\n",WSAGetLastError());
        Sleep(5000);
        return -1;
    }
    USHORT nport = 3456;
    SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
    sockaddr_in sin;
    sin.sin_family = AF_INET;
    sin.sin_port = htons(nport);
    sin.sin_addr.S_un.S_addr = ADDR_ANY;

    if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))
    {
        printf("bind failed!%d\n",WSAGetLastError());
        Sleep(5000);
        return -1;
    }

    WSAAsyncSelect(sListen,hwnd,WM_SOCKET,FD_ACCEPT | FD_CLOSE);
    listen(sListen,5);

    MSG msg;
    while(GetMessage(&msg,NULL,0,0))
    {
        TranslateMessage(&msg);
        DispatchMessage(&msg);
    }
    closesocket(sListen);
    WSACleanup();
    return msg.wParam;
}

五,事件选择模型
事件选择模型主要难点是对线程池的使用,send操作可以参考异步选择模型。

#include <WinSock2.h>
#include <Windows.h>
#include <stdio.h>
#include <vector>
using namespace std;
#pragma comment(lib,"Ws2_32.lib")

typedef struct _THREAD_OBJ
{
    HANDLE events[WSA_MAXIMUM_WAIT_EVENTS];
    SOCKET sockets[WSA_MAXIMUM_WAIT_EVENTS];
    int nSocksUsed;
    CRITICAL_SECTION cs;
    _THREAD_OBJ *pNext;
}THREAD_OBJ,*PTHREAD_OBJ;

PTHREAD_OBJ g_pThreadList = NULL;
CRITICAL_SECTION g_cs;
BOOL g_bServerRunning = FALSE;
HANDLE g_hThreads[1000] = {0};
int g_nThreadsCount = 0;

PTHREAD_OBJ CreateThreadObj()
{
    PTHREAD_OBJ pThread = new THREAD_OBJ();
    if(pThread != NULL)
    {
        InitializeCriticalSectionAndSpinCount(&pThread->cs,4000);
        pThread->events[0] = WSACreateEvent();
        pThread->nSocksUsed = 1;
        EnterCriticalSection(&g_cs);
        pThread->pNext = g_pThreadList;
        g_pThreadList = pThread;
        LeaveCriticalSection(&g_cs);
    }
    return pThread;
}

void FreeThreadObj(PTHREAD_OBJ pThread)
{
    if(pThread == NULL)
        return;
    EnterCriticalSection(&g_cs);
    PTHREAD_OBJ p = g_pThreadList;
    if(p == pThread)
    {
        g_pThreadList = p->pNext;
    }else{
        while(p != NULL && p->pNext != pThread)
        {
            p = p->pNext;
        }
        if(p != NULL)
        {
            p->pNext = pThread->pNext;
        }
    }
    LeaveCriticalSection(&g_cs);

    DeleteCriticalSection(&pThread->cs);
    WSACloseEvent(pThread->events[0]);
    delete pThread;
}

LONG g_nTotalConnections;
LONG g_nCurrentConnections;

BOOL InsertSocket(PTHREAD_OBJ pThread,SOCKET s)
{
    if(pThread == NULL || s == INVALID_SOCKET)
        return FALSE;

    BOOL bRet = FALSE;
    EnterCriticalSection(&pThread->cs);
    if(pThread->nSocksUsed < WSA_MAXIMUM_WAIT_EVENTS)
    {
        pThread->events[pThread->nSocksUsed] = WSACreateEvent();
        pThread->sockets[pThread->nSocksUsed] = s;
        WSAEventSelect(s,pThread->events[pThread->nSocksUsed],FD_READ | FD_CLOSE | FD_WRITE);
        pThread->nSocksUsed++;
        bRet = TRUE;
        WSASetEvent(pThread->events[0]);//通知线程,有新的事件加入了,需要重新调用WSAWaitFormultipleEvents
    }
    LeaveCriticalSection(&pThread->cs);

    if(bRet)
    {
        InterlockedIncrement(&g_nTotalConnections);
        InterlockedIncrement(&g_nCurrentConnections);
    }
    return bRet;
}

void RemoveSocket(PTHREAD_OBJ pThread,SOCKET s)
{
    if(pThread == NULL || s == INVALID_SOCKET)
        return;
    EnterCriticalSection(&pThread->cs);
    for(int i=1;i<pThread->nSocksUsed;i++)
    {
        if(pThread->sockets[i] == s)
        {
            WSACloseEvent(pThread->events[i]);
            closesocket(s);
            for(int j=i;j<pThread->nSocksUsed - 1;j++)
            {
                pThread->events[j] = pThread->events[j+1];
                pThread->sockets[j] = pThread->sockets[j+1];
            }
            pThread->nSocksUsed--;
            break;
        }
    }
    LeaveCriticalSection(&pThread->cs);
    InterlockedDecrement(&g_nCurrentConnections);
}

BOOL HandleIo(PTHREAD_OBJ pThread,int nIndex)
{
    WSANETWORKEVENTS event;
    SOCKET s = pThread->sockets[nIndex];
    HANDLE sEvent = pThread->events[nIndex];
    if(0 != WSAEnumNetworkEvents(s,sEvent,&event))
    {
        printf("socket error!\n");
        RemoveSocket(pThread,s);
        return FALSE;
    }

    do
    {
        if(event.lNetworkEvents & FD_READ)
        {
            if(event.iErrorCode[FD_READ_BIT] == 0)
            {
                char szText[256];
                int nRecv = recv(s,szText,strlen(szText),0);
                if(nRecv > 0)
                {
                    szText[nRecv] = '\0';
                    printf("接收到数据:%s\n",szText);
                }else{
                    break;
                }
            }else
                break;
        }else if(event.lNetworkEvents & FD_CLOSE){
            break;
        }else if(event.lNetworkEvents & FD_WRITE){
            printf("FD_WRITE==========================\n");
        }
        return TRUE;
    } while (FALSE);
    printf("socket error2!\n");
    RemoveSocket(pThread,s);
    return FALSE;
}

DWORD WINAPI ServerThread(LPVOID lpParam)
{
    PTHREAD_OBJ pThread = (PTHREAD_OBJ)lpParam;
    while(TRUE)
    {
        int nIndex = WSAWaitForMultipleEvents(
            pThread->nSocksUsed,pThread->events,FALSE,WSA_INFINITE,FALSE);
        nIndex = nIndex - WSA_WAIT_EVENT_0;

        if(nIndex == WSA_WAIT_FAILED || nIndex == WSA_WAIT_TIMEOUT)
        {
            printf("WSAWaitForMultipleEvents error!\n");
            continue;
        }else if(nIndex == 0){
            ResetEvent(pThread->events[0]);
        }else{
            HandleIo(pThread,nIndex);
        }
        if(!g_bServerRunning && pThread->nSocksUsed == 1)
            break;
    }
    FreeThreadObj(pThread);
    return 0;
}

BOOL AssignToFreeThread(SOCKET s)
{
    if(s == INVALID_SOCKET)
        return FALSE;
    BOOL bAllSucceed = TRUE;
    EnterCriticalSection(&g_cs);
    PTHREAD_OBJ pThread = g_pThreadList;
    while(pThread != NULL)
    {
        if(InsertSocket(pThread,s))
        {
            break;
        }
        pThread = pThread->pNext;
    }

    if(pThread == NULL)
    {
        if(g_nThreadsCount < 1000)
        {
            pThread = CreateThreadObj();
            HANDLE hThread = CreateThread(NULL,0,ServerThread,pThread,0,NULL);
            if(!hThread)
            {
                bAllSucceed = FALSE;
                FreeThreadObj(pThread);
            }else{
                g_hThreads[g_nThreadsCount++] = hThread;
                InsertSocket(pThread,s);
            }
        }else
            bAllSucceed = FALSE;
    }
    LeaveCriticalSection(&g_cs);
    return bAllSucceed;
}

DWORD WINAPI ControlThread(LPVOID lpParma)
{
    HANDLE wsaEvent = (HANDLE)lpParma;
    char cmd[128];
    while(scanf("%s",cmd))
    {
        if(cmd[0] == 's')
        {
            g_bServerRunning = FALSE;
            EnterCriticalSection(&g_cs);
            PTHREAD_OBJ pThread = g_pThreadList;
            while(pThread != NULL)
            {
                EnterCriticalSection(&pThread->cs);
                for(int i=0;i<pThread->nSocksUsed;i++)
                {
                    closesocket(pThread->sockets[i]);
                }
                WSASetEvent(pThread->events[0]);
                LeaveCriticalSection(&pThread->cs);
                pThread = pThread->pNext;
            }
            LeaveCriticalSection(&g_cs);
            WSASetEvent(wsaEvent);
            break;
        }
    }
    return 0;
}

int main()
{
    WSAData wsaData;
    if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
    {
        printf("初始化失败!%d\n",WSAGetLastError());
        Sleep(5000);
        return -1;
    }
    USHORT nport = 3456;
    SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
    sockaddr_in sin;
    sin.sin_family = AF_INET;
    sin.sin_port = htons(nport);
    sin.sin_addr.S_un.S_addr = ADDR_ANY;

    if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))
    {
        printf("bind failed!%d\n",WSAGetLastError());
        Sleep(5000);
        return -1;
    }

    listen(sListen,200);

    WSAEVENT wsaEvent = WSACreateEvent();
    WSAEventSelect(sListen,wsaEvent,FD_ACCEPT | FD_CLOSE);
    InitializeCriticalSectionAndSpinCount(&g_cs,4000);
    g_bServerRunning = TRUE;
    HANDLE hThread = CreateThread(NULL,0,ControlThread,wsaEvent,0,NULL);
    CloseHandle(hThread);
    while(TRUE)
    {
        int nRet = WaitForSingleObject(wsaEvent,5*1000);
        if(!g_bServerRunning)
        {
            closesocket(sListen);
            WSACloseEvent(wsaEvent);
            WSAWaitForMultipleEvents(g_nThreadsCount,g_hThreads,TRUE,INFINITE,FALSE);
            for(int i=0;i<g_nThreadsCount;i++)
            {
                CloseHandle(g_hThreads[i]);
            }
            break;
        }
        if(nRet == WAIT_FAILED)
        {
            printf("WaitForSingleObject Failed!\n");
            break;
        }else if(nRet == WAIT_TIMEOUT){
            printf("\nTotalConnections: %d\nCurrentConnections: %d\nThreads:%d\n",
                g_nTotalConnections,g_nCurrentConnections,g_nThreadsCount);
            continue;
        }else{
            ResetEvent(wsaEvent);
            while(TRUE)
            {
                sockaddr_in addrRemote;
                int nLen = sizeof(addrRemote);
                SOCKET sNew = accept(sListen,(sockaddr*)&addrRemote,&nLen);
                if(sNew == SOCKET_ERROR)
                    break;

                if(!AssignToFreeThread(sNew))
                {
                    closesocket(sNew);
                    printf("AssignToFreeThread Failed!\n");
                }
            }
        }
    }
    DeleteCriticalSection(&g_cs);
    return 0;
}
微信小程序扫码登陆

文章评论

1853人参与,0条评论