C++实战——线程池设计
文章代码来源:高并发Web服务器
首先,在线程池的私有部分 定义结构体
Pool
并且实例化一个共享智能指针.另外可以看到任务队列实际上,就是一个参数为返回值为void
,参数为空的函数的队列。1
2
3
4
5
6
7
8struct Pool
{
std::mutex mtx; //互斥锁
std::condition_variable cond; //条件变量
bool isClosed; //是否关闭
std::queue<std::function<void()>> tasks; //任务队列
};
std::shared_ptr<Pool> pool_;
显式实例化构造函数定义,在这之前需要先看看 std::thread的构造方法,
1
2
3
4
5
6
7
8
9
10//强制显式实例化 放置A a = 8 必须 A a = new A(8);
explicit ThreadPool(size_t threadCount = 8): pool_(std::make_shared<Pool>()) {
assert(threadCount > 0);
//创建8个子线程
for(size_t i = 0; i < threadCount; i++) {
//创建线程表示不需要父线程对线程进行资源的释放
std::thread([pool = pool_] { ... }).detach();
}
}上述thread方法的
...
为线程的具体要执行的命令内容。.detach()
代表线程不需要父线程释放资源,该方法会自动释放资源。具体命令内容如下,可以看到通过互斥锁实现对
pool->tasks
队列的互斥访问,:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17std::thread([pool = pool_] {
std::unique_lock<std::mutex> locker(pool->mtx); //设置一个锁
while(true) {
//从任务队列中取任务
if(!pool->tasks.empty()) {
auto task = std::move(pool->tasks.front());
pool->tasks.pop();
//开关锁
locker.unlock();
task(); //具体任务指定的代码 functional库里边的东西
locker.lock();
}
else if(pool->isClosed) break;
else pool->cond.wait(locker); //条件变量使之阻塞
}
}).detach();//设置线程分离?表示不需要父线程对线程进行资源的释放另外上述代码中,使用条件变量在任务队列为空时讲线程阻塞,可以防止他一直不断的循环浪费资源
1
pool->cond.wait(locker);
与之相应的,向任务队列中添加任务后,通过条件变量将一个线程唤醒:
1
2
3
4
5
6
7
8template<class F>
void AddTask(F&& task) {
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->tasks.emplace(std::forward<F>(task));
}
pool_->cond.notify_one(); //条件变量去唤醒一个线程
}析构函数 联系上部分的线程内容,将
pool->isClosed
置为真,并且唤醒所有线程,才会让线程执行完推出并自动释放资源。1
2
3
4
5
6
7
8
9~ThreadPool() {
if(static_cast<bool>(pool_)) {
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->isClosed = true;
}
pool_->cond.notify_all();
}
}
整体上来看是一个生产者消费者模型,完整代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79/*
* @Author : mark
* @Date : 2020-06-15
* @copyleft Apache 2.0
*/
class ThreadPool {
private:
//定义pool结构体
struct Pool {
std::mutex mtx; //互斥锁
std::condition_variable cond; //条件变量
bool isClosed; //是否关闭
std::queue<std::function<void()>> tasks; //任务队列
};
std::shared_ptr<Pool> pool_; //实例化一个pool指针 而且是共享指针
public:
//显式实例化
explicit ThreadPool(size_t threadCount = 8): pool_(std::make_shared<Pool>()) {
assert(threadCount > 0);
//创建8个子线程
for(size_t i = 0; i < threadCount; i++) {
std::thread([pool = pool_] { //花括号内为线程的具体任务
std::unique_lock<std::mutex> locker(pool->mtx); //设置一个锁
while(true) {
//从任务队列中取任务
if(!pool->tasks.empty()) {
auto task = std::move(pool->tasks.front());
pool->tasks.pop();
locker.unlock();
task(); //具体任务指定的代码 functional库里边的东西
locker.lock();
}
else if(pool->isClosed) break;
else pool->cond.wait(locker); //条件变量使之阻塞
}
}).detach();//设置线程分离?表示不需要父线程对线程进行资源的释放
}
}
ThreadPool() = default; //无参用默认
ThreadPool(ThreadPool&&) = default; //没定义用默认
~ThreadPool() {
if(static_cast<bool>(pool_)) {
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->isClosed = true;
}
pool_->cond.notify_all();
}
}
template<class F>
void AddTask(F&& task) {
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->tasks.emplace(std::forward<F>(task));
}
pool_->cond.notify_one(); //条件变量去唤醒一个线程
}
};