线程池
线程池(英語:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数上限一般取CPU逻辑核心数+2,线程数过多会导致额外的线程切换开销。 任务调度以执行线程的常见方法是使用同步队列,称作任务队列。池中的线程等待队列中的任务,并把执行完的任务放入完成队列中。 线程池模式一般分为两种:HS/HA半同步/半异步模式、L/F领导者与跟随者模式。
线程池的伸缩性对性能有较大的影响。
Windows API的线程池函数Windows操作系统的API提供了一套线程池的实现接口。[1]可以方便地创建、使用线程池。Windows线程池API被设计为一组协同对象, 其中有些对象表示工作单位、计时器、异步I/O 等等。使用下述用户模式的对象来管理线程池及相关的数据:
相关的API函数:[2]
示例示例程序1如下: #include <windows.h>
#include <tchar.h>
#include <stdio.h>
//
// Thread pool wait callback function template
//
VOID CALLBACK MyWaitCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Parameter, PTP_WAIT Wait, TP_WAIT_RESULT WaitResult)
{
// Instance, Parameter, Wait, and WaitResult not used in this example.
UNREFERENCED_PARAMETER(Instance); UNREFERENCED_PARAMETER(Parameter); UNREFERENCED_PARAMETER(Wait); UNREFERENCED_PARAMETER(WaitResult);
// Do something when the wait is over.
_tprintf(_T("MyWaitCallback: wait is over.\n"));
}
//
// Thread pool timer callback function template
//
VOID CALLBACK MyTimerCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Parameter, PTP_TIMER Timer)
{
// Instance, Parameter, and Timer not used in this example.
UNREFERENCED_PARAMETER(Instance); UNREFERENCED_PARAMETER(Parameter); UNREFERENCED_PARAMETER(Timer);
// Do something when the timer fires.
_tprintf(_T("MyTimerCallback: timer has fired.\n"));
}
//
// This is the thread pool work callback function.
//
VOID CALLBACK MyWorkCallback( PTP_CALLBACK_INSTANCE Instance, PVOID Parameter, PTP_WORK Work)
{
// Instance, Parameter, and Work not used in this example.
UNREFERENCED_PARAMETER(Instance);UNREFERENCED_PARAMETER(Parameter);UNREFERENCED_PARAMETER(Work);
// Do something when the work callback is invoked.
_tprintf(_T("MyWorkCallback: Task performed.\n"));
}
int main(void)
{
PTP_WAIT Wait = NULL;
PTP_WAIT_CALLBACK waitcallback = MyWaitCallback;
HANDLE hEvent = NULL;
UINT i = 0;
UINT rollback = 0;
// Create an auto-reset event and initialized as nonsignaled.
hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
if (NULL == hEvent) {
// Error Handling
return 0;
}
rollback = 1; // CreateEvent succeeded
Wait = CreateThreadpoolWait(waitcallback,
NULL, // 回调函数的输入参数
NULL); // 使用缺省的回调环境
if (NULL == Wait) {
_tprintf(_T("CreateThreadpoolWait failed. LastError: %u\n"),GetLastError());
goto new_wait_cleanup;
}
rollback = 2; // CreateThreadpoolWait succeeded
// must re-register the event with the wait object before signaling it each time to trigger the wait callback
// each time before signaling the event to trigger the wait callback.
for (i = 0; i < 5; i++) {
SetThreadpoolWait(Wait, //线程池等待对象
hEvent, //内核等待对象
NULL); //超时设定
SetEvent(hEvent); //触发内核等待对象
// Delay for the waiter thread to act if necessary.
Sleep(500);
// Block here until the callback function is done executing.
WaitForThreadpoolWaitCallbacks(Wait, FALSE);
}
new_wait_cleanup:
switch (rollback) {
case 2:
// Unregister the wait by setting the event to NULL.
SetThreadpoolWait(Wait, NULL, NULL); //the wait object will cease to queue new callbacks (but callbacks already queued will still occur
// Close the wait.
CloseThreadpoolWait(Wait);
case 1:
// Close the event.
CloseHandle(hEvent);
default:
break;
}
BOOL bRet = FALSE;
PTP_WORK work = NULL;
PTP_TIMER timer = NULL;
PTP_POOL pool = NULL;
PTP_WORK_CALLBACK workcallback = MyWorkCallback;
PTP_TIMER_CALLBACK timercallback = MyTimerCallback;
TP_CALLBACK_ENVIRON CallBackEnviron;
PTP_CLEANUP_GROUP cleanupgroup = NULL;
FILETIME FileDueTime;
ULARGE_INTEGER ulDueTime;
rollback = 0;
InitializeThreadpoolEnvironment(&CallBackEnviron); //不使用缺省的线程池与缺省的回调环境
// Create a custom, dedicated thread pool.
pool = CreateThreadpool(NULL);
if (NULL == pool) {
_tprintf(_T("CreateThreadpool failed. LastError: %u\n"), GetLastError());
goto main_cleanup;
}
rollback = 1; // pool creation succeeded
// The thread pool is made persistent simply by setting both the minimum and maximum threads to 1.
SetThreadpoolThreadMaximum(pool, 1);
bRet = SetThreadpoolThreadMinimum(pool, 1);
if (FALSE == bRet) {
_tprintf(_T("SetThreadpoolThreadMinimum failed. LastError: %u\n"),GetLastError());
goto main_cleanup;
}
// Create a cleanup group for this thread pool.
cleanupgroup = CreateThreadpoolCleanupGroup();
if (NULL == cleanupgroup) {
_tprintf(_T("CreateThreadpoolCleanupGroup failed. LastError: %u\n"),GetLastError());
goto main_cleanup;
}
rollback = 2; // Cleanup group creation succeeded
// Associate the callback environment with our thread pool.
SetThreadpoolCallbackPool(&CallBackEnviron, pool);
// Associate the cleanup group with our thread pool.
// Objects created with the same callback environment as the cleanup group become members of the cleanup group.
SetThreadpoolCallbackCleanupGroup(&CallBackEnviron, //回调环境
cleanupgroup, //Cleanup Group
NULL); //Cleanup Group的回调函数,当释放其所包含的对象之前先调用该回调函数
// Create work with the callback environment.
work = CreateThreadpoolWork(workcallback, //回调函数
NULL, //回调函数的输入参数
&CallBackEnviron); //回调环境
if (NULL == work) {
_tprintf(_T("CreateThreadpoolWork failed. LastError: %u\n"), GetLastError());
goto main_cleanup;
}
rollback = 3; // Creation of work succeeded
// Submit the work to the pool. Because this was a pre-allocated work item (using CreateThreadpoolWork), it is guaranteed to execute.
SubmitThreadpoolWork(work);
// Create a timer with the same callback environment.
timer = CreateThreadpoolTimer(timercallback, //回调函数
NULL, //回调函数的输入参数
&CallBackEnviron); //回调环境
if (NULL == timer) {
_tprintf(_T("CreateThreadpoolTimer failed. LastError: %u\n"), GetLastError());
goto main_cleanup;
}
rollback = 4; // Timer creation succeeded
// Set the timer to fire in one second.
ulDueTime.QuadPart = (ULONGLONG)-(1 * 10 * 1000 * 1000);
FileDueTime.dwHighDateTime = ulDueTime.HighPart;
FileDueTime.dwLowDateTime = ulDueTime.LowPart;
SetThreadpoolTimer(timer, //线程池定时器对象
&FileDueTime, //到期时间
0, //周期时期,为0则表示一次性定时器
0); //操作系统调用回调函数的最大延迟时间
// Delay for the timer to be fired
Sleep(1500);
// Wait for all callbacks to finish.
// CloseThreadpoolCleanupGroupMembers also releases objects that are members of the cleanup group,
// so it is not necessary to call close functions on individual objects after calling CloseThreadpoolCleanupGroupMembers.
CloseThreadpoolCleanupGroupMembers(cleanupgroup, //Cleanup Group
FALSE, //为真则取消还未开始执行的pending的回调函数
NULL); //CleanupGroup回调函数的输入参数
// Already cleaned up the work item with the
// CloseThreadpoolCleanupGroupMembers, so set rollback to 2.
rollback = 2;
goto main_cleanup;
main_cleanup:
// Clean up any individual pieces manually
// Notice the fall-through structure of the switch.
// Clean up in reverse order.
switch (rollback) {
case 4:
case 3:
// Clean up the cleanup group members.
CloseThreadpoolCleanupGroupMembers(cleanupgroup,FALSE, NULL);
case 2:
// Clean up the cleanup group.
CloseThreadpoolCleanupGroup(cleanupgroup);
case 1:
// Clean up the pool.
CloseThreadpool(pool);
default:
break;
}
return 0;
}
关于IO线程池的一个示例: #include <windows.h>
#include <tchar.h>
#include <strsafe.h>
#include <locale.h>
#include <iostream>
#include <limits>
void PressEnterToContinue()
{
std::cout << "Press ENTER to continue... " << std::flush;
std::cin.ignore( (std::numeric_limits<std::streamsize>::max)( ), '\n');
}
//////////////////////////////////////////////////////////////////////////
#define QMLX_ALLOC(sz) HeapAlloc(GetProcessHeap(),0,sz)
#define QMLX_CALLOC(sz) HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sz)
#define QMLX_SAFEFREE(p) if(NULL != p){HeapFree(GetProcessHeap(),0,p);p=NULL;}
#define QMLX_ASSERT(s) if(!(s)){DebugBreak();}
#define QMLX_BEGINTHREAD(Fun,Param) CreateThread(NULL,0,\
(LPTHREAD_START_ROUTINE)Fun,Param,0,NULL);
//////////////////////////////////////////////////////////////////////////
#define MAXWRITEPERTHREAD 2 //每个线程最大写入次数
#define MAXWRITETHREAD 2 //写入线程的数量
#define OP_READ 0x01 //读操作
#define OP_WRITE 0x02 //写操作
//#pragma pack(show)
//单IO数据
typedef struct __declspec(align(16)) _tagPerIoData {
OVERLAPPED m_ol;
HANDLE m_hFile; //操作的文件句柄
DWORD m_dwOp; //操作类型,OP_READ或OP_WRITE
LPVOID m_pData; //操作的数据
UINT m_nLen; //操作的数据长度
DWORD m_dwWrite; //写入的字节数
DWORD m_dwTimestamp; //起始操作的时间戳
}PER_IO_DATA, *PPER_IO_DATA;
//IOCP线程池回调函数,实际就是完成通知的响应函数
VOID CALLBACK IOCPCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PVOID pOverlapped,
ULONG IoResult, ULONG_PTR NumberOfBytesTransferred, PTP_IO pio);
//写文件的线程
DWORD WINAPI WriteThread(LPVOID lpParam);
//当前操作的文件对象的指针
LARGE_INTEGER g_liFilePointer = { 0 };
//IOCP线程池
PTP_IO g_pThreadpoolIo = NULL;
//////////////////////////////////////////////////////////////////////////
//获取可模块的路径名(路径后含‘\’)
VOID GetAppPath(LPTSTR pszBuffer) {
DWORD dwLen = 0;
if (0 == (dwLen = GetModuleFileName(NULL, pszBuffer, MAX_PATH)))
return;
for (DWORD i = dwLen; i > 0; i--) {
if ('\\' == pszBuffer[i]) {
pszBuffer[i + 1] = '\0';
break;
}
}
}
int _tmain() {
_tsetlocale(LC_ALL, _T("chs"));
TCHAR pFileName[MAX_PATH] = {};
GetAppPath(pFileName);
StringCchCat(pFileName, MAX_PATH, _T("NewIOCPFile.txt"));
HANDLE ahWThread[MAXWRITETHREAD] = {};
DWORD dwWrited = 0;
//创建文件
HANDLE hTxtFile = CreateFile(pFileName, GENERIC_WRITE, 0, NULL,
CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);
if (INVALID_HANDLE_VALUE == hTxtFile) {
_tprintf(_T("CreateFile(%s)失败,错误码:%u\n"), GetLastError());
_tsystem(_T("PAUSE"));
return 0;
}
//初始化线程池回调环境
TP_CALLBACK_ENVIRON poolEnv = {};
InitializeThreadpoolEnvironment(&poolEnv);
//创建IOCP线程池
g_pThreadpoolIo = CreateThreadpoolIo(hTxtFile, (PTP_WIN32_IO_CALLBACK)IOCPCallback, hTxtFile, &poolEnv);
//启动IOCP线程池
StartThreadpoolIo(g_pThreadpoolIo);
//写入UNICODE文件的前缀码,以便正确打开
PER_IO_DATA* pIo = (PPER_IO_DATA)QMLX_CALLOC(sizeof(PER_IO_DATA));
QMLX_ASSERT(pIo != NULL);
pIo->m_dwOp = OP_WRITE;
pIo->m_hFile = hTxtFile;
pIo->m_pData = QMLX_CALLOC(sizeof(WORD));
QMLX_ASSERT(pIo->m_pData != NULL);
*((WORD*)pIo->m_pData) = MAKEWORD(0xFF, 0xFE);
pIo->m_nLen = sizeof(WORD);
//偏移文件指针
pIo->m_ol.Offset = g_liFilePointer.LowPart;
pIo->m_ol.OffsetHigh = g_liFilePointer.HighPart;
g_liFilePointer.QuadPart += pIo->m_nLen;
pIo->m_dwTimestamp = GetTickCount(); //记录时间戳
WriteFile(hTxtFile, pIo->m_pData, pIo->m_nLen,
&pIo->m_dwWrite, (LPOVERLAPPED)&pIo->m_ol);
//等待IOCP线程池完成操作
WaitForThreadpoolIoCallbacks(g_pThreadpoolIo, FALSE);
//启动写入线程进行日志写入操作
for (int i = 0; i < MAXWRITETHREAD; i++) {
ahWThread[i] = QMLX_BEGINTHREAD(WriteThread, hTxtFile);
}
//让主线程等待这些写入线程结束
WaitForMultipleObjects(MAXWRITETHREAD, ahWThread, TRUE, INFINITE);
for (int i = 0; i < MAXWRITETHREAD; i++) {
CloseHandle(ahWThread[i]);
}
//关闭IOCP线程池
CloseThreadpoolIo(g_pThreadpoolIo);
//关闭日志文件
if (INVALID_HANDLE_VALUE != hTxtFile) {
CloseHandle(hTxtFile);
hTxtFile = INVALID_HANDLE_VALUE;
}
_tsystem(_T("PAUSE"));
return 0;
}
//IOCP线程池回调函数,实际就是完成通知的响应函数
VOID CALLBACK IOCPCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PVOID pOverlapped,
ULONG IoResult, ULONG_PTR NumberOfBytesTransferred, PTP_IO pio)
{
if (NO_ERROR != IoResult) {
_tprintf(_T("I/O操作出错,错误码:%u\n"), IoResult);
return;
}
PPER_IO_DATA pIo = CONTAINING_RECORD((LPOVERLAPPED)pOverlapped, PER_IO_DATA, m_ol);
DWORD dwCurTimestamp = GetTickCount();
switch (pIo->m_dwOp)
{
case OP_WRITE://写操作结束
{//写入操作结束
_tprintf(_T("线程[0x%x]得到IO完成通知,完成操作(%s),缓冲(0x%08x)长度(%ubytes),写入时间戳(%u)当前时间戳(%u)时差(%u)\n"),
GetCurrentThreadId(), OP_WRITE == pIo->m_dwOp ? _T("Write") : _T("Read"),
pIo->m_pData, pIo->m_nLen, pIo->m_dwTimestamp, dwCurTimestamp, dwCurTimestamp - pIo->m_dwTimestamp);
QMLX_SAFEFREE(pIo->m_pData);
QMLX_SAFEFREE(pIo);
}
break;
case OP_READ: //读操作结束
break;
default:
break;
}
}
//写文件的线程
#define MAX_LOGLEN 256
DWORD WINAPI WriteThread(LPVOID lpParam)
{
TCHAR pTxtContext[MAX_LOGLEN] = {};
PPER_IO_DATA pIo = NULL;
size_t szLen = 0;
LPTSTR pWriteText = NULL;
StringCchPrintf(pTxtContext, MAX_LOGLEN, _T("这是一条模拟的日志记录,由线程[0x%x]写入\r\n"),
GetCurrentThreadId());
StringCchLength(pTxtContext, MAX_LOGLEN, &szLen);
szLen += 1;
int i = 0;
for (; i < MAXWRITEPERTHREAD; i++) {
pWriteText = (LPTSTR)QMLX_CALLOC(szLen * sizeof(TCHAR));
QMLX_ASSERT(NULL != pWriteText);
StringCchCopy(pWriteText, szLen, pTxtContext);
//为每个操作申请一个“单IO数据”结构体
pIo = (PPER_IO_DATA)QMLX_CALLOC(sizeof(PER_IO_DATA));
QMLX_ASSERT(pIo != NULL);
pIo->m_dwOp = OP_WRITE;
pIo->m_hFile = (HANDLE)lpParam;
pIo->m_pData = pWriteText;
pIo->m_nLen = (szLen - 1) * sizeof(TCHAR);
//这里使用原子操作同步文件指针,写入不会相互覆盖
//这个地方体现了lock-free算法的精髓,使用了基本的CAS操作控制文件指针
//比传统的使用关键代码段并等待的方法,这里用的方法要轻巧的多,付出的代价也小
*((LONGLONG*)&pIo->m_ol.Pointer) = InterlockedCompareExchange64(&g_liFilePointer.QuadPart,
g_liFilePointer.QuadPart + pIo->m_nLen, g_liFilePointer.QuadPart);
pIo->m_dwTimestamp = GetTickCount(); //记录时间戳
StartThreadpoolIo(g_pThreadpoolIo);
//写入
WriteFile((HANDLE)lpParam, pIo->m_pData, pIo->m_nLen,
&pIo->m_dwWrite, (LPOVERLAPPED)&pIo->m_ol);
if (ERROR_IO_PENDING != GetLastError()) {
CancelThreadpoolIo(g_pThreadpoolIo);
}
}
return i;
}
.NET Framework的线程池实现命名空间System.Threading中的类ThreadPool提供一个线程池,该线程池可用于执行任务、发送工作项、处理异步 I/O、代表其他线程等待以及处理计时器。[3] 参看参考文献
|
Index:
pl ar de en es fr it arz nl ja pt ceb sv uk vi war zh ru af ast az bg zh-min-nan bn be ca cs cy da et el eo eu fa gl ko hi hr id he ka la lv lt hu mk ms min no nn ce uz kk ro simple sk sl sr sh fi ta tt th tg azb tr ur zh-yue hy my ace als am an hyw ban bjn map-bms ba be-tarask bcl bpy bar bs br cv nv eml hif fo fy ga gd gu hak ha hsb io ig ilo ia ie os is jv kn ht ku ckb ky mrj lb lij li lmo mai mg ml zh-classical mr xmf mzn cdo mn nap new ne frr oc mhr or as pa pnb ps pms nds crh qu sa sah sco sq scn si sd szl su sw tl shn te bug vec vo wa wuu yi yo diq bat-smg zu lad kbd ang smn ab roa-rup frp arc gn av ay bh bi bo bxr cbk-zam co za dag ary se pdc dv dsb myv ext fur gv gag inh ki glk gan guw xal haw rw kbp pam csb kw km kv koi kg gom ks gcr lo lbe ltg lez nia ln jbo lg mt mi tw mwl mdf mnw nqo fj nah na nds-nl nrm nov om pi pag pap pfl pcd krc kaa ksh rm rue sm sat sc trv stq nso sn cu so srn kab roa-tara tet tpi to chr tum tk tyv udm ug vep fiu-vro vls wo xh zea ty ak bm ch ny ee ff got iu ik kl mad cr pih ami pwn pnt dz rmy rn sg st tn ss ti din chy ts kcg ve
Portal di Ensiklopedia Dunia