加入收藏 | 设为首页 | 会员中心 | 我要投稿 应用网_阳江站长网 (https://www.0662zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 服务器 > 搭建环境 > Linux > 正文

手写Linux/C语言线程线程池实现

发布时间:2022-10-20 21:04:06 所属栏目:Linux 来源:转载
导读: 线程池简介
本线程池采用C语言实现线程池linux,C++版本可参考基于C++11新特性手写线程池实现
线程池的场景:
线程池的一般特点:
线程池中线程数量的选择:
有一个经验公式: 线程数量 =(

线程池简介

本线程池采用C语言实现线程池linux,C++版本可参考基于C++11新特性手写线程池实现

线程池的场景:

线程池的一般特点:

线程池中线程数量的选择:

有一个经验公式: 线程数量 =(io等待时间+cpu运算时间)*核心数/cpu运算时间

因此可以根据经验公式得出下面两种场景的线程数量:

线程池的组成:

线程池结构体分析

由于C语言不像C++可以用类封装函数,因此线程池会使用结构体来封装一些变量或者函数指针。

task_t

typedef struct task_t {
    handler_pt func;
    void * arg;
} task_t;

封装任务的入口指针以及参数。

task_queue_t

typedef struct task_queue_t {
    uint32_t head;
    uint32_t tail;
    uint32_t count;
    task_t *queue;
} task_queue_t;

封装任务队列,为了不频繁移动队列中数据,此处采用头尾索引来标记任务。

thread_pool_t

struct thread_pool_t {
    pthread_mutex_t mutex;
    pthread_cond_t condition;
    pthread_t *threads;
    task_queue_t task_queue;
    int closed;
    int started; // 当前运行的线程数
    int thrd_count;
    int queue_size;
};

包含互斥锁,条件变量,任务队列等信息

线程池的组成 thread_pool_create

thread_pool_t *thread_pool_create(int thrd_count, int queue_size) {
    thread_pool_t *pool;
    if (thrd_count <= 0 || queue_size <= 0) {
        return NULL;
    }
    pool = (thread_pool_t*) malloc(sizeof(*pool));
    if (pool == NULL) {
        return NULL;
    }
    pool->thrd_count = 0;
    pool->queue_size = queue_size;
    pool->task_queue.head = 0;
    pool->task_queue.tail = 0;
    pool->task_queue.count = 0;
    pool->started = pool->closed = 0;
    pool->task_queue.queue = (task_t*)malloc(sizeof(task_t)*queue_size);
    if (pool->task_queue.queue == NULL) {
        // TODO: free pool
        return NULL;
    }
    pool->threads = (pthread_t*) malloc(sizeof(pthread_t) * thrd_count);
    if (pool->threads == NULL) {
        // TODO: free pool
        return NULL;
    }
    int i = 0;
    for (; i < thrd_count; i++) {
        if (pthread_create(&(pool ->threads[i]), NULL, thread_worker, (void*)pool) != 0) {
            // TODO: free pool
            return NULL;
        }
        pool->thrd_count++;
        pool->started++;
    }
    return pool;
}

初始化一些线程的属性。

通过循环pthread_create函数创建子线程。

thread_pool_post

int thread_pool_post(thread_pool_t *pool, handler_pt func, void *arg) {
    if (pool == NULL || func == NULL) {
        return -1;
    }
    task_queue_t *task_queue = &(pool->task_queue);
//此处用自旋锁会更节省消耗,因为锁里面的逻辑比较简单
    if (pthread_mutex_lock(&(pool->mutex)) != 0) {
        return -2;
    }
    if (pool->closed) {
        pthread_mutex_unlock(&(pool->mutex));
        return -3;
    }
    if (task_queue->count == pool->queue_size) {
        pthread_mutex_unlock(&(pool->mutex));
        return -4;
    }
//避免queue数据的变化,采用头尾索引来标识
    task_queue->queue[task_queue->tail].func = func;
    task_queue->queue[task_queue->tail].arg = arg;
    task_queue->tail = (task_queue->tail + 1) % pool->queue_size;
    task_queue->count++;
//唤醒一个休眠的线程
    if (pthread_cond_signal(&(pool->condition)) != 0) {
        pthread_mutex_unlock(&(pool->mutex));
        return -5;
    }
    pthread_mutex_unlock(&(pool->mutex));
    return 0;
}

在任务队列保存任务

通过pthread_cond_signal通知子唤醒子线程的pthread_cond_wait

thread_pool_destroy

int thread_pool_destroy(thread_pool_t *pool) {
    if (pool == NULL) {
        return -1;
    }
    if (pthread_mutex_lock(&(pool->mutex)) != 0) {
        return -2;
    }
    if (pool->closed) {
        thread_pool_free(pool);
        return -3;
    }
    pool->closed = 1;
//广播形式,通知所有阻塞在condition的线程接触阻塞
    if (pthread_cond_broadcast(&(pool->condition)) != 0 || 
            pthread_mutex_unlock(&(pool->mutex)) != 0) {
        thread_pool_free(pool);
        return -4;
    }
    wait_all_done(pool);//等待所有线程退出
    thread_pool_free(pool);
    return 0;
}

pool->closed = 1;

通过pthread_cond_broadcast唤醒线程池所有线程

wait_all_done

int wait_all_done(thread_pool_t *pool) {
    int i, ret=0;
    for (i=0; i < pool->thrd_count; i++) {
        if (pthread_join(pool->threads[i], NULL) != 0) {
            ret=1;
        }
    }
    return ret;
}

使用pthread_join,等待所有子线程任务执行完毕,回收线程

thread_worker

static void *thread_worker(void *thrd_pool) {
    thread_pool_t *pool = (thread_pool_t*)thrd_pool;
    task_queue_t *que;
    task_t task;
    for (;;) {
        pthread_mutex_lock(&(pool->mutex));
        que = &pool->task_queue;
        //此处判断que->count == 0的原因:
        // 1.可能虚假唤醒   linux  pthread_cond_signal
        // 2.linux 中可能被信号唤醒
        // 3.业务逻辑不严谨,被其他线程抢了该任务
        while (que->count == 0 && pool->closed == 0) {
            // pthread_mutex_unlock(&(pool->mutex))
            // 阻塞在 condition
            // ===================================
            // 解除阻塞
            // pthread_mutex_lock(&(pool->mutex));
            pthread_cond_wait(&(pool->condition), &(pool->mutex));
        }
        if (pool->closed == 1) break;
        task = que->queue[que->head];
        que->head = (que->head + 1) % pool->queue_size;
        que->count--;
        pthread_mutex_unlock(&(pool->mutex));
        (*(task.func))(task.arg);
    }
    pool->started--;
    pthread_mutex_unlock(&(pool->mutex));
    pthread_exit(NULL);
    return NULL;
}

pthread_cond_wait等待任务的唤醒

(*(task.func))(task.arg);执行任务

文章参考与的C/C++linux服务器高级架构系统教程学习

(编辑:应用网_阳江站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!