跳至內容

C++/STL/ConditionVariable

維基教科書,自由的教學讀本
< C++

condition_variable標準程式庫中的一個頭文件,定義了C++11標準中的一些用於並發編程時表示條件變量的類與方法等。從g++ 4.8.1、Visual C++ 2012都已經支持了C++11標準中定義的condition_variable頭文件。

背景簡介

[編輯]

條件變量並發程序設計中的一種控制結構。多個執行緒訪問一個共享資源(或稱臨界區)時,不但需要用互斥鎖實現獨享訪問以避免並發錯誤(稱為競爭危害),在獲得互斥鎖進入臨界區後還需要檢驗特定條件是否成立:

  • 如果不滿足該條件,擁有互訴鎖的執行緒應該釋放該互斥鎖、把自身阻塞(block)並掛到(suspend)條件變量的執行緒隊列中;
  • 如果滿足該條件,擁有互訴鎖的執行緒在臨界區內訪問共享資源,在退出臨界區時通知(notify)在條件變量的執行緒隊列中處於阻塞狀態的執行緒,被通知的執行緒必須重新申請對該互斥鎖加鎖。

如上,實際上使用了兩個處於阻塞狀態的執行緒的隊列,分別為條件變量與互斥鎖所擁有。

C++11的標準庫中新增加的條件變量的實現,與pthread的實現語義完全一致。

使用條件變量做並發控制時,某一時刻阻塞在一個條件變量上的各個執行緒應該在調用wait操作時指明同一個互斥鎖,此時該條件變量與該互斥鎖綁定;否則程序的行為未定義。條件變量必須與互斥鎖配合使用,其理由是程序需要判定某個條件(condition或稱predict)是否成立,該條件可以是任意複雜。通行的編程樣例為(偽代碼):

 mutex.lock();//互斥锁加锁
 while(predict()!=true) // predict可以任意复杂,但必须互斥独占访问
    conditionVariable.wait();   
 //退出while循环执行至此时,既有predict()为真且获得了mutex加锁 

離開臨界區的執行緒用notify操作解除阻塞(unblock)在條件變量上的各個執行緒時,按照公平性(fairness)這些執行緒應該有平等的獲得互斥鎖的機會,不應讓某個執行緒始終難以獲得互斥鎖被餓死(starvation),並且比後來到臨界區的其它執行緒更為優先(即基本上FIFO)。一種辦法是調用了notify_all的執行緒保持互斥鎖,直到所有從條件變量上解除阻塞的執行緒都已經掛起(suspend)到互斥鎖上,然後發起了notify_all的執行緒再釋放互斥鎖。[1]互斥鎖上一般都有比較完善的阻塞執行緒調度算法,一般會按照執行緒優先級調度,相同優先級按照FIFO調度。

發起notify的執行緒不需要擁有互斥鎖。

即將離開臨界區的執行緒是先釋放互斥鎖還是先notify操作解除在條件變量上掛起執行緒的阻塞?表面看兩種順序都可以。但一般建議是先notify操作,後對互斥鎖解鎖。因為這既有利於上述的公平性,同時還避免了相反順序時可能的w:優先級倒置。這種先notify後解鎖的做法是悲觀的(pessimization),因為被通知(notified)執行緒將立即被阻塞,等待通知(notifying)執行緒釋放互斥鎖。很多實現(特別是pthreads的很多實現)為了避免這種「匆忙與等待」(hurry up and wait)情形,把在條件變量的執行緒隊列上處於等待的被通知執行緒直接移到互斥鎖的執行緒隊列上,而不喚醒這些執行緒。

std::condition_variable類

[編輯]

std::condition_variable類表示w:條件變量。效果上相當於包裝了w:pthread庫中的pthread_cond_*()系列的函數。

  • 構造函數
    • condition_variable();預設構造函數
    • condition_variable (const condition_variable&) = delete;禁止拷貝構造函數
  • 成員函數
    • void wait (unique_lock<mutex>& lck); 無條件被阻塞。調用該函數前,當前執行緒應該已經對unique_lock<mutex> lck完成了加鎖。所有使用同一個條件變量的執行緒必須在wait函數中使用同一個unique_lock<mutex>。該wait函數內部會自動調用lck.unlock()對互斥鎖解鎖,使得其他被阻塞在互斥鎖上的執行緒恢復執行。使用本函數被阻塞的當前執行緒在獲得通知(notified,通過別的執行緒調用 notify_*系列的函數)而被喚醒後,wait()函數恢復執行並自動調用lck.lock()對互斥鎖加鎖。
    • template <class Predicate> void wait (unique_lock<mutex>& lck, Predicate pred);帶條件的被阻塞。wait函數設置了謂詞(Predicate),只有當pred條件為false時調用該wait函數才會阻塞當前執行緒,並且在收到其他執行緒的通知後只有當pred為true時才會被解除阻塞。因此,等效於while (!pred()) wait(lck);
    • template <class Rep, class Period> cv_status wait_for (unique_lock<mutex>& lck, const chrono::duration<Rep,Period>& rel_time);指定一個時間段,在當前執行緒收到通知(notify)或者超過指定時間段,wait_for 返回
    • template <class Rep, class Period, class Predicate> bool wait_for (unique_lock<mutex>& lck, const chrono::duration<Rep,Period>& rel_time, Predicate pred); 有條件阻塞且超時返回。
    • template <class Clock, class Duration> cv_status wait_until (unique_lock<mutex>& lck, const chrono::time_point<Clock,Duration>& abs_time);指定一個絕對時間點,超時wait_until返回。
    • template <class Clock, class Duration, class Predicate> bool wait_until (unique_lock<mutex>& lck, const chrono::time_point<Clock,Duration>& abs_time, Predicate pred);有條件阻塞且超時返回。
    • notify_one():喚醒某個等待執行緒,該執行緒是通過該條件變量的某個wait函數阻塞在該條件變量的執行緒隊列上。如果當前沒有等待執行緒,則該函數什麼也不做
    • notify_all():喚醒所有的等待(wait)執行緒。如果當前沒有等待執行緒,則該函數什麼也不做。

std::condition_variable_any類

[編輯]

與std::condition_variable用法一樣,區別僅在於std::condition_variable_any 的 wait 函數可以接受任何 lockable 參數,而 std::condition_variable 只能接受 std::unique_lock<std::mutex> 類型的參數。

std::cv_status枚舉類型

[編輯]
  • std::cv_status::no_timeout: wait_for 或者 wait_until 沒有超時即返回,即在規定的時間段內執行緒收到了通知。
  • std::cv_status::timeout: wait_for 或者 wait_until 超時後返回。

函數std::notify_all_at_thread_exit()

[編輯]

函數原型為:

void notify_all_at_thread_exit (condition_variable& cond, unique_lock<mutex> lck);

當調用該函數的執行緒退出時,所有在 cond 條件變量上等待的執行緒都會收到通知。Microsoft Visual C++ 2013已經支持了該函數;但GCC 4.9.3尚未支持該函數。

例子程序

[編輯]
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
 
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;
 
void worker_thread()
{
    // 等待主线程设置好ready变量为真
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, []{return ready;}); //或者为 cv.wait(lk);
 
    // 现在拥有了互斥锁m,变量ready为真,已进入了临界区
    std::cout << "Worker thread is processing data\n";
    data += " after processing";
 
    // Send data back to main()
    processed = true;
    std::cout << "Worker thread signals data processing completed\n";
 
    // 手工解锁,并通知阻塞在cv上的某个线程。  
    cv.notify_one();
    lk.unlock();
}
 
int main()
{
    std::thread worker(worker_thread); //启动工作线程
 
    data = "Example data";
    //把ready变量由false变为true,这使得工作线程进入临界区
    {
        std::lock_guard<std::mutex> lk(m); //由于工作线程不可能更早得到ready为真,所以主线程很快就会获得互斥锁m
        ready = true;
        std::cout << "main() signals data ready for processing\n";
    }
    cv.notify_one(); //通知已经阻塞在cv上的某个线程;如果没有线程被阻塞,则什么也不做
 
    // 等待工作线程——主线程需要获得互斥锁m且processed变量变为真
    {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return processed;});
    }
    std::cout << "Back in main(), data = " << data << '\n';
 
    worker.join();
}

示例:實現信號量

[編輯]
#include <mutex>
#include <condition_variable>

class Semaphore {
public:
    Semaphore (int count_ = 0)
        : count(count_) {}

    inline void notify()
    {
        std::unique_lock<std::mutex> lock(mtx);
        count++;
        cv.notify_one();
    }

    inline void wait()
    {
        std::unique_lock<std::mutex> lock(mtx);

        while(count == 0){
            cv.wait(lock);
        }
        count--;
    }

private:
    std::mutex mtx;
    std::condition_variable cv;
    int count;
};

參考文獻

[編輯]
  1. Douglas C. Schmidt and Irfan Pyarali:《Strategies for Implementing POSIX Condition Variables on Win32》§3.4. The SignalObjectAndWait Solution