c++版线程池和任务池示例
c++版线程池和任务池示例
发布时间:2016-12-28 来源:查字典编辑
摘要:commondef.h复制代码代码如下://单位秒,监测空闲列表时间间隔,在空闲队列中超过TASK_DESTROY_INTERVAL时间的任...

commondef.h

复制代码 代码如下:

//单位秒,监测空闲列表时间间隔,在空闲队列中超过TASK_DESTROY_INTERVAL时间的任务将被自动销毁

const int CHECK_IDLE_TASK_INTERVAL = 300;

//单位秒,任务自动销毁时间间隔

const int TASK_DESTROY_INTERVAL = 60;

//监控线程池是否为空时间间隔,微秒

const int IDLE_CHECK_POLL_EMPTY = 500;

//线程池线程空闲自动退出时间间隔 ,5分钟

const int THREAD_WAIT_TIME_OUT = 300;

taskpool.cpp

复制代码 代码如下:

#include "taskpool.h"

#include <string.h>

#include <stdio.h>

#include <pthread.h>

TaskPool::TaskPool(const int & poolMaxSize)

: m_poolSize(poolMaxSize)

, m_taskListSize(0)

, m_bStop(false)

{

pthread_mutex_init(&m_lock, NULL);

pthread_mutex_init(&m_idleMutex, NULL);

pthread_cond_init(&m_idleCond, NULL);

pthread_attr_t attr;

pthread_attr_init( &attr );

pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE ); // 让线程独立运行

pthread_create(&m_idleId, &attr, CheckIdleTask, this); //创建监测空闲任务进程

pthread_attr_destroy(&attr);

}

TaskPool::~TaskPool()

{

if(!m_bStop)

{

StopPool();

}

if(!m_taskList.empty())

{

std::list<Task*>::iterator it = m_taskList.begin();

for(; it != m_taskList.end(); ++it)

{

if(*it != NULL)

{

delete *it;

*it = NULL;

}

}

m_taskList.clear();

m_taskListSize = 0;

}

if(!m_idleList.empty())

{

std::list<Task*>::iterator it = m_idleList.begin();

for(; it != m_idleList.end(); ++it)

{

if(*it != NULL)

{

delete *it;

*it = NULL;

}

}

m_idleList.clear();

}

pthread_mutex_destroy(&m_lock);

pthread_mutex_destroy(&m_idleMutex);

pthread_cond_destroy(&m_idleCond);

}

void * TaskPool::CheckIdleTask(void * arg)

{

TaskPool * pool = (TaskPool*)arg;

while(1)

{

pool->LockIdle();

pool->RemoveIdleTask();

if(pool->GetStop())

{

pool->UnlockIdle();

break;

}

pool->CheckIdleWait();

pool->UnlockIdle();

}

}

void TaskPool::StopPool()

{

m_bStop = true;

LockIdle();

pthread_cond_signal(&m_idleCond); //防止监控线程正在等待,而引起无法退出的问题

UnlockIdle();

pthread_join(m_idleId, NULL);

}

bool TaskPool::GetStop()

{

return m_bStop;

}

void TaskPool::CheckIdleWait()

{

struct timespec timeout;

memset(&timeout, 0, sizeof(timeout));

timeout.tv_sec = time(0) + CHECK_IDLE_TASK_INTERVAL;

timeout.tv_nsec = 0;

pthread_cond_timedwait(&m_idleCond, &m_idleMutex, &timeout);

}

int TaskPool::RemoveIdleTask()

{

int iRet = 0;

std::list<Task*>::iterator it, next;

std::list<Task*>::reverse_iterator rit = m_idleList.rbegin();

time_t curTime = time(0);

for(; rit != m_idleList.rend(); )

{

it = --rit.base();

if(difftime(curTime,((*it)->last_time)) >= TASK_DESTROY_INTERVAL)

{

iRet++;

delete *it;

*it = NULL;

next = m_idleList.erase(it);

rit = std::list<Task*>::reverse_iterator(next);

}

else

{

break;

}

}

}

int TaskPool::AddTask(task_fun fun, void *arg)

{

int iRet = 0;

if(0 != fun)

{

pthread_mutex_lock(&m_lock);

if(m_taskListSize >= m_poolSize)

{

pthread_mutex_unlock(&m_lock);

iRet = -1; //task pool is full;

}

else

{

pthread_mutex_unlock(&m_lock);

Task * task = GetIdleTask();

if(NULL == task)

{

task = new Task;

}

if(NULL == task)

{

iRet = -2; // new failed

}

else

{

task->fun = fun;

task->data = arg;

pthread_mutex_lock(&m_lock);

m_taskList.push_back(task);

++m_taskListSize;

pthread_mutex_unlock(&m_lock);

}

}

}

return iRet;

}

Task* TaskPool::GetTask()

{

Task *task = NULL;

pthread_mutex_lock(&m_lock);

if(!m_taskList.empty())

{

task = m_taskList.front();

m_taskList.pop_front();

--m_taskListSize;

}

pthread_mutex_unlock(&m_lock);

return task;

}

void TaskPool::LockIdle()

{

pthread_mutex_lock(&m_idleMutex);

}

void TaskPool::UnlockIdle()

{

pthread_mutex_unlock(&m_idleMutex);

}

Task * TaskPool::GetIdleTask()

{

LockIdle();

Task * task = NULL;

if(!m_idleList.empty())

{

task = m_idleList.front();

m_idleList.pop_front();

}

UnlockIdle();

return task;

}

void TaskPool::SaveIdleTask(Task*task)

{

if(NULL != task)

{

task->fun = 0;

task->data = NULL;

task->last_time = time(0);

LockIdle();

m_idleList.push_front(task);

UnlockIdle();

}

}

taskpool.h

复制代码 代码如下:

#ifndef TASKPOOL_H

#define TASKPOOL_H

/* purpose @ 任务池,主要是缓冲外部高并发任务数,有manager负责调度任务

* 任务池可自动销毁长时间空闲的Task对象

* 可通过CHECK_IDLE_TASK_INTERVAL设置检查idle空闲进程轮训等待时间

* TASK_DESTROY_INTERVAL 设置Task空闲时间,超过这个时间值将会被CheckIdleTask线程销毁

* date @ 2013.12.23

* author @ haibin.wang

*/

#include <list>

#include <pthread.h>

#include "commondef.h"

//所有的用户操作为一个task,

typedef void (*task_fun)(void *);

struct Task

{

task_fun fun; //任务处理函数

void* data; //任务处理数据

time_t last_time; //加入空闲队列的时间,用于自动销毁

};

//任务池,所有任务会投递到任务池中,管理线程负责将任务投递给线程池

class TaskPool

{

public:

/* pur @ 初始化任务池,启动任务池空闲队列自动销毁线程

* para @ maxSize 最大任务数,大于0

*/

TaskPool(const int & poolMaxSize);

~TaskPool();

/* pur @ 添加任务到任务队列的尾部

* para @ task, 具体任务

* return @ 0 添加成功,负数 添加失败

*/

int AddTask(task_fun fun, void* arg);

/* pur @ 从任务列表的头获取一个任务

* return @ 如果列表中有任务则返回一个Task指针,否则返回一个NULL

*/

Task* GetTask();

/* pur @ 保存空闲任务到空闲队列中

* para @ task 已被调用执行的任务

* return @

*/

void SaveIdleTask(Task*task);

void StopPool();

public:

void LockIdle();

void UnlockIdle();

void CheckIdleWait();

int RemoveIdleTask();

bool GetStop();

private:

static void * CheckIdleTask(void *);

/* pur @ 获取空闲的task

* para @

* para @

* return @ NULL说明没有空闲的,否则从m_idleList中获取一个

*/

Task* GetIdleTask();

int GetTaskSize();

private:

int m_poolSize; //任务池大小

int m_taskListSize; // 统计taskList的大小,因为当List的大小会随着数量的增多而耗时增加

bool m_bStop; //是否停止

std::list<Task*> m_taskList;//所有待处理任务列表

std::list<Task*> m_idleList;//所有空闲任务列表

pthread_mutex_t m_lock; //对任务列表进行加锁,保证每次只能取一个任务

pthread_mutex_t m_idleMutex; //空闲任务队列锁

pthread_cond_t m_idleCond; //空闲队列等待条件

pthread_t m_idleId;;

};

#endif

threadpool.cpp

复制代码 代码如下:

/* purpose @ 线程池类,负责线程的创建与销毁,实现线程超时自动退出功能(半驻留)

* date @ 2014.01.03

* author @ haibin.wang

*/

#include "threadpool.h"

#include <errno.h>

#include <string.h>

/*

#include <iostream>

#include <stdio.h>

*/

Thread::Thread(bool detach, ThreadPool * pool)

: m_pool(pool)

{

pthread_attr_init(&m_attr);

if(detach)

{

pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_DETACHED ); // 让线程独立运行

}

else

{

pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_JOINABLE );

}

pthread_mutex_init(&m_mutex, NULL); //初始化互斥量

pthread_cond_init(&m_cond, NULL); //初始化条件变量

task.fun = 0;

task.data = NULL;

}

Thread::~Thread()

{

pthread_cond_destroy(&m_cond);

pthread_mutex_destroy(&m_mutex);

pthread_attr_destroy(&m_attr);

}

ThreadPool::ThreadPool()

: m_poolMax(0)

, m_idleNum(0)

, m_totalNum(0)

, m_bStop(false)

{

pthread_mutex_init(&m_mutex, NULL);

pthread_mutex_init(&m_runMutex,NULL);

pthread_mutex_init(&m_terminalMutex, NULL);

pthread_cond_init(&m_terminalCond, NULL);

pthread_cond_init(&m_emptyCond, NULL);

}

ThreadPool::~ThreadPool()

{

/*if(!m_threads.empty())

{

std::list<Thread*>::iterator it = m_threads.begin();

for(; it != m_threads.end(); ++it)

{

if(*it != NULL)

{

pthread_cond_destroy( &((*it)->m_cond) );

pthread_mutex_destroy( &((*it)->m_mutex) );

delete *it;

*it = NULL;

}

}

m_threads.clear();

}*/

pthread_mutex_destroy(&m_runMutex);

pthread_mutex_destroy(&m_terminalMutex);

pthread_mutex_destroy(&m_mutex);

pthread_cond_destroy(&m_terminalCond);

pthread_cond_destroy(&m_emptyCond);

}

int ThreadPool::InitPool(const int & poolMax, const int & poolPre)

{

if(poolMax < poolPre

|| poolPre < 0

|| poolMax <= 0)

{

return -1;

}

m_poolMax = poolMax;

int iRet = 0;

for(int i=0; i<poolPre; ++i)

{

Thread * thread = CreateThread();

if(NULL == thread)

{

iRet = -2;

}

}

if(iRet < 0)

{

std::list<Thread*>::iterator it = m_threads.begin();

for(; it!= m_threads.end(); ++it)

{

if(NULL != (*it) )

{

delete *it;

*it = NULL;

}

}

m_threads.clear();

m_totalNum = 0;

}

return iRet;

}

void ThreadPool::GetThreadRun(task_fun fun, void* arg)

{

//从线程池中获取一个线程

pthread_mutex_lock( &m_mutex);

if(m_threads.empty())

{

pthread_cond_wait(&m_emptyCond,&m_mutex); //阻塞等待有空闲线程

}

Thread * thread = m_threads.front();

m_threads.pop_front();

pthread_mutex_unlock( &m_mutex);

pthread_mutex_lock( &thread->m_mutex );

thread->task.fun = fun;

thread->task.data = arg;

pthread_cond_signal(&thread->m_cond); //触发线程WapperFun循环执行

pthread_mutex_unlock( &thread->m_mutex );

}

int ThreadPool::Run(task_fun fun, void * arg)

{

pthread_mutex_lock(&m_runMutex); //保证每次只能由一个线程执行

int iRet = 0;

if(m_totalNum <m_poolMax) //

{

if(m_threads.empty() && (NULL == CreateThread()) )

{

iRet = -1;//can not create new thread!

}

else

{

GetThreadRun(fun, arg);

}

}

else

{

GetThreadRun(fun, arg);

}

pthread_mutex_unlock(&m_runMutex);

return iRet;

}

void ThreadPool::StopPool(bool bStop)

{

m_bStop = bStop;

if(bStop)

{

//启动监控所有空闲线程是否退出的线程

Thread thread(false, this);

pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck , &thread); //启动监控所有线程退出线程

//阻塞等待所有空闲线程退出

pthread_join(thread.m_threadId, NULL);

}

/*if(bStop)

{

pthread_mutex_lock(&m_terminalMutex);

//启动监控所有空闲线程是否退出的线程

Thread thread(true, this);

pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck , &thread); //启动监控所有线程退出线程

//阻塞等待所有空闲线程退出

pthread_cond_wait(&m_terminalCond, & m_terminalMutex);

pthread_mutex_unlock(&m_terminalMutex);

}*/

}

bool ThreadPool::GetStop()

{

return m_bStop;

}

Thread * ThreadPool::CreateThread()

{

Thread * thread = NULL;

thread = new Thread(true, this);

if(NULL != thread)

{

int iret = pthread_create(&thread->m_threadId,&thread->m_attr, ThreadPool::WapperFun , thread); //通过WapperFun将线程加入到空闲队列中

if(0 != iret)

{

delete thread;

thread = NULL;

}

}

return thread;

}

void * ThreadPool::WapperFun(void*arg)

{

Thread * thread = (Thread*)arg;

if(NULL == thread || NULL == thread->m_pool)

{

return NULL;

}

ThreadPool * pool = thread->m_pool;

pool->IncreaseTotalNum();

struct timespec abstime;

memset(&abstime, 0, sizeof(abstime));

while(1)

{

if(0 != thread->task.fun)

{

thread->task.fun(thread->task.data);

}

if( true == pool->GetStop() )

{

break; //确定当前任务执行完毕后再判定是否退出线程

}

pthread_mutex_lock( &thread->m_mutex );

pool->SaveIdleThread(thread); //将线程加入到空闲队列中

abstime.tv_sec = time(0) + THREAD_WAIT_TIME_OUT;

abstime.tv_nsec = 0;

if(ETIMEDOUT == pthread_cond_timedwait( &thread->m_cond, &thread->m_mutex, &abstime )) //等待线程被唤醒 或超时自动退出

{

pthread_mutex_unlock( &thread->m_mutex );

break;

}

pthread_mutex_unlock( &thread->m_mutex );

}

pool->LockMutex();

pool->DecreaseTotalNum();

if(thread != NULL)

{

pool->RemoveThread(thread);

delete thread;

thread = NULL;

}

pool->UnlockMutex();

return 0;

}

void ThreadPool::SaveIdleThread(Thread * thread )

{

if(thread)

{

thread->task.fun = 0;

thread->task.data = NULL;

LockMutex();

if(m_threads.empty())

{

pthread_cond_broadcast(&m_emptyCond); //发送不空的信号,告诉run函数线程队列已经不空了

}

m_threads.push_front(thread);

UnlockMutex();

}

}

int ThreadPool::TotalThreads()

{

return m_totalNum;

}

void ThreadPool::SendSignal()

{

LockMutex();

std::list<Thread*>::iterator it = m_threads.begin();

for(; it!= m_threads.end(); ++it)

{

pthread_mutex_lock( &(*it)->m_mutex );

pthread_cond_signal(&((*it)->m_cond));

pthread_mutex_unlock( &(*it)->m_mutex );

}

UnlockMutex();

}

void * ThreadPool::TerminalCheck(void* arg)

{

Thread * thread = (Thread*)arg;

if(NULL == thread || NULL == thread->m_pool)

{

return NULL;

}

ThreadPool * pool = thread->m_pool;

while((false == pool->GetStop()) || pool->TotalThreads() >0 )

{

pool->SendSignal();

usleep(IDLE_CHECK_POLL_EMPTY);

}

//pool->TerminalCondSignal();

return 0;

}

void ThreadPool::TerminalCondSignal()

{

pthread_cond_signal(&m_terminalCond);

}

void ThreadPool::RemoveThread(Thread* thread)

{

m_threads.remove(thread);

}

void ThreadPool::LockMutex()

{

pthread_mutex_lock( &m_mutex);

}

void ThreadPool::UnlockMutex()

{

pthread_mutex_unlock( &m_mutex );

}

void ThreadPool::IncreaseTotalNum()

{

LockMutex();

m_totalNum++;

UnlockMutex();

}

void ThreadPool::DecreaseTotalNum()

{

m_totalNum--;

}

threadpool.h

复制代码 代码如下:

#ifndef THREADPOOL_H

#define THREADPOOL_H

/* purpose @ 线程池类,负责线程的创建与销毁,实现线程超时自动退出功能(半驻留)a

* 当线程池退出时创建TerminalCheck线程,负责监测线程池所有线程退出

* date @ 2013.12.23

* author @ haibin.wang

*/

#include <list>

#include <string>

#include "taskpool.h"

//通过threadmanager来控制任务调度进程

//threadpool的TerminalCheck线程负责监测线程池所有线程退出

class ThreadPool;

class Thread

{

public:

Thread(bool detach, ThreadPool * pool);

~Thread();

pthread_t m_threadId; //线程id

pthread_mutex_t m_mutex; //互斥锁

pthread_cond_t m_cond; //条件变量

pthread_attr_t m_attr; //线程属性

Task task; //

ThreadPool * m_pool; //所属线程池

};

//线程池,负责创建线程处理任务,处理完毕后会将线程加入到空闲队列中,从任务池中

class ThreadPool

{

public:

ThreadPool();

~ThreadPool();

/* pur @ 初始化线程池

* para @ poolMax 线程池最大线程数

* para @ poolPre 预创建线程数

* return @ 0:成功

* -1: parameter error, must poolMax > poolPre >=0

* -2: 创建线程失败

*/

int InitPool(const int & poolMax, const int & poolPre);

/* pur @ 执行一个任务

* para @ task 任务指针

* return @ 0任务分配成功,负值 任务分配失败,-1,创建新线程失败

*/

int Run(task_fun fun, void* arg);

/* pur @ 设置是否停止线程池工作

* para @ bStop true停止,false不停止

*/

void StopPool(bool bStop);

public: //此公有函数主要用于静态函数调用

/* pur @ 获取进程池的启停状态

* return @

*/

bool GetStop();

void SaveIdleThread(Thread * thread );

void LockMutex();

void UnlockMutex();

void DecreaseTotalNum();

void IncreaseTotalNum();

void RemoveThread(Thread* thread);

void TerminalCondSignal();

int TotalThreads();

void SendSignal();

private:

/* pur @ 创建线程

* return @ 非空 成功,NULL失败,

*/

Thread * CreateThread();

/* pur @ 从线程池中获取一个一个线程运行任务

* para @ fun 函数指针

* para @ arg 函数参数

* return @

*/

void GetThreadRun(task_fun fun, void* arg);

static void * WapperFun(void*);

static void * TerminalCheck(void*);//循环监测是否所有线程终止线程

private:

int m_poolMax;//线程池最大线程数

int m_idleNum; //空闲线程数

int m_totalNum; //当前线程总数 小于最大线程数

bool m_bStop; //是否停止线程池

pthread_mutex_t m_mutex; //线程列表锁

pthread_mutex_t m_runMutex; //run函数锁

pthread_mutex_t m_terminalMutex; //终止所有线程互斥量

pthread_cond_t m_terminalCond; //终止所有线程条件变量

pthread_cond_t m_emptyCond; //空闲线程不空条件变量

std::list<Thread*> m_threads; // 线程列表

};

#endif

threadpoolmanager.cpp

复制代码 代码如下:

#include "threadpoolmanager.h"

#include "threadpool.h"

#include "taskpool.h"

#include <errno.h>

#include <string.h>

/*#include <string.h>

#include <sys/time.h>

#include <stdio.h>*/

// struct timeval time_beg, time_end;

ThreadPoolManager::ThreadPoolManager()

: m_threadPool(NULL)

, m_taskPool(NULL)

, m_bStop(false)

{

pthread_mutex_init(&m_mutex_task,NULL);

pthread_cond_init(&m_cond_task, NULL);

/* memset(&time_beg, 0, sizeof(struct timeval));

memset(&time_end, 0, sizeof(struct timeval));

gettimeofday(&time_beg, NULL);*/

}

ThreadPoolManager::~ThreadPoolManager()

{

StopAll();

if(NULL != m_threadPool)

{

delete m_threadPool;

m_threadPool = NULL;

}

if(NULL != m_taskPool)

{

delete m_taskPool;

m_taskPool = NULL;

}

pthread_cond_destroy( &m_cond_task);

pthread_mutex_destroy( &m_mutex_task );

/*gettimeofday(&time_end, NULL);

long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec);

printf("manager total time = %dn", total);

gettimeofday(&time_beg, NULL);*/

}

int ThreadPoolManager::Init(

const int &tastPoolSize,

const int &threadPoolMax,

const int &threadPoolPre)

{

m_threadPool = new ThreadPool();

if(NULL == m_threadPool)

{

return -1;

}

m_taskPool = new TaskPool(tastPoolSize);

if(NULL == m_taskPool)

{

return -2;

}

if(0>m_threadPool->InitPool(threadPoolMax, threadPoolPre))

{

return -3;

}

//启动线程池

//启动任务池

//启动任务获取线程,从任务池中不断拿任务到线程池中

pthread_attr_t attr;

pthread_attr_init( &attr );

pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE );

pthread_create(&m_taskThreadId, &attr, TaskThread, this); //创建获取任务进程

pthread_attr_destroy(&attr);

return 0;

}

void ThreadPoolManager::StopAll()

{

m_bStop = true;

LockTask();

pthread_cond_signal(&m_cond_task);

UnlockTask();

pthread_join(m_taskThreadId, NULL);

//等待当前所有任务执行完毕

m_taskPool->StopPool();

m_threadPool->StopPool(true); // 停止线程池工作

}

void ThreadPoolManager::LockTask()

{

pthread_mutex_lock(&m_mutex_task);

}

void ThreadPoolManager::UnlockTask()

{

pthread_mutex_unlock(&m_mutex_task);

}

void* ThreadPoolManager::TaskThread(void* arg)

{

ThreadPoolManager * manager = (ThreadPoolManager*)arg;

while(1)

{

manager->LockTask(); //防止任务没有执行完毕发送了停止信号

while(1) //将任务队列中的任务执行完再退出

{

Task * task = manager->GetTaskPool()->GetTask();

if(NULL == task)

{

break;

}

else

{

manager->GetThreadPool()->Run(task->fun, task->data);

manager->GetTaskPool()->SaveIdleTask(task);

}

}

if(manager->GetStop())

{

manager->UnlockTask();

break;

}

manager->TaskCondWait(); //等待有任务的时候执行

manager->UnlockTask();

}

return 0;

}

ThreadPool * ThreadPoolManager::GetThreadPool()

{

return m_threadPool;

}

TaskPool * ThreadPoolManager::GetTaskPool()

{

return m_taskPool;

}

int ThreadPoolManager::Run(task_fun fun,void* arg)

{

if(0 == fun)

{

return 0;

}

if(!m_bStop)

{

int iRet = m_taskPool->AddTask(fun, arg);

if(iRet == 0 && (0 == pthread_mutex_trylock(&m_mutex_task)) )

{

pthread_cond_signal(&m_cond_task);

UnlockTask();

}

return iRet;

}

else

{

return -3;

}

}

bool ThreadPoolManager::GetStop()

{

return m_bStop;

}

void ThreadPoolManager::TaskCondWait()

{

struct timespec to;

memset(&to, 0, sizeof to);

to.tv_sec = time(0) + 60;

to.tv_nsec = 0;

pthread_cond_timedwait( &m_cond_task, &m_mutex_task, &to); //60秒超时

}

threadpoolmanager.h

复制代码 代码如下:

#ifndef THREADPOOLMANAGER_H

#define THREADPOOLMANAGER_H

/* purpose @

* 基本流程:

* 管理线程池和任务池,先将任务加入任务池,然后由TaskThread负责从任务池中将任务取出放入到线程池中

* 基本功能:

* 1、工作线程可以在业务不忙的时候自动退出部分长时间不使用的线程

* 2、任务池可以在业务不忙的时候自动释放长时间不使用的资源(可通过commondef.h修改)

* 3、当程序退时不再向任务池中添加任务,当任务池中所有任务执行完毕后才退出相关程序(做到程序的安全退出)

* 线程资源:

* 如果不预分配任何处理线程的话,ThreadPool只有当有任务的时候才实际创建需要的线程,最大线程创建数为用户指定

* 当manager销毁的时候,manager会创建一个监控所有任务执行完毕的监控线程,只有当所有任务执行完毕后manager才销毁

* 线程最大数为:1个TaskPool线程 + 1个manager任务调度线程 + ThreadPool最大线程数 + 1个manager退出监控线程 + 1线程池所有线程退出监控线程

* 线程最小数为:1个TaskPool创建空闲任务资源销毁监控线程 + 1个manager创建任务调度线程

* 使用方法:

* ThreadPoolManager manager;

* manager.Init(100000, 50, 5);//初始化一个任务池为10000,线程池最大线程数50,预创建5个线程的管理器

* manager.run(fun, data); //添加执行任务到manager中,fun为函数指针,data为fun需要传入的参数,data可以为NULL

*

* date @ 2013.12.23

* author @ haibin.wang

*

* 详细参数控制可以修改commondef.h中的相关变量值

*/

#include <pthread.h>

typedef void (*task_fun)(void *);

class ThreadPool;

class TaskPool;

class ThreadPoolManager

{

public:

ThreadPoolManager();

~ThreadPoolManager();

/* pur @ 初始化线程池与任务池,threadPoolMax > threadPoolPre > threadPoolMin >= 0

* para @ tastPoolSize 任务池大小

* para @ threadPoolMax 线程池最大线程数

* para @ threadPoolPre 预创建线程数

* return @ 0:初始化成功,负数 初始化失败

* -1:创建线程池失败

* -2:创建任务池失败

* -3:线程池初始化失败

*/

int Init(const int &tastPoolSize,

const int &threadPoolMax,

const int &threadPoolPre);

/* pur @ 执行一个任务

* para @ fun 需要执行的函数指针

* para @ arg fun需要的参数,默认为NULL

* return @ 0 任务分配成功,负数 任务分配失败

* -1:任务池满

* -2:任务池new失败

* -3:manager已经发送停止信号,不再接收新任务

*/

int Run(task_fun fun,void* arg=NULL);

public: //以下public函数主要用于静态函数调用

bool GetStop();

void TaskCondWait();

TaskPool * GetTaskPool();

ThreadPool * GetThreadPool();

void LockTask();

void UnlockTask();

void LockFull();

private:

static void * TaskThread(void*); //任务处理线程

void StopAll();

private:

ThreadPool *m_threadPool; //线程池

TaskPool * m_taskPool; //任务池

bool m_bStop; // 是否终止管理器

pthread_t m_taskThreadId; // TaskThread线程id

pthread_mutex_t m_mutex_task;

pthread_cond_t m_cond_task;

};

#endif

main.cpp

复制代码 代码如下:

#include <iostream>

#include <string>

#include "threadpoolmanager.h"

#include <sys/time.h>

#include <string.h>

#include <stdlib.h>

#include <pthread.h>

using namespace std;

int seq = 0;

int billNum =0;

int inter = 1;

pthread_mutex_t m_mutex;

void myFunc(void*arg)

{

pthread_mutex_lock(&m_mutex);

seq++;

if(seq%inter == 0 )

{

cout << "fun 1=" << seq << endl;

}

if(seq>=1000000000)

{

cout << "billion" << endl;

seq = 0;

billNum++;

}

pthread_mutex_unlock(&m_mutex);

//sleep();

}

int main(int argc, char** argv)

{

if(argc != 6)

{

cout << "必须有5个参数 任务执行次数 任务池大小 线程池大小 预创建线程数 输出间隔" << endl;

cout << "eg: ./test 999999 10000 100 10 20" << endl;

cout << "上例代表创建一个间隔20个任务输出,任务池大小为10000,线程池大小为100,预创建10个线程,执行任务次数为:999999" << endl;

return 0;

}

double loopSize = atof(argv[1]);

int taskSize = atoi(argv[2]);

int threadPoolSize = atoi(argv[3]);

int preSize = atoi(argv[4]);

inter = atoi(argv[5]);

pthread_mutex_init(&m_mutex,NULL);

ThreadPoolManager manager;

if(0>manager.Init(taskSize, threadPoolSize, preSize))

{

cout << "初始化失败" << endl;

return 0;

}

cout << "*******************初始化完成*********************" << endl;

struct timeval time_beg, time_end;

memset(&time_beg, 0, sizeof(struct timeval));

memset(&time_end, 0, sizeof(struct timeval));

gettimeofday(&time_beg, NULL);

double i=0;

for(; i<loopSize; ++i)

{

while(0>manager.Run(myFunc,NULL))

{

usleep(100);

}

}

gettimeofday(&time_end, NULL);

long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec);

cout << "total time =" << total << endl;

cout << "total num =" << i << " billion num=" << billNum<< endl;

cout << __FILE__ << "将关闭所有线程" << endl;

//pthread_mutex_destroy(&m_mutex);

return 0;

}

推荐文章
猜你喜欢
附近的人在看
推荐阅读
拓展阅读
相关阅读
网友关注
最新C语言学习
热门C语言学习
编程开发子分类