线程池原理详解及如何用C语言实现线程池
任务队列的存在形式与线程数组相似;在线程池初始化时根据传入的最大任务数开辟空间;当服务器前方后请求到来后,分类并打包消息成为任务,将任务放入任务队列并通知空闲线程来取;不同之处在于任务队列有明显的先后顺序,先进先出;而线程数组中的线程则是一个竞争关系去拿到互斥锁争取任务; /*向线程池的任务队列中添加一个任务*/ int threadpool_add_task(threadpool_t *pool, void *(*function)(void *arg), void *arg) { pthread_mutex_lock(&(pool->lock)); /*如果队列满了,调用wait阻塞*/ while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) pthread_cond_wait(&(pool->queue_not_full), &(pool->lock)); /*如果线程池处于关闭状态*/ if (pool->shutdown) { pthread_mutex_unlock(&(pool->lock)); return -1; } /*清空工作线程的回调函数的参数arg*/ if (pool->task_queue[pool->queue_rear].arg != NULL) { free(pool->task_queue[pool->queue_rear].arg); pool->task_queue[pool->queue_rear].arg = NULL; } /*添加任务到任务队列*/ pool->task_queue[pool->queue_rear].function = function; pool->task_queue[pool->queue_rear].arg = arg; pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* 逻辑环 */ pool->queue_size++; /*添加完任务后,队列就不为空了,唤醒线程池中的一个线程*/ pthread_cond_signal(&(pool->queue_not_empty)); pthread_mutex_unlock(&(pool->lock)); return 0; } 四、管理者线程 作为线程池的管理者,该线程的主要功能包括:检查线程池内线程的存活状态,工作状态;负责根据服务器当前的请求状态去动态的增加或删除线程,保证线程池中的线程数量维持在一个合理高效的平衡上; 说到底,它就是一个单独的线程,定时的去检查,根据我们的一个维持平衡算法去增删线程; /*管理线程*/ void * admin_thread(void *threadpool) { int i; threadpool_t *pool = (threadpool_t *)threadpool; while (!pool->shutdown) { printf("admin -----------------n"); sleep(DEFAULT_TIME); /*隔一段时间再管理*/ pthread_mutex_lock(&(pool->lock)); /*加锁*/ int queue_size = pool->queue_size; /*任务数*/ int live_thr_num = pool->live_thr_num; /*存活的线程数*/ pthread_mutex_unlock(&(pool->lock)); /*解锁*/ pthread_mutex_lock(&(pool->thread_counter)); int busy_thr_num = pool->busy_thr_num; /*忙线程数*/ pthread_mutex_unlock(&(pool->thread_counter)); printf("admin busy live -%d--%d-n", busy_thr_num, live_thr_num); /*创建新线程 实际任务数量大于 最小正在等待的任务数量,存活线程数小于最大线程数*/ if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num <= pool->max_thr_num) { printf("admin add-----------n"); pthread_mutex_lock(&(pool->lock)); int add=0; /*一次增加 DEFAULT_THREAD_NUM 个线程*/ for (i=0; i<pool->max_thr_num && add<DEFAULT_THREAD_NUM && pool->live_thr_num < pool->max_thr_num; i++) { if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) { pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool); add++; pool->live_thr_num++; printf("new thread -----------------------n"); } } pthread_mutex_unlock(&(pool->lock)); } /*销毁多余的线程 忙线程x2 都小于 存活线程,并且存活的大于最小线程数*/ if ((busy_thr_num*2) < live_thr_num && live_thr_num > pool->min_thr_num) { // printf("admin busy --%d--%d----n", busy_thr_num, live_thr_num); /*一次销毁DEFAULT_THREAD_NUM个线程*/ pthread_mutex_lock(&(pool->lock)); pool->wait_exit_thr_num = DEFAULT_THREAD_NUM; pthread_mutex_unlock(&(pool->lock)); for (i=0; i<DEFAULT_THREAD_NUM; i++) { //通知正在处于空闲的线程,自杀 pthread_cond_signal(&(pool->queue_not_empty)); printf("admin cler --n"); } } } return NULL; /*线程是否存活*/ int is_thread_alive(pthread_t tid) { int kill_rc = pthread_kill(tid, 0); //发送0号信号,测试是否存活 if (kill_rc == ESRCH) //线程不存在 { return false; } return true; } 五、释放 /*释放线程池*/ int threadpool_free(threadpool_t *pool) { if (pool == NULL) return -1; if (pool->task_queue) free(pool->task_queue); if (pool->threads) { free(pool->threads); pthread_mutex_lock(&(pool->lock)); /*先锁住再销毁*/ pthread_mutex_destroy(&(pool->lock)); pthread_mutex_lock(&(pool->thread_counter)); pthread_mutex_destroy(&(pool->thread_counter)); pthread_cond_destroy(&(pool->queue_not_empty)); pthread_cond_destroy(&(pool->queue_not_full)); } free(pool); pool = NULL; return 0; } /*销毁线程池*/ int threadpool_destroy(threadpool_t *pool) { int i; if (pool == NULL) { return -1; } pool->shutdown = true; /*销毁管理者线程*/ pthread_join(pool->admin_tid, NULL); //通知所有线程去自杀(在自己领任务的过程中) for (i=0; i<pool->live_thr_num; i++) { pthread_cond_broadcast(&(pool->queue_not_empty)); } /*等待线程结束 先是pthread_exit 然后等待其结束*/ for (i=0; i<pool->live_thr_num; i++) { pthread_join(pool->threads[i], NULL); } threadpool_free(pool); return 0; } 六、接口 /* 线程池初始化,其管理者线程及工作线程都会启动 */ threadpool_t *thp = threadpool_create(10, 100, 100); printf("threadpool init ... ... n"); /* 接收到任务后添加 */ threadpool_add_task(thp, do_work, (void *)p); // ... ... /* 销毁 */ threadpool_destroy(thp); (编辑:应用网_阳江站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |