跳至內容

Python/Threading

維基教科書,自由的教學讀本

Python 中的線程用於同時運行多個線程(任務、函數調用)。請注意,這並不意味着它們在不同的 CPU 上執行。如果程序已經使用了 100% 的 CPU 時間,Python 線程將不會讓您的程序運行得更快。在這種情況下,您可能需要研究多進程並行編程。如果您對使用 Python 進行並行編程感興趣,請參閱 此處

Python 線程用於執行任務需要等待的情況。一個例子是與另一台計算機上託管的服務(如 Web 伺服器)交互。線程允許 Python 在等待時執行其他代碼;這可以通過 sleep 函數輕鬆模擬。

Python 3已經停用了thread模塊,並改名為 _thread 模塊。Python 3在_thread 模塊的基礎上開發了更高級的 threading 模塊。threading 模塊除了包含 _thread 模塊中的所有方法外,還提供的其他方法,常用的方法如下:

  • threading.current_thread() 返回當前線程的信息
  • threading.enumerate() 返回一個包含正在運行的線程的list。正在運行指線程啟動後、結束前,不包括啟動前和終止後的線程。
  • threading.active_count() 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果

GIL

[編輯]

GIL(Global Interpreter Lock)是 CPython 解釋器進程級的互斥鎖,它讓「同一時刻只有一個線程在執行 Python 字節碼」,從而保證解釋器內部對象的安全,但也使多線程無法真正並行(CPU 密集任務無效),I/O 密集任務仍可並發。

一個進程一把鎖,嵌在 ceval.c 的字節碼執行循環入口;任何線程想執行 Python 字節碼,必須先搶 GIL;因此同一進程內多個線程在 CPython 層面永遠串行。

為什麼當初要加 GIL?(1992 年引入)早期大量全局狀態(引用計數、內存管理、對象頭)無鎖保護;單鎖方案實現簡單、性能高(CPU單核單線程時代);避免細粒度鎖帶來的複雜度和死鎖風險。

線程切換機制(默認時間片 + 信號量)

  • 時間片:默認 5 ms(sys.setswitchinterval,可調);
  • I/O 阻塞:內部 lock->wait() 前主動 drop_gil();
  • 線程搶鎖優先級:等待時間最長者優先,防止飢餓。

能否去掉 GIL?理論可行,實現極難:需給所有對象頭、引用計數、容器、異常體系加細粒度鎖或無鎖算法;

PEP 703(「No-GIL」)2023 年合併到主分支,Python 3.12 實驗性構建可用,默認仍保留 GIL,預計 3.13/3.14 才提供生產級開關。

創建線程

[編輯]

直接創建線程

[編輯]
threading.Thread(target=None, name=None, args=(), kwargs={})

target 指要創建的線程的方法名,name 指給此線程命名,命名後可以調用 threading.current_thread().name 方法輸出該線程的名字, args/kwargs 指 target 指向的方法需要傳遞的參數,必須是元組形式,如果只有一個參數,需要以添加逗號。

創建一個線程,打印 1-10 的數字,並在每次打印之間等待一秒鐘:

import threading
import time

def loop1_10():
    for i in range(1, 11):
        time.sleep(1)
        print(i)

threading.Thread(target=loop1_10).start()

類的繼承創建線程

[編輯]

通過直接從 threading.Thread 繼承創建一個新的子類。只能重載基類的run方法和_init__構造器。實例化後調用 start() 方法啟動新線程,即相當於它調用了線程的 run() 方法。

#!/usr/bin/env python

import threading
import time


class MyThread(threading.Thread):
    def run(self):                                         # Default called function with mythread.start()
        print("{} started!".format(self.getName()))        # "Thread-x started!"
        time.sleep(1)                                      # Pretend to work for a second
        print("{} finished!".format(self.getName()))       # "Thread-x finished!"

def main():
    for x in range(4):                                     # Four times...
        mythread = MyThread(name = "Thread-{}".format(x))  # ...Instantiate a thread and pass a unique ID to it
        mythread.start()                                   # ...Start the thread, run method will be invoked
        time.sleep(.9)                                     # ...Wait 0.9 seconds before starting another

if __name__ == '__main__':
    main()

輸出為:

Thread-0 started!
Thread-1 started!
Thread-0 finished!
Thread-2 started!
Thread-1 finished!
Thread-3 started!
Thread-2 finished!
Thread-3 finished!

守護線程

[編輯]

如果當前python線程是守護線程,那麼意味着這個線程是「不重要」的,「不重要」意味着如果他的主線程結束了但該守護線程沒有運行完,守護線程就會被強制結束。如果線程是非守護線程,那麼父進程只有等到非守護線程運行完畢後才能結束。

守護daemon這裏的含義是替主線程打雜,主線程一旦退出,它立即被拋棄;「後台隨時可扔的僕從」。

只要當前主線程中尚存任何一個非守護線程沒有結束,守護線程就全部工作;只有當最後一個非守護線程結束是,守護線程隨着主線程一同結束工作。

import threading
import time

# 每1秒加1
def job1(num):
    while True:
        num += 1
        print('{} is running >> {}'.format(threading.current_thread().name, num))
        time.sleep(1)

# 每2秒加2
def job2(num):
    while True:
        num += 2
        print('{} is running >> {}'.format(threading.current_thread().name, num))
        time.sleep(2)

# 线程1,一秒加一
new_job1 = threading.Thread(target=job1, name='Add1', args=(100,))
# 设置为守护线程
new_job1.setDaemon(True)
new_job1.start()

# 线程2,两秒加二
new_job2 = threading.Thread(target=job2, name='Add2', args=(1,))
new_job2.setDaemon(True)
new_job2.start()

# 主线程等待9秒
time.sleep(9)
print('{} Ending'.format(threading.current_thread().name))

隨着輸出 MainThread Ending 後,程序就運行結束了,這表明子線程全為守護線程時,會隨着主線程的結束而強制結束。

線程的阻塞會合

[編輯]

join([timeout])方法會使線程進入等待狀態(阻塞),直到調用join()方法的子線程運行結束。同時也可以通過設置 timeout 參數來設定等待的時間。

線程並發控制

[編輯]

互斥鎖(Lock)

[編輯]

互斥鎖只能加鎖一次然後釋放一次。

import threading
import time


num = 0

lock = threading.Lock()

def job1():
    global num
    for i in range(1000000):
        lock.acquire() # 加锁
        num += 1
        lock.release() # 释放锁
        # 上述代码也可以直接写为
        # with lock:
        # 	 num += 1

new_job1 = threading.Thread(target=job1, name='Add1')
new_job1.start()

for i in range(1000000):
    lock.acquire() # 加锁
    num += 2
    lock.release() # 释放锁

# 等待线程执行完毕
time.sleep(3)
print('num = {}'.format(num))

遞歸鎖

[編輯]

遞歸鎖(Rlock)允許多層加鎖、釋放鎖:

import threading, time


def run1():
    lock.acquire()
    print("grab the first part data")
    global num
    num += 1
    lock.release()
    return num


def run2():
    lock.acquire()
    print("grab the second part data")
    global num2
    num2 += 1
    lock.release()
    return num2


def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res, res2)


if __name__ == '__main__':
    num, num2 = 0, 0
    lock = threading.RLock()
    for i in range(3):
        t = threading.Thread(target=run3)
        t.start()

while threading.active_count() != 1:
    print(threading.active_count())
else:
    print('----all threads done---')
    print(num, num2)

信號量

[編輯]
import threading
import time

# 设置信号量,即同时执行的线程数为3
lock = threading.BoundedSemaphore(3)

def job1():
    lock.acquire()
    print('{} is coming, {}'.format(threading.current_thread().name, time.strftime('%H:%M:%S',time.localtime(time.time()))))
    time.sleep(3)
    lock.release()

for i in range(10):
    new_job1 = threading.Thread(target=job1, name='Thread{}'.format(i))
    new_job1.start()

事件

[編輯]

threading.Event()類會在全局定義一個Flag,當 Flag=False 時,調用 wait()方法會阻塞所有線程;而當 Flag=True 時,調用 wait() 方法不再阻塞。形象的比喻就是「紅綠燈」:在紅燈時阻塞所有線程,而在綠燈時又會一次性放行所有排隊中的線程。Event類有四個方法:

  • set() 將Flag設置為True
  • wait() 阻塞所有線程
  • clear() 將Flag設置為False
  • is_set() 返回bool值,判斷Flag是否為True

例如,主線程通知所有子線程可以開工:

import threading
import time

evt = threading.Event()          # 默认 False

def worker():
    print("worker: 等待信号...")
    evt.wait()                   # 阻塞
    print("worker: 收到信号,开始干活")

t = threading.Thread(target=worker)
t.start()

time.sleep(2)
print("main: 发信号")
evt.set()                        # 置位并唤醒等待线程
t.join()

隊列

[編輯]

queue模塊是Python內置的標準模塊,模塊實現了三種類型的隊列,它們的區別僅僅是條目取回的順序,分別對應3個類:Queue,LifoQueue,PriorityQueue。共同的成員方法:

  • Queue.qsize()返回隊列的大致大小。
  • Queue.empty()如果隊列為空,返回 True ,否則返回 False 。
  • Queue.full()如果隊列是滿的返回 True ,否則返回 False 。
  • Queue.put(item, block=True, timeout=None) 將 item 放入隊列。如果可選參數 block 是 true 並且 timeout 是 None (默認),則在必要時阻塞至有空閒插槽可用。如果 timeout 是個正數,將最多阻塞 timeout 秒,如果在這段時間沒有可用的空閒插槽,將引發 Full 異常。反之 (block 是 false),如果空閒插槽立即可用,則把 item 放入隊列,否則引發 Full 異常 ( 在這種情況下,timeout 將被忽略)。
  • Queue.put_nowait(item)相當於 put(item, block=False)。
  • Queue.get(block=True, timeout=None)從隊列中移除並返回一個項目。如果可選參數 block 是 true 並且 timeout 是 None (默認值),則在必要時阻塞至項目可得到。如果 timeout 是個正數,將最多阻塞 timeout 秒,如果在這段時間內項目不能得到,將引發 Empty 異常。反之 (block 是 false) , 如果一個項目立即可得到,則返回一個項目,否則引發 Empty 異常 (這種情況下,timeout 將被忽略)。
  • Queue.get_nowait()相當於 get(block=False) 。
  • Queue.task_done()在完成一項工作以後,task_done()告訴隊列,該任務已處理完成。注意,從業務邏輯上,獲取數據get()不等價於該任務已經處理完成。
  • Queue.join()阻塞至隊列中所有的元素都被接收put()和處理完畢task_done()。隊列添加新工作時,未完成任務的計數就會增一,當調用task_done()函數後,就代表執行完一個工作,未完成任務的計數就會減一,當計數為0時 join() 阻塞被解除。

先進先出隊列 queue.Queue(maxsize=0)

maxsize 是個整數,用於設置可以放入隊列中的項目數的上限。當達到這個大小的時候,插入操作將被阻塞,直至隊列中的項目被消費掉。如果 maxsize 小於等於零,隊列尺寸為無限大。

from queue import Queue
# FIFO
queue_obj = Queue()  # 创建一个队列对象
for i in range(4):
    queue_obj.put(i)
while not queue_obj.empty():
    print(queue_obj.get())

後進先出隊列即棧 queue.LifoQueue(maxsize=0)

from queue import Queue,LifoQueue
# LIFO
queue_obj = LifoQueue()  # 创建一个队列对象
for i in range(4):
    queue_obj.put(i)
while not queue_obj.empty():
    print(queue_obj.get())

優先級隊列queue.PriorityQueue(maxsize=0),按照級別順序取出元素,級別最低的最先取出。隊列中的元素一般採取元組(priority_number, data)的形式進行存儲

  • 優先級不同,可以比較大小
  • 優先級一樣,數據部分可以比較大小
  • 優先級一樣,數據部分不可以比較大小則報錯 ypeError: '<' not supported between instances of 'dict' and 'dict'

優先級隊列本質上是元素可以全序(total order)比較,內部用堆(heap)實現。所以,可以自己實現一個類作為優先級隊列的元素的類型,將數據包裝到類中,在類中自定義或重寫 def __lt__(self, other)魔法方法。

SimpleQueue queue.SimpleQueue是Python 3.7版本新增的一個輕量級FIFO隊列類,提供了線程安全的簡單隊列實現。與標準的queue.Queue相比,它功能更簡單,但性能更好,並且具有重入性安全特性。主要特性

  • 無界隊列:SimpleQueue是無界的,不支持設置最大容量(沒有maxsize參數)
  • 線程安全:支持多生產者、多消費者場景
  • 重入性安全:可以在同一線程中被安全地中斷調用(例如同一線程中signal_handler函數讀寫隊列),適合在析構函數、weakref回調或信號處理器中使用
  • 高性能:相比Queue,實現更簡單,性能更好
  • 內存效率:比Queue更節省內存

內存屏障Barrier

[編輯]

threading.Barrier(parties, action=None, timeout=None)

條件鎖

[編輯]
cond = threading.Condition()
with cond:            # 必须先拿底层锁
    cond.wait()        # 挂起直到被 notify(会暂时放锁)
    cond.wait_for(谓词) # 挂起直到谓词为真,自带重试
    cond.notify(n=1)   # 唤醒 n 个等待线程
    cond.notify_all()  # 唤醒全部

線程局部存儲

[編輯]
import threading

# 创建全局ThreadLocal对象
local_data = threading.local()
def add():
	# 取出ThreadLocal中的数据
    n = local_data.num
    local_data.num_add = n + n

def divid():
    n = local_data.num
    local_data.num_divid = n / 2

def times():
    local_data.result = local_data.num_add * local_data.num_divid

def job1(num):
	# 将数据存入ThreadLocal中
    local_data.num = num
    add()
    divid()
    times()
    print('{} result >> {}'.format(threading.current_thread().name, local_data.result))

for i in range(5):
    t = threading.Thread(target=job1, args=(i,), name='Thread{}'.format(i))
    t.start()

線程池

[編輯]

concurrent.futures.ThreadPoolExecutor

[編輯]

python3新引入的庫concurrent.futures.ThreadPoolExecutor可以並行執行多個線程,適用於 I/O 密集型任務。

concurrent.futures.ThreadPoolExecutor類的常用方法:

  • map(func, *iterables, timeout=None, chunksize=1):並行執行一個函數對多個輸入迭代器進行映射。使用 map 方法,有兩個特點:無需提前使用submit方法;返回結果的順序和元素的順序相同,即使子線程先返回也不會獲取結果.
  • shutdown(wait=True):停止池的操作,並等待所有提交的任務完成(wait=True)或立即返回(wait=False)。
  • submit(fn, *args, **kwargs) 在線程池中提交任務。返回一個 concurrent.futures.Future 對象,用於表示異步操作的結果。

示例:

from concurrent.futures import ThreadPoolExecutor

def task(n):
    return n * n

with ThreadPoolExecutor(max_workers=4) as executor:
    future = executor.submit(task, 2)
    print(future.result())  # 输出: 4

concurrent.futures.Future

[編輯]
  • concurrent.futures.Future 是一個表示異步操作結果的對象,它提供了一種機制來檢查異步操作是否完成、獲取結果以及處理可能出現的異常。Future 對象是 concurrent.futures 模塊中的核心組件,主要用於線程池和進程池中的異步任務管理。
  • cancel():嘗試取消這個任務。如果任務已經完成或已經被取消,返回 False。如果任務尚未執行,則任務會被取消,返回 True。
  • cancelled():檢查這個任務是否已經被取消。
  • done():檢查任務是否已經完成(無論是成功還是失敗)。
  • result(timeout=None): 輸出對應的線程運行後方法的返回結果,如果線程還在運行,那麼其會一直阻塞在那裏,直到該線程運行完,當然,也可以設置
  • result(timeout),即如果調用還沒完成那麼這個方法將等待 timeout 秒。如果在 timeout 秒內沒有執行完成,concurrent.futures.TimeoutError 將會被觸發。
  • exception(timeout=None):獲取任務拋出的異常。如果任務成功完成,則返回 None。如果任務拋出異常,exception() 方法會返回該異常。如果指定了 timeout 參數並且任務還未完成,則會阻塞至超時或任務完成。
  • add_done_callback(fn):在任務完成時調用 fn 回調函數。fn 必須是一個可調用對象,接收一個 Future 對象作為參數示例:
  • remove_done_callback(fn):從回調函數列表中移除 fn。只有在回調函數還沒有執行時,這個方法才有效
from concurrent.futures import ThreadPoolExecutor

def task(n):
    return n * n

with ThreadPoolExecutor() as executor:
    future = executor.submit(task, 2)
    print(future.result())  # 输出: 4

as_completed

[編輯]
concurrent.futures.as_completed(fs, timeout=None)

返回一個包含fs所指定的Future實例的迭代器。在沒有任務完成的時候,會一直阻塞;如果設置了 timeout 參數,timeout 秒之後結果仍不可用,則返回的迭代器將引發concurrent.futures.TimeoutError。如果timeout未指定或為 None,則不限制等待時間。當有某個任務完成的時候,該方法會 yield 這個任務,就能執行 for循環體的語句,然後繼續阻塞住,循環到所有的任務結束。先完成的任務會先返回給主線程。

import time
from concurrent.futures import ThreadPoolExecutor, as_completed

def add(a, b):
    time.sleep(3)
    return a + b

task = []
list_a = [1, 2, 3, 4, 5]
list_b = [6, 7, 8, 9, 10]
with ThreadPoolExecutor(2) as pool:
    for i in range(len(list_a)):
        task.append(pool.submit(add, list_a[i], list_b[i]))
	
	# 使用as_completed遍历
    for i in as_completed(task):
        print('result = {}'.format(i.result()))

該方法與第一種的直接遍歷所具有的優勢是,不需要等待所有線程全部返回,而是每返回一個子線程就能夠處理,上面的result方法會阻塞後面的線程。

wait方法

[編輯]
wait(fs, timeout=None, return_when=ALL_COMPLETED)

fs 為指定的 Future 實例,timeout 可以用來控制返回前最大的等待秒數。 timeout 可以為 int 或 float 類型。 如果 timeout 未指定或為 None ,則不限制等待時間。return_when 指定此函數應在何時返回必須為以下常數之一:

  • FIRST_COMPLETED 等待第一個線程結束時返回,即結束等待
  • FIRST_EXCEPTION 函數將在任意可等待對象因引發異常而結束時返回。當沒有引發任何異常時它就相當於 ALL_COMPLETED
  • ALL_COMPLETED 函數將在所有可等待對象結束或取消時返回