C++实战——线程池设计

C++实战——线程池设计

文章代码来源:高并发Web服务器

  1. 首先,在线程池的私有部分 定义结构体 Pool 并且实例化一个共享智能指针.另外可以看到任务队列实际上,就是一个参数为返回值为void ,参数为空的函数的队列。

    1
    2
    3
    4
    5
    6
    7
    8
    struct Pool 
    {
    std::mutex mtx; //互斥锁
    std::condition_variable cond; //条件变量
    bool isClosed; //是否关闭
    std::queue<std::function<void()>> tasks; //任务队列
    };
    std::shared_ptr<Pool> pool_;
  1. 显式实例化构造函数定义,在这之前需要先看看 std::thread的构造方法,

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    //强制显式实例化   放置A a = 8  必须 A a = new A(8); 
    explicit ThreadPool(size_t threadCount = 8): pool_(std::make_shared<Pool>()) {
    assert(threadCount > 0);

    //创建8个子线程
    for(size_t i = 0; i < threadCount; i++) {
    //创建线程表示不需要父线程对线程进行资源的释放
    std::thread([pool = pool_] { ... }).detach();
    }
    }

    上述thread方法的 ...为线程的具体要执行的命令内容。.detach()代表线程不需要父线程释放资源,该方法会自动释放资源。

    具体命令内容如下,可以看到通过互斥锁实现对 pool->tasks队列的互斥访问,:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    std::thread([pool = pool_] {                       
    std::unique_lock<std::mutex> locker(pool->mtx); //设置一个锁

    while(true) {
    //从任务队列中取任务
    if(!pool->tasks.empty()) {
    auto task = std::move(pool->tasks.front());
    pool->tasks.pop();
    //开关锁
    locker.unlock();
    task(); //具体任务指定的代码 functional库里边的东西
    locker.lock();
    }
    else if(pool->isClosed) break;
    else pool->cond.wait(locker); //条件变量使之阻塞
    }
    }).detach();//设置线程分离?表示不需要父线程对线程进行资源的释放

    另外上述代码中,使用条件变量在任务队列为空时讲线程阻塞,可以防止他一直不断的循环浪费资源

    1
    pool->cond.wait(locker);

    与之相应的,向任务队列中添加任务后,通过条件变量将一个线程唤醒:

    1
    2
    3
    4
    5
    6
    7
    8
    template<class F>
    void AddTask(F&& task) {
    {
    std::lock_guard<std::mutex> locker(pool_->mtx);
    pool_->tasks.emplace(std::forward<F>(task));
    }
    pool_->cond.notify_one(); //条件变量去唤醒一个线程
    }
  2. 析构函数 联系上部分的线程内容,将pool->isClosed置为真,并且唤醒所有线程,才会让线程执行完推出并自动释放资源。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    ~ThreadPool() {
    if(static_cast<bool>(pool_)) {
    {
    std::lock_guard<std::mutex> locker(pool_->mtx);
    pool_->isClosed = true;
    }
    pool_->cond.notify_all();
    }
    }
  1. 整体上来看是一个生产者消费者模型,完整代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    /*
    * @Author : mark
    * @Date : 2020-06-15
    * @copyleft Apache 2.0
    */

    #ifndef THREADPOOL_H
    #define THREADPOOL_H

    #include <mutex> //锁
    #include <condition_variable>//条件变量
    #include <queue> //队列
    #include <thread> //线程库 c++的库
    #include <functional> //
    class ThreadPool {
    private:
    //定义pool结构体
    struct Pool {
    std::mutex mtx; //互斥锁
    std::condition_variable cond; //条件变量
    bool isClosed; //是否关闭
    std::queue<std::function<void()>> tasks; //任务队列
    };
    std::shared_ptr<Pool> pool_; //实例化一个pool指针 而且是共享指针
    public:
    //显式实例化
    explicit ThreadPool(size_t threadCount = 8): pool_(std::make_shared<Pool>()) {
    assert(threadCount > 0);

    //创建8个子线程
    for(size_t i = 0; i < threadCount; i++) {
    std::thread([pool = pool_] { //花括号内为线程的具体任务

    std::unique_lock<std::mutex> locker(pool->mtx); //设置一个锁

    while(true) {
    //从任务队列中取任务
    if(!pool->tasks.empty()) {
    auto task = std::move(pool->tasks.front());
    pool->tasks.pop();
    locker.unlock();
    task(); //具体任务指定的代码 functional库里边的东西
    locker.lock();
    }
    else if(pool->isClosed) break;
    else pool->cond.wait(locker); //条件变量使之阻塞
    }
    }).detach();//设置线程分离?表示不需要父线程对线程进行资源的释放
    }
    }

    ThreadPool() = default; //无参用默认

    ThreadPool(ThreadPool&&) = default; //没定义用默认

    ~ThreadPool() {
    if(static_cast<bool>(pool_)) {
    {
    std::lock_guard<std::mutex> locker(pool_->mtx);
    pool_->isClosed = true;
    }
    pool_->cond.notify_all();
    }
    }

    template<class F>
    void AddTask(F&& task) {
    {
    std::lock_guard<std::mutex> locker(pool_->mtx);
    pool_->tasks.emplace(std::forward<F>(task));
    }
    pool_->cond.notify_one(); //条件变量去唤醒一个线程
    }


    };


    #endif //THREADPOOL_H