C++线程池的设计与测试

openkk 12年前

编写了一个最基本的线程池类,处理用c_work表示的工作任务。

///////////////////////////////////////////////////////  //线程池类   ///////////////////////////////////////////////////////  #include <pthread.h>  #include <stdlib.h>  #include <stdio.h>  #include <unistd.h>  #include <assert.h>    const int DEFAULT_MAX_THREAD_NUM = 10;  const int MAX_WORK_NUM = 100000;  //c_worker类  class c_work  {   public:    c_work():process(NULL), arg(NULL), next(NULL){}    c_work(void *(*prcss)(void *), void *arg):     process(prcss), arg(arg), next(NULL) {}    ~c_work();       void *(*process)(void *);    void *arg;    unsigned char type; //最高位表示arg是否需要delete操作    c_work *next;  };    c_work::~c_work()  {   unsigned char ifdel = type >> 7;   if (ifdel)   {    delete arg;    arg = NULL;   }  }    class c_thread_pool  {   public:    c_thread_pool();    c_thread_pool(const int max_thread_num);    ~c_thread_pool();      int add_work(c_work work);    static void *thread_routine(void *arg);      pthread_mutex_t queue_lock;    pthread_cond_t queue_cond;    // private:    c_work *queue_head;    c_work *queue_tail;    int shutdown;      pthread_t *threadid;    int max_thread_num;    int cur_queue_size;  };    c_thread_pool::c_thread_pool()  {     pthread_mutex_init(&queue_lock, NULL);   pthread_cond_init(&queue_cond, NULL);     //工作队列初始化   queue_head = NULL;   queue_tail = NULL;      max_thread_num = max_thread_num;   cur_queue_size = 0;     shutdown = 0;     max_thread_num = DEFAULT_MAX_THREAD_NUM;   threadid = new pthread_t[max_thread_num];   int i = 0;     for (i = 0; i < max_thread_num; i++)   {    pthread_create(&(threadid[i]), NULL, thread_routine, (void*)this);   }  }      c_thread_pool::c_thread_pool(int max_thread_num)  {     pthread_mutex_init(&queue_lock, NULL);   pthread_cond_init(&queue_cond, NULL);     //工作队列初始化   queue_head = NULL;   queue_tail = NULL;      max_thread_num = max_thread_num;   cur_queue_size = 0;     threadid = new pthread_t[max_thread_num];   int i = 0;   for (i = 0; i < max_thread_num; i++)   {    pthread_create(&(threadid[i]), NULL, thread_routine, (void*)this);   }  }    /*向线程池中的任务队列加入任务*/  int c_thread_pool::add_work(c_work work)  {   c_work *newwork = new c_work;   newwork->process = work.process;   newwork->arg = work.arg;   newwork->next = NULL;     pthread_mutex_lock(&queue_lock);      /*将任务加入到等待队列中*/   if (queue_head != NULL && queue_tail != NULL)   {    queue_tail->next = newwork;    queue_tail = newwork;    }   else   {    //空队列    queue_head = newwork;    queue_tail = newwork;   }     cur_queue_size++;   pthread_mutex_unlock(&queue_lock);   /*等待队列中有任务了,唤醒一个等待线程,注意如果所有线程都在忙碌,这句没有任何作用*/   pthread_cond_signal(&(queue_cond));    printf("add work returned!\n");   return 0;  }      void* c_thread_pool::thread_routine(void *arg)  {   c_thread_pool *pool = (c_thread_pool *)arg;   int i = 0;   while (1)   {    pthread_mutex_lock(&(pool->queue_lock));    //如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意     // pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁      //标注:注意这一如果任务队列不为空的话,while语句将被跳过,直接执行下面的调用。    while (pool->cur_queue_size == 0 && pool->shutdown)    {     pthread_cond_wait(&(pool->queue_cond), &(pool->queue_lock));    }        //等待队列长度减去1,并取出链表中的头元素    if (pool->cur_queue_size > 0 && pool->queue_head != NULL)    {     printf("IN THREAD ROUTINE size = %d && queue head is not NULL\n", pool->cur_queue_size);     pool->cur_queue_size--;     c_work *work = pool->queue_head;     pool->queue_head = work->next;     pthread_mutex_unlock(&(pool->queue_lock));       //调用回调函数,执行测试任务     //////////////////////////////////////////     (*(work->process))(work->arg);     free(work);     work = NULL;    }    else //不可达    {     pthread_mutex_unlock(&(pool->queue_lock));    }   }     }    c_thread_pool::~c_thread_pool()  {   for (int i = 0; i < max_thread_num; ++i)     pthread_cancel(threadid[i]);   for (c_work *w_t = queue_head; w_t != NULL;)   {    c_work *temp = w_t->next;    delete w_t;    w_t = temp;   }   delete [] threadid;  }
转自:http://blog.csdn.net/naturebe/article/details/7901130