跳转到内容

C++/STL/Future

维基教科书,自由的教学读本
< C++

futureC++標準庫中的一個头文件,定义了C++11标准中的一些表示线程并发控制时数据共享的类与方法、数据类型,实现了“生产者”(provider)-“消费者”(consumer)并发模型的数据同步。

该头文件主要声明了:

  • Providers 类:std::promise, std::package_task
  • Futures 类:std::future, std::shared_future
  • Providers 函数:std::async()
  • 其他类型:std::future_error, std::future_errc, std::future_status, std::launch.

在并发编程中,常用到一组非阻塞的模型:promise与future。future表示一个可能还没有实际完成的异步任务的结果。实际上,一个std::future对象在内部分配了一个未来会赋的值的存储,并且还提供了一种访问该值的机制,即使用get()成员函数。如果通过get()函数尝试在此关联值可用之前访问该值,则get()函数将阻塞直到该值可用。并发程序中的消费者使用future可以等待(wait)、超时等待(wait_for或wait_until)、获取(共享状态上设置的,许诺提供该值(get方法)等操作。promise是future的源头,promise对象承诺将来会设置该值。任务执行者(并发程序生产者)可以设置结果值(set_value)、标记任务完成或者失败。如果promise在设置结果值之前就被摧毁了,在对应的future上阻塞的线程会得到std::broken_promise异常。这一套模型是很多异步非阻塞架构的基础。

一个std::future是一个异步返回对象( is an asynchronous return object),它从一个共享状态读取结果。一个std::promise是一个一步提供者(asynchronous provider),该对象提供一个结果给一个共享状态。异步提供者在初始化时创建一个共享状态,一个future引用这个共享状态。异步提供者可以是std::promise、std::packaged_task、std::async内部的实现等。

std::promise类模板

[编辑]

std::promise类模板用于数据同步共享模型中的“生产者”(provider)线程提供数据。std::promise提供了一套设施可存储一个值或异常,std::promise的对象创建的std::future对象稍后可以异步获取这个值或异常。类模板promise对象与一个共享状态(通常是std::future)相关联,在这个共享状态上保存一个可能还没有求出(evaluated)的类型为T的值(可以为void)或求出为一个异常。该值可被(可能在另外一个线程中的)future对象读取,因此promise提供了一种线程数据同步的手段。promise对象是异步的数据生产者(provider),它可以在未来某一时刻设置共享状态的值。promise对共享状态可有三种操作:

  • make ready: promise在共享状态存储结果或异常。使状态变为ready,对与这个共享状态关联的future对象上等待(waiting)的所有线程解除阻塞(unblock)。
  • release: promise放弃引用(reference to)共享状态。如果这是共享状态的最后一个引用,这将导致共享状态被摧毁。但这不适用于std::async创建的还未ready的共享状态。
  • abandon: promise存储异常类型为std::future_error并具有错误码std::future_errc::broken_promise,使共享状态ready,然后再releases共享状态。

promise是promise-future通信渠道的推送端( "push" end ):存储一个值到共享状态会同步任何等待这个共享状态的函数(通过std::future::get)成功返回。对同一个共享状态的并发访问可能会彼此冲突:例如,std::shared_future::get的多个调用者必须或者只读或者提供外部同步机制。

  • 构造函数
    • promise(); 缺省构造函数
    • template <class Alloc> promise (allocator_arg_t aa, const Alloc& alloc); 带分配器(可省略)的构造函数
    • promise (const promise&) = delete; 禁止复制构造函数
    • promise (promise&& x) noexcept; 允许移动构造函数
  • 操作符
    • 禁止复制赋值操作符
    • 允许移动赋值操作符
  • 成员函数
    • get_future():该函数返回一个与promise对象的共享状态相关联的future对象。返回的future对象可以访问由 promise对象设置在共享状态上的值或者某个异常对象。在调用该函数之后,promise对象通常会在某个时间点准备好一个值或者一个异常对象,此时promise对象的共享状态为ready。如果不设置值或者异常,promise对象在析构时会自动地设置一个future_error异常对象(broken_promise)来设置其自身的准备状态。
    • set_value():设置promise对象的用于共享的值,此后promise对象的共享状态标志变为ready。该操作是atomic的。若无共享状态或共享状态已设置,则抛出异常std::future_error。
      • void set_value (const T& val);
      • void set_value (T&& val);
      • void promise<R&>::set_value (R& val); // 当promise的模板参数是引用类型(R&)
      • void promise<void>::set_value (void); // 当promise的模板参数是void
    • set_exception():设置promise对象用于共享的异常,此后promise对象的共享状态标志变为ready.
    • set_value_at_thread_exit(): 设置promise对象共享状态的值,但是不将共享状态的标志设置为 ready,当线程退出时该 promise对象会自动设置为ready。注意,调用该函数后,当前线程已经设置了promise共享状态的值,如果在线程结束之前有其他设置或者修改共享状态的值的操作,则会抛出future_error(promise_already_satisfied )。
    • swap():交换promise对象的共享状态。

std::future类模板

[编辑]

std::future用来获取异步任务的结果。std::future 通常由某个 Provider 创建,Provider 在某个线程中设置共享状态的值,与该共享状态相关联的 std::future 对象调用 get(通常在另外一个线程中) 获取共享状态的值。如果共享状态的标志不是ready,则该future对象所在的线程被阻塞(block),直到 Provider 设置了共享状态的值(此时共享状态的标志变为 ready)调用future::get()的线程被解除阻塞,std::future::get 返回异步任务的值或异常(如果发生了异常)。

一个有效(valid)的 std::future 对象通常由以下三种 Provider 创建,并和某个共享状态相关联:

  • std::async()
  • std::promise::get_future()
  • std::packaged_task::get_future()

future的成员:

  • 构造函数
    • future() noexcept;
    • future (const future&) = delete;
    • future (future&& x) noexcept;
  • 运算符
    • 禁止拷贝赋值
    • 允许移动赋值
  • 成员函数
    • share():返回一个 std::shared_future 对象;调用该函数之后,该 std::future 对象本身已经不和任何共享状态相关联,因此该 std::future 的状态不再是 valid。
    • get():当与该 std::future 对象相关联的共享状态标志变为 ready 后,调用该函数将返回保存在共享状态中的值;如果共享状态的标志不为 ready,则调用该函数会阻塞当前的调用者,而此后一旦共享状态的标志变为 ready,get 返回 Provider 所设置的共享状态的值或者异常(如果抛出了异常)。
    • valid():检查当前的 std::future 对象是否有效,即是否与某个共享状态相关联。一个有效的 std::future 对象只能通过 std::async(), std::future::get_future()或者 std::packaged_task::get_future()的返回值移动构造或移动赋值。由 std::future 默认构造函数创建的 std::future 对象是无效的(invalid)。
    • wait():如果共享状态的标志不是 ready,调用该成员函数会被阻塞当前线程,直到共享状态的标志变为 ready,当前线程被解除阻塞,但是 wait() 并不读取共享状态的值或者异常。
    • wait_for():设置一个时间段,如果共享状态的标志在该时间段结束之前没有被 Provider 设置为 ready,则调用 wait_for 的线程被阻塞,然后在等待了该时间段的时间长度后该函数返回。返回值为future_status::ready、future_status::timeout、future_status::deferred(共享状态包含一个 deferred 函数)。
    • wait_until():设置一个系统绝对时间点 abs_time,如果共享状态的标志在该时间点到来之前没有被 Provider 设置为 ready,则调用 wait_until 的线程被阻塞,在 abs_time 这一时刻到来之后 wait_for() 返回。返回值为future_status::ready、future_status::timeout、future_status::deferred。

std::shared_future类模板

[编辑]

std::shared_future 与 std::future 类似,但是 std::shared_future 可以拷贝、多个 std::shared_future 可以共享某个共享状态的值或者异常。

shared_future 可以通过某个 std::future 对象隐式转换,或者通过 std::future::share() 显示转换,无论哪种转换,被转换的那个 std::future 对象都会变为 invalid(因为std::future仅是moveable).

  • 构造函数
    • shared_future() noexcept;
    • shared_future (const shared_future& x);
    • shared_future (shared_future&& x) noexcept;
    • shared_future (future<T>&& x) noexcept;
  • 运算符
    • 支持move赋值
    • 支持拷贝赋值
  • 成员函数
    • get():
    • valid():
    • wait():
    • wait_for():
    • wait_until():

下述例子用于启动2个线程,在确定线程启动就绪后,同时通知多个线程,类似于std::condition_variable::notify_all()。

#include <iostream>
#include <future>
#include <chrono>
 
int main()
{   
    std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;
    std::shared_future<void> ready_future(ready_promise.get_future());
 
    std::chrono::time_point<std::chrono::high_resolution_clock> start;
 
    auto fun1 = [&, ready_future]() -> std::chrono::duration<double, std::milli> 
    {
        t1_ready_promise.set_value();
        ready_future.wait(); // waits for the signal from main()
        return std::chrono::high_resolution_clock::now() - start;
    };
 
 
    auto fun2 = [&, ready_future]() -> std::chrono::duration<double, std::milli> 
    {
        t2_ready_promise.set_value();
        ready_future.wait(); // waits for the signal from main()
        return std::chrono::high_resolution_clock::now() - start;
    };
 
    auto fut1 = t1_ready_promise.get_future();
    auto fut2 = t2_ready_promise.get_future();
 
    auto result1 = std::async(std::launch::async, fun1);
    auto result2 = std::async(std::launch::async, fun2);
 
    // wait for the threads to become ready
    fut1.wait();
    fut2.wait();
 
    // the threads are ready, start the clock
    start = std::chrono::high_resolution_clock::now();
 
    // signal the threads to go
    ready_promise.set_value();
 
    std::cout << "Thread 1 received the signal "
              << result1.get().count() << " ms after start\n"
              << "Thread 2 received the signal "
              << result2.get().count() << " ms after start\n";
}

std::packaged_task类模板

[编辑]

std::packaged_task是对promise/future的简化。可以更方便的写出异步执行的代码。

std::packaged_task实例对象为一个可执行对象、是数据的异步提供者(provider)、被包装的任务执行结束时std::packaged_task会设置共享状态的值。

std::packaged_task可用于把一些计算任务包装好,压入一个任务栈。一批线程从任务栈上取下一项并执行它。

std::packaged_task类模板包装一个可调用的对象(诸如b:函数指针成员函数指针b:lambda表达式b:bind表达式或者b:函数对象),该对象通常在另外一个线程中被自动执行,通过与 std::packaged_task相关联的std::future对象异步获取该可调用对象执行产生的结果。std::future对象是一个数据的异步返回对象,通过它可以获得共享状态的值,如果等待共享状态标志不为ready则调用std::future::get()的线程被挂起。std::packaged_task 的共享状态的生命周期一直持续到最后一个与之相关联的对象被释放或者销毁为止。

  • 构造函数
    • packaged_task() noexcept;默认构造函数,初始化一个空的共享状态,并且该 packaged_task 对象无包装任务。
    • template <class Fn> explicit packaged_task (Fn&& fn);初始化一个共享状态,并且被包装任务由参数 fn 指定。
    • template <class Fn, class Alloc> explicit packaged_task (allocator_arg_t aa, const Alloc& alloc, Fn&& fn);带自定义内存分配器的构造函数,与默认构造函数类似,但是使用自定义分配器来分配共享状态。
    • packaged_task (const packaged_task&) = delete;拷贝构造函数,被禁用。
    • packaged_task (packaged_task&& x) noexcept;移动构造函数。
  • 运算符
    • 禁用了普通的赋值操作运算
    • 允许 move 赋值运算
    • operator()(Args... args):调用该 packaged_task 对象所包装的对象(通常为函数指针,函数对象,lambda 表达式等),传入的参数为 args. 调用该函数一般会发生两种情况: 如果成功调用 packaged_task 所包装的对象,则返回值(如果被包装的对象有返回值的话)被保存在 packaged_task 的共享状态中。 如果调用 packaged_task 所包装的对象失败,并且抛出了异常,则异常也会被保存在 packaged_task 的共享状态中。以上两种情况都使共享状态的标志变为 ready,因此其他等待该共享状态的线程可以获取共享状态的值或者异常并继续执行下去。
  • 成员函数
    • valid():检查当前packaged_task是否和一个有效的共享状态相关联,对于由默认构造函数生成的 packaged_task 对象,该函数返回 false;除非构造后已经进行了 move 赋值操作或者 swap 操作。
    • get_future():返回一个与 packaged_task 对象共享状态相关的 future 对象。返回的 future 对象可以获得别的线程在该 packaged_task 对象的共享状态上设置的某个值或者异常。
    • make_ready_at_thread_exit():该函数会调用被包装的任务,并向任务传递参数,类似 std::packaged_task::operator() 成员函数。但不同的是,make_ready_at_thread_exit 并不会立即设置共享状态的标志为 ready,而是在线程退出时设置共享状态的标志。注意,该函数已经设置了 promise 共享状态的值,如果在线程结束之前有其他设置或者修改共享状态的值的操作,则会抛出 future_error( promise_already_satisfied )。
    • reset():重置 packaged_task 的共享状态,但是保留之前的被包装的任务。[1]
    • swap():交换 packaged_task 的共享状态。
#include <iostream> 
#include <future>
#include <functional>  

int Test_Fun(int a, int b, int& c)
{
	//突出效果,休眠5s
	std::this_thread::sleep_for(std::chrono::seconds(5));

	//c=233
	c = a + b + 230;

	return c;
}

int main()
{
	//声明一个std::packaged_task对象pt1,包装函数Test_Fun
	std::packaged_task<int(int, int, int&)> pt1(Test_Fun);
	//声明一个std::future对象fu1,包装Test_Fun的返回结果类型,并与pt1关联
	std::future<int> fu1 = pt1.get_future();

	//声明一个变量
	int c = 0;

	//创建一个线程t1,将pt1及对应的参数放到线程里面执行
	std::thread t1(std::move(pt1), 1, 2, std::ref(c));

	//阻塞至线程t1结束(函数Test_Fun返回结果)
	int iResult = fu1.get();

	std::cout << "执行结果:" << iResult << std::endl;	//执行结果:233
	std::cout << "c:" << c << std::endl;	//c:233
	t1.join();
	return 1;
}

std::future_error类

[编辑]
class future_error : public logic_error;

std::async()函数模板

[编辑]

std::async()把一个工作负载(workload)函数作为一项任务(task)来调度执行。与std::thread()不同,std::async()由runtime来负责启动与调度。

  • template <class Fn, class... Args> future<typename result_of<Fn(Args...)>::type> async(Fn&& fn, Args&&... args);
  • template <class Fn, class... Args> future<typename result_of<Fn(Args...)>::type> async(launch policy, Fn&& fn, Args&&... args);

第二种形式的模板函数,用第一个参数指定了启动策略, policy 参数可以是:

  • launch::async:完全异步执行。调用者(caller)线程与工作线程不是同一个。
  • launch::deferred:工作负载函数在第一次非超时获取结果时才被执行。即惰性求值(lazy evaluation)。实际上,async()的返回值std::future对象的get()或wait()成员函数被调用时,调用者所在的线程被用来执行工作负载函数。
  • 上述两者的按位或,即launch::async|launch::deferred。这也是默认情形。这给了运行时最大的调度灵活性。但是,async()的返回值std::future对象用while循环调用wait_for将总是返回std::future_status::deferred,永远不会等于std::future_status::ready,所以while循环永远不会终止。这种bug在开发或单元测试中很容易被忽略,因为它只会在机器负载很重时才会显现。在机器过载(over subscription)或线程消耗完的状况下,任务很可能会被推迟(如果使用的是默认启动策略)。解决办法是检查std::async返回的future的wait_for函数,看返回值是否为std::future_status::deferred。

和直接使用std::thread相比,std::async有一些优势:

  • std::async 返回的future对象,可以方便地等待callable对象执行完成并获取其返回值。这意味着std::async必须会合(join)。
  • 能从实现库的一些高级功能中获益,比如线程池等,并大大减少异常的产生。
future<T> result = async(policy, func, this_ptr, param1);
result.get();

其中

  • T 为返回参数类型
  • para和构造普通线程的参数类似,不同的是,第一个参数可以选择launch::async、deferred或者不填,不填则默认为launch::async。

注意:std::async()总是返回一个std::future对象。如果满足下述全部条件,这个future对象析构的时就会发生阻塞:

  • 共享状态是通过std::async创建
  • 共享状态还不是ready状态
  • 被析构的future对象是共享状态的最后一个引用

可以把future对象向函数调用栈上层传播等方法,延长这个共享状态的生命期来避免这个问题。

定义的类型

[编辑]
  • enum class future_errc; 该枚举类型表示std::future对象的各种情形,枚举值的意义如下:
    • broken_promise:与该std::future共享状态相关联的std::promise对象在设置值或者设置异常之前已被销毁。
    • future_already_retrieved:与该 std::future对象相关联的共享状态的值已经被获取过了,即调用了std::future::get函数。
    • promise_already_satisfied: std::promise对象已经对共享状态设置了某一值或者异常。
    • no_state:无共享状态。
  • enum class future_status;主要用在 std::future(或std::shared_future)中的 wait_for 和 wait_until 两个函数中
    • future_status::ready:wait_for(或wait_until) 因为共享状态的标志变为 ready 而返回。
    • future_status::timeout:超时,即 wait_for(或wait_until) 因为在指定的时间段(或时刻)内共享状态的标志依然没有变为 ready 而返回。
    • future_status::deferred:共享状态包含了 deferred 函数。
  • enum class launch;在调用 std::async 设置异步任务的启动策略
    • launch::async:异步任务会在另外一个线程中调用,并通过共享状态返回异步任务的结果
    • launch::deferred:异步任务将会在共享状态被访问时调用,相当与按需调用

例子程序

[编辑]
#include <thread>
#include <future>
#include <cctype>
#include <vector>
#include <algorithm>
#include <iterator>
#include <iostream>
#include <sstream>
 
int main()
{
    std::istringstream iss_numbers{"3 4 1 42 23 -23 93 2 -289 93"};
    std::istringstream iss_letters{" a 23 b,e a2 k k?a;si,ksa c"};
 
    std::vector<int> numbers;
    std::vector<char> letters;
    std::promise<void> numbers_promise, letters_promise;   //std::promise<void>对象初始化后的共享状态没有ready
 
    auto numbers_ready = numbers_promise.get_future();     //std::future对象
    auto letter_ready = letters_promise.get_future();
 
    std::thread value_reader([&]                          //创建一个异步线程并立即开始执行
    {
        // I/O operations.
        std::copy(std::istream_iterator<int>{iss_numbers},
                  std::istream_iterator<int>{},
                  std::back_inserter(numbers));
 
        //Notify for numbers.
        numbers_promise.set_value();                     //std::promise<void>对象numbers_promise的共享状态设置为ready
 
        std::copy_if(std::istreambuf_iterator<char>{iss_letters},
                     std::istreambuf_iterator<char>{},
                     std::back_inserter(letters),
                     ::isalpha);
 
        //Notify for letters.
        letters_promise.set_value();                     //std::promise<void>对象letters_promise的共享状态设置为ready
    });
 
 
    numbers_ready.wait();                               //主线程等待数值IO工作的完成
 
    std::sort(numbers.begin(), numbers.end());
 
    if (letter_ready.wait_for(std::chrono::seconds(1)) ==  //在std::future上超时等待共享状态变为ready
            std::future_status::timeout)
    {
        //output the numbers while letters are being obtained.
        for (int num : numbers) std::cout << num << ' ';
        numbers.clear(); //Numbers were already printed.
    }
 
    letter_ready.wait();                               //主线程等待字母IO工作的完成
    std::sort(letters.begin(), letters.end());
 
    //If numbers were already printed, it does nothing.
    for (int num : numbers) std::cout << num << ' ';
    std::cout << '\n';
 
    for (char let : letters) std::cout << let << ' ';
    std::cout << '\n';
 
    value_reader.join();
}

参考文献

[编辑]
  1. Microsot Visual C++ 2013与GCC 4.9.3都尚未实现std::packaged_task::reset()