跳至內容

Python/協程

維基教科書,自由的教學讀本

Python中的協程是處理並發任務的一種方式,它們基於異步編程模型,可以更高效地處理I/O密集型操作。協程有兩種,一種無棧協程,python中以標準庫asyncio為代表;一種有棧協程,python 中以第三方庫gevent為代表。本文主要講解asyncio協程。

協程通過asyncawait關鍵字來實現,能夠讓代碼在等待I/O操作時繼續執行其他任務,從而提高程序的性能和響應性。

協程(Coroutine)本質上是把程序的任務分成多個部分,在遇到阻塞的I/O操作時主動掛起當前部分,執行緒執行其他待執行的部分。協程相比多執行緒的一大優勢就是省去了多執行緒之間的切換開銷,獲得了更高的運行效率。由於協程本質上在單進程單執行緒內執行程序,因此,同時執行的任務只能有一個。在切換執行任務時不存在系統級的上下文切換,都是在用戶態內進行的執行代碼塊的切換。

asyncio 是Python 3.4版本引入的標準庫,直接內置了對異步IO的支持。asyncio包的運行核心就是event loop(事件循環)。可等待對象: 如果一個對象可以使用 await 關鍵字修飾,那麼它就是可等待對象(awaitable object)。在Asyncio包中,許多API都是接收 可等待對象 作為參數。主要有三種可等待對象:coroutine、task、future。 其中coroutine 有兩層相關定義:coroutine function(協程函數)、 coroutine object(協程對象)。

asyncio 的編程模型本質是一個消息循環,我們一般先定義一個協程函數(或任務), 從 asyncio 模塊中獲取事件循環loop,然後把需要執行的協程任務(或任務列表)扔到 loop中執行,就實現了異步IO。 實際應用中,我們先由協程函數創建協程任務,然後把它們加入協程任務列表,最後一起交由事件循環執行。

asyncio.iscoroutine() 驗證協程函數 asyncio.ensure_future

基本概念

[編輯]
  • 協程函數:通過async def關鍵字定義的函數。可以將coroutines理解為一種特殊的,具有停止執行然後根據特殊io完成信息拿到結果後可以繼續執行的任務。這裡我們通過async關鍵字來定義協程、通過await關鍵字來暫停協程的執行。直接調用協程並不會開始執行(實際上得到了一個coroutine object),需要將其顯式的放入event loop中才能執行。
  • 協程對象:調用協程函數時返回的對象,必須用await關鍵字來執行。res = work()創建協程對象,函數內部代碼不會執行。如果要運行協程函數內部代碼,必須要將協程對象交給事件循環來處理。
  • await:用於掛起協程的執行,直到await後的任務完成。await必須在async函數中使用。await後跟可等待對象,可等待對象包括協程對象、Future和Task對象,這些都是IO等待。等IO操作完成之後再繼續往下執行,當前協程(任務)掛起時,事件循環可以執行其他協程(任務)。

同一個協程任務中,多個await,會依次等待等待對象執行完成;不同協程任務中,遇到await會交替執行。

  • 事件循環:管理協程的執行,不斷地檢查並運行已經註冊的任務或協程。通常使用asyncio庫中的事件循環。asyncio.run() 是最常用的啟動事件循環並運行協程的方式,它會創建一個新的事件循環,運行傳遞的協程,直到協程完成,然後關閉事件循環。通常不需要直接操作事件循環。
    • loop.run_until_complete(future) 函數運行直到 future (Future 的實例) 被完成。
    • loop.run_forever() 函數運行事件循環直到 stop() 被調用。
    • loop.stop() 函數停止事件循環。
    • loop.is_running() 函數返回 True 如果事件循環當前正在運行。
    • loop.is_closed() 函數如果事件循環已經被關閉,返回 True 。
    • loop.close() 函數關閉事件循環。
    • loop.create_future() 函數創建一個附加到事件循環中的 asyncio.Future 對象。
    • loop.create_task(coro, *, name=None) 函數安排一個 協程 的執行。返回一個 Task 對象。
    • loop.set_task_factory(factory) 函數設置一個 task 工廠 , 被用於 loop.create_task() 。
    • loop.get_task_factory() 函數返回一個任務工廠,或者如果是使用默認值則返回 None。
    • loop.call_soon(callback, *args, context=None) 函數安排 callback 在事件循環的下一次迭代時附帶 args 參數被調用。回調按其註冊順序被調用。每個回調僅被調用一次。方法不是執行緒安全的。
    • loop.call_soon_threadsafe(callback, *args, context=None) 函數是 call_soon() 的執行緒安全變體。必須被用於安排 來自其他執行緒 的回調。
    • loop.call_later(delay, callback, *args, context=None) 函數安排 callback 在給定的 delay 秒(可以是 int 或者 float)後被調用。
    • loop.call_at(when, callback, *args, context=None) 函數安排 callback 在給定的絕對時間戳的時間(一個 int 或者 float)被調用,使用與 loop.time() 同樣的時間參考。
    • loop.time() 函數根據時間循環內部的單調時鐘,返回當前時間, float 值。
    • loop.create_connection(protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None) 函數打開一個流式傳輸連接,連接到由 host 和 port 指定的地址。
    • loop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True) 函數創建TCP服務 (socket 類型 SOCK_STREAM ) 監聽 host 地址的 port 埠。
    • loop.create_unix_server(protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True) 函數與 loop.create_server() 類似但是專用於 AF_UNIX 套接字族。path 是必要的 Unix 域套接字名稱,除非提供了 sock 參數。 抽象的 Unix 套接字, str, bytes 和 Path 路徑都是受支持的。
    • loop.connect_accepted_socket(protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None) 函數將已被接受的連接包裝成一個傳輸/協議對。
    • loop.sock_recv(sock, nbytes) 函數從 sock 接收至多 nbytes。 socket.recv() 的異步版本。
    • loop.sock_recv_into(sock, buf) 函數從 sock 接收數據放入 buf 緩衝區。 模仿了阻塞型的 socket.recv_into() 方法。
    • loop.sock_sendall(sock, data) 函數將 data 發送到 sock 套接字。 socket.sendall() 的異步版本。
    • loop.sock_accept(sock) 函數接受一個連接。 模仿了阻塞型的 socket.accept() 方法。
    • loop.sock_sendfile(sock, file, offset=0, count=None, *, fallback=True) 函數在可能的情況下使用高性能的 os.sendfile 發送文件。 返回所發送的字節總數。
    • loop.set_exception_handler(handler) 函數將 handler 設置為新的事件循環異常處理器。
    • loop.get_exception_handler() 函數返回當前的異常處理器,如果沒有設置異常處理器,則返回 None 。
    • loop.default_exception_handler(context) 函數默認的異常處理器。
    • loop.call_exception_handler(context) 函數調用當前事件循環的異常處理器。
    • loop.get_debug() 函數獲取事件循環調試模式設置(bool)。
    • loop.set_debug(enabled: bool) 函數設置事件循環的調試模式。
  • 「可等待對象」(awaitable object),其狀態可以是「掛起」(即還未完成),在完成時會通知 await 關鍵字使得掛起的協程恢復執行。可等待對象包括:
    • 協程對象: 通過 async def 定義的函數的調用得到的對象。這些對象表示一個可以被暫停和恢復的異步操作。
    • asyncio.Future 類對象:await future將暫停當前協程的執行,直到future完成。Future 是一種特殊的低級可等待對象,它代表了異步操作的最終結果。當一個 Future 對象被 await 時,意味著協程將會等待直到這個 Future 在其他地方被解決。在 asyncio 中,Future 對象的存在是為了使基於回調的代碼可以與 async/await 語法一起使用。通常,在應用層代碼中並不需要創建 Future 對象。
      • state數據成員: 任務的狀態, 分別是 PENDING , CANCELLED , FINISHED .
      • loop數據成員: 事件循環, 用於執行回調函數.
      • callbacks數據成員: 回調函數列表, 用於存儲回調函數.
      • result數據成員: 任務的結果.
      • exception數據成員: 任務的異常
      • Future對象創建後可以用set_result()方法設置結果。
      • result()返回Future對象的結果;如果任務尚未完成,則拋出asyncio.InvalidStateError。
      • exception()返回 Future 對象的異常(如果有的話)。如果任務尚未完成,則拋出 asyncio.InvalidStateError。
      • set_result(result):設置 Future 對象的結果,並通知所有等待的協程。如果任務已經完成,則拋出asyncio.InvalidStateError。
      • set_exception(exception): 設置 Future 對象的異常,並通知所有等待的協程。如果任務已經完成,則拋出asyncio.InvalidStateError。
      • cancel():取消 Future 對象。如果任務已經完成或不能取消,則返回False。
      • done():返回True如果任務已經完成。
      • cancelled():返回True如果任務已經被取消。
      • asyncio.Future(*, loop=None) 函數是一個 Future 代表一個異步運算的最終結果。執行緒不安全。
      • asyncio.isfuture(obj) 函數用來判斷如果 obj 為一個 asyncio.Future類的示例、 asyncio.Task 類的實例或者一個具有 _asyncio_future_blocking 屬性的對象,返回 True。
      • asyncio.ensure_future(obj, *, loop=None) 函數創建新任務。
      • asyncio.wrap_future(future, *, loop=None) 函數將一個 concurrent.futures.Future 對象封裝到 asyncio.Future 對象中。
      • fut.result() 函數返回 Future 的結果。
      • fut.set_result(result) 函數將 Future 標記為 完成 並設置結果。
      • fut.set_exception(exception) 函數將 Future 標記為 完成 並設置一個異常。
      • fut.done() 函數如果 Future 為已 完成 則返回 True 。
      • fut.cancelled() 函數是如果 Future 已取消則返回 True
      • fut.add_done_callback(callback, *, context=None) 函數添加一個在 Future 完成 時運行的回調函數。
      • fut.remove_done_callback(callback) 函數從回調列表中移除 callback 。
      • fut.cancel() 函數取消 Future 並調度回調函數。
      • fut.exception() 函數返回 Future 已設置的異常。
      • fut.get_loop() 函數返回 Future 對象已綁定的事件循環。
    • Task 對象是asyncio的一種特殊類型的Future,即是Future的子類。Task 對象管理協程的執行並可以提供協程的執行狀態和結果。
      • asyncio.create_task(my_task()) 將 my_task 協程包裝成 Task 對象。await task 將暫停 main 協程的執行,直到 task 完成。Task對象通過add_done_callback方法增加回調函數。
      • cancel(msg=None)請求取消任務。這安排在事件循環的下一次循環中向包裝的協程拋出 CancelledError 異常。協程有機會清理或甚至通過 try … except CancelledError … finally 塊來拒絕請求。因此,與 Future.cancel() 不同,Task.cancel() 不保證任務會被取消,儘管完全抑制取消並不常見並且被積極反對。如果協程決定抑制取消,它需要調用 Task.uncancel() 來補充捕獲異常。
      • task.cancelled() 如果任務被取消了,則返回 True。當取消請求通過 cancel() 請求並且包裝的協程傳播了拋入的 CancelledError 異常時,任務被取消。
      • uncancel() 減少對任務的取消請求計數。返回剩餘的取消請求數量。注意,一旦取消的任務的執行完成,進一步調用 uncancel() 是無效的。這個方法由 asyncio 的內部使用,不建議終端用戶代碼使用。特別是,如果任務成功取消,這允許結構化並發的元素(如任務組和 asyncio.timeout())繼續運行,將取消隔離在相關的結構化塊中。
      • cancelling()返回對任務的待處理取消請求的數量,即調用 cancel() 的次數減去調用 uncancel() 的次數。注意,如果這個數字大於零但任務仍在執行中,cancelled() 仍然返回 False。這是因為這個數字可以通過調用 uncancel() 減少,這可能導致任務最終未被取消。
      • task.done() 如果任務完成了,則返回 True。任務完成的條件是協程返回了值、引發了異常,或者任務被取消了。
      • task.result() 返回任務的結果。如果任務完成了,返回包裝協程的結果(如果協程引發了異常,則重新引發那個異常)。如果任務被取消了,這個方法會引發 CancelledError 異常。如果任務的結果尚不可用,這個方法會引發 InvalidStateError 異常。
      • task.exception() 返回任務的異常。如果包裝的協程引發了異常,則返回那個異常。如果包裝的協程正常返回,則這個方法返回 None。如果任務被取消了,這個方法會引發 CancelledError 異常。如果任務尚未完成,這個方法會引發 InvalidStateError 異常。
      • add_done_callback(callback, *, context=None)添加一個回調函數,當任務完成時運行。這個方法僅應在低級回調代碼中使用。
      • task.remove_done_callback(callback) 從回調列表中移除回調函數。這個方法僅應在低級回調代碼中使用。
      • task.get_stack(*, limit=None) 返回此任務的堆疊幀列表。如果包裝的協程尚未完成,則返回協程掛起時的堆疊。如果協程成功完成或被取消,則返回一個空列表。如果協程因異常終止,則返回回溯幀列表。幀始終按從最舊到最新的順序返回。返回的幀的數量由可選的 limit 參數控制;默認情況下返回所有可用的幀。堆疊或回溯的返回列表順序不同:堆疊的最新幀會被返回,而回溯的最舊幀會被返回。
      • task.print_stack(*, limit=None, file=None) 列印任務的堆疊或回溯。這產生的輸出類似於 traceback 模塊對於 get_stack() 獲取的幀。limit 參數直接傳遞給 get_stack()。file 參數是一個 IO 流,輸出將寫入到該流;默認情況下,輸出寫入 sys.stdout。
      • task.get_coro() 返回由任務包裝的協程對象。注意 對於已經急切完成的任務,這將返回 None。參見急切任務工廠。
      • task.get_name() 返回任務的名稱。如果沒有明確分配名稱,默認的 asyncio 任務實現會在實例化期間生成一個默認名稱。
      • task.set_name(value) 設置任務的名稱。value 參數可以是任何對象,然後會被轉換為字符串。在默認的任務實現中,名稱會顯示在任務對象的 repr() 輸出中。
      • get_context()返回與任務關聯的 contextvars.Context 對象。
    • 實現了 __await__ 方法的類和對象

底層實現

[編輯]

res = await fun1()底層步驟:

  1. 創建協程對象:調用 fun1() 返回一個協程對象。這個對象表示一個尚未完成的異步操作。
  2. 調度協程對象:在Python 3.7及以後的版本中,協程對象會被自動封裝為一個Task對象。這一步實際上是由事件循環完成的。
  3. 等待Future完成:使用await關鍵字暫停當前協程(caller)的執行,直到Task對象完成。這一步是通過 asyncio 的事件循環來實現的。await在內部會阻塞當前協程,直到Task對象的Future完成。
  4. 獲取結果:一旦Task對象完成,await會恢復當前協程的執行,並獲取Task的結果。Task對象的result()方法被調用,獲取異步操作的最終結果。這個結果被賦值給變量res。
  5. 處理異常:如果協程執行過程中拋出了異常,await會將異常重新拋出,允許在調用await的協程中捕獲並處理它。

六種正確的並發調用方式和兩種錯誤調用方式

[編輯]
import asyncio
import time
import io
 

async def a():
    print('Suspending a')
    await asyncio.sleep(3)
    print('Resuming a')


async def b():
    print('Suspending b')
    await asyncio.sleep(1)
    print('Resuming b')


async def s1():
    await a()
    await b()

async def s2():
    await asyncio.create_task(a())
    await asyncio.create_task(b())

async def c1():
    await asyncio.gather(a(), b())

async def c2():
    await asyncio.wait([asyncio.create_task(a()), asyncio.create_task(b())])

async def c3():
    task1 = asyncio.create_task(a())
    task2 = asyncio.create_task(b())
    await task1
    await task2

async def c4():
    task = asyncio.create_task(b())
    await a()
    await task    

async def c5():
    task = asyncio.ensure_future(b())
    await a()
    await task

async def c6():
    loop = asyncio.get_event_loop()
    task = loop.create_task(b())
    await a()
    await task

def show_perf(func):
    print('*' * 20)
    start = time.perf_counter()
    asyncio.run(func())
    print(f'{func.__name__} Cost: {time.perf_counter() - start}')

show_perf(c2)

示例代碼

[編輯]

下面是一個使用 asyncio 模塊的簡單示例:

import asyncio
async def say_hello():
    print("Hello")
    await asyncio.sleep(1)  # 模拟I/O操作
    print("World")

async def main():
    await asyncio.gather(say_hello(), say_hello())

# 运行协程
if __name__ == "__main__":
    asyncio.run(main())

在這個示例中:

  • say_hello 是一個協程函數,它在列印「Hello」後等待1秒鐘,然後列印「World」。
  • main 協程函數使用 asyncio.gather 並發執行兩個 say_hello 協程。
  • asyncio.run(main()) 啟動事件循環並運行 main 協程。

進階用法:

協程中的異常處理:可以使用 try 和 except 來處理協程中的異常。

async def risky_operation():
    try:
        # 一些可能引发异常的操作
        await asyncio.sleep(1)
        raise ValueError("Something went wrong!")
    except ValueError as e:
        print(f"Error: {e}")

asyncio.run(risky_operation())

並發任務:使用 asyncio.gather 來並發執行多個協程任務。

async def task1():
    await asyncio.sleep(2)
    print("Task 1 completed")

async def task2():
    await asyncio.sleep(1)
    print("Task 2 completed")

async def main():
    await asyncio.gather(task1(), task2())

asyncio.run(main())

另一個多任務的例子:

import asyncio

async def fetch_data(name, delay):
    print(f"{name} started, will take {delay} seconds.")
    await asyncio.sleep(delay)
    print(f"{name} completed.")
    return f"Result from {name}"

async def main():
    # 创建多个任务
    tasks = [
        asyncio.create_task(fetch_data("Task 1", 3)),
        asyncio.create_task(fetch_data("Task 2", 1)),
        asyncio.create_task(fetch_data("Task 3", 2)),
    ]
    
    # 等待所有任务完成
    results = await asyncio.gather(*tasks)
    
    print("All tasks completed.")
    for result in results:
        print(result)

# 运行主函数
asyncio.run(main())

超時控制:可以使用 asyncio.wait_for 來設置協程的超時時間。

async def slow_operation():
    await asyncio.sleep(10)

async def main():
    try:
        await asyncio.wait_for(slow_operation(), timeout=2)
    except asyncio.TimeoutError:
        print("Operation timed out!")

asyncio.run(main())

協程是Python中處理異步操作的強大工具,適用於各種需要並發操作的場景,如網絡請求、文件讀寫等。使用它們可以讓代碼更清晰、執行效率更高。

其他

[編輯]

ensure_future

[編輯]

asyncio.ensure_future 用於將協程或 Future 對象調度到事件循環中,並返回一個 Future 對象。它確保協程在事件循環中運行,並允許你檢查任務的狀態或結果。Python 3.7引入了asyncio.create_task, 功能與 ensure_future 類似,但更具語義化。如果你的代碼只需要調度協程而不需要處理 Future 對象,建議使用 asyncio.create_task。

asyncio.gather和asyncio.await的用途區別

[編輯]

asyncio.gather是一個簡單、直接的用法。封裝為Task執行的黑盒,返回協程的執行結果。

 asyncio.gather(*aws, return_exceptions=False)

並發地運行aws序列中的所有可等待對象。如果aws中的任何可等待對象是協程,它會被自動調度為一個任務(Task)。如果所有的可等待對象都成功完成,結果將是一個聚合的返回值列表。結果值的順序與 aws 中的可等待對象的順序相對應。如果參數return_exceptions 為 False(默認值),第一個引發的異常會立即傳遞給等待gather()的任務。aws 序列中的其他可等待對象不會被取消,並將繼續運行。如果參數return_exceptions為True,異常會被視為成功結果,並被聚合到結果列表中。如果gather()被取消,所有提交的(尚未完成的)可等待對象也會被取消。如果aws序列中的任何任務(Task)或未來(Future)被取消,它將被視為引發了CancelledError——在這種情況下,gather() 調用不會被取消。這是為了防止一個被取消的任務/未來導致其他任務/未來也被取消。注意:一個新的替代方案來並發地創建和運行任務並等待它們完成是 asyncio.TaskGroup。TaskGroup 提供比 gather 更強的安全保證,用於調度子任務的嵌套:如果一個任務(或子任務,即由任務調度的任務)引發異常,TaskGroup 會取消其餘調度的任務,而 gather 不會這樣做。

asyncio.await是一個高級、複雜的用法。

done, pending = asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)

並發運行aws可迭代對象中的Future和Task實例,並阻塞直到滿足return_when指定的條件。aws可迭代對象不能為空。返回兩個Tasks/Futures集合:(done, pending)。參數timeout(浮點數或整數),如果指定,可以用來控制在返回之前等待的最大秒數。注意,這個函數不會引發TimeoutError。在超時時,如果某些 Futures或Tasks尚未完成,不會取消Futures,它們將簡單地被返回到第二個集合中。參數return_when指定了函數應何時返回。它必須是以下常量之一:

  • asyncio.FIRST_COMPLETED 當任何 Future 完成或被取消時,函數將返回。
  • asyncio.FIRST_EXCEPTION 當任何 Future 因引發異常而完成時,函數將返回。如果沒有 Future 引發異常,則等同於 ALL_COMPLETED。
  • asyncio.ALL_COMPLETED 當所有 Futures 完成或被取消時,函數將返回。

asyncio.wait的返回值有2項,第一項表示完成的任務列表(done),第二項表示等待(Future)完成的任務列表(pending),每個任務都是一個Task實例,由於這2個任務都已經完成,所以可以執行task.result()獲得協程返回值。asyncio.wait支持選擇返回的時機。asyncio.wait的參數return_when,在默認情況下,asyncio.wait會等待全部任務完成(return_when='ALL_COMPLETED'),它還支持FIRST_COMPLETED(第一個協程完成就返回)和FIRST_EXCEPTION(出現第一個異常就返回)。

使用asyncio.wait的場合:

  • 需要拿到封裝好的Task,以便取消或者添加成功回調等
  • 業務上需要FIRST_COMPLETED/FIRST_EXCEPTION即返回的

asyncio.wait的歷史:

  • 3.10 版本中: 移除了 loop 參數。
  • 3.11 版本中: 直接將協程對象傳遞給 wait() 是禁止的。
  • 3.12 版本中: 添加了對生成器生成任務的支持。

3種超時機制

[編輯]

類asyncio.timeout(delay)返回一個異步上下文管理器,以限制用於等待的時間。創建後可用Timeout.reschedule()重新設定超時時間。對超時的協程在事件循環下一次迭代時會被取消。超時的任務會被拋出TimeoutError異常。這是唯一可在上下文管理器捕獲的異常。asyncio.timeout可安全地嵌套。

async def main():
    try:
        async with asyncio.timeout(10):
            await long_running_task()
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")

類asyncio.timeout_at(when)用於指出超時的絕對時間:

async def main():
    loop = get_running_loop()
    deadline = loop.time() + 20
    try:
        async with asyncio.timeout_at(deadline):
            await long_running_task()
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")

協程asyncio.wait_for(aw, timeout)用於在一個超時範圍內等待協程完成。參數aw如果是協程則自動調度為Task。超時拋出TimeoutError異常。

asyncio.create_task 與 loop.create_task 與 asyncio.ensure_future的區別

[編輯]

創建一個Task一共有3種方法。

從Python 3.7開始可以統一的使用更高階的asyncio.create_task。其實asyncio.create_task就是用的loop.create_task:

 def create_task(coro):
    loop = events.get_running_loop()
    return loop.create_task(coro)

asyncio.ensure_future除了接受協程,還可以是Future對象或者awaitable對象:

  • 如果參數是協程,其實底層還是用的loop.create_task,返回Task對象
  • 如果是Future對象會直接返回
  • 如果是一個awaitable對象會await這個對象的__await__方法,再執行一次ensure_future,最後返回Task或者Future

所以就像ensure_future名字說的,確保這個是一個Future對象:Task是Future 子類,前面說過一般情況下開發者不需要自己創建Future

在獨立執行緒中運行協程

[編輯]

Python3.9新增加在一個獨立的執行緒中異步運行函數 func:

asyncio.to_thread(func, /, *args, **kwargs)

所有傳遞給這個函數的 *args 和 **kwargs 會直接傳遞給 func。此外,當前的 contextvars.Context 也會被傳播,使得可以在獨立執行緒中訪問來自事件循環執行緒的上下文變量。返回一個協程,可以通過 await 來獲取 func 的最終結果。這個協程函數主要用於執行IO密集的函數/方法,如果這些函數在主執行緒中運行,會阻塞事件循環。例如:

def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    # 注意,time.sleep() 可以被任何阻塞的
    # IO 绑定操作替代,比如文件操作。
    time.sleep(1)
    print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
    print(f"started main at {time.strftime('%X')}")

    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        asyncio.sleep(1))

    print(f"finished main at {time.strftime('%X')}")


asyncio.run(main())

直接在任何協程中調用 blocking_io() 會阻塞事件循環,導致額外增加1秒的運行時間。通過使用 asyncio.to_thread(),我們可以在一個獨立的執行緒中運行它,而不會阻塞事件循環。

注意:由於全局解釋器鎖(GIL),asyncio.to_thread() 通常只能用於將IO操作密集的函數變為非阻塞。

Queue

[編輯]

對於任務隨機出現,需要異步執行它們,可以使用asyncio.Queue()來實現生產者-消費者模式。

queue.join()方法會將進程阻塞起來,釋放的條件是_unfinished_tasks計數器清零。該計數器隨著put()方法而增加,隨著task_done()方法而降低。

自省

[編輯]

asyncio.current_task: 返回當前運行的Task實例,如果沒有正在運行的任務則返回 None。如果 loop 為 None 則會使用 get_running_loop()獲取當前事件循環。 asyncio.all_tasks: 返回事件循環所運行的未完成的Task對象的集合。

shield

[編輯]

asyncio.shield,可以屏蔽取消操作:

task1 = asyncio.shield(func_1())

異步上下文管理器

[編輯]

異步上下文管理器是一種用於協程(asyncio)的特殊類型的上下文管理器,用於管理異步資源的分配和釋放。異步上下文管理器通常與 async with 語句一起使用,以確保在異步代碼塊執行前分配資源,並在執行後釋放資源。

要創建一個異步上下文管理器,需要定義一個類,該類必須實現兩個特殊方法 __aenter__ 和 __aexit__。這些方法允許您定義資源的獲取和釋放邏輯。

__aenter__ 方法:在進入 async with 代碼塊時調用。通常在這裡執行資源的分配或初始化操作。

__aexit__ 方法:在退出 async with 代碼塊時調用。通常在這裡執行資源的釋放或清理操作。

async with 語句必須寫在一個協程函數中 ,不能在函數外使用

@asynccontextmanager
async def async_timed(func):
    start = time.perf_counter()
    yield await func()
    print(f'Cost: {time.perf_counter() - start}')

 
class MyAsyncContextManager:
    async def __aenter__(self):
        print("Entering the async context")
        return self

    async def __aexit__(self, exc_type, exc, tb):
        """
         with语句运行结束之后触发此方法的运行
         exc_type:如果抛出异常, 这里获取异常类型
         exc_val:如果抛出异常, 这里显示异常内容
         exc_tb:如果抛出异常, 这里显示所在位置, traceback
         """
        print("Exiting the async context") 

async def main():
    async with MyAsyncContextManager() as manager:
        print("Inside the async context")

    async with async_timed(s1) as rv:
        print(f'Result: {rv}')

asyncio.run(main())

給 Task (Future) 添加回調函數

[編輯]
task = loop.create_task(a())
def callback(future):
    print(f'Result: {future.result()}')
task.add_done_callback(callback)
await task

執行同步函數

[編輯]

對於阻塞的time.sleep()等同步邏輯怎麼利用asyncio實現並發呢?答案是用run_in_executor。

def a():
    time.sleep(1)
    return 'A'


async def b():
    await asyncio.sleep(1)
    return 'B'


def show_perf(func):
    print('*' * 20)
    start = time.perf_counter()
    asyncio.run(func())
    print(f'{func.__name__} Cost: {time.perf_counter() - start}')

async def c1():
    loop = asyncio.get_running_loop()
    await asyncio.gather(loop.run_in_executor(None, a),b() )

show_perf(c1)

loop.run_in_executor()的第一個參數是要傳遞concurrent.futures.Executor實例的,傳遞None會選擇默認的executor:loop._default_executor

當然我們還可以用進程池做執行器:concurrent.futures.ProcessPoolExecutor()

多執行緒執行協程

[編輯]

使用run_coroutine_threadsafe()在其他執行緒執行協程(這是執行緒安全的):

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


def shutdown(loop):
    loop.stop()


async def b1():
    new_loop = asyncio.new_event_loop()
    t = Thread(target=start_loop, args=(new_loop,))
    t.start()

    future = asyncio.run_coroutine_threadsafe(a(), new_loop)
    print(future)
    print(f'Result: {future.result(timeout=2)}')
    new_loop.call_soon_threadsafe(partial(shutdown, new_loop))

await b1()

注意:

  • 協程應該從另一個執行緒中調用,而非事件循環運行所在執行緒,所以用 asyncio.new_event_loop () 新建一個事件循環
  • 在執行協程前要確保新創建的事件循環是運行著的,所以需要用 start_loop 之類的方式啟動循環
  • 創建的新的事件循環跑在一個執行緒裡面,由於 loop.run_forever 會阻塞程序關閉,所以需要結束時殺掉執行緒,所以用 call_soon_threadsafe 回調函數 shutdown 去停止事件循環

異步迭代器

[編輯]

實現了__aiter__()和__anext__()方法的對象是異步迭代對象。要求__anext__()必須返回一個awaitable對象。async for會處理異步迭代器的__anext__()方法所返回的可等待對象,直到其引發一個StopAsyncIteration異常。

根本上的誤解是期望async for會自動並行化迭代。實際上,它並不這樣做,它只是允許對異步源進行順序迭代。async for的作用是順序地遍歷異步源(如異步生成器或異步迭代器),每次迭代的操作會在異步事件循環中依次執行。異步迭代意味著你可以與事件循環中的其他異步任務(包括其他類似的迭代)並行運行它。普通的for無法進行異步迭代,至少不能在不阻塞事件循環的情況下進行。這是因為普通的for調用__next__作為一個阻塞函數,並且不await其結果。而且你不能手動await通過for獲取的元素。for循環依賴__next__引發StopIteration來結束迭代,因此無法直接用await處理異步操作。如果__next__是一個協程,那麼在等待其結果之前,StopIteration異常不可見。這就是為什麼async for被引入的原因,不僅在Python中,還有其他具有async/await和通用for的語言中。

import asyncio

class Reader:
    """
    自定义异步迭代器(同时也是异步可迭代对象)
    """

    def __init__(self):
        self.count = 0

    async def readline(self):
        # await asyncio.sleep(1)
        self.count += 1
        if self.count == 100:
            return None
        return self.count

    def __aiter__(self):
        return self

    async def __anext__(self):
        value = await self.readline()
        if value == None:
            raise StopAsyncIteration
        return value
async def main():
    async for i in Reader():
        print(i)

asyncio.run(main())

async for 並不會創建任務以並發執行。它用於允許對異步源進行順序迭代。如果想對迭代的對象並發,參加下例:

import asyncio

async def process_all():
    tasks = []

    async for obj in my_async_generator:
        # Python 3.7+. Use ensure_future for older versions.
        task = asyncio.create_task(process_obj(obj))
        tasks.append(task)
    
    await asyncio.gather(*tasks)


async def process_obj(obj):
    ...

asyncio.as_completed

[編輯]

asyncio.as_completed()是 Python 的 asyncio 庫中用於處理異步任務的一個函數。它允許你以異步方式逐個處理一組協程對象(coroutines),並按它們完成的順序返回結果。這個函數特別適用於你需要在協程列表中逐個處理結果,而不是等到所有協程完成後一次性處理它們的場景。

import asyncio
import time

async def task(n):
    print(f"Task {n} starting")
    await asyncio.sleep(n)  # 模拟耗时操作
    print(f"Task {n} completed")
    return n

async def main():
    tasks = [task(1), task(1), task(1)]  # 创建三个协程任务
    for completed_task in asyncio.as_completed(tasks):
        result = await completed_task
        print(f"Result: {result}")

start = time.perf_counter()
asyncio.run(main())
print(f'Cost: {time.perf_counter() - start}')

上述例子耗時1秒,而不是3秒。

asyncio.TaskGroup

[編輯]

Python3.11提供了更為現代化的asyncio.TaskGroup類,可替代create_task() 。

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(
            say_after(1, 'hello'))

        task2 = tg.create_task(
            say_after(2, 'world'))

        print(f"started at {time.strftime('%X')}")

    # The await is implicit when the context manager exits.

    print(f"finished at {time.strftime('%X')}")

async with 語句將等待組中的所有任務完成。在等待期間,新的任務仍然可以被添加到組中(例如,通過將 tg 傳遞給某個協程並在該協程中調用 tg.create_task())。一旦最後一個任務完成並且 async with 塊被退出,組中將不能再添加新任務。

如果組中的任何任務第一次遇到除了 asyncio.CancelledError 之外的異常,剩餘的任務將會被取消。之後無法再向組中添加新的任務。此時,如果 async with 語句的主體仍在活動中(即,__aexit__() 尚未被調用),直接包含 async with 語句的任務也會被取消。 resulting asyncio.CancelledError 將中斷一個 await,但不會從包含的 async with 語句中向外傳播。

Eager Task Factory

[編輯]

Python3.12增加了用於急切任務執行的任務工廠:

asyncio.eager_task_factory(loop, coro, *, name=None, context=None)

當使用這個工廠(通過 loop.set_task_factory(asyncio.eager_task_factory))時,協程在任務構造期間會同步開始執行。只有在協程阻塞時,任務才會被調度到事件循環。這可以提高性能,因為避免了對同步完成的協程進行事件循環調度的開銷。一個常見的例子是,使用緩存或備忘錄來避免實際的 I/O 操作的協程,這樣可以在可能的情況下提高性能。注意:協程的立即執行是一種語義上的變化。如果協程返回或引發異常,則任務不會被調度到事件循環中。如果協程執行阻塞,則任務會被調度到事件循環中。這種變化可能會引入對現有應用程式的行為更改。例如,應用程式的任務執行順序可能會發生變化。

Python3.12增加了創建一個急切任務工廠:

asyncio.create_eager_task_factory(custom_task_constructor)

類似於 eager_task_factory(),在創建新任務時使用提供的 custom_task_constructor,而不是默認的 Task。custom_task_constructor 必須是一個符合 Task.__init__ 簽名的可調用對象。該可調用對象必須返回一個與 asyncio.Task 兼容的對象。此函數返回一個可調用對象,旨在通過 loop.set_task_factory(factory) 作為事件循環的任務工廠使用。

歷史

[編輯]

在最早的Python 3.4中,協程函數是通過@asyncio.coroutine 和 yeild from 實現的, 如下所示。

 import asyncio
 
 @asyncio.coroutine
 def func1(i):
     print("协程函数{}马上开始执行。".format(i))
     yield from asyncio.sleep(2)
     print("协程函数{}执行完毕!".format(i))
 
 if __name__ == '__main__':
     # 获取事件循环
     loop = asyncio.get_event_loop()
 
     # 执行协程任务
     loop.run_until_complete(func1(1))
 
     # 关闭事件循环
     loop.close()

這裡我們定義了一個func1的協程函數,我們可以使用asyncio.iscoroutinefunction來驗證。定義好協程函數後,我們首先獲取事件循環loop,使用它的run_until_complete方法執行協程任務,然後關閉loop。

 print(asyncio.iscoroutinefunction(func1(1))) # True

Python 3.5以後引入了async/await 語法定義協程函數,代碼如下所示。每個協程函數都以async聲明,以區別於普通函數,對於耗時的代碼或函數我們使用await聲明,表示碰到等待時掛起,以切換到其它任務。 Python 3.7提供了一個更簡便的asyncio.run方法,提供的asyncio.create_task方法

# 方法1:使用ensure_future方法。future代表一个对象,未执行的任务。
 task1 = asyncio.ensure_future(func1(1))
 task2 = asyncio.ensure_future(func1(2))
 
 # 方法2:使用loop.create_task方法
 task1 = loop.create_task(func1(1))
 task2 = loop.create_task(func1(2))
 
 # 方法3:使用Python 3.7提供的asyncio.create_task方法
 task1 = asyncio.create_task(func1(1))
 task2 = asyncio.create_task(func1(2))

創建多個協程任務列表後,我們還要使用asyncio.wait方法收集協程任務,並交由事件循環處理執行。新的asyncio.gather方法,它的作用asyncio.wait方法類似,但更強大。如果列表中傳入的不是create_task方法創建的協程任務,它會自動將協程對象封裝成協程任務。

     tasks = []
     for i in range(1, 5):
         # 这里未由协程函数创建协程任务
         tasks.append(func1(i))
         
     # 注意这里*号。gather自动将函数列表封装成了协程任务。
     await asyncio.gather(*tasks)

兩者更大的區別在協程任務執行完畢後對於返回結果的處理上。通常獲取任務執行結果通常對於一個程序至關重要。asyncio.wait 會返回兩個值:done 和 pending,done 為已完成的協程任務列表,pending 為超時未完成的協程任務類別,需通過task.result()方法可以獲取每個協程任務返回的結果;而asyncio.gather 返回的是所有已完成協程任務的 result,不需要再進行調用或其他操作,就可以得到全部結果。

import asyncio
 
 async def func1(i):
     print(f"协程函数{i}马上开始执行。")
     await asyncio.sleep(2)
     return i
 
 async def main():
     tasks = []
     for i in range(1, 5):
         tasks.append(asyncio.create_task(func1(i)))
         
     # 获取任务执行结果。
     done, pending = await asyncio.wait(tasks)
     for task in done:
         print(f"执行结果: {task.result()}")
 
 if __name__ == '__main__':
     asyncio.run(main())

#-*- coding:utf-8 -*-
 import asyncio
 
 async def func1(i):
     print(f"协程函数{i}马上开始执行。")
     await asyncio.sleep(2)
     return i
 
 async def main():
     tasks = []
     for i in range(1, 5):
         tasks.append(func1(i))
 
     results = await asyncio.gather(*tasks)
     for result in results:
         print(f"执行结果: {result}")
 
 if __name__ == '__main__':
     asyncio.run(main())

gather返回的任務執行結果是按照添加順序有序的,wait方法獲取的結果是無序的。