Why and how to use Condition Variables in C++

Sleeping worker threads efficiently until work is ready

Introduction

In concurrent applications, sometimes it makes sense to wait on a condition before proceeding. C++ provides an idiomatic way of doing this, with the use of std::condition_variable.

This template extends a mutex, enabling threads to efficiently wait for a predicate condition to be satisfied.

LinkedInTable of Contents

The problem it solves link icon

Sometimes threads should wait on some condition to occur before proceeding. For example, a pool of worker threads might process large, intermittent bursts of data from an incoming queue –

If the data comes in bursts, the queue could remain empty for, eg: 80% of its lifetime. The workers would then be idle most of the time. A simple solution is to put the workers to sleep, waking them up periodically to check if the queue has some data ready (say, every 50ms).

It may seem ok to let the thread sleep and periodically wake up to poll whether the queue has data in it. After all, this is a concurrent, multithreaded system – meaning the OS will let the CPU’s hardware thread do something else until it wakes up.

Why this is wasteful link icon

This naive solution doesn’t seem that bad on paper, since we know the operating system is letting other tasks use the CPU thread while it’s sleeping –

// periodicially checking if a queue
//  has data in it to consume, by putting
//  the thread to sleep between checks
while (true) {
    std::this_thread::sleep_for(50ms);
    {
        std::scoped_lock lock{ mutex };
        if (!queue.empty()) {
            // ..do something with data..
        }
    }
}

.. but, it turns out, there is a lot that goes on when you put a thread to sleep, and then wake it up again. The thread you’re putting to sleep undergoes a context switch, which incurs an enormous amount of overhead in terms of system calls and CPU instructions.

The cost of context switching link icon

When you put a thread to sleep, the operating system has to “save” the exact state of the outgoing routine, and then load the state of the new, completely different routine to run on the thread in the meantime. When the system eventually wakes up, the whole process then happens in reverse.

This process turns out to be quite the long-winded ordeal. Estimating the exact cost of context switching is difficult, but it’s enough to say that it’s usually thousands of CPU instructions and should best be avoided when not necessary.

Now, imagine that every 50ms (or some other time interval), you’re 80% of the time waking up a dozen or more worker threads. Each thread is spending thousands of CPU instructions doing nothing but going back to sleep – and thus starting another context switch. Not great!

How to use it link icon

The std::condition_variable (aka: CV) provides an idiomatic way to wake up a thread and unblock only when you want it to reevaluate the condition blocking it.

On the waiting thread link icon

Have a thread conditionally wait by pairing the CV with a std::unique_lock. After trying to lock the mutex, you call .wait() on the CV and provide a conditional predicate as second parameter. The mutex is automatically released so other threads can use it, and the thread goes to sleep until it’s signalled from another thread to wake up and evaluate the predicate once again.

On the signaling thread link icon

To let the waiting thread know it’s time to wake up, you simply have to signal from the CV using its .notify_one() or .notify_all() method. In general, avoid using .notify_all() unless it makes sense, since the aim here is to avoid unnecessary wakeups. Eg: if only one piece of data is available to consume, it makes no sense to tell multiple workers to wake up.

Example application link icon

This simplified example illustrates multiple threads consuming a data stream from a single queue. It uses a condition_variable to efficiently put worker threads to sleep until there is data in the queue to process.

The producer thread loads bulk chunks of data sporadically (eg: from a database), and each time a batch of data has finished being placed in the queue, it signals all of the worker threads with cv.notify_all() to let them know they can check the queue for data to process.

The consumer (worker) routine is where the CV’s condition is checked, via wait(). When the condition evaluates to false, the thread goes to sleep until it receives the notify signal to wake up and check the queue for data once again.

std::mutex m;
std::queue<int> queue;
std::condition_variable cv;
bool should_stop{ false };

// helper to make some random data
std::vector<int> get_random_ints(size_t n_ints=10) {
  thread_local std::mt19937 rng{ std::random_device{}() };
  std::uniform_int_distribution<int> random_int{ 0, 100 };
  std::vector<int> xs(n_ints);
  for (auto& x: xs)
    x = random_int(rng);
  return xs;
}

// called to stop all the running workers
void stop() {
  {
    std::scoped_lock lock{ m };
    should_stop = true;
  }
  cv.notify_all();
}

void produce_data() {
  // simulate a slow database read
  const auto get_data_from_db = [](){
    std::this_thread::sleep_for(50ms);
    return get_random_ints();
  };
  // simulate 5 bursts of data
  for (int i{}; i < 5; ++i) {
    auto data = get_data_from_db();
    {
      std::scoped_lock lock{ m };
      for (auto& d: data)
        queue.push(d);
    }
    // let threads know there's data
    cv.notify_all();
  }
}

void consume_data() {
  int data;
  while (true) {
    {
      std::unique_lock lock{ m };
      // release the mutex, go to sleep and
      //  wait until notified
      cv.wait(lock, [&]{ return !queue.empty() 
                                || should_stop; });
      if (should_stop) return;
      data = queue.front();
      queue.pop();
    }
    do_something_with(data);
  }
}


int main() {
  // demo with 1 producer, 4 consumers
  std::vector<std::jthread> consumers;
  for (int i{}; i < 4; ++i) {
    consumers.emplace_back(consume_data);
  }
  auto producer = std::jthread{ produce_data };
  std::this_thread::sleep_for(600ms); // run for 600ms
  stop(); // shutdown
}

Comments