线程池(Thread Pool)是一种用于管理多线程环境的技术。Thread Pool 会在 User Process 中预先创建一组 User Threads,并在需要时重复使用它们,而不是频繁的创建新线程。
线程池可以有效提高程序性能和可靠性:
一个线程池架构通常需要包含以下组件:
Task(任务):由 Task Producer(任务生产者)发出的 Task Request。
Task Queue(任务队列):用于缓冲 Tasks 的 FIFO 队列。
Thread Pool Manager(线程池管理器):用于管理 Thread Pool 资源:例如:多线程、互斥锁、条件变量等。
Worker(工作者):分配来真正处理 Task 的 Worker Thread(工作线程)。
当有 Task 需要处理时,Manager 就从 Pool 中获取一个可用的 Thread 来处理这个 Task。当 Task 完成后,Manager 回收 Thread,而不是销毁它。
C-Thread-Pool 是一个开源的轻量级线程池,实现了以下功能。
$ gcc example.c thpool.c -D THPOOL_DEBUG -pthread -o example$ ./example
Making threadpool with 4 threads
THPOOL_DEBUG: Created thread 0 in pool
THPOOL_DEBUG: Created thread 1 in pool
THPOOL_DEBUG: Created thread 2 in pool
THPOOL_DEBUG: Created thread 3 in pool
Adding 40 tasks to threadpool
Thread #245600256 working on 0
Thread #246136832 working on 2
Thread #246673408 working on 3
Thread #246673408 working on 6
Thread #246673408 working on 7
...
Killing threadpool
// 描述一个信号量
typedef struct bsem {pthread_mutex_t mutex; // 互斥锁pthread_cond_t cond; // 条件变量int v; // 0、1 二进制信号量
} bsem;// 描述一个任务
typedef struct job{struct job* prev; // 指向下一个(previous)jobvoid (*function)(void* arg); // 线程入口函数void* arg; // 线程入口函数的参数
} job;// 描述一个任务队列
typedef struct jobqueue{pthread_mutex_t rwmutex; // 读写 queue 的互斥锁job *front; // 指向队头job *rear; // 指向队尾bsem *has_jobs; // 指向一个信号量int len; // 队列长度
} jobqueue;// 描述一个线程
typedef struct thread{int id; // 线程池分配的 ID,不是 TID。pthread_t pthread; // 指向一个线程struct thpool_* thpool_p; // 指向线程池
} thread;// 描述一个线程池
typedef struct thpool_{thread** threads; // 指向线程指针数组volatile int num_threads_alive; // 当前活跃的线程数量volatile int num_threads_working; // 当前正在工作中的线程数量pthread_mutex_t thcount_lock; // used for thread count etcpthread_cond_t threads_all_idle; // 空闲条件:等待全部任务完成jobqueue jobqueue; // 指向任务队列
} thpool_;
typedef struct thpool_* threadpool;// 创建一个包含有指定数量线程的线程池
threadpool thpool_init(int num_threads);// 添加 task 到 task queue 中。
int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p);// 等待所有 tasks 执行完。
void thpool_wait(threadpool);// 暂停所有 tasks,使它们进入 sleep 状态。通过信号机制来实现。
void thpool_pause(threadpool);// 恢复所有 tasks 继续执行。
void thpool_resume(threadpool);// 销毁所有 tasks,如果有 task 正在执行,则先等待 task 执行完。
void thpool_destroy(threadpool);// 获取当前正在工作中的线程数量。
int thpool_num_threads_working(threadpool);
// 构造 struct thread,并调用 pthread_create() 创建线程
static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id) // 当线程被暂停时会在这里休眠
static void thread_hold(int sig_id) // 线程在此函数中执行任务
static void* thread_do(struct thread* thread_p) // 销毁 struct thread
static void thread_destroy (thread* thread_p) // 任务队列相关的操作集合
static int jobqueue_init(jobqueue* jobqueue_p)
static void jobqueue_clear(jobqueue* jobqueue_p)
static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob)
static struct job* jobqueue_pull(jobqueue* jobqueue_p)
static void jobqueue_destroy(jobqueue* jobqueue_p) // 信号量相关的操作集合
static void bsem_init(bsem *bsem_p, int value)
static void bsem_reset(bsem *bsem_p)
static void bsem_post(bsem *bsem_p)
static void bsem_post_all(bsem *bsem_p)
static void bsem_wait(bsem* bsem_p)