C++线程池的具体实现(带源码和解析)
线程池是一种管理线程的机制,在程序运行期间维护一组预先创建的线程,用于执行后台任务,避免重复地创建和销毁线程以节省系统的调度时间。线程池可以提高程序的性能和资源利用率,同时也减少了线程创建和销毁的开销。
线程池通常包含任务队列、线程池管理、工作线程:
使用线程池的主要好处是可以控制线程数量,方便在并发和资源调配上找出一个比较合理的平衡。线程的创建和销毁是一项昂贵的操作,线程池可以重用已创建的线程,减少这些开销,避免系统资源过度消耗和竞争,能够提供更好的系统资源管理。线程池可以根据系统资源的情况来动态地调整线程的数量,从而更好地利用系统资源。
在 C++11 标准库中,提供了主要的线程操作模块,但是并没有实现线程池。线程池作为一种高效的线程处理手段在实际开发中会经常用到。本节实现了一个简易的线程池,成员变量定义的代码如下:

图 1 线程池调度流程图
run_task__() 函数用于在线程池中循环执行任务,是线程池处理调度的核心代码。它的主要作用是不断地从任务队列中获取任务并执行它们,实际上会根据线程池的池容量启动若干个 run_task__() 线程。当线程池被标记为停止并且任务队列为空时,该函数会退出,具体步骤见图 1,run_task__() 函数的代码如下:
add() 用于向线程池中添加任务,对需要执行的任务进行排队。它接收一个函数对象 f 和任意数量的参数 args,并返回一个 std::future 对象,后期用于获取任务的返回值。模板参数 Func_t 是函数类型,Args... 是 Func_t 参数表类型。
在 add() 函数的实现中返回的返回值使用类型推导的方式实现,所以返回值类型使用了 auto,通过 -> 操作对返回值进行推导。在这里是 std::future<返回值类型>,无论函数的实际返回值是什么都返回一个 std::future 类型。函数的实际返回值类型则通过 std::result_of<> 元函数获取 std::result_of<F(Args...)>::type,代码如下:
std::packaged_task() 用于包装任何可调用目标(如函数、lambda 表达式、bind 表达式或函数对象),以便它可以被异步调用,其主要功能是将可调用对象的执行结果传递给一个 std::future 对象,这样可以在另一个线程中异步获取该结果。要返回的 std::future 对象通过 std::packaged_task 的成员函数 get_future() 获取。
最后任务的返回值通过 std::future 读取,代码如下:
start() 函数的功能是启动线程池操作,函数参数 start 的默认值为 true,在参数值为 true 的情况下启动线程池,在参数值为 false 的情况下停止线程池。for 循环根据构造函数配置的线程池数量启动对应数量的线程,线程执行的任务就是 run_task__() 函数。
当需要结束线程池时首先清理掉还没有执行的任务列表,然后根据操作系统结束正在运行的线程,代码如下:
在这个模块完成了基础的线程池功能,但是并没有考虑任务数量的上限,这可能会造成内存负载过高,在使用时需要注意这点,读者可以根据自己的需要调整代码,以便添加任务上限的约束,例如当任务数量达到上限时拒绝添加任务。如果要进行这样的修改,则可以将判断的内容添加到 add() 函数的实现中。
下面是一个简单的使用线程池的示例程序,演示如何使用 threadPool 模块,代码如下:
线程池通常包含任务队列、线程池管理、工作线程:
- 任务队列用于存储待执行的任务。任务可以是任意的代码块或函数;
- 线程池管理负责创建和销毁线程,以及监控任务队列的状态,并处理好多个线程的执行顺序和资源的竞争;
- 工作线程用于获取队列中的任务并执行任务。
使用线程池的主要好处是可以控制线程数量,方便在并发和资源调配上找出一个比较合理的平衡。线程的创建和销毁是一项昂贵的操作,线程池可以重用已创建的线程,减少这些开销,避免系统资源过度消耗和竞争,能够提供更好的系统资源管理。线程池可以根据系统资源的情况来动态地调整线程的数量,从而更好地利用系统资源。
在 C++11 标准库中,提供了主要的线程操作模块,但是并没有实现线程池。线程池作为一种高效的线程处理手段在实际开发中会经常用到。本节实现了一个简易的线程池,成员变量定义的代码如下:
//threadPool.h
class threadPool
{
private:
std::vector<std::thread > m_works__; //线程池运行中的线程组
std::queue<std::function<void() >> m_tasks__; //任务队列
std::mutex m_queue_mutex__; //互斥锁
std::condition_variable m_condition__; //调度控制的条件变量
std::atomic< bool > m_stop__; //是否已经停止
std::atomic< int > m_count__; //线程池容量
private:
线程调度模块的处理流程如下图所示:
图 1 线程池调度流程图
- 首先检查线程池是否在运行,如果在运行,则锁定运行数据,检查是否存在任务;
- 如果不存在任务,则等待任务。如果存在任务,则从任务队列中读取任务调度以执行任务;
- 当不存在任务时使用条件变量等候通知,新增任务的新增函数会发出通知以调度程序执行调度操作。
run_task__() 函数用于在线程池中循环执行任务,是线程池处理调度的核心代码。它的主要作用是不断地从任务队列中获取任务并执行它们,实际上会根据线程池的池容量启动若干个 run_task__() 线程。当线程池被标记为停止并且任务队列为空时,该函数会退出,具体步骤见图 1,run_task__() 函数的代码如下:
// threadPool.h
void run_task__() {
while(!m_stop__.load()) {
std::function<void()> task; // 实际要执行的任务
{
std::unique_lock<std::mutex> lock(m_queue_mutex__);
// 等候队列任务就绪
m_condition__.wait(lock, [&] {
return m_stop__ || !m_tasks__.empty();
});
// 结束执行
if (m_stop__ && m_tasks__.empty()) { return; }
// 获取任务
task = std::move(m_tasks__.front());
m_tasks__.pop();
}
// 执行任务
task();
}
}
public:
// 参数 numThreads 是线程池的线程容量
threadPool(size_t numThreads) : m_stop__(true), m_count__(numThreads) {}
~threadPool() { stop(); }
add() 用于向线程池中添加任务,对需要执行的任务进行排队。它接收一个函数对象 f 和任意数量的参数 args,并返回一个 std::future 对象,后期用于获取任务的返回值。模板参数 Func_t 是函数类型,Args... 是 Func_t 参数表类型。
在 add() 函数的实现中返回的返回值使用类型推导的方式实现,所以返回值类型使用了 auto,通过 -> 操作对返回值进行推导。在这里是 std::future<返回值类型>,无论函数的实际返回值是什么都返回一个 std::future 类型。函数的实际返回值类型则通过 std::result_of<> 元函数获取 std::result_of<F(Args...)>::type,代码如下:
//threadPool.h
template<class Func_t, class...Args >
auto add(Func_t&&f, Args&&...args) -> std::future<typename std::result_of<F(Args...) >::type >
{
using return_type = typename std::result_of<F(Args...) >::type;
std::packaged_task() 用于包装任何可调用目标(如函数、lambda 表达式、bind 表达式或函数对象),以便它可以被异步调用,其主要功能是将可调用对象的执行结果传递给一个 std::future 对象,这样可以在另一个线程中异步获取该结果。要返回的 std::future 对象通过 std::packaged_task 的成员函数 get_future() 获取。
最后任务的返回值通过 std::future 读取,代码如下:
//threadPool.h
auto task =std::make_shared<std::packaged_task<return_type() >>( std::bind(std::forward<F >(f),std::forward<Args >(args)...));
std::future<return_type > res = task->get_future();
{
std::unique_lock<std::mutex > lock(m_queue_mutex__);
m_tasks__.emplace([task](){(* task)();});
}
m_condition__.notify_one();
return res;
}
start() 函数的功能是启动线程池操作,函数参数 start 的默认值为 true,在参数值为 true 的情况下启动线程池,在参数值为 false 的情况下停止线程池。for 循环根据构造函数配置的线程池数量启动对应数量的线程,线程执行的任务就是 run_task__() 函数。
当需要结束线程池时首先清理掉还没有执行的任务列表,然后根据操作系统结束正在运行的线程,代码如下:
//container/variant.h
void start(bool sw = true) {
if(sw) {
if(!m_stop__.load()) return; // 线程池正在运行
m_stop__ = !sw;
// 根据线程池的线程容量启动 m_count__ 个线程
for (int i = 0; i < m_count__.load(); ++i) {
m_works__.emplace_back(
std::bind(&threadPool::run_task__, this));
}
m_condition__.notify_all();
} else {
clearNotRunning();
m_stop__ = !sw;
m_condition__.notify_all();
// 根据平台情况停止正在运行的线程任务
for (std::thread& worker : m_works__) {
#if defined(__POSIX__) || defined(__LINUX__)
pthread_t id = worker.native_handle();
pthread_cancel(id);
#elif defined(WIN32)
HANDLE id = worker.native_handle();
TerminateThread(id, 0);
#endif
}
}
}
inline void stop() { start(false); }
使用 std::queue 针对任务进行排队处理,使用 std::future 处理异步返回值,并利用条件变量进行任务调度和数据的同步处理,可以有效地控制系统负载和任务平衡。在实际使用时可以根据计算机的硬件特点选择线程数量,能够保证在每个核上运行一个任务以达到对 CPU 算力的最好利用。在这个模块完成了基础的线程池功能,但是并没有考虑任务数量的上限,这可能会造成内存负载过高,在使用时需要注意这点,读者可以根据自己的需要调整代码,以便添加任务上限的约束,例如当任务数量达到上限时拒绝添加任务。如果要进行这样的修改,则可以将判断的内容添加到 add() 函数的实现中。
下面是一个简单的使用线程池的示例程序,演示如何使用 threadPool 模块,代码如下:
//threadPool.cpp
class a
{
public:
int taskFunction(int id) {
std::cout << "Task " << id << " started" << std::endl;
// 模拟一些工作任务,使用 sleep_for 函数等候一段时间
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Task " << id << " finished" << std::endl;
return id * 2;
}
};
int main()
{
a a1;
threadPool pool(4); // 创建一个线程池,有 4 个工作线程
std::vector<std::future<int> > rst(8); // 结果数组
// 将任务提交到线程池
for (int i = 0; i < 8; ++i) {
rst[i] = pool.add(
std::bind(&a::taskFunction, &a1, std::placeholders::_1), i);
}
// 启动线程池
pool.start(true);
// 等待所有任务完成
std::this_thread::sleep_for(std::chrono::seconds(5));
// 输出结果
for(int i = 0; i < 8; i++) {
std::cout << "thread " << i << " result: " << rst[i].get() << std::endl;
}
return 0;
}
ICP备案:
公安联网备案: