跳转到内容

C++/coroutine

维基教科书,自由的教学读本
< C++

协程概念

[编辑]

C++20引入了协程(coroutine)。协程是函数的一种泛化,它允许函数被挂起,然后在稍后被恢复执行。使用了co_awaitco_yieldco_return的函数都是协程。协程的返回类型必须满足特定条件:协程的返回类型Result必须能匹配__coroutine_traits_impl<Result>的模板参数要求。简单说,就是返回值类型Result应当有Result::promise_type子类型。__coroutine_traits_impl>coroutine_traits的空基类模板,只是用于检查返回结果是否含有内部类型promise_type子类型。

协程在“调用”和“返回”操作外,还有3种额外的操作:“挂起”、“恢复”、“摧毁”。[1]

  • 挂起(Suspend)操作是协程执行到co_awaitco_yield关键字处(称为挂起点),挂起自身的执行,把执行转交给协程的(最初的)调用者或者(中途的)恢复者。挂起操作首先准备堆上的协程帧(coroutine frame),包括写入是哪个挂起点以便将来作为恢复执行的地址。如果要把执行转交给协程的调用者或者恢复者,则协程的栈帧被弹出运行栈。
  • 恢复(Resume)操作:当一个函数要恢复一个已经被挂起的协程,它会调用协程帧(coroutine frame)中void resume()方法。resume()将分配协程的栈帧、把调用者的返回地址保存在栈帧中,然后转移到协程的上次挂起点继续执行。当协程再次被挂起或者执行完毕, resume()的调用者将恢复秩序。
  • 摧毁(Destroy)操作将摧毁一个挂起的协程。首先它类似于恢复操作将恢复协程的栈帧、保存摧毁操作调用者的返回地址。然后调用调用协程帧(coroutine frame)中void destroy()方法,调用所有作用域内的局部变量的析构函数,最后释放堆上的协程帧。
  • 调用(Call)操作:从调用者角度,调用一个协程和调用普通函数并无区别。但协程执行到第一个挂起点后,将恢复调用者的执行。协程被调用后,首先在堆上创建协程帧,从栈帧复制/移动参数到协程帧,这些参数的生命期需要跨越第一个挂起点。
  • 返回(Return)操作:与普通函数的返回略有不同。协程执行co_return时,在(可定制的)某处存储返回值,然后析构作用域内的局部变量(不含参数)。然后协程可执行一些可定制逻辑。最后协程挂起操作或者摧毁操作,把执行交给调用者或者恢复者。需要注意的是,传递给返回操作的返回值与从调用操作返回的返回值并不相同,因为返回操作可能在调用者从最初的调用操作恢复后很长时间才执行。

一个C++协程可以先在线程A中执行并挂起(这时线程A继续在协程调用点或者协程的上次恢复点处返回并继续向下执行),然后再被线程B恢复,并在B线程中继续执行协程体。因为协程的本质是“挂起点保存状态,resume时从该状态继续”,并没有绑定到某个线程。谁调用 coroutine_handle.resume(),协程体的代码就在谁的线程里继续执行。这允许在多线程环境下,把协程的恢复操作交给不同的线程,实现协程在多个线程间“迁移”。

协程语法

[编辑]

C++20提供了3个新的运算符关键字:

  • co_await
  • co_yield
  • co_return

新的类型:

  • coroutine_handle

  • coroutine_traits<Ts...>
  • suspend_always

协程原理

[编辑]

promise机制

[编辑]

可以认为协程是一个状态机。协程的promise对象可看作“协程状态控制器”对象,控制了协程的行为,可用于跟踪协程的状态。

协程执行到某一点时,在协程帧中创建协程的promise对象。编译器产生promise对象的若干方法。协程的执行大致为:

{
  co_await promise.initial_suspend();
  try
  {
    <body-statements>
  }
  catch (...)
  {
    promise.unhandled_exception();
  }
FinalSuspend:
  co_await promise.final_suspend();
}

协程的判定标准

[编辑]

C++ 的编译器如何识别协程呢?是通过它的返回值。C++ 协程的返回类型有要求,返回类型(例如result)必须有一个子类型为承诺(promise),呈现为Result::promise_type。承诺(promise)是一个接口,里面实现get_return_object等接口。而通过std::coroutine_handle<promise_type>::from_promise( promise& p )这个静态函数,可以得到协程句柄(coroutine handle)。协程句柄(coroutine handle)是协程的唯一标示,用于恢复协程执行或销毁协程帧。而协程的运行状态 ,协程函数的形参,内部变量,临时变量,挂起暂停在什么点,被保存在协程状态 (coroutine state)中。

调用协程时的初始化

[编辑]

调用协程时初始化要执行:

  1. 使用operator new分配一个协程帧(可重载)。
  2. 将协程函数实参复制到协程帧中, 复制时可以按值传递或按引用传递。如果参数是通过值传递给协程的,那么这些参数将通过调用类型的移动构造函数复制到协程帧中。如果参数是通过引用(无论是左值还是右值)传递给协程的,那么只有引用被复制到协程帧中,而不是它们指向的值。在将参数通过引用传递到协程时,涉及许多陷阱,因为协程的生命周期内,引用不一定始终保持有效。许多通常用于普通函数的技术,如完美转发和通用引用,如果用于协程,可能会导致代码具有未定义行为。
  3. 构造return_type::promise_type对象,其中return_type是协程返回类型。构造promise_type对象时, 若它有一个构造函数形参和协程的形参一致, 则会调用那个构造函数, 并传入所有协程的实参; 否则调用默认构造器。
  4. 调用promise.get_return_object()方法并将返回值保留在局部变量中。当协程首次挂起时,该值为返回给调用者的结果。
  5. 调用promise.initial_suspend()方法并使用co_await等待结果。这给了编程者一个机会在执行协程体之前先挂起执行。initial_suspend()方法要么返回std::suspend_always(如果操作是延迟启动的),要么返回std::suspend_never(如果操作是立即启动的),这两种都是 noexcept 的可等待对象。
  6. 当co_await promise.initial_suspend()表达式恢复执行时(无论是立即还是异步),协程就开始执行协程体, 直到遇到 co_await、co_yield或co_return

co_await运算符

[编辑]

co_await运算符用于挂起协程并等待某个异步操作(即可等待对象)完成。执行co_await expr;时,expr可以是:

  • 一个等待对象(awaiter)的构造表达式。例如:co_await MyAwaiter{...};
  • 返回等待对象的函数。例如:co_await switch_to_new_thread(out);
  • 可以是一个普通对象,只要它能被转成“等待者”。如有 operator co_await,即可被 co_await 自动转换。

进一步,执行co_await awaitable_obj;时,如果使用默认的co_await运算符, 那么自动执行的流程为:

  1. 调用awaitable_obj.await_ready()
    • 如果返回 true 则直接执行协程体
    • 如果返回 false 则表示协程未准备好, 协程将被挂起
      1. 开始执行await_suspend(), 随后控制权交给调用协程处的程序
      2. 如果在挂起后被恢复, 则执行await_resume(),其返回值作为co_await表达式的结果,这个返回值是协程体需要的。
      3. 协程恢复完成, 继续执行co_await之后的协程体。

注意, std::suspend_always和std::suspend_never都是等待对象(awaiter),所以可以写 co_await std::suspend_always; 立即把当前协程挂起。

下例的注释1、2、3、.... 为实际执行顺序:

struct Myawaiter {
    bool await_ready() {                            //5
        std::cerr << "await_ready\n";
        return false;
    }

    void await_suspend(std::coroutine_handle<> h) { //6
        std::cerr << "await_suspend\n";
    }

    int await_resume() {                            //8
        std::cerr << "await_resume\n";
        return 44;
    }
};

struct promise_type;

struct RT {
public:
    using promise_type = ::promise_type;
    using handle_type = std::coroutine_handle<promise_type>;

    handle_type _handle; //句柄成员

    handle_type get_handle() { return _handle; }

    ~RT() {
    }
};

struct promise_type {
    RT get_return_object() {                      //1
        std::cerr << "get_return_object\n";
        return RT{ ._handle = std::coroutine_handle<promise_type>::from_promise(*this) };
    }

    std::suspend_always initial_suspend() {       //2
        std::cerr << "initial_suspend\n";
        return {};
    }

    std::suspend_never final_suspend() noexcept { //11
        std::cerr << "final_suspend\n";
        return {};
    }

    void return_void() {                          //10
        std::cerr << "return_void\n";
    }

    void unhandled_exception() { std::terminate(); }
};

RT cofun() {
    co_await Myawaiter{};  //4
    co_return;             //9
}

int main() {
    RT r = cofun();
    RT::handle_type h = r.get_handle();
    h.resume(); //3
    h.resume(); //7
    return 0;
}

/* 打印结果
get_return_object
initial_suspend
await_ready
await_suspend
await_resume
return_void
final_suspend
*/

co_return运算符

[编辑]

co_return 作用:

  • co_return 可以将一个值返回给协程的调用者。 这个值可以是任何类型,包括基本类型、自定义类型、甚至是 void。
  • co_return 语句会终止当前协程的执行,并调用 promise_type::final_suspend(), 并将控制权返回给调用者。
  • 最后协程结束

当执行到达一个 co_return 语句时(协程体执行完毕时,当作有一个co_return):

  1. 先计算返回值 (如果有的话),其过程为:
    • co_return调用与协程关联的promise_type对象的promise.return_void() 或 promise.return_value(<expr>)成员函数,并将计算得到的返回值传递给它。
    • promise::return_value()函数负责处理返回值,例如将其存储在promise对象的成员变量中,或者进行其他自定义操作。
    • 如果promise对象没有定义return_value()函数,或者co_return语句没有返回值(例如 co_return;)则不会调用return_value()函数。
  2. 以它们被创建的相反顺序销毁所有具有自动存储期限的变量。
  3. 调用 promise.final_suspend()
  4. 协程的控制权返回给调用者。
    • 调用者可以通过 std::coroutine_handle::promise() 函数获取 promise 对象,并访问其中存储的返回值。

下例中,注释中的序号1、2、3、...为执行顺序:

struct Myawaiter {
    bool await_ready() {
        std::cerr << "await_ready\n"; //5
        return false;
    }

    void await_suspend(std::coroutine_handle<> h) { 
        std::cerr << "await_suspend\n";  //6
    }

    int await_resume() {
        std::cerr << "await_resume\n"; //8
        return 44;
    }
};

struct promise_type;

struct RT {
public:

    using promise_type = ::promise_type;
    using handle_type = std::coroutine_handle<promise_type>;

    handle_type _handle; //句柄成员

    handle_type get_handle() { return _handle; }

    RT(std::coroutine_handle<promise_type> h) : _handle(h) {
        std::cerr << "RT 构造函数\n";        // 1.1
    }
    ~RT() {
        std::cerr << "RT 析构函数\n";  // 17
    }

    int get_val();
};

struct promise_type {
    int value{};
    RT get_return_object() { 
        std::cerr << "get_return_object()\n"; // 1
        return RT{ 
            std::coroutine_handle<promise_type>::from_promise(*this)  
        };
    }

    std::suspend_always initial_suspend() {
        std::cerr << "initial_suspend()\n"; // 2
        return {};
    }

    std::suspend_always final_suspend() noexcept { 
        std::cerr << "final_suspend()\n"; // 12
        return {};
    }

    void return_value(int x) {
        std::cerr << "return_value()\n";  // 11

    }

    void unhandled_exception() { std::terminate(); }

    promise_type() { 
        std::cerr << "promise_type 构造函数()\n";  // 0
    }

    ~promise_type() {
        std::cerr << "promise_type 析构函数()\n";  // 16
    }
};


int RT::get_val() { 
    return _handle.promise().value;  // 14
}

RT cofun() {
    std::cerr << "----------------------------------开始co_await...\n"; //4
    co_await Myawaiter{};
    std::cerr << "----------------------------------结束co_await...\n";  //9

    std::cerr << "----------------------------------开始co_return...\n";
    co_return 1;                                                         // 10
    std::cerr << "----------------------------------结束co_return...\n";  // 不会执行
}   


int main() {
    RT r = cofun();
    RT::handle_type h = r.get_handle();
    h.resume(); // 3
    h.resume(); // 7

    std::cerr << "协程设置的值为:  " << r.get_val() << "\n"; // 13
    h.destroy(); // 15
    return 0; // 18
}

/* 输出结果

promise_type 构造函数()
get_return_object()
RT 构造函数
initial_suspend()
----------------------------------开始co_await...
await_ready
await_suspend
await_resume
----------------------------------结束co_await...
----------------------------------开始co_return...
return_value()
final_suspend()
协程设置的值为:  0
promise_type 析构函数()
RT 析构函数

*/

co_yield关键字

[编辑]

co_yield 作用:用于暂停协程的执行并返回值给调用者。每次调用 co_yield 都会产生一个挂起点, 并调用函数 promise_type::yield_value()。co_yield 表达式的值会作为参数传递给 yield_value() 函数。

co_yield 流程:

  1. 当协程执行到 co_yield expression; 语句时,会发生以下事情:
    1. expression 被计算,其结果将作为返回值, 返回给调用者
    2. 协程的执行状态被保存,包括局部变量, 寄存器值等。
    3. 控制权返回给协程的调用者。
  2. 协程的调用者会接收到 co_yield 表达式的结果。
    1. promise_type 中的成员变量来保存结果
    2. 调用者可以通过一些接口来查询该结果.
  3. 当调用者希望继续执行协程时,可以调用 std::coroutine_handle<promise_type>::resume() 函数。
    1. 协程会从之前 co_yield 语句中断的地方恢复执行。

下例的注释中序号1、2、3、... 为执行顺序:

struct GeneratorPromise;
// 定义协程类型
struct Generator {
    using promise_type = GeneratorPromise;
    std::coroutine_handle<GeneratorPromise> handle;

    explicit Generator(std::coroutine_handle<GeneratorPromise> h) : handle(h) {}
    ~Generator() { if (handle) handle.destroy(); }

    // 获取 co_yield 返回的值
    int next_value();
    bool done() const { return handle.done(); }
};

// 定义 promise 类型,用于自定义协程的行为
struct GeneratorPromise {
    int current_value;

    std::suspend_always initial_suspend() { 
        return {}; // 3
    }
    std::suspend_always final_suspend() noexcept { 
        return {}; 
    }
    void return_void() {}
    void unhandled_exception() {}

    // 用于获取协程返回的对象
    Generator get_return_object() { 
        return Generator{ // 2
            std::coroutine_handle<GeneratorPromise>::from_promise(*this) 
        }; 
    }

    // 用于处理 co_yield 表达式
    std::suspend_always yield_value(int value) {
        current_value = value;  // 6
        return {};
    }
};

int Generator::next_value() {
    return handle.promise().current_value; // 7
}
// 定义一个生成从 0 到 max 的整数序列的协程
Generator generate_numbers(int max) {
    for (int i = 0; i <= max; ++i) {
        co_yield i;        // 5
    }
}

int main() {
    // 创建协程
    auto generator = generate_numbers(5); // 1
    // 首次 resume 进入协程体
    generator.handle.resume(); // 4
    while (!generator.handle.done()) { //当协程句柄有效时, 则可以继续
        
        std::cout << generator.next_value() << " ";
        generator.handle.resume(); //继续协程
    }

    std::cout << std::endl;
    return 0;
}

协程的未处理的异常

[编辑]

当执行由于未处理的异常而离开 <body-statements> 时:

  1. 捕获异常:在 catch 块内调用 promise.unhandled_exception()。典型是用std::current_exception()捕获异常。
  2. 调用 promise.final_suspend() 并使用 co_await 等待结果。因此有机会发布结果、发出完成信号或恢复继续执行。它还允许协程在执行运行完成并且协程帧被销毁之前选择性地立即挂起。请注意,对处于 final_suspend 点的协程调用 resume() 是未定义行为。你唯一能对挂起在这里的协程做的就是销毁它。

协程的销毁

[编辑]

一旦执行离开了协程体,那么协程帧就会被销毁。销毁协程帧涉及以下步骤:

  1. 调用promise对象的析构函数。
  2. 调用函数参数副本的析构函数。
  3. 使用操作符delete释放协程帧所使用的内存(可选)。
  4. 将执行转回调用者/恢复者。

其他

[编辑]

当执行首次到达 co_await 表达式内的 <return-to-caller-or-resumer> 点,或者如果协程在没有遇到 <return-to-caller-or-resumer> 点的情况下运行完毕,那么协程要么被挂起要么被销毁,而之前从调用 promise.get_return_object() 返回的返回对象随后被返回给协程的调用者。

头文件<coroutine>

[编辑]

头文件<coroutine>提供了

  • 类和类模板
    • coroutine_traits
    • coroutine_handle
    • std::hash<std::coroutine_handle>
    • noop_coroutine_promise类
    • noop_coroutine_handle类型
    • suspend_never类
    • suspend_always类
  • 函数
    • operator==
    • operator<=>
    • noop_coroutine

应用例子

[编辑]

替换回调函数

[编辑]

协程一大用途是可以优雅地替换传统的回调(callback)编程方式。其优点:

  1. 异常处理机制比回调函数返回值hResult, 更丰富、完善。回调方式的错误处理分散、不直观。
  2. 直接复用了上下文,比lambda回调函数的上下文捕获更高效
  3. 多个协程在一个主线程中异步执行,无需并发控制了。所以一个协程执行时可以处置同线程的其他协程的状态数据。
  4. 如果需要依次地异步获取数据,层层嵌套的回调地狱(callback hell),代码难读难维护。这种情况协程易于组合和复用。

下述例子为一个简单对比协程和传统的回调(callback)方式异步读取数据:

#include <coroutine>
#include <iostream>
#include <thread>
#include <chrono>
#include <string>
#include <functional>


// ----------- 回调方式异步API -----------
void async_read_callback(std::function<void(std::string)> callback) {
    std::thread([callback] {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        callback("data from async read (callback)");
        }).detach();
}

void process_with_callback() {
    std::cout << "[callback] Begin async read..." << std::endl;
    async_read_callback([](std::string data) {
        std::cout << "[callback] Received: " << data << std::endl;
        });
}

// ----------- 协程方式异步API -----------

struct async_read_awaiter {
    bool await_ready() const noexcept { return false; }
    void await_suspend(std::coroutine_handle<> h) {
        std::thread([this, h] {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            result = "data from async read (coroutine)";
            h.resume();
            }).detach();
    }
    std::string await_resume() { return result; }
    std::string result;
};

async_read_awaiter async_read_coro() {
    return {};
}

struct task {
    struct promise_type {
        task get_return_object() { return {}; }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() {}
    };
};

task process_with_coroutine() {
    std::cout << "[coroutine] Begin async read..." << std::endl;
    std::string data = co_await async_read_coro();
    std::cout << "[coroutine] Received: " << data << std::endl;
}

int main() {
    process_with_callback();    // 回调方式演示
    process_with_coroutine();   // 协程方式演示

    // 等待异步线程输出
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return 0;
}

如果需要依次地异步获取数据,层层嵌套的回调地狱(callback hell),代码难读难维护。回调地狱是指:大部分异步编程框架都是基于回调的,当一个业务需要多个步骤时回调函数会分布在不同的执行单元中,这对代码的维护与理解造成了压力。当执行链条非常长时回调链路也会很深,基于事件与回调的编码风格将业务割裂到不同的 handle 函数中,理解与维护起来比较麻烦。

  1. . 可读性变差:逻辑顺序被“倒挂”在多层回调里,写法越来越深,阅读和维护变困难。
  2. . 变量传递复杂,如果想在更深层的回调里用到前面异步结果(比如 data1),必须通过捕获(捕获列表 [data1])传递下去。
  3. . 异常处理困难:各层回调必须单独处理异常,无法用 try/catch 包裹整个异步流程。
  4. . 扩展性差:增加更多异步步骤时,嵌套会越来越深,“金字塔型”结构,极易导致“回调地狱”。

用协程替代回调地狱,逻辑顺序直观,变量作用域清晰,异常处理自然(try/catch 即可)。下例为回调方式的2次异步读写:

#include <iostream>
#include <thread>
#include <functional>
#include <chrono>
#include <string>

// 模拟异步读取数据的方法
void async_read(std::function<void(std::string)> callback) {
    std::thread([callback] {
        std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时操作
        static int count = 1;
        std::string result = "data" + std::to_string(count++); // 生成不同的数据
        callback(result); // 调用回调,通知数据就绪
    }).detach();
}

int main() {
    std::cout << "1. 主程序开始,准备获取第一个数据..." << std::endl;

    // 第一次异步获取
    async_read([](std::string data1){
        std::cout << "2. 第一次异步获取成功,得到: " << data1 << std::endl;
        std::cout << "3. 开始获取第二个数据..." << std::endl;
        
        // 在第一次回调里发起第二次异步获取
        async_read([data1](std::string data2){
            std::cout << "4. 第二次异步获取成功,得到: " << data2 << std::endl;
            std::cout << "5. 现在可以同时使用 data1 和 data2: " 
                      << data1 << ", " << data2 << std::endl;
        });
    });

    // 主线程等待子线程完成输出
    std::this_thread::sleep_for(std::chrono::seconds(3));
    std::cout << "6. 主程序结束。" << std::endl;
    return 0;
}
/*
//如果用协程替代,可以这样写:
task process() {
    auto data1 = co_await async_read();
    auto data2 = co_await async_read();
    // 这里可以顺序访问 data1 和 data2
}*/

std::future<T>

[编辑]

std::future<T> 通常用于异步任务,结合协程(C++20及以后)可以实现异步等待。

#include <future>
#include <iostream>
#include <thread>

std::future<int> async_add(int a, int b) {
    // 在新线程异步计算
    co_return a + b;
}

int main() {
    auto fut = async_add(2, 3);  // fut 是 std::future<int>
    std::cout << fut.get() << std::endl;  // 输出: 5
}

std:generator

[编辑]

std::generator<T> 适用于C++23,代表可以逐步“产出”多个值的协程(类似Python的生成器)。std::generator 实例只能遍历一次,遍历完后不能重头再来获取(除非重新创建新的 generator)。std::generator 的每个实例只能遍历一次。这和 Python 的生成器(generator/iterator)是一样的。

#include <generator>
#include <iostream>

std::generator<int> gen() {
    for (int i = 0; i < 3; ++i)
        co_yield i;
}

int main() {
    auto g = gen();
    //for (auto v : g) {
    //    std::cout << v << std::endl;
    //}
    //// 或者手动用迭代器
    auto it = g.begin();
    while (it != g.end()) {
        std::cout << *it << std::endl;
        ++it;
    }
}

参考文献

[编辑]
  1. Asymmetric Transfer