C++/STL/Future

維基教科書,自由的教學讀本
< C++
跳至導覽 跳至搜尋

futureC++標準庫中的一個頭文件,定義了C++11標準中的一些表示執行緒並發控制時數據共享的類與方法、數據類型,實現了「生產者」(provider)-「消費者」(consumer)並發模型的數據同步。

該頭文件主要聲明了:

  • Providers 類:std::promise, std::package_task
  • Futures 類:std::future, std::shared_future
  • Providers 函數:std::async()
  • 其他類型:std::future_error, std::future_errc, std::future_status, std::launch.

在並發編程中,常用到一組非阻塞的模型:promise與future。future表示一個可能還沒有實際完成的異步任務的結果。並發程序中的消費者使用future可以等待(wait)、超時等待(wait_for或wait_until)、獲取(共享狀態上設置的,許諾提供的)值(get)等操作。而promise是future的源頭,任務執行者(並發程序生產者)可以設置結果值(set_value)、標記任務完成或者失敗。如果promise在設置結果值之前就被摧毀了,在對應的future上阻塞的執行緒會得到std::broken_promise異常。這一套模型是很多異步非阻塞架構的基礎。

一個std::future是一個異步返回對象( is an asynchronous return object),它從一個共享狀態讀取結果。一個std::promise是一個一步提供者(asynchronous provider),該對象提供一個結果給一個共享狀態。異步提供者在初始化時創建一個共享狀態,一個future引用這個共享狀態。異步提供者可以是std::promise、std::packaged_task、std::async內部的實現等。

std::promise類模板[編輯]

std::promise類模板用於數據同步共享模型中的「生產者」(provider)執行緒提供數據。std::promise提供了一套設施可存儲一個值或異常,std::promise的對象創建的std::future對象稍後可以異步獲取這個值或異常。類模板promise對象與一個共享狀態(通常是std::future)相關聯,在這個共享狀態上保存一個可能還沒有求出(evaluated)的類型為T的值(可以為void)或求出為一個異常。該值可被(可能在另外一個執行緒中的)future對象讀取,因此promise提供了一種執行緒數據同步的手段。promise對象是異步的數據生產者(provider),它可以在未來某一時刻設置共享狀態的值。promise對共享狀態可有三種操作:

  • make ready: promise在共享狀態存儲結果或異常。使狀態變為ready,對與這個共享狀態關聯的future對象上等待(waiting)的所有執行緒解除阻塞(unblock)。
  • release: promise放棄引用(reference to)共享狀態。如果這是共享狀態的最後一個引用,這將導致共享狀態被摧毀。但這不適用於std::async創建的還未ready的共享狀態。
  • abandon: promise存儲異常類型為std::future_error並具有錯誤碼std::future_errc::broken_promise,使共享狀態ready,然後再releases共享狀態。

promise是promise-future通信渠道的推送端( "push" end ):存儲一個值到共享狀態會同步任何等待這個共享狀態的函數(通過std::future::get)成功返回。對同一個共享狀態的並發訪問可能會彼此衝突:例如,std::shared_future::get的多個調用者必須或者只讀或者提供外部同步機制。

  • 構造函數
    • promise(); 預設構造函數
    • template <class Alloc> promise (allocator_arg_t aa, const Alloc& alloc); 帶分配器(可省略)的構造函數
    • promise (const promise&) = delete; 禁止複製構造函數
    • promise (promise&& x) noexcept; 允許移動構造函數
  • 操作符
    • 禁止複製賦值操作符
    • 允許移動賦值操作符
  • 成員函數
    • get_future():該函數返回一個與promise對象的共享狀態相關聯的future對象。返回的future對象可以訪問由 promise對象設置在共享狀態上的值或者某個異常對象。在調用該函數之後,promise對象通常會在某個時間點準備好一個值或者一個異常對象,此時promise對象的共享狀態為ready。如果不設置值或者異常,promise對象在析構時會自動地設置一個future_error異常對象(broken_promise)來設置其自身的準備狀態。
    • set_value():設置promise對象的用於共享的值,此後promise對象的共享狀態標誌變為ready。該操作是atomic的。若無共享狀態或共享狀態已設置,則拋出異常std::future_error。
      • void set_value (const T& val);
      • void set_value (T&& val);
      • void promise<R&>::set_value (R& val); // 當promise的模板參數是引用類型(R&)
      • void promise<void>::set_value (void); // 當promise的模板參數是void
    • set_exception():設置promise對象用於共享的異常,此後promise對象的共享狀態標誌變為ready.
    • set_value_at_thread_exit(): 設置promise對象共享狀態的值,但是不將共享狀態的標誌設置為 ready,當執行緒退出時該 promise對象會自動設置為ready。注意,調用該函數後,當前執行緒已經設置了promise共享狀態的值,如果在執行緒結束之前有其他設置或者修改共享狀態的值的操作,則會拋出future_error(promise_already_satisfied )。
    • swap():交換promise對象的共享狀態。

std::future類模板[編輯]

std::future用來獲取異步任務的結果。std::future 通常由某個 Provider 創建,Provider 在某個執行緒中設置共享狀態的值,與該共享狀態相關聯的 std::future 對象調用 get(通常在另外一個執行緒中) 獲取共享狀態的值。如果共享狀態的標誌不是ready,則該future對象所在的執行緒被阻塞(block),直到 Provider 設置了共享狀態的值(此時共享狀態的標誌變為 ready)調用future::get()的執行緒被解除阻塞,std::future::get 返回異步任務的值或異常(如果發生了異常)。

一個有效(valid)的 std::future 對象通常由以下三種 Provider 創建,並和某個共享狀態相關聯:

  • std::async()
  • std::promise::get_future()
  • std::packaged_task::get_future()

future的成員:

  • 構造函數
    • future() noexcept;
    • future (const future&) = delete;
    • future (future&& x) noexcept;
  • 運算符
    • 禁止拷貝賦值
    • 允許移動賦值
  • 成員函數
    • share():返回一個 std::shared_future 對象;調用該函數之後,該 std::future 對象本身已經不和任何共享狀態相關聯,因此該 std::future 的狀態不再是 valid。
    • get():當與該 std::future 對象相關聯的共享狀態標誌變為 ready 後,調用該函數將返回保存在共享狀態中的值;如果共享狀態的標誌不為 ready,則調用該函數會阻塞當前的調用者,而此後一旦共享狀態的標誌變為 ready,get 返回 Provider 所設置的共享狀態的值或者異常(如果拋出了異常)。
    • valid():檢查當前的 std::future 對象是否有效,即是否與某個共享狀態相關聯。一個有效的 std::future 對象只能通過 std::async(), std::future::get_future()或者 std::packaged_task::get_future()的返回值移動構造或移動賦值。由 std::future 默認構造函數創建的 std::future 對象是無效的(invalid)。
    • wait():如果共享狀態的標誌不是 ready,調用該成員函數會被阻塞當前執行緒,直到共享狀態的標誌變為 ready,當前執行緒被解除阻塞,但是 wait() 並不讀取共享狀態的值或者異常。
    • wait_for():設置一個時間段,如果共享狀態的標誌在該時間段結束之前沒有被 Provider 設置為 ready,則調用 wait_for 的執行緒被阻塞,然後在等待了該時間段的時間長度後該函數返回。返回值為future_status::ready、future_status::timeout、future_status::deferred(共享狀態包含一個 deferred 函數)。
    • wait_until():設置一個系統絕對時間點 abs_time,如果共享狀態的標誌在該時間點到來之前沒有被 Provider 設置為 ready,則調用 wait_until 的執行緒被阻塞,在 abs_time 這一時刻到來之後 wait_for() 返回。返回值為future_status::ready、future_status::timeout、future_status::deferred。

std::shared_future類模板[編輯]

std::shared_future 與 std::future 類似,但是 std::shared_future 可以拷貝、多個 std::shared_future 可以共享某個共享狀態的值或者異常。

shared_future 可以通過某個 std::future 對象隱式轉換,或者通過 std::future::share() 顯示轉換,無論哪種轉換,被轉換的那個 std::future 對象都會變為 invalid(因為std::future僅是moveable).

  • 構造函數
    • shared_future() noexcept;
    • shared_future (const shared_future& x);
    • shared_future (shared_future&& x) noexcept;
    • shared_future (future<T>&& x) noexcept;
  • 運算符
    • 支持move賦值
    • 支持拷貝賦值
  • 成員函數
    • get():
    • valid():
    • wait():
    • wait_for():
    • wait_until():

下述例子用於啟動2個執行緒,在確定執行緒啟動就緒後,同時通知多個執行緒,類似於std::condition_variable::notify_all()。

#include <iostream>
#include <future>
#include <chrono>
 
int main()
{   
    std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;
    std::shared_future<void> ready_future(ready_promise.get_future());
 
    std::chrono::time_point<std::chrono::high_resolution_clock> start;
 
    auto fun1 = [&, ready_future]() -> std::chrono::duration<double, std::milli> 
    {
        t1_ready_promise.set_value();
        ready_future.wait(); // waits for the signal from main()
        return std::chrono::high_resolution_clock::now() - start;
    };
 
 
    auto fun2 = [&, ready_future]() -> std::chrono::duration<double, std::milli> 
    {
        t2_ready_promise.set_value();
        ready_future.wait(); // waits for the signal from main()
        return std::chrono::high_resolution_clock::now() - start;
    };
 
    auto fut1 = t1_ready_promise.get_future();
    auto fut2 = t2_ready_promise.get_future();
 
    auto result1 = std::async(std::launch::async, fun1);
    auto result2 = std::async(std::launch::async, fun2);
 
    // wait for the threads to become ready
    fut1.wait();
    fut2.wait();
 
    // the threads are ready, start the clock
    start = std::chrono::high_resolution_clock::now();
 
    // signal the threads to go
    ready_promise.set_value();
 
    std::cout << "Thread 1 received the signal "
              << result1.get().count() << " ms after start\n"
              << "Thread 2 received the signal "
              << result2.get().count() << " ms after start\n";
}

std::packaged_task類模板[編輯]

std::packaged_task是對promise/future的簡化。可以更方便的寫出異步執行的代碼。

std::packaged_task實例對象為一個可執行對象、是數據的異步提供者(provider)、被包裝的任務執行結束時std::packaged_task會設置共享狀態的值。

std::packaged_task可用於把一些計算任務包裝好,壓入一個任務棧。一批執行緒從任務棧上取下一項並執行它。

std::packaged_task類模板包裝一個可調用的對象(諸如b:函數指針成員函數指針b:lambda表達式b:bind表達式或者b:函數對象),該對象通常在另外一個執行緒中被自動執行,通過與 std::packaged_task相關聯的std::future對象異步獲取該可調用對象執行產生的結果。std::future對象是一個數據的異步返回對象,通過它可以獲得共享狀態的值,如果等待共享狀態標誌不為ready則調用std::future::get()的執行緒被掛起。std::packaged_task 的共享狀態的生命周期一直持續到最後一個與之相關聯的對象被釋放或者銷毀為止。

  • 構造函數
    • packaged_task() noexcept;默認構造函數,初始化一個空的共享狀態,並且該 packaged_task 對象無包裝任務。
    • template <class Fn> explicit packaged_task (Fn&& fn);初始化一個共享狀態,並且被包裝任務由參數 fn 指定。
    • template <class Fn, class Alloc> explicit packaged_task (allocator_arg_t aa, const Alloc& alloc, Fn&& fn);帶自定義內存分配器的構造函數,與默認構造函數類似,但是使用自定義分配器來分配共享狀態。
    • packaged_task (const packaged_task&) = delete;拷貝構造函數,被禁用。
    • packaged_task (packaged_task&& x) noexcept;移動構造函數。
  • 運算符
    • 禁用了普通的賦值操作運算
    • 允許 move 賦值運算
    • operator()(Args... args):調用該 packaged_task 對象所包裝的對象(通常為函數指針,函數對象,lambda 表達式等),傳入的參數為 args. 調用該函數一般會發生兩種情況: 如果成功調用 packaged_task 所包裝的對象,則返回值(如果被包裝的對象有返回值的話)被保存在 packaged_task 的共享狀態中。 如果調用 packaged_task 所包裝的對象失敗,並且拋出了異常,則異常也會被保存在 packaged_task 的共享狀態中。以上兩種情況都使共享狀態的標誌變為 ready,因此其他等待該共享狀態的執行緒可以獲取共享狀態的值或者異常並繼續執行下去。
  • 成員函數
    • valid():檢查當前packaged_task是否和一個有效的共享狀態相關聯,對於由默認構造函數生成的 packaged_task 對象,該函數返回 false;除非構造後已經進行了 move 賦值操作或者 swap 操作。
    • get_future():返回一個與 packaged_task 對象共享狀態相關的 future 對象。返回的 future 對象可以獲得別的執行緒在該 packaged_task 對象的共享狀態上設置的某個值或者異常。
    • make_ready_at_thread_exit():該函數會調用被包裝的任務,並向任務傳遞參數,類似 std::packaged_task::operator() 成員函數。但不同的是,make_ready_at_thread_exit 並不會立即設置共享狀態的標誌為 ready,而是在執行緒退出時設置共享狀態的標誌。注意,該函數已經設置了 promise 共享狀態的值,如果在執行緒結束之前有其他設置或者修改共享狀態的值的操作,則會拋出 future_error( promise_already_satisfied )。
    • reset():重置 packaged_task 的共享狀態,但是保留之前的被包裝的任務。[1]
    • swap():交換 packaged_task 的共享狀態。
#include <iostream> 
#include <future>
#include <functional>  

int Test_Fun(int a, int b, int& c)
{
	//突出效果,休眠5s
	std::this_thread::sleep_for(std::chrono::seconds(5));

	//c=233
	c = a + b + 230;

	return c;
}

int main()
{
	//声明一个std::packaged_task对象pt1,包装函数Test_Fun
	std::packaged_task<int(int, int, int&)> pt1(Test_Fun);
	//声明一个std::future对象fu1,包装Test_Fun的返回结果类型,并与pt1关联
	std::future<int> fu1 = pt1.get_future();

	//声明一个变量
	int c = 0;

	//创建一个线程t1,将pt1及对应的参数放到线程里面执行
	std::thread t1(std::move(pt1), 1, 2, std::ref(c));

	//阻塞至线程t1结束(函数Test_Fun返回结果)
	int iResult = fu1.get();

	std::cout << "执行结果:" << iResult << std::endl;	//执行结果:233
	std::cout << "c:" << c << std::endl;	//c:233
	t1.join();
	return 1;
}

std::future_error類[編輯]

class future_error : public logic_error;

std::async()函數模板[編輯]

std::async()把一個工作負載(workload)函數作為一項任務(task)來調度執行。與std::thread()不同,std::async()由runtime來負責啟動與調度。

  • template <class Fn, class... Args> future<typename result_of<Fn(Args...)>::type> async(Fn&& fn, Args&&... args);
  • template <class Fn, class... Args> future<typename result_of<Fn(Args...)>::type> async(launch policy, Fn&& fn, Args&&... args);

第二種形式的模板函數,用第一個參數指定了啟動策略, policy 參數可以是:

  • launch::async:完全異步執行。調用者(caller)執行緒與工作執行緒不是同一個。
  • launch::deferred:工作負載函數在第一次需求獲取結果時才被執行。即b:惰性求值(lazy evaluation)。實際上,async()的返回值std::future對象的get()或wait()成員函數被調用時,調用者所在的執行緒被用來執行工作負載函數。
  • 上述兩者的按位或,即launch::async|launch::deferred。這也是默認情形。這給了運行時最大的調度靈活性。但是,async()的返回值std::future對象用while循環調用wait_for將總是返回std::future_status::deferred,永遠不會等於std::future_status::ready,所以while循環永遠不會終止。這種bug在開發或單元測試中很容易被忽略,因為它只會在機器負載很重時才會顯現。在機器過載(over subscription)或執行緒消耗完的狀況下,任務很可能會被推遲(如果使用的是默認啟動策略)。解決辦法是檢查std::async返回的future的wait_for函數,看返回值是否為std::future_status::deferred。

和直接使用std::thread相比,std::async有一些優勢:

  • std::async 返回的future對象,可以方便地等待callable對象執行完成並獲取其返回值
  • 能從實現庫的一些高級功能中獲益,比如執行緒池等,並大大減少異常的產生。
future<T> result = async(policy, func, this_ptr, param1);
result.get();

其中

  • T 為返回參數類型
  • para和構造普通執行緒的參數類似,不同的是,第一個參數可以選擇launch::async、deferred或者不填,不填則默認為launch::async。

注意:std::async()總是返回一個std::future對象。如果滿足下述全部條件,這個future對象析構的時就會發生阻塞:

  • 共享狀態是通過std::async創建
  • 共享狀態還不是ready狀態
  • 被析構的future對象是共享狀態的最後一個引用

可以把future對象向函數調用棧上層傳播等方法,延長這個共享狀態的生命期來避免這個問題。

定義的類型[編輯]

  • enum class future_errc; 該枚舉類型表示std::future對象的各種情形,枚舉值的意義如下:
    • broken_promise:與該std::future共享狀態相關聯的std::promise對象在設置值或者設置異常之前已被銷毀。
    • future_already_retrieved:與該 std::future對象相關聯的共享狀態的值已經被獲取過了,即調用了std::future::get函數。
    • promise_already_satisfied: std::promise對象已經對共享狀態設置了某一值或者異常。
    • no_state:無共享狀態。
  • enum class future_status;主要用在 std::future(或std::shared_future)中的 wait_for 和 wait_until 兩個函數中
    • future_status::ready:wait_for(或wait_until) 因為共享狀態的標誌變為 ready 而返回。
    • future_status::timeout:超時,即 wait_for(或wait_until) 因為在指定的時間段(或時刻)內共享狀態的標誌依然沒有變為 ready 而返回。
    • future_status::deferred:共享狀態包含了 deferred 函數。
  • enum class launch;在調用 std::async 設置異步任務的啟動策略
    • launch::async:異步任務會在另外一個執行緒中調用,並通過共享狀態返回異步任務的結果
    • launch::deferred:異步任務將會在共享狀態被訪問時調用,相當與按需調用

例子程序[編輯]

#include <thread>
#include <future>
#include <cctype>
#include <vector>
#include <algorithm>
#include <iterator>
#include <iostream>
#include <sstream>
 
int main()
{
    std::istringstream iss_numbers{"3 4 1 42 23 -23 93 2 -289 93"};
    std::istringstream iss_letters{" a 23 b,e a2 k k?a;si,ksa c"};
 
    std::vector<int> numbers;
    std::vector<char> letters;
    std::promise<void> numbers_promise, letters_promise;   //std::promise<void>对象初始化后的共享状态没有ready
 
    auto numbers_ready = numbers_promise.get_future();     //std::future对象
    auto letter_ready = letters_promise.get_future();
 
    std::thread value_reader([&]                          //创建一个异步线程并立即开始执行
    {
        // I/O operations.
        std::copy(std::istream_iterator<int>{iss_numbers},
                  std::istream_iterator<int>{},
                  std::back_inserter(numbers));
 
        //Notify for numbers.
        numbers_promise.set_value();                     //std::promise<void>对象numbers_promise的共享状态设置为ready
 
        std::copy_if(std::istreambuf_iterator<char>{iss_letters},
                     std::istreambuf_iterator<char>{},
                     std::back_inserter(letters),
                     ::isalpha);
 
        //Notify for letters.
        letters_promise.set_value();                     //std::promise<void>对象letters_promise的共享状态设置为ready
    });
 
 
    numbers_ready.wait();                               //主线程等待数值IO工作的完成
 
    std::sort(numbers.begin(), numbers.end());
 
    if (letter_ready.wait_for(std::chrono::seconds(1)) ==  //在std::future上超时等待共享状态变为ready
            std::future_status::timeout)
    {
        //output the numbers while letters are being obtained.
        for (int num : numbers) std::cout << num << ' ';
        numbers.clear(); //Numbers were already printed.
    }
 
    letter_ready.wait();                               //主线程等待字母IO工作的完成
    std::sort(letters.begin(), letters.end());
 
    //If numbers were already printed, it does nothing.
    for (int num : numbers) std::cout << num << ' ';
    std::cout << '\n';
 
    for (char let : letters) std::cout << let << ' ';
    std::cout << '\n';
 
    value_reader.join();
}

參考文獻[編輯]

  1. Microsot Visual C++ 2013與GCC 4.9.3都尚未實現std::packaged_task::reset()