线程池简介
本线程池采用C语言实现线程池linux,C++版本可参考基于C++11新特性手写线程池实现
线程池的场景:
线程池的一般特点:
线程池中线程数量的选择:
有一个经验公式: 线程数量 =(io等待时间+cpu运算时间)*核心数/cpu运算时间
因此可以根据经验公式得出下面两种场景的线程数量:
线程池的组成:
线程池结构体分析
由于C语言不像C++可以用类封装函数,因此线程池会使用结构体来封装一些变量或者函数指针。
task_t
typedef struct task_t {
handler_pt func;
void * arg;
} task_t;
封装任务的入口指针以及参数。
task_queue_t
typedef struct task_queue_t {
uint32_t head;
uint32_t tail;
uint32_t count;
task_t *queue;
} task_queue_t;
封装任务队列,为了不频繁移动队列中数据,此处采用头尾索引来标记任务。
thread_pool_t
struct thread_pool_t {
pthread_mutex_t mutex;
pthread_cond_t condition;
pthread_t *threads;
task_queue_t task_queue;
int closed;
int started;
int thrd_count;
int queue_size;
};
包含互斥锁,条件变量,任务队列等信息
线程池的组成 thread_pool_create
thread_pool_t *thread_pool_create(int thrd_count, int queue_size) {
thread_pool_t *pool;
if (thrd_count <= 0 || queue_size <= 0) {
return NULL;
}
pool = (thread_pool_t*) malloc(sizeof(*pool));
if (pool == NULL) {
return NULL;
}
pool->thrd_count = 0;
pool->queue_size = queue_size;
pool->task_queue.head = 0;
pool->task_queue.tail = 0;
pool->task_queue.count = 0;
pool->started = pool->closed = 0;
pool->task_queue.queue = (task_t*)malloc(sizeof(task_t)*queue_size);
if (pool->task_queue.queue == NULL) {
return NULL;
}
pool->threads = (pthread_t*) malloc(sizeof(pthread_t) * thrd_count);
if (pool->threads == NULL) {
return NULL;
}
int i = 0;
for (; i < thrd_count; i++) {
if (pthread_create(&(pool ->threads[i]), NULL, thread_worker, (void*)pool) != 0) {
return NULL;
}
pool->thrd_count++;
pool->started++;
}
return pool;
}
初始化一些线程的属性。
通过循环pthread_create函数创建子线程。
thread_pool_post
int thread_pool_post(thread_pool_t *pool, handler_pt func, void *arg) {
if (pool == NULL || func == NULL) {
return -1;
}
task_queue_t *task_queue = &(pool->task_queue);
if (pthread_mutex_lock(&(pool->mutex)) != 0) {
return -2;
}
if (pool->closed) {
pthread_mutex_unlock(&(pool->mutex));
return -3;
}
if (task_queue->count == pool->queue_size) {
pthread_mutex_unlock(&(pool->mutex));
return -4;
}
task_queue->queue[task_queue->tail].func = func;
task_queue->queue[task_queue->tail].arg = arg;
task_queue->tail = (task_queue->tail + 1) % pool->queue_size;
task_queue->count++;
if (pthread_cond_signal(&(pool->condition)) != 0) {
pthread_mutex_unlock(&(pool->mutex));
return -5;
}
pthread_mutex_unlock(&(pool->mutex));
return 0;
}
在任务队列保存任务
通过pthread_cond_signal通知子唤醒子线程的pthread_cond_wait
thread_pool_destroy
int thread_pool_destroy(thread_pool_t *pool) {
if (pool == NULL) {
return -1;
}
if (pthread_mutex_lock(&(pool->mutex)) != 0) {
return -2;
}
if (pool->closed) {
thread_pool_free(pool);
return -3;
}
pool->closed = 1;
if (pthread_cond_broadcast(&(pool->condition)) != 0 ||
pthread_mutex_unlock(&(pool->mutex)) != 0) {
thread_pool_free(pool);
return -4;
}
wait_all_done(pool);
thread_pool_free(pool);
return 0;
}
pool->closed = 1;
通过pthread_cond_broadcast唤醒线程池所有线程
wait_all_done
int wait_all_done(thread_pool_t *pool) {
int i, ret=0;
for (i=0; i < pool->thrd_count; i++) {
if (pthread_join(pool->threads[i], NULL) != 0) {
ret=1;
}
}
return ret;
}
使用pthread_join,等待所有子线程任务执行完毕,回收线程
thread_worker
static void *thread_worker(void *thrd_pool) {
thread_pool_t *pool = (thread_pool_t*)thrd_pool;
task_queue_t *que;
task_t task;
for (;;) {
pthread_mutex_lock(&(pool->mutex));
que = &pool->task_queue;
while (que->count == 0 && pool->closed == 0) {
pthread_cond_wait(&(pool->condition), &(pool->mutex));
}
if (pool->closed == 1) break;
task = que->queue[que->head];
que->head = (que->head + 1) % pool->queue_size;
que->count--;
pthread_mutex_unlock(&(pool->mutex));
(*(task.func))(task.arg);
}
pool->started--;
pthread_mutex_unlock(&(pool->mutex));
pthread_exit(NULL);
return NULL;
}
pthread_cond_wait等待任务的唤醒
(*(task.func))(task.arg);执行任务
文章参考与的C/C++linux服务器高级架构系统教程学习
(编辑:应用网_阳江站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|