跳至內容

C++/coroutine

維基教科書,自由的教學讀本
< C++

協程概念

[編輯]

C++20引入了協程(coroutine)。協程是函數的一種泛化,它允許函數被掛起,然後在稍後被恢復執行。使用了co_awaitco_yieldco_return的函數都是協程。協程的返回類型必須滿足特定條件:協程的返回類型Result必須能匹配__coroutine_traits_impl<Result>的模板參數要求。簡單說,就是返回值類型Result應當有Result::promise_type子類型。__coroutine_traits_impl>coroutine_traits的空基類模板,只是用於檢查返回結果是否含有內部類型promise_type子類型。

協程在「調用」和「返回」操作外,還有3種額外的操作:「掛起」、「恢復」、「摧毀」。[1]

  • 掛起(Suspend)操作是協程執行到co_awaitco_yield關鍵字處(稱為掛起點),掛起自身的執行,把執行轉交給協程的(最初的)調用者或者(中途的)恢復者。掛起操作首先準備堆上的協程幀(coroutine frame),包括寫入是哪個掛起點以便將來作為恢復執行的地址。如果要把執行轉交給協程的調用者或者恢復者,則協程的棧幀被彈出運行棧。
  • 恢復(Resume)操作:當一個函數要恢復一個已經被掛起的協程,它會調用協程幀(coroutine frame)中void resume()方法。resume()將分配協程的棧幀、把調用者的返回地址保存在棧幀中,然後轉移到協程的上次掛起點繼續執行。當協程再次被掛起或者執行完畢, resume()的調用者將恢復秩序。
  • 摧毀(Destroy)操作將摧毀一個掛起的協程。首先它類似於恢復操作將恢復協程的棧幀、保存摧毀操作調用者的返回地址。然後調用調用協程幀(coroutine frame)中void destroy()方法,調用所有作用域內的局部變量的析構函數,最後釋放堆上的協程幀。
  • 調用(Call)操作:從調用者角度,調用一個協程和調用普通函數並無區別。但協程執行到第一個掛起點後,將恢復調用者的執行。協程被調用後,首先在堆上創建協程幀,從棧幀複製/移動參數到協程幀,這些參數的生命期需要跨越第一個掛起點。
  • 返回(Return)操作:與普通函數的返回略有不同。協程執行co_return時,在(可定製的)某處存儲返回值,然後析構作用域內的局部變量(不含參數)。然後協程可執行一些可定製邏輯。最後協程掛起操作或者摧毀操作,把執行交給調用者或者恢復者。需要注意的是,傳遞給返回操作的返回值與從調用操作返回的返回值並不相同,因為返回操作可能在調用者從最初的調用操作恢復後很長時間才執行。

一個C++協程可以先在線程A中執行並掛起(這時線程A繼續在協程調用點或者協程的上次恢復點處返回並繼續向下執行),然後再被線程B恢復,並在B線程中繼續執行協程體。因為協程的本質是「掛起點保存狀態,resume時從該狀態繼續」,並沒有綁定到某個線程。誰調用 coroutine_handle.resume(),協程體的代碼就在誰的線程里繼續執行。這允許在多線程環境下,把協程的恢復操作交給不同的線程,實現協程在多個線程間「遷移」。

協程語法

[編輯]

C++20提供了3個新的運算符關鍵字:

  • co_await
  • co_yield
  • co_return

新的類型:

  • coroutine_handle

  • coroutine_traits<Ts...>
  • suspend_always

協程原理

[編輯]

promise機制

[編輯]

可以認為協程是一個狀態機。協程的promise對象可看作「協程狀態控制器」對象,控制了協程的行為,可用於跟蹤協程的狀態。

協程執行到某一點時,在協程幀中創建協程的promise對象。編譯器產生promise對象的若干方法。協程的執行大致為:

{
  co_await promise.initial_suspend();
  try
  {
    <body-statements>
  }
  catch (...)
  {
    promise.unhandled_exception();
  }
FinalSuspend:
  co_await promise.final_suspend();
}

協程的判定標準

[編輯]

C++ 的編譯器如何識別協程呢?是通過它的返回值。C++ 協程的返回類型有要求,返回類型(例如result)必須有一個子類型為承諾(promise),呈現為Result::promise_type。承諾(promise)是一個接口,裏面實現get_return_object等接口。而通過std::coroutine_handle<promise_type>::from_promise( promise& p )這個靜態函數,可以得到協程句柄(coroutine handle)。協程句柄(coroutine handle)是協程的唯一標示,用於恢復協程執行或銷毀協程幀。而協程的運行狀態 ,協程函數的形參,內部變量,臨時變量,掛起暫停在什麼點,被保存在協程狀態 (coroutine state)中。

調用協程時的初始化

[編輯]

調用協程時初始化要執行:

  1. 使用operator new分配一個協程幀(可重載)。
  2. 將協程函數實參複製到協程幀中, 複製時可以按值傳遞或按引用傳遞。如果參數是通過值傳遞給協程的,那麼這些參數將通過調用類型的移動構造函數複製到協程幀中。如果參數是通過引用(無論是左值還是右值)傳遞給協程的,那麼只有引用被複製到協程幀中,而不是它們指向的值。在將參數通過引用傳遞到協程時,涉及許多陷阱,因為協程的生命周期內,引用不一定始終保持有效。許多通常用於普通函數的技術,如完美轉發和通用引用,如果用於協程,可能會導致代碼具有未定義行為。
  3. 構造return_type::promise_type對象,其中return_type是協程返回類型。構造promise_type對象時, 若它有一個構造函數形參和協程的形參一致, 則會調用那個構造函數, 並傳入所有協程的實參; 否則調用默認構造器。
  4. 調用promise.get_return_object()方法並將返回值保留在局部變量中。當協程首次掛起時,該值為返回給調用者的結果。
  5. 調用promise.initial_suspend()方法並使用co_await等待結果。這給了編程者一個機會在執行協程體之前先掛起執行。initial_suspend()方法要麼返回std::suspend_always(如果操作是延遲啟動的),要麼返回std::suspend_never(如果操作是立即啟動的),這兩種都是 noexcept 的可等待對象。
  6. 當co_await promise.initial_suspend()表達式恢復執行時(無論是立即還是異步),協程就開始執行協程體, 直到遇到 co_await、co_yield或co_return

co_await運算符

[編輯]

co_await運算符用於掛起協程並等待某個異步操作(即可等待對象)完成。執行co_await expr;時,expr可以是:

  • 一個等待對象(awaiter)的構造表達式。例如:co_await MyAwaiter{...};
  • 返回等待對象的函數。例如:co_await switch_to_new_thread(out);
  • 可以是一個普通對象,只要它能被轉成「等待者」。如有 operator co_await,即可被 co_await 自動轉換。

進一步,執行co_await awaitable_obj;時,如果使用默認的co_await運算符, 那麼自動執行的流程為:

  1. 調用awaitable_obj.await_ready()
    • 如果返回 true 則直接執行協程體
    • 如果返回 false 則表示協程未準備好, 協程將被掛起
      1. 開始執行await_suspend(), 隨後控制權交給調用協程處的程序
      2. 如果在掛起後被恢復, 則執行await_resume(),其返回值作為co_await表達式的結果,這個返回值是協程體需要的。
      3. 協程恢復完成, 繼續執行co_await之後的協程體。

注意, std::suspend_always和std::suspend_never都是等待對象(awaiter),所以可以寫 co_await std::suspend_always; 立即把當前協程掛起。

下例的註釋1、2、3、.... 為實際執行順序:

struct Myawaiter {
    bool await_ready() {                            //5
        std::cerr << "await_ready\n";
        return false;
    }

    void await_suspend(std::coroutine_handle<> h) { //6
        std::cerr << "await_suspend\n";
    }

    int await_resume() {                            //8
        std::cerr << "await_resume\n";
        return 44;
    }
};

struct promise_type;

struct RT {
public:
    using promise_type = ::promise_type;
    using handle_type = std::coroutine_handle<promise_type>;

    handle_type _handle; //句柄成员

    handle_type get_handle() { return _handle; }

    ~RT() {
    }
};

struct promise_type {
    RT get_return_object() {                      //1
        std::cerr << "get_return_object\n";
        return RT{ ._handle = std::coroutine_handle<promise_type>::from_promise(*this) };
    }

    std::suspend_always initial_suspend() {       //2
        std::cerr << "initial_suspend\n";
        return {};
    }

    std::suspend_never final_suspend() noexcept { //11
        std::cerr << "final_suspend\n";
        return {};
    }

    void return_void() {                          //10
        std::cerr << "return_void\n";
    }

    void unhandled_exception() { std::terminate(); }
};

RT cofun() {
    co_await Myawaiter{};  //4
    co_return;             //9
}

int main() {
    RT r = cofun();
    RT::handle_type h = r.get_handle();
    h.resume(); //3
    h.resume(); //7
    return 0;
}

/* 打印结果
get_return_object
initial_suspend
await_ready
await_suspend
await_resume
return_void
final_suspend
*/

co_return運算符

[編輯]

co_return 作用:

  • co_return 可以將一個值返回給協程的調用者。 這個值可以是任何類型,包括基本類型、自定義類型、甚至是 void。
  • co_return 語句會終止當前協程的執行,並調用 promise_type::final_suspend(), 並將控制權返回給調用者。
  • 最後協程結束

當執行到達一個 co_return 語句時(協程體執行完畢時,當作有一個co_return):

  1. 先計算返回值 (如果有的話),其過程為:
    • co_return調用與協程關聯的promise_type對象的promise.return_void() 或 promise.return_value(<expr>)成員函數,並將計算得到的返回值傳遞給它。
    • promise::return_value()函數負責處理返回值,例如將其存儲在promise對象的成員變量中,或者進行其他自定義操作。
    • 如果promise對象沒有定義return_value()函數,或者co_return語句沒有返回值(例如 co_return;)則不會調用return_value()函數。
  2. 以它們被創建的相反順序銷毀所有具有自動存儲期限的變量。
  3. 調用 promise.final_suspend()
  4. 協程的控制權返回給調用者。
    • 調用者可以通過 std::coroutine_handle::promise() 函數獲取 promise 對象,並訪問其中存儲的返回值。

下例中,註釋中的序號1、2、3、...為執行順序:

struct Myawaiter {
    bool await_ready() {
        std::cerr << "await_ready\n"; //5
        return false;
    }

    void await_suspend(std::coroutine_handle<> h) { 
        std::cerr << "await_suspend\n";  //6
    }

    int await_resume() {
        std::cerr << "await_resume\n"; //8
        return 44;
    }
};

struct promise_type;

struct RT {
public:

    using promise_type = ::promise_type;
    using handle_type = std::coroutine_handle<promise_type>;

    handle_type _handle; //句柄成员

    handle_type get_handle() { return _handle; }

    RT(std::coroutine_handle<promise_type> h) : _handle(h) {
        std::cerr << "RT 构造函数\n";        // 1.1
    }
    ~RT() {
        std::cerr << "RT 析构函数\n";  // 17
    }

    int get_val();
};

struct promise_type {
    int value{};
    RT get_return_object() { 
        std::cerr << "get_return_object()\n"; // 1
        return RT{ 
            std::coroutine_handle<promise_type>::from_promise(*this)  
        };
    }

    std::suspend_always initial_suspend() {
        std::cerr << "initial_suspend()\n"; // 2
        return {};
    }

    std::suspend_always final_suspend() noexcept { 
        std::cerr << "final_suspend()\n"; // 12
        return {};
    }

    void return_value(int x) {
        std::cerr << "return_value()\n";  // 11

    }

    void unhandled_exception() { std::terminate(); }

    promise_type() { 
        std::cerr << "promise_type 构造函数()\n";  // 0
    }

    ~promise_type() {
        std::cerr << "promise_type 析构函数()\n";  // 16
    }
};


int RT::get_val() { 
    return _handle.promise().value;  // 14
}

RT cofun() {
    std::cerr << "----------------------------------开始co_await...\n"; //4
    co_await Myawaiter{};
    std::cerr << "----------------------------------结束co_await...\n";  //9

    std::cerr << "----------------------------------开始co_return...\n";
    co_return 1;                                                         // 10
    std::cerr << "----------------------------------结束co_return...\n";  // 不会执行
}   


int main() {
    RT r = cofun();
    RT::handle_type h = r.get_handle();
    h.resume(); // 3
    h.resume(); // 7

    std::cerr << "协程设置的值为:  " << r.get_val() << "\n"; // 13
    h.destroy(); // 15
    return 0; // 18
}

/* 输出结果

promise_type 构造函数()
get_return_object()
RT 构造函数
initial_suspend()
----------------------------------开始co_await...
await_ready
await_suspend
await_resume
----------------------------------结束co_await...
----------------------------------开始co_return...
return_value()
final_suspend()
协程设置的值为:  0
promise_type 析构函数()
RT 析构函数

*/

co_yield關鍵字

[編輯]

co_yield 作用:用於暫停協程的執行並返回值給調用者。每次調用 co_yield 都會產生一個掛起點, 並調用函數 promise_type::yield_value()。co_yield 表達式的值會作為參數傳遞給 yield_value() 函數。

co_yield 流程:

  1. 當協程執行到 co_yield expression; 語句時,會發生以下事情:
    1. expression 被計算,其結果將作為返回值, 返回給調用者
    2. 協程的執行狀態被保存,包括局部變量, 寄存器值等。
    3. 控制權返回給協程的調用者。
  2. 協程的調用者會接收到 co_yield 表達式的結果。
    1. promise_type 中的成員變量來保存結果
    2. 調用者可以通過一些接口來查詢該結果.
  3. 當調用者希望繼續執行協程時,可以調用 std::coroutine_handle<promise_type>::resume() 函數。
    1. 協程會從之前 co_yield 語句中斷的地方恢復執行。

下例的註釋中序號1、2、3、... 為執行順序:

struct GeneratorPromise;
// 定义协程类型
struct Generator {
    using promise_type = GeneratorPromise;
    std::coroutine_handle<GeneratorPromise> handle;

    explicit Generator(std::coroutine_handle<GeneratorPromise> h) : handle(h) {}
    ~Generator() { if (handle) handle.destroy(); }

    // 获取 co_yield 返回的值
    int next_value();
    bool done() const { return handle.done(); }
};

// 定义 promise 类型,用于自定义协程的行为
struct GeneratorPromise {
    int current_value;

    std::suspend_always initial_suspend() { 
        return {}; // 3
    }
    std::suspend_always final_suspend() noexcept { 
        return {}; 
    }
    void return_void() {}
    void unhandled_exception() {}

    // 用于获取协程返回的对象
    Generator get_return_object() { 
        return Generator{ // 2
            std::coroutine_handle<GeneratorPromise>::from_promise(*this) 
        }; 
    }

    // 用于处理 co_yield 表达式
    std::suspend_always yield_value(int value) {
        current_value = value;  // 6
        return {};
    }
};

int Generator::next_value() {
    return handle.promise().current_value; // 7
}
// 定义一个生成从 0 到 max 的整数序列的协程
Generator generate_numbers(int max) {
    for (int i = 0; i <= max; ++i) {
        co_yield i;        // 5
    }
}

int main() {
    // 创建协程
    auto generator = generate_numbers(5); // 1
    // 首次 resume 进入协程体
    generator.handle.resume(); // 4
    while (!generator.handle.done()) { //当协程句柄有效时, 则可以继续
        
        std::cout << generator.next_value() << " ";
        generator.handle.resume(); //继续协程
    }

    std::cout << std::endl;
    return 0;
}

協程的未處理的異常

[編輯]

當執行由於未處理的異常而離開 <body-statements> 時:

  1. 捕獲異常:在 catch 塊內調用 promise.unhandled_exception()。典型是用std::current_exception()捕獲異常。
  2. 調用 promise.final_suspend() 並使用 co_await 等待結果。因此有機會發布結果、發出完成信號或恢復繼續執行。它還允許協程在執行運行完成並且協程幀被銷毀之前選擇性地立即掛起。請注意,對處於 final_suspend 點的協程調用 resume() 是未定義行為。你唯一能對掛起在這裏的協程做的就是銷毀它。

協程的銷毀

[編輯]

一旦執行離開了協程體,那麼協程幀就會被銷毀。銷毀協程幀涉及以下步驟:

  1. 調用promise對象的析構函數。
  2. 調用函數參數副本的析構函數。
  3. 使用操作符delete釋放協程幀所使用的內存(可選)。
  4. 將執行轉回調用者/恢復者。

其他

[編輯]

當執行首次到達 co_await 表達式內的 <return-to-caller-or-resumer> 點,或者如果協程在沒有遇到 <return-to-caller-or-resumer> 點的情況下運行完畢,那麼協程要麼被掛起要麼被銷毀,而之前從調用 promise.get_return_object() 返回的返回對象隨後被返回給協程的調用者。

頭文件<coroutine>

[編輯]

頭文件<coroutine>提供了

  • 類和類模板
    • coroutine_traits
    • coroutine_handle
    • std::hash<std::coroutine_handle>
    • noop_coroutine_promise類
    • noop_coroutine_handle類型
    • suspend_never類
    • suspend_always類
  • 函數
    • operator==
    • operator<=>
    • noop_coroutine

應用例子

[編輯]

替換回調函數

[編輯]

協程一大用途是可以優雅地替換傳統的回調(callback)編程方式。其優點:

  1. 異常處理機制比回調函數返回值hResult, 更豐富、完善。回調方式的錯誤處理分散、不直觀。
  2. 直接復用了上下文,比lambda回調函數的上下文捕獲更高效
  3. 多個協程在一個主線程中異步執行,無需並發控制了。所以一個協程執行時可以處置同線程的其他協程的狀態數據。
  4. 如果需要依次地異步獲取數據,層層嵌套的回調地獄(callback hell),代碼難讀難維護。這種情況協程易於組合和復用。

下述例子為一個簡單對比協程和傳統的回調(callback)方式異步讀取數據:

#include <coroutine>
#include <iostream>
#include <thread>
#include <chrono>
#include <string>
#include <functional>


// ----------- 回调方式异步API -----------
void async_read_callback(std::function<void(std::string)> callback) {
    std::thread([callback] {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        callback("data from async read (callback)");
        }).detach();
}

void process_with_callback() {
    std::cout << "[callback] Begin async read..." << std::endl;
    async_read_callback([](std::string data) {
        std::cout << "[callback] Received: " << data << std::endl;
        });
}

// ----------- 协程方式异步API -----------

struct async_read_awaiter {
    bool await_ready() const noexcept { return false; }
    void await_suspend(std::coroutine_handle<> h) {
        std::thread([this, h] {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            result = "data from async read (coroutine)";
            h.resume();
            }).detach();
    }
    std::string await_resume() { return result; }
    std::string result;
};

async_read_awaiter async_read_coro() {
    return {};
}

struct task {
    struct promise_type {
        task get_return_object() { return {}; }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() {}
    };
};

task process_with_coroutine() {
    std::cout << "[coroutine] Begin async read..." << std::endl;
    std::string data = co_await async_read_coro();
    std::cout << "[coroutine] Received: " << data << std::endl;
}

int main() {
    process_with_callback();    // 回调方式演示
    process_with_coroutine();   // 协程方式演示

    // 等待异步线程输出
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return 0;
}

如果需要依次地異步獲取數據,層層嵌套的回調地獄(callback hell),代碼難讀難維護。回調地獄是指:大部分異步編程框架都是基於回調的,當一個業務需要多個步驟時回調函數會分布在不同的執行單元中,這對代碼的維護與理解造成了壓力。當執行鏈條非常長時回調鏈路也會很深,基於事件與回調的編碼風格將業務割裂到不同的 handle 函數中,理解與維護起來比較麻煩。

  1. . 可讀性變差:邏輯順序被「倒掛」在多層回調里,寫法越來越深,閱讀和維護變困難。
  2. . 變量傳遞複雜,如果想在更深層的回調里用到前面異步結果(比如 data1),必須通過捕獲(捕獲列表 [data1])傳遞下去。
  3. . 異常處理困難:各層回調必須單獨處理異常,無法用 try/catch 包裹整個異步流程。
  4. . 擴展性差:增加更多異步步驟時,嵌套會越來越深,「金字塔型」結構,極易導致「回調地獄」。

用協程替代回調地獄,邏輯順序直觀,變量作用域清晰,異常處理自然(try/catch 即可)。下例為回調方式的2次異步讀寫:

#include <iostream>
#include <thread>
#include <functional>
#include <chrono>
#include <string>

// 模拟异步读取数据的方法
void async_read(std::function<void(std::string)> callback) {
    std::thread([callback] {
        std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时操作
        static int count = 1;
        std::string result = "data" + std::to_string(count++); // 生成不同的数据
        callback(result); // 调用回调,通知数据就绪
    }).detach();
}

int main() {
    std::cout << "1. 主程序开始,准备获取第一个数据..." << std::endl;

    // 第一次异步获取
    async_read([](std::string data1){
        std::cout << "2. 第一次异步获取成功,得到: " << data1 << std::endl;
        std::cout << "3. 开始获取第二个数据..." << std::endl;
        
        // 在第一次回调里发起第二次异步获取
        async_read([data1](std::string data2){
            std::cout << "4. 第二次异步获取成功,得到: " << data2 << std::endl;
            std::cout << "5. 现在可以同时使用 data1 和 data2: " 
                      << data1 << ", " << data2 << std::endl;
        });
    });

    // 主线程等待子线程完成输出
    std::this_thread::sleep_for(std::chrono::seconds(3));
    std::cout << "6. 主程序结束。" << std::endl;
    return 0;
}
/*
//如果用协程替代,可以这样写:
task process() {
    auto data1 = co_await async_read();
    auto data2 = co_await async_read();
    // 这里可以顺序访问 data1 和 data2
}*/

std::future<T>

[編輯]

std::future<T> 通常用於異步任務,結合協程(C++20及以後)可以實現異步等待。

#include <future>
#include <iostream>
#include <thread>

std::future<int> async_add(int a, int b) {
    // 在新线程异步计算
    co_return a + b;
}

int main() {
    auto fut = async_add(2, 3);  // fut 是 std::future<int>
    std::cout << fut.get() << std::endl;  // 输出: 5
}

std:generator

[編輯]

std::generator<T> 適用於C++23,代表可以逐步「產出」多個值的協程(類似Python的生成器)。std::generator 實例只能遍歷一次,遍歷完後不能重頭再來獲取(除非重新創建新的 generator)。std::generator 的每個實例只能遍歷一次。這和 Python 的生成器(generator/iterator)是一樣的。

#include <generator>
#include <iostream>

std::generator<int> gen() {
    for (int i = 0; i < 3; ++i)
        co_yield i;
}

int main() {
    auto g = gen();
    //for (auto v : g) {
    //    std::cout << v << std::endl;
    //}
    //// 或者手动用迭代器
    auto it = g.begin();
    while (it != g.end()) {
        std::cout << *it << std::endl;
        ++it;
    }
}

參考文獻

[編輯]
  1. Asymmetric Transfer