首页 > 编程笔记 > C++笔记 阅读:12

C++ std::condition_variable条件变量的用法(附带实例)

互斥量是保护访问共享数据的同步原语。然而,标准库提供了叫作条件变量的同步原语,用来通知其他线程发生了特定条件。在条件变量上等待的线程将被阻塞直到条件变量被触发或超时或虚假唤醒。

在本节中,我们将看到如何使用条件变量在线程生产的数据和线程消费的数据间发送通知。

条件变量在 <condition_variable> 头文件的 std 命名空间中可用。


使用以下模式在线程间用条件变量来同步通知:
1) 定义条件变量(在合适的上下文):
std::condition_variable cv;

2) 定义给线程加锁用的互斥量。第二个互斥量用于同步访问不同线程的标准控制台:
std::mutex cv_mutex; // data mutex
std::mutex io_mutex; // I/O mutex

3) 定义线程间共享的数据:
int data = 0;

4) 在生产线程中,在修改数据前锁定互斥量:
std::thread p[&](){
    // simulate long running operation
    {
        using namespace std::chrono_literals;
        std::this_thread::sleep_for(2s);
    }

    // produce
    {
        std::unique_lock lock(cv_mutex);
        data = 42;
    }

    // print message
    {
        std::lock_guard l(io_mutex);
        std::cout << "produced " << data << '\n';
    }
    // continued at 5.
};

5) 在生产线程中,调用 notify_one() 或 notify_all() 给条件变量发信号(在互斥量保护的共享数据解锁后调用):
// continued from 4.
cv.notify_one();

6) 在消费线程中,获取互斥量唯一锁并在锁上等待条件变量。需要留意虚假唤醒可能会发生,我们将在之后讨论:
std::thread c[&](){
    // wait for notification
    {
        std::unique_lock lock(cv_mutex);
        cv.wait(lock);
    }

    // continued at 7.
}};

7) 在消费线程中,当条件变量被通知后使用共享数据:
// continued from 6.
{
    std::lock_guard lock(io_mutex);
    std::cout << "consumed " << data << '\n';
}

深度剖析C++条件变量

在前面示例中展示了两个线程共享数据(即整型变量)。在冗长的计算后(用睡眠模拟),一个线程生产数据,同时另一个线程只有当数据生产后才消费。为此,它们通过互斥量和条件变量的同步机制来阻塞消费线程,直到生产者线程发出了通知,表示数据已经生产完毕。

这种交流通道的关键是消费线程等待在条件变量上,直到生产线程通知它。两个线程都在同一时间启动。生产者线程开始冗长的计算用来生产给消费线程的数据。与此同时,消费线程直到数据可用时才能继续执行。在被通知数据生产完前,消费线程必须被阻塞。一旦被通知,消费线程可继续执行。

整个机制如下:
在多处理器系统中,条件变量不是完全可预测的。因此,虚假唤醒可能发生,即使没人给条件变量发信号,线程可能也会变成非阻塞。故而,当线程变成非阻塞时,检查条件变量是否为真是必要的。然而,虚假唤醒可能发生多次,所以在循环里检查条件变量是必要的。

C++ 标准提供了条件变量的两种实现:

1) 本节使用的 std::condition_variable,定义了和 std::unique_lock 关联的条件变量。

2) std::condition_variable_any 是更通用的实现,可以和任何满足基本锁要求(实现了 lock() 和 unlock() 方法)的锁一起使用。这种实现可能的一种场景是提供可中断的等待。

Anthony William 在 C++ Concurrency in Action(2012)中提及:“自定义锁操作将相关联的互斥量加锁,也会在中断信号收到时通知条件变量。”

条件变量的所有等待方法都有两种重载实现:
1) 第一个重载接收 std::unique_lock<std::mutex>(基于类型,即时长或时间点)并阻塞线程直到通知条件变量。此重载自动释放互斥量,阻塞当前线程,然后将其加入等待此条件变量的线程列表中。当条件变量被 notify_one() 或 notify_all() 通知,虚假唤醒发生或超时发生(取决于重载的函数)时,线程变成非阻塞,再次自动获取互斥量。

2) 第二个重载除了其他重载的参数外还接收一个断言。此断言用来避免虚假唤醒,直到条件变量为真。此重载跟以下等价:
while(!pred())
    wait(lock);
以下代码展示了跟前面类似但更复杂的例子。生产线程在循环中生成数据(示例中为有限循环),消费线程等待新的数据并消费它(输出在控制台上)。在生产数据完后,生产线程停止,当没有更多的数据消费时,消费线程停止。数据被添加进 queue<int>,布尔变量给消费线程指明生产数据是否完成。

以下代码片段展示了 producer 线程的实现:
std::mutex g_lockprint;
std::mutex g_lockqueue;
std::condition_variable g_queuecheck;
std::queue<int> g_buffer;
bool g_done;

void producer(
    int const id,
    std::mt19937& generator,
    std::uniform_int_distribution<int>& dsleep,
    std::uniform_int_distribution<int>& dcode)
{
    for (int i = 0; i < 5; ++i)
    {
        // simulate work
        std::this_thread::sleep_for(
            std::chrono::seconds(dsleep(generator)));

        // generate data
        {
            std::unique_lock<std::mutex> locker(g_lockqueue);
            int value = id * 100 + dcode(generator);
            g_buffer.push(value);
        }
        {
            std::unique_lock<std::mutex> locker(g_lockprint);
            std::cout << "[produced(" << id << ")]: " << value << '\n';
        }

        // notify consumers
        g_queuecheck.notify_one();
    }
}

消费者线程的实现如下:
void consumer()
{
    // Loop until end is signaled
    while (!g_done)
    {
        std::unique_lock<std::mutex> locker(g_lockqueue);

        g_queuecheck.wait_for(
            locker,
            std::chrono::seconds(1),
            [&]() {return !g_buffer.empty(); });

        // if there are values in the queue process them
        while (!g_done && !g_buffer.empty())
        {
            std::unique_lock<std::mutex> locker(g_lockprint);
            std::cout
                << "[consumed]: " << g_buffer.front()
                << '\n';
            g_buffer.pop();
        }
    }
}
消费者线程做了如下工作:
我们可以启动多个生产线程和一个消费线程来测试。生产者线程随机生成数据,共享一套伪随机数生成器引擎和分布。如以下示例所示:
auto seed_data = std::array<int, std::mt19937::state_size> {};
std::random_device rd {};
std::generate(std::begin(seed_data), std::end(seed_data),
              std::ref(rd));
std::seed_seq seq(std::begin(seed_data), std::end(seed_data));
auto generator = std::mt19937{ seq };
auto dsleep = std::uniform_int_distribution<>{ 1, 5 };
auto dcode = std::uniform_int_distribution<>{ 1, 99 };

std::cout << "start producing and consuming..." << '\n';

std::thread consumerthread(consumer);
std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i)
{
    threads.emplace_back(producer,
                        i + 1,
                        std::ref(generator),
                        std::ref(dsleep),
                        std::ref(dcode));
}

// work for the workers to finish
for (auto& t : threads)
    t.join();

// notify the logger to finish and wait for it
g_done = true;
consumerthread.join();

std::cout << "done producing and consuming" << '\n';

这段程序可能的输出如下(实际输出每次执行可能不同):

start producing and consuming...
[produced(5)]: 550
[consumed]: 550
[produced(5)]: 529
[consumed]: 529
[produced(5)]: 537
[consumed]: 537
[produced(1)]: 122
[produced(2)]: 224
[produced(3)]: 326
[produced(4)]: 458
[consumed]: 122
[consumed]: 224
[consumed]: 326
[consumed]: 458
...
done producing and consuming


标准库还提供了帮助函数 notify_all_at_thread_exit(),可通过 condition_variable 对象用来通知其他线程,线程完全结束执行,包括销毁所有 thread_local 对象。这个函数有两个参数:condition_variable 和与条件变量关联的 std::unique_lock<std::mutex>(拥有此条件变量所有权)。此函数的典型用例是就在结束前,运行调用此函数的分离线程。

相关文章