线程池中的线程,在任务队列为空的时候线程池linux,等待任务的到来,任务队列中有任务时,则依次获取任务来执行,任务队列需要同步。
Linux线程同步有多种方法:互斥量、信号量、条件变量等。
下面是根据互斥量、信号量、条件变量封装的三个类。
线程池中用到了互斥量和信号量。
#ifndef _LOCKER_H_
#define _LOCKER_H_
#include
#include
#include
/*信号量的类*/
class sem_locker
{
private:
sem_t m_sem;
public:
//初始化信号量
sem_locker()
{
if(sem_init(&m_sem, 0, 0) != 0)
printf("sem init error\n");
}
//销毁信号量
~sem_locker()
{
sem_destroy(&m_sem);
}
//等待信号量
bool wait()
{
return sem_wait(&m_sem) == 0;
}
//添加信号量
bool add()
{
return sem_post(&m_sem) == 0;
}
};
/*互斥 locker*/
class mutex_locker
{
private:
pthread_mutex_t m_mutex;
public:
mutex_locker()
{
if(pthread_mutex_init(&m_mutex, NULL) != 0)
printf("mutex init error!");
}
~mutex_locker()
{
pthread_mutex_destroy(&m_mutex);
}
bool mutex_lock() //lock mutex
{
return pthread_mutex_lock(&m_mutex) == 0;
}
bool mutex_unlock() //unlock
{
return pthread_mutex_unlock(&m_mutex) == 0;
}
};
/*条件变量 locker*/
class cond_locker
{
private:
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
public:
// 初始化 m_mutex and m_cond
cond_locker()
{
if(pthread_mutex_init(&m_mutex, NULL) != 0)
printf("mutex init error");
if(pthread_cond_init(&m_cond, NULL) != 0)
{ //条件变量初始化是被,释放初始化成功的mutex
pthread_mutex_destroy(&m_mutex);
printf("cond init error");
}
}
// destroy mutex and cond
~cond_locker()
{
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_cond);
}
//等待条件变量
bool wait()
{
int ans = 0;
pthread_mutex_lock(&m_mutex);
ans = pthread_cond_wait(&m_cond, &m_mutex);
pthread_mutex_unlock(&m_mutex);
return ans == 0;
}
//唤醒等待条件变量的线程
bool signal()
{
return pthread_cond_signal(&m_cond) == 0;
}
};
#endif
下面的是线程池类,是一个模版类:
#ifndef _PTHREAD_POOL_
#define _PTHREAD_POOL_
#include "locker.h"
#include
#include
#include
#include
#include
#include
template
class threadpool
{
private:
int thread_number; //线程池的线程数
int max_task_number; //任务队列中的最大任务数
pthread_t *all_threads; //线程数组
std::list task_queue; //任务队列
mutex_locker queue_mutex_locker; //互斥锁
sem_locker queue_sem_locker; //信号量
bool is_stop; //是否结束线程
public:
threadpool(int thread_num = 20, int max_task_num = 30);
~threadpool();
bool append_task(T *task);
void start();
void stop();
private:
//线程运行的函数。执行run()函数
static void *worker(void *arg);
void run();
};
template
threadpool::threadpool(int thread_num, int max_task_num):
thread_number(thread_num), max_task_number(max_task_num),
is_stop(false), all_threads(NULL)
{
if((thread_num <= 0) || max_task_num <= 0)
printf("threadpool can't init because thread_number = 0"
" or max_task_number = 0");
all_threads = new pthread_t[thread_number];
if(!all_threads)
printf("can't init threadpool because thread array can't new");
}
template
threadpool::~threadpool()
{
delete []all_threads;
is_stop = true;
}
template
void threadpool::stop()
{
is_stop = true;
//queue_sem_locker.add();
}
template
void threadpool::start()
{
for(int i = 0; i < thread_number; ++i)
{
printf("create the %dth pthread\n", i);
if(pthread_create(all_threads + i, NULL, worker, this) != 0)
{//创建线程失败,清除成功申请的资源并抛出异常
delete []all_threads;
throw std::exception();
}
if(pthread_detach(all_threads[i]))
{//将线程设置为脱离线程,失败则清除成功申请的资源并抛出异常
delete []all_threads;
throw std::exception();
}
}
}
//添加任务进入任务队列
template
bool threadpool::append_task(T *task)
{ //获取互斥锁
queue_mutex_locker.mutex_lock();
//判断队列中任务数是否大于最大任务数
if(task_queue.size() > max_task_number)
{//是则释放互斥锁
queue_mutex_locker.mutex_unlock();
return false;
}
//添加进入队列
task_queue.push_back(task);
queue_mutex_locker.mutex_unlock();
//唤醒等待任务的线程
queue_sem_locker.add();
return true;
}
template
void *threadpool::worker(void *arg)
{
threadpool *pool = (threadpool *)arg;
pool->run();
return pool;
}
template
void threadpool::run()
{
while(!is_stop)
{ //等待任务
queue_sem_locker.wait();
if(errno == EINTR)
{
printf("errno");
continue;
}
//获取互斥锁
queue_mutex_locker.mutex_lock();
//判断任务队列是否为空
if(task_queue.empty())
{
queue_mutex_locker.mutex_unlock();
continue;
}
//获取队头任务并执行
T *task = task_queue.front();
task_queue.pop_front();
queue_mutex_locker.mutex_unlock();
if(!task)
continue;
// printf("pthreadId = %ld\n", (unsigned long)pthread_self());
task->doit(); //doit是T对象中的方法
}
//测试用
printf("close %ld\n", (unsigned long)pthread_self());
}
#endif
以上参考《Linux高性能服务器编程》
写个程序对线程池进行测试:
#include
#include
#include
#include "thread_pool.h"
class task
{
private:
int number;
public:
task(int num) : number(num)
{
}
~task()
{
}
void doit()
{
printf("this is the %dth task\n", number);
}
};
int main()
{
task *ta;
threadpool pool(10, 15);
// pool.start();
for(int i = 0; i < 20; ++i)
{
ta = new task(i);
// sleep(2);
pool.append_task(ta);
}
pool.start();
sleep(10);
printf("close the thread pool\n");
pool.stop();
pause();
return 0;
}
经测试,线程池可以正常使用。
下一篇博客,使用线程池来实现回射服务器,测试可以达到多大的并发量。
(编辑:成都站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|