线程池原理详解及如何用C语言实现线程池
线程池是一种多线程处理形式,大多用于高并发服务器上,它能合理有效的利用高并发服务器上的线程资源;线程与进程用于处理各项分支子功能,我们通常的操作是:接收消息 ==> 消息分类 ==> 线程创建 ==> 传递消息到子线程 ==> 线程分离 ==> 在子线程中执行任务 ==> 任务结束退出; 对大多数小型局域网的通信来说,上述方法足够满足需求;但当我们的通信范围扩大到广域网或大型局域网通信中时,我们将面临大量消息频繁请求服务器;在这种情况下,创建与销毁线程都已经成为一种奢侈的开销,特别对于嵌入式服务器来说更应保证内存资源的合理利用; 因此,线程池技术应运而生;线程池允许一个线程可以多次复用,且每次复用的线程内部的消息处理可以不相同,将创建与销毁的开销省去而不必来一个请求开一个线程; 结构讲解: 线程池是一个抽象的概念,其内部由任务队列,一堆线程,管理者线程组成; 我们将以上图为例,实现一个最基础的线程池,接下来将分部分依次讲解;讲解顺序为:1.线程池总体结构 2.线程数组 3.任务队列 4.管理者线程 5.使用线程池接口的例子 一、线程池总体结构 这里讲解线程池在逻辑上的结构体;看下方代码,该结构体threadpool_t中包含线程池状态信息,任务队列信息以及多线程操作中的互斥锁;在任务结构体中包含了一个可以放置多种不同任务函数的函数指针,一个传入该任务函数的void*类型的参数; 注意:在使用时需要将你的消息分类处理函数装入任务的(*function);然后放置到任务队列并通知空闲线程; 线程池状态信息:描述当前线程池的基本信息,如是否开启、最小线程数、最大线程数、存活线程数、忙线程数、待销毁线程数等… … 任务队列信息:描述当前任务队列基本信息,如最大任务数、队列不为满条件变量、队列不为空条件变量等… … 多线程互斥锁:保证在同一时间点上只有一个线程在任务队列中取任务并修改任务队列信息、修改线程池信息; 函数指针:在打包消息阶段,将分类后的消息处理函数放在(*function); void*类型参数:用于传递消息处理函数需要的信息; /*任务*/ typedef struct { void *(*function)(void *); void *arg; } threadpool_task_t; /*线程池管理*/ struct threadpool_t{ pthread_mutex_t lock; /* 锁住整个结构体 */ pthread_mutex_t thread_counter; /* 用于使用忙线程数时的锁 */ pthread_cond_t queue_not_full; /* 条件变量,任务队列不为满 */ pthread_cond_t queue_not_empty; /* 任务队列不为空 */ pthread_t *threads; /* 存放线程的tid,实际上就是管理了线 数组 */ pthread_t admin_tid; /* 管理者线程tid */ threadpool_task_t *task_queue; /* 任务队列 */ /*线程池信息*/ int min_thr_num; /* 线程池中最小线程数 */ int max_thr_num; /* 线程池中最大线程数 */ int live_thr_num; /* 线程池中存活的线程数 */ int busy_thr_num; /* 忙线程,正在工作的线程 */ int wait_exit_thr_num; /* 需要销毁的线程数 */ /*任务队列信息*/ int queue_front; /* 队头 */ int queue_rear; /* 队尾 */ int queue_size; /* 存在的任务数 */ int queue_max_size; /* 队列能容纳的最大任务数 */ /*线程池状态*/ int shutdown; /* true为关闭 */ }; **/*创建线程池*/** threadpool_t * threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size) { /* 最小线程数 最大线程数 最大任务数*/ int i; threadpool_t *pool = NULL; do { /* 线程池空间开辟 */ if ((pool=(threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) { printf("malloc threadpool false; n"); break; } /*信息初始化*/ pool->min_thr_num = min_thr_num; pool->max_thr_num = max_thr_num; pool->busy_thr_num = 0; pool->live_thr_num = min_thr_num; pool->wait_exit_thr_num = 0; pool->queue_front = 0; pool->queue_rear = 0; pool->queue_size = 0; pool->queue_max_size = queue_max_size; pool->shutdown = false; /* 根据最大线程数,给工作线程数组开空间,清0 */ pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); if (pool->threads == NULL) { printf("malloc threads false;n"); break; } memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num); /* 队列开空间 */ pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size); if (pool->task_queue == NULL) { printf("malloc task queue false;n"); break; } /* 初始化互斥锁和条件变量 */ if ( pthread_mutex_init(&(pool->lock), NULL) != 0 || pthread_mutex_init(&(pool->thread_counter), NULL) !=0 || pthread_cond_init(&(pool->queue_not_empty), NULL) !=0 || pthread_cond_init(&(pool->queue_not_full), NULL) !=0) { printf("init lock or cond false;n"); break; } /* 启动min_thr_num个工作线程 */ for (i=0; i<min_thr_num; i++) { /* pool指向当前线程池 threadpool_thread函数在后面讲解 */ pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool); printf("start thread 0x%x... n", (unsigned int)pool->threads[i]); } /* 管理者线程 admin_thread函数在后面讲解 */ pthread_create(&(pool->admin_tid), NULL, admin_thread, (void *)pool); return pool; } while(0); /* 释放pool的空间 */ threadpool_free(pool); return NULL; } 二、线程数组 (编辑:应用网_阳江站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |