Main Content

Multiple threads waiting on same condition variable

Using cnd_signal or std::condition_variable::notify_one() to notify one of the threads waiting for the same condition variable might result in indefinite blocking

Since R2020a

Description

This checker is deactivated in a default Polyspace® as You Code analysis. See Checkers Deactivated in Polyspace as You Code Analysis (Polyspace Access).

This defect occurs when you use cnd_signal family functions or the std::condition_variable::notify_one() function to notify one of at least two threads that are concurrently waiting for the same condition variable. For threads with the same priority level, these functions cause the thread scheduler to arbitrarily notify one of the threads waiting on the condition variable.

Polyspace reports this check on the function call. See the Event column in the Results Details pane to view the threads waiting on the same condition variable.

Risk

When multiple threads use the same condition variable, then the cnd_signal family function or the std::condition_variable::notify_one() function arbitrarily notifies one of the waiting threads. The notified thread usually tests for a condition predicate. If the condition predicate is false, the thread continues to wait until it is notified again. Because this method notifies an arbitrary thread, it is possible that the condition predicate for the notified thread never becomes true. The program ends up in a state where no thread is awake to signal the condition variable, resulting in indefinite blocking.

Fix

Use cnd_broadcast family functions or std::condition_variable::notify_all() to notify all threads waiting on the condition variable, or use a unique condition variable for each thread.

Examples

expand all

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <threads.h>

typedef int thrd_return_t;

static void fatal_error(void)
{
    exit(1);
}

enum { NTHREADS = 5 };

mtx_t mutex;
cnd_t cond;

thrd_return_t next_step(void* t)
{
    static size_t current_step = 0;
    size_t my_step = *(size_t*)t;

    if (thrd_success != mtx_lock(&mutex)) {
        /* Handle error */
        fatal_error();
    }

    printf("Thread %zu has the lock\n", my_step);
    while (current_step != my_step) {
        printf("Thread %zu is sleeping...\n", my_step);
        if (thrd_success !=
            cnd_wait(&cond, &mutex)) {
            /* Handle error */
            fatal_error();
        }
        printf("Thread %zu woke up\n", my_step);
    }
    /* Do processing ... */
    printf("Thread %zu is processing...\n", my_step);
    current_step++;

    /* Signal a waiting task */
    if (thrd_success !=
        cnd_signal(&cond)) {
        /* Handle error */
        fatal_error();
    }

    printf("Thread %zu is exiting...\n", my_step);

    if (thrd_success != mtx_unlock(&mutex)) {
        /* Handle error */
        fatal_error();
    }
    return (thrd_return_t)0;
}

int main(void)
{
    thrd_t threads[NTHREADS];
    size_t step[NTHREADS];

    if (thrd_success != mtx_init(&mutex, mtx_plain)) {
        /* Handle error */
        fatal_error();
    }
    if (thrd_success != cnd_init(&cond)) {
        /* Handle error */
        fatal_error();
    }
    /* Create threads */
    for (size_t i = 0; i < NTHREADS; ++i) {
        step[i] = i;
        if (thrd_success != thrd_create(&threads[i],
                                        next_step,
                                        &step[i])) {
            /* Handle error */
            fatal_error();
        }
    }
    /* Wait for all threads to complete */
    for (size_t i = NTHREADS; i != 0; --i) {
        if (thrd_success != thrd_join(threads[i - 1], NULL)) {
            /* Handle error */
            fatal_error();
        }
    }
    (void)mtx_destroy(&mutex);
    (void)cnd_destroy(&cond);
    return 0;
}

In this example, multiple threads are created and assigned step level. Each thread checks if its assigned step level matches the current step level (condition predicate). If the predicate is false, the thread goes back to waiting on the condition variable cond. The use of cnd_signal to signal the cond causes the thread scheduler to arbitrarily wake up one of the threads waiting on cond. This can result in indefinite blocking when the condition predicate of woken up thread is false and no other thread is available to signal cond.

Correction — Use cnd_broadcast to Wake up All the Threads

One possible correction is to use cnd_broadcast instead to signal cond. The function cnd_signal wakes up all the thread that are waiting on cond.

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <threads.h>

typedef int thrd_return_t;

static void fatal_error(void)
{
    exit(1);
}

enum { NTHREADS = 5 };

mtx_t mutex;
cnd_t cond;

thrd_return_t next_step(void* t)
{
    static size_t current_step = 0;
    size_t my_step = *(size_t*)t;

    if (thrd_success != mtx_lock(&mutex)) {
        /* Handle error */
        fatal_error();
    }

    printf("Thread %zu has the lock\n", my_step);
    while (current_step != my_step) {
        printf("Thread %zu is sleeping...\n", my_step);
        if (thrd_success !=
            cnd_wait(&cond, &mutex)) {
            /* Handle error */
            fatal_error();
        }
        printf("Thread %zu woke up\n", my_step);
    }
    /* Do processing ... */
    printf("Thread %zu is processing...\n", my_step);
    current_step++;

    /* Signal a waiting task */
    if (thrd_success !=
        cnd_broadcast(&cond)) {
        /* Handle error */
        fatal_error();
    }

    printf("Thread %zu is exiting...\n", my_step);

    if (thrd_success != mtx_unlock(&mutex)) {
        /* Handle error */
        fatal_error();
    }
    return (thrd_return_t)0;
}

int main_test_next_step(void)
{
    thrd_t threads[NTHREADS];
    size_t step[NTHREADS];

    if (thrd_success != mtx_init(&mutex, mtx_plain)) {
        /* Handle error */
        fatal_error();
    }
    if (thrd_success != cnd_init(&cond)) {
        /* Handle error */
        fatal_error();
    }
    /* Create threads */
    for (size_t i = 0; i < NTHREADS; ++i) {
        step[i] = i;
        if (thrd_success != thrd_create(&threads[i],
                                        next_step,
                                        &step[i])) {
            /* Handle error */
            fatal_error();
        }
    }
    /* Wait for all threads to complete */
    for (size_t i = NTHREADS; i != 0; --i) {
        if (thrd_success != thrd_join(threads[i - 1], NULL)) {
            /* Handle error */
            fatal_error();
        }
    }
    (void)mtx_destroy(&mutex);
    (void)cnd_destroy(&cond);
    return 0;
}

In this example, multiple threads are created and assigned step numbers. All these threads are controlled using the same std::condition_variable conVar. Each thread checks if its assigned step number matches the current step number. If the assigned step number of a thread does not match the current step number, the thread goes back to waiting on the condition variable. Because the code uses std::condition_variable::notify_one() to notify conVar, it is possible that the arbitrarily notified thread has a different stepNumber than the currentStep, which causes the notified thread to resume waiting. If no other thread is awake, then conVar is never notified again, resulting in indefinite blocking.

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
  
std::mutex myMutex;
std::condition_variable conVar;
 
void stepFunc(size_t stepNumber) {
  static size_t currentStep = 0;
  std::unique_lock<std::mutex> uLock(myMutex);
 
 
  while (currentStep != stepNumber) {
    conVar.wait(uLock);
  }
 
  // Do processing...
  //....
  currentStep++;
 
  // Signal awaiting task.
  conVar.notify_one(); 
 
}
 
void foo() {
  constexpr size_t nThreads = 5;
  std::thread threads[nThreads];
 
  // Create threads.
  for (size_t i = 0; i < nThreads; ++i) {
    threads[i] = std::thread(stepFunc, i);
  }
 
  // Wait for all threads to complete.
  for (size_t i = nThreads; i != 0; --i) {
    threads[i - 1].join();
  }
}
Correction — Use std::condition_variable::notify_all() to Notify All Threads

One possible correction is to use std::condition_variable::notify_all() instead to signal conVar. The function std::condition_variable::notify_all() notifies all the thread that are waiting on conVar. At least one of the threads fail the test (currentStep != stepNumber) and currentStep is incremented.

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
  
std::mutex myMutex;
std::condition_variable conVar;
 
void stepFunc(size_t stepNumber) {
  static size_t currentStep = 0;
  std::unique_lock<std::mutex> uLock(myMutex);
 
 
  while (currentStep != stepNumber) {
    conVar.wait(uLock);
  }
 
  // Do processing...
  //....
  currentStep++;
 
  // Signal ALL awaiting task.
  conVar.notify_all(); //Compliant
 
}
 
void foo() {
  constexpr size_t nThreads = 5;
  std::thread threads[nThreads];
 
  // Create threads.
  for (size_t i = 0; i < nThreads; ++i) {
    threads[i] = std::thread(stepFunc, i);
  }
 
  // Wait for all threads to complete.
  for (size_t i = nThreads; i != 0; --i) {
    threads[i - 1].join();
  }
}
Correction — Use Unique Condition Variable For Each Thread

Notifying all threads by using std::condition_variable::notify_all() is not always thread-safe. An alternative is to use unique condition variables for each thread. This way, a thread can choose which thread to notify. In this code, each thread signals the thread corresponding to the next stepNumber. The notified threads does not resume waiting and currentStep is incremented.

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
  constexpr size_t numThreads = 5;
std::mutex myMutex;
std::condition_variable conVar[numThreads];
 
void stepFunc(size_t stepNumber) {
  static size_t currentStep = 0;
  std::unique_lock<std::mutex> uLock(myMutex);
 
 
  while (currentStep != stepNumber) {
    conVar[stepNumber].wait(uLock);
  }
 
  // Do processing...
  //....
  currentStep++;
 
  // Signal awaiting task.
  if ((stepNumber + 1) < numThreads) {
    conVar[stepNumber + 1].notify_one();
  } //Noncompliant
 
}
 
void foo() {
  constexpr size_t nThreads = 5;
  std::thread threads[nThreads];
 
  // Create threads.
  for (size_t i = 0; i < nThreads; ++i) {
    threads[i] = std::thread(stepFunc, i);
  }
 
  // Wait for all threads to complete.
  for (size_t i = nThreads; i != 0; --i) {
    threads[i - 1].join();
  }
}

Result Information

Group: Concurrency
Language: C | C++
Default: Off
Command-Line Syntax: SIGNALED_COND_VAR_NOT_UNIQUE
Impact: Low

Version History

Introduced in R2020a