返回> 网站首页
线程池的简单例程
yoours2011-03-04 14:00:37
简介一边听听音乐,一边写写文章。
一、 调用方法: TestSimpleThreadPool(false, 3);
a. 线程池线程:用于管理多个工作线程
b. 工作线程: 用于处理执行用户的任务
c. 初始化线程池 -> 创建指定数量的线程 -> 为空闲的线程分配任务
d. 工作线程 -> 完成任务 -> 等待分配新任务
e. 退出信号量有信号则退出
二、程序代码
//
// Doubly-linked list manipulation routines.
//
#define InitializeListHead32(ListHead) ((ListHead)->Flink = (ListHead)->Blink = PtrToUlong((ListHead)))
#define RTL_STATIC_LIST_HEAD(x) LIST_ENTRY x = { &x, &x }
FORCEINLINE VOID InitializeListHead(__out PLIST_ENTRY ListHead)
{
ListHead->Flink = ListHead->Blink = ListHead;
}
__checkReturn BOOLEAN FORCEINLINE IsListEmpty(__in const LIST_ENTRY * ListHead)
{
return (BOOLEAN)(ListHead->Flink == ListHead);
}
FORCEINLINE BOOLEAN RemoveEntryList(__in PLIST_ENTRY Entry)
{
PLIST_ENTRY Blink;
PLIST_ENTRY Flink;
Flink = Entry->Flink;
Blink = Entry->Blink;
Blink->Flink = Flink;
Flink->Blink = Blink;
return (BOOLEAN)(Flink == Blink);
}
FORCEINLINE PLIST_ENTRY RemoveHeadList(__inout PLIST_ENTRY ListHead)
{
PLIST_ENTRY Flink;
PLIST_ENTRY Entry;
Entry = ListHead->Flink;
Flink = Entry->Flink;
ListHead->Flink = Flink;
Flink->Blink = ListHead;
return Entry;
}
FORCEINLINE PLIST_ENTRY RemoveTailList(__inout PLIST_ENTRY ListHead)
{
PLIST_ENTRY Blink;
PLIST_ENTRY Entry;
Entry = ListHead->Blink;
Blink = Entry->Blink;
ListHead->Blink = Blink;
Blink->Flink = ListHead;
return Entry;
}
FORCEINLINE VOID InsertTailList(__inout PLIST_ENTRY ListHead, __inout PLIST_ENTRY Entry)
{
PLIST_ENTRY Blink;
Blink = ListHead->Blink;
Entry->Flink = ListHead;
Entry->Blink = Blink;
Blink->Flink = Entry;
ListHead->Blink = Entry;
}
FORCEINLINE VOID InsertHeadList(__inout PLIST_ENTRY ListHead, __inout PLIST_ENTRY Entry)
{
PLIST_ENTRY Flink;
Flink = ListHead->Flink;
Entry->Flink = Flink;
Entry->Blink = ListHead;
Flink->Blink = Entry;
ListHead->Flink = Entry;
}
FORCEINLINE VOID AppendTailList(__inout PLIST_ENTRY ListHead, __inout PLIST_ENTRY ListToAppend)
{
PLIST_ENTRY ListEnd = ListHead->Blink;
ListHead->Blink->Flink = ListToAppend;
ListHead->Blink = ListToAppend->Blink;
ListToAppend->Blink->Flink = ListHead;
ListToAppend->Blink = ListEnd;
}
FORCEINLINE PSINGLE_LIST_ENTRY PopEntryList(__inout PSINGLE_LIST_ENTRY ListHead)
{
PSINGLE_LIST_ENTRY FirstEntry;
FirstEntry = ListHead->Next;
if (FirstEntry != NULL)
{
ListHead->Next = FirstEntry->Next;
}
return FirstEntry;
}
FORCEINLINE VOID PushEntryList(__inout PSINGLE_LIST_ENTRY ListHead, __inout PSINGLE_LIST_ENTRY Entry)
{
Entry->Next = ListHead->Next;
ListHead->Next = Entry;
}
// Test Our own thread pool
typedef struct _THREAD_POOL
{
HANDLE QuitEvent;
HANDLE WorkItemSemaphore;
LONG WorkItemCount;
LIST_ENTRY WorkItemHeader;
CRITICAL_SECTION WorkItemLock;
LONG ThreadNum;
HANDLE *ThreadsArray;
}THREAD_POOL, *PTHREAD_POOL;
// 用户线程函数指针
typedef VOID (*WORK_ITEM_PROC)(PVOID Param);
typedef struct _WORK_ITEM
{
LIST_ENTRY List;
WORK_ITEM_PROC UserProc;
PVOID UserParam;
}WORK_ITEM, *PWORK_ITEM;
HANDLE CompleteEvent;
// 线程池的线程 - 处理多个线程工作的
DWORD WINAPI WorkerThread(PVOID pParam)
{
PTHREAD_POOL pThreadPool = (PTHREAD_POOL)pParam;
HANDLE Events[2];
Events[0] = pThreadPool->QuitEvent;
Events[1] = pThreadPool->WorkItemSemaphore;
for(;;)
{
DWORD dwRet = WaitForMultipleObjects(2, Events, FALSE, INFINITE);
if(dwRet == WAIT_OBJECT_0)
{
//索引为0的信号量(WAIT_OBJECT_0 + 0)QuitEvent有信号,退出
break;
}else if(dwRet == WAIT_OBJECT_0 + 1){
//索引为1的信号量(WAIT_OBJECT_0 + 1)WorkItemSemaphore有信号,工作
PWORK_ITEM pWorkItem;
PLIST_ENTRY pList;
EnterCriticalSection(&pThreadPool->WorkItemLock);
_ASSERT(!IsListEmpty(&pThreadPool->WorkItemHeader));
pList = RemoveHeadList(&pThreadPool->WorkItemHeader);
LeaveCriticalSection(&pThreadPool->WorkItemLock);
pWorkItem = CONTAINING_RECORD(pList, WORK_ITEM, List);
pWorkItem->UserProc(pWorkItem->UserParam);
// 函数减少(减少1)指定变量的值,并且检查结果。这个函数阻止多于一个线程同时使用变量
// 工作线程数量-1计数
if (InterlockedDecrement(&pThreadPool->WorkItemCount) == 0)
{
//正在使用的线程数为0
}
free(pWorkItem);
}else{
_ASSERT(0);
break;
}
}
return 0;
}
// 初始化线程池
BOOL InitializeThreadPool(PTHREAD_POOL pThreadPool, LONG ThreadNum)
{
pThreadPool->QuitEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
// 创建信号量 第一个参数:使用默认安全 第二个参数:初始信号量的数量 第三个参数:最大的信号量数量 第四个参数:信号量字符串名称
pThreadPool->WorkItemSemaphore = CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL);
pThreadPool->WorkItemCount = 0;
InitializeListHead(&pThreadPool->WorkItemHeader);
InitializeCriticalSection(&pThreadPool->WorkItemLock);
// 线程池中包含的工作线程数量
pThreadPool->ThreadNum = ThreadNum;
pThreadPool->ThreadsArray = (HANDLE*)malloc(sizeof(HANDLE) * ThreadNum);
// 创建指定数量的工作线程
for(int i=0; i<ThreadNum; i++)
{
pThreadPool->ThreadsArray[i] = CreateThread(NULL, 0, WorkerThread, pThreadPool, 0, NULL);
}
return TRUE;
}
// 销毁线程池
VOID DestroyThreadPool(PTHREAD_POOL pThreadPool)
{
// 销毁线程池中的线程
SetEvent(pThreadPool->QuitEvent);
for(int i=0; i<pThreadPool->ThreadNum; i++)
{
WaitForSingleObject(pThreadPool->ThreadsArray[i], INFINITE);
CloseHandle(pThreadPool->ThreadsArray[i]);
}
free(pThreadPool->ThreadsArray);
CloseHandle(pThreadPool->QuitEvent);
CloseHandle(pThreadPool->WorkItemSemaphore);
DeleteCriticalSection(&pThreadPool->WorkItemLock);
while(!IsListEmpty(&pThreadPool->WorkItemHeader))
{
PWORK_ITEM pWorkItem;
PLIST_ENTRY pList;
pList = RemoveHeadList(&pThreadPool->WorkItemHeader);
pWorkItem = CONTAINING_RECORD(pList, WORK_ITEM, List);
free(pWorkItem);
}
}
BOOL PostWorkItem(PTHREAD_POOL pThreadPool, WORK_ITEM_PROC UserProc, PVOID UserParam)
{
PWORK_ITEM pWorkItem = (PWORK_ITEM)malloc(sizeof(WORK_ITEM));
if(pWorkItem == NULL)
return FALSE;
//工作线程函数指针和参数
pWorkItem->UserProc = UserProc;
pWorkItem->UserParam = UserParam;
EnterCriticalSection(&pThreadPool->WorkItemLock);
InsertTailList(&pThreadPool->WorkItemHeader, &pWorkItem->List);
LeaveCriticalSection(&pThreadPool->WorkItemLock);
// 工作线程数量+1计数
InterlockedIncrement(&pThreadPool->WorkItemCount);
ReleaseSemaphore(pThreadPool->WorkItemSemaphore, 1, NULL);
return TRUE;
}
//用户的工作内容
VOID UserProc1(PVOID dwParam)
{
//WorkItem(dwParam);
}
// 将工作任务(数量)分配给ThreadNum个线程去处理
void TestSimpleThreadPool(BOOL bWaitMode, LONG ThreadNum)
{
THREAD_POOL ThreadPool;
InitializeThreadPool(&ThreadPool, ThreadNum);
CompleteEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
int ItemCount = 20;//工作任务数量
for(int i = 0; i < ItemCount; i++)
{
PostWorkItem(&ThreadPool, UserProc1, (PVOID)bWaitMode);
}
//等待退出信号 //退出SetEvent(CompleteEvent);
WaitForSingleObject(CompleteEvent, INFINITE);
CloseHandle(CompleteEvent);
DestroyThreadPool(&ThreadPool);
}
文章评论
1401人参与,0条评论