返回> 网站首页 

线程池的简单例程

yoours2011-03-04 14:00:37 阅读 1255

简介一边听听音乐,一边写写文章。


一、 调用方法: TestSimpleThreadPool(false, 3);
        a. 线程池线程:用于管理多个工作线程
        b. 工作线程: 用于处理执行用户的任务
        c. 初始化线程池 -> 创建指定数量的线程 -> 为空闲的线程分配任务
        d. 工作线程 -> 完成任务 -> 等待分配新任务
        e. 退出信号量有信号则退出

二、程序代码
//
//  Doubly-linked list manipulation routines.
//

#define InitializeListHead32(ListHead) ((ListHead)->Flink = (ListHead)->Blink = PtrToUlong((ListHead)))
#define RTL_STATIC_LIST_HEAD(x) LIST_ENTRY x = { &x, &x }

FORCEINLINE VOID InitializeListHead(__out PLIST_ENTRY ListHead)
{
    ListHead->Flink = ListHead->Blink = ListHead;
}

__checkReturn BOOLEAN FORCEINLINE IsListEmpty(__in const LIST_ENTRY * ListHead)
{
    return (BOOLEAN)(ListHead->Flink == ListHead);
}

FORCEINLINE BOOLEAN RemoveEntryList(__in PLIST_ENTRY Entry)
{
    PLIST_ENTRY Blink;
    PLIST_ENTRY Flink;

    Flink = Entry->Flink;
    Blink = Entry->Blink;
    Blink->Flink = Flink;
    Flink->Blink = Blink;
    return (BOOLEAN)(Flink == Blink);
}

FORCEINLINE PLIST_ENTRY RemoveHeadList(__inout PLIST_ENTRY ListHead)
{
    PLIST_ENTRY Flink;
    PLIST_ENTRY Entry;

    Entry = ListHead->Flink;
    Flink = Entry->Flink;
    ListHead->Flink = Flink;
    Flink->Blink = ListHead;
    return Entry;
}

FORCEINLINE PLIST_ENTRY RemoveTailList(__inout PLIST_ENTRY ListHead)
{
    PLIST_ENTRY Blink;
    PLIST_ENTRY Entry;

    Entry = ListHead->Blink;
    Blink = Entry->Blink;
    ListHead->Blink = Blink;
    Blink->Flink = ListHead;
    return Entry;
}

FORCEINLINE VOID InsertTailList(__inout PLIST_ENTRY ListHead, __inout PLIST_ENTRY Entry)
{
    PLIST_ENTRY Blink;

    Blink = ListHead->Blink;
    Entry->Flink = ListHead;
    Entry->Blink = Blink;
    Blink->Flink = Entry;
    ListHead->Blink = Entry;
}

FORCEINLINE VOID InsertHeadList(__inout PLIST_ENTRY ListHead, __inout PLIST_ENTRY Entry)
{
    PLIST_ENTRY Flink;

    Flink = ListHead->Flink;
    Entry->Flink = Flink;
    Entry->Blink = ListHead;
    Flink->Blink = Entry;
    ListHead->Flink = Entry;
}

FORCEINLINE VOID AppendTailList(__inout PLIST_ENTRY ListHead, __inout PLIST_ENTRY ListToAppend)
{
    PLIST_ENTRY ListEnd = ListHead->Blink;

    ListHead->Blink->Flink = ListToAppend;
    ListHead->Blink = ListToAppend->Blink;
    ListToAppend->Blink->Flink = ListHead;
    ListToAppend->Blink = ListEnd;
}

FORCEINLINE PSINGLE_LIST_ENTRY PopEntryList(__inout PSINGLE_LIST_ENTRY ListHead)
{
    PSINGLE_LIST_ENTRY FirstEntry;
    FirstEntry = ListHead->Next;
    if (FirstEntry != NULL)
{
        ListHead->Next = FirstEntry->Next;
    }

    return FirstEntry;
}

FORCEINLINE VOID PushEntryList(__inout PSINGLE_LIST_ENTRY ListHead, __inout PSINGLE_LIST_ENTRY Entry)
{
    Entry->Next = ListHead->Next;
    ListHead->Next = Entry;
}

// Test Our own thread pool
typedef struct _THREAD_POOL
{
HANDLE QuitEvent;
HANDLE WorkItemSemaphore;

LONG WorkItemCount;
LIST_ENTRY WorkItemHeader;
CRITICAL_SECTION WorkItemLock;

LONG ThreadNum;
HANDLE *ThreadsArray;
}THREAD_POOL, *PTHREAD_POOL;
// 用户线程函数指针
typedef VOID (*WORK_ITEM_PROC)(PVOID Param);

typedef struct _WORK_ITEM
{
LIST_ENTRY List;

WORK_ITEM_PROC UserProc;
PVOID UserParam;

}WORK_ITEM, *PWORK_ITEM;

HANDLE CompleteEvent;
// 线程池的线程 - 处理多个线程工作的
DWORD WINAPI WorkerThread(PVOID pParam)
{
PTHREAD_POOL pThreadPool = (PTHREAD_POOL)pParam;
HANDLE Events[2];

Events[0] = pThreadPool->QuitEvent;
Events[1] = pThreadPool->WorkItemSemaphore;

for(;;)
{
DWORD dwRet = WaitForMultipleObjects(2, Events, FALSE, INFINITE);
if(dwRet == WAIT_OBJECT_0)
{
//索引为0的信号量(WAIT_OBJECT_0 + 0)QuitEvent有信号,退出
break;
}else if(dwRet == WAIT_OBJECT_0 + 1){
//索引为1的信号量(WAIT_OBJECT_0 + 1)WorkItemSemaphore有信号,工作
PWORK_ITEM pWorkItem;
PLIST_ENTRY pList;

EnterCriticalSection(&pThreadPool->WorkItemLock);
_ASSERT(!IsListEmpty(&pThreadPool->WorkItemHeader));
pList = RemoveHeadList(&pThreadPool->WorkItemHeader);
LeaveCriticalSection(&pThreadPool->WorkItemLock);

pWorkItem = CONTAINING_RECORD(pList, WORK_ITEM, List);
pWorkItem->UserProc(pWorkItem->UserParam);

// 函数减少(减少1)指定变量的值,并且检查结果。这个函数阻止多于一个线程同时使用变量
// 工作线程数量-1计数
if (InterlockedDecrement(&pThreadPool->WorkItemCount) == 0)
{
//正在使用的线程数为0
}

free(pWorkItem);
}else{
_ASSERT(0);
break;
}
}

return 0;
}

// 初始化线程池
BOOL InitializeThreadPool(PTHREAD_POOL pThreadPool, LONG ThreadNum)
{
pThreadPool->QuitEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
// 创建信号量 第一个参数:使用默认安全 第二个参数:初始信号量的数量 第三个参数:最大的信号量数量 第四个参数:信号量字符串名称
pThreadPool->WorkItemSemaphore = CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL);
pThreadPool->WorkItemCount = 0;

InitializeListHead(&pThreadPool->WorkItemHeader);
InitializeCriticalSection(&pThreadPool->WorkItemLock);

// 线程池中包含的工作线程数量
pThreadPool->ThreadNum = ThreadNum;
pThreadPool->ThreadsArray = (HANDLE*)malloc(sizeof(HANDLE) * ThreadNum);

// 创建指定数量的工作线程
for(int i=0; i<ThreadNum; i++)
{
pThreadPool->ThreadsArray[i] = CreateThread(NULL, 0, WorkerThread, pThreadPool, 0, NULL);
}

return TRUE;
}

// 销毁线程池
VOID DestroyThreadPool(PTHREAD_POOL pThreadPool)
{
// 销毁线程池中的线程
SetEvent(pThreadPool->QuitEvent);
for(int i=0; i<pThreadPool->ThreadNum; i++)
{
WaitForSingleObject(pThreadPool->ThreadsArray[i], INFINITE);
CloseHandle(pThreadPool->ThreadsArray[i]);
}

free(pThreadPool->ThreadsArray);

CloseHandle(pThreadPool->QuitEvent);
CloseHandle(pThreadPool->WorkItemSemaphore);
DeleteCriticalSection(&pThreadPool->WorkItemLock);

while(!IsListEmpty(&pThreadPool->WorkItemHeader))
{
PWORK_ITEM pWorkItem;
PLIST_ENTRY pList;

pList = RemoveHeadList(&pThreadPool->WorkItemHeader);
pWorkItem = CONTAINING_RECORD(pList, WORK_ITEM, List);

free(pWorkItem);
}
}

BOOL PostWorkItem(PTHREAD_POOL pThreadPool, WORK_ITEM_PROC UserProc, PVOID UserParam)
{
PWORK_ITEM pWorkItem = (PWORK_ITEM)malloc(sizeof(WORK_ITEM));
if(pWorkItem == NULL)
return FALSE;

//工作线程函数指针和参数
pWorkItem->UserProc = UserProc;
pWorkItem->UserParam = UserParam;

EnterCriticalSection(&pThreadPool->WorkItemLock);
InsertTailList(&pThreadPool->WorkItemHeader, &pWorkItem->List);
LeaveCriticalSection(&pThreadPool->WorkItemLock);

// 工作线程数量+1计数
InterlockedIncrement(&pThreadPool->WorkItemCount);
ReleaseSemaphore(pThreadPool->WorkItemSemaphore, 1, NULL);

return TRUE;
}

//用户的工作内容
VOID UserProc1(PVOID dwParam)
{
//WorkItem(dwParam);
}

// 将工作任务(数量)分配给ThreadNum个线程去处理
void TestSimpleThreadPool(BOOL bWaitMode, LONG ThreadNum)
{
THREAD_POOL ThreadPool;    
InitializeThreadPool(&ThreadPool, ThreadNum);
CompleteEvent = CreateEvent(NULL, FALSE, FALSE, NULL);

int ItemCount = 20;//工作任务数量
for(int i = 0; i < ItemCount; i++)
{
PostWorkItem(&ThreadPool, UserProc1, (PVOID)bWaitMode);
}

//等待退出信号 //退出SetEvent(CompleteEvent);
WaitForSingleObject(CompleteEvent, INFINITE);
CloseHandle(CompleteEvent);

DestroyThreadPool(&ThreadPool);
}
微信小程序扫码登陆

文章评论

1255人参与,0条评论