Python/Threading
Python 中的線程用於同時運行多個線程(任務、函數調用)。請注意,這並不意味着它們在不同的 CPU 上執行。如果程序已經使用了 100% 的 CPU 時間,Python 線程將不會讓您的程序運行得更快。在這種情況下,您可能需要研究多進程並行編程。如果您對使用 Python 進行並行編程感興趣,請參閱 此處。
Python 線程用於執行任務需要等待的情況。一個例子是與另一台計算機上託管的服務(如 Web 伺服器)交互。線程允許 Python 在等待時執行其他代碼;這可以通過 sleep 函數輕鬆模擬。
Python 3已經停用了thread模塊,並改名為 _thread 模塊。Python 3在_thread 模塊的基礎上開發了更高級的 threading 模塊。threading 模塊除了包含 _thread 模塊中的所有方法外,還提供的其他方法,常用的方法如下:
- threading.current_thread() 返回當前線程的信息
- threading.enumerate() 返回一個包含正在運行的線程的list。正在運行指線程啟動後、結束前,不包括啟動前和終止後的線程。
- threading.active_count() 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果
GIL
[編輯]GIL(Global Interpreter Lock)是 CPython 解釋器進程級的互斥鎖,它讓「同一時刻只有一個線程在執行 Python 字節碼」,從而保證解釋器內部對象的安全,但也使多線程無法真正並行(CPU 密集任務無效),I/O 密集任務仍可並發。
一個進程一把鎖,嵌在 ceval.c 的字節碼執行循環入口;任何線程想執行 Python 字節碼,必須先搶 GIL;因此同一進程內多個線程在 CPython 層面永遠串行。
為什麼當初要加 GIL?(1992 年引入)早期大量全局狀態(引用計數、內存管理、對象頭)無鎖保護;單鎖方案實現簡單、性能高(CPU單核單線程時代);避免細粒度鎖帶來的複雜度和死鎖風險。
線程切換機制(默認時間片 + 信號量)
- 時間片:默認 5 ms(sys.setswitchinterval,可調);
- I/O 阻塞:內部 lock->wait() 前主動 drop_gil();
- 線程搶鎖優先級:等待時間最長者優先,防止飢餓。
能否去掉 GIL?理論可行,實現極難:需給所有對象頭、引用計數、容器、異常體系加細粒度鎖或無鎖算法;
PEP 703(「No-GIL」)2023 年合併到主分支,Python 3.12 實驗性構建可用,默認仍保留 GIL,預計 3.13/3.14 才提供生產級開關。
創建線程
[編輯]直接創建線程
[編輯]threading.Thread(target=None, name=None, args=(), kwargs={})
target 指要創建的線程的方法名,name 指給此線程命名,命名後可以調用 threading.current_thread().name 方法輸出該線程的名字, args/kwargs 指 target 指向的方法需要傳遞的參數,必須是元組形式,如果只有一個參數,需要以添加逗號。
創建一個線程,打印 1-10 的數字,並在每次打印之間等待一秒鐘:
import threading
import time
def loop1_10():
for i in range(1, 11):
time.sleep(1)
print(i)
threading.Thread(target=loop1_10).start()
類的繼承創建線程
[編輯]通過直接從 threading.Thread 繼承創建一個新的子類。只能重載基類的run方法和_init__構造器。實例化後調用 start() 方法啟動新線程,即相當於它調用了線程的 run() 方法。
#!/usr/bin/env python
import threading
import time
class MyThread(threading.Thread):
def run(self): # Default called function with mythread.start()
print("{} started!".format(self.getName())) # "Thread-x started!"
time.sleep(1) # Pretend to work for a second
print("{} finished!".format(self.getName())) # "Thread-x finished!"
def main():
for x in range(4): # Four times...
mythread = MyThread(name = "Thread-{}".format(x)) # ...Instantiate a thread and pass a unique ID to it
mythread.start() # ...Start the thread, run method will be invoked
time.sleep(.9) # ...Wait 0.9 seconds before starting another
if __name__ == '__main__':
main()
輸出為:
Thread-0 started! Thread-1 started! Thread-0 finished! Thread-2 started! Thread-1 finished! Thread-3 started! Thread-2 finished! Thread-3 finished!
守護線程
[編輯]如果當前python線程是守護線程,那麼意味着這個線程是「不重要」的,「不重要」意味着如果他的主線程結束了但該守護線程沒有運行完,守護線程就會被強制結束。如果線程是非守護線程,那麼父進程只有等到非守護線程運行完畢後才能結束。
守護daemon這裏的含義是替主線程打雜,主線程一旦退出,它立即被拋棄;「後台隨時可扔的僕從」。
只要當前主線程中尚存任何一個非守護線程沒有結束,守護線程就全部工作;只有當最後一個非守護線程結束是,守護線程隨着主線程一同結束工作。
import threading
import time
# 每1秒加1
def job1(num):
while True:
num += 1
print('{} is running >> {}'.format(threading.current_thread().name, num))
time.sleep(1)
# 每2秒加2
def job2(num):
while True:
num += 2
print('{} is running >> {}'.format(threading.current_thread().name, num))
time.sleep(2)
# 线程1,一秒加一
new_job1 = threading.Thread(target=job1, name='Add1', args=(100,))
# 设置为守护线程
new_job1.setDaemon(True)
new_job1.start()
# 线程2,两秒加二
new_job2 = threading.Thread(target=job2, name='Add2', args=(1,))
new_job2.setDaemon(True)
new_job2.start()
# 主线程等待9秒
time.sleep(9)
print('{} Ending'.format(threading.current_thread().name))
隨着輸出 MainThread Ending 後,程序就運行結束了,這表明子線程全為守護線程時,會隨着主線程的結束而強制結束。
線程的阻塞會合
[編輯]join([timeout])方法會使線程進入等待狀態(阻塞),直到調用join()方法的子線程運行結束。同時也可以通過設置 timeout 參數來設定等待的時間。
線程並發控制
[編輯]互斥鎖(Lock)
[編輯]互斥鎖只能加鎖一次然後釋放一次。
import threading
import time
num = 0
lock = threading.Lock()
def job1():
global num
for i in range(1000000):
lock.acquire() # 加锁
num += 1
lock.release() # 释放锁
# 上述代码也可以直接写为
# with lock:
# num += 1
new_job1 = threading.Thread(target=job1, name='Add1')
new_job1.start()
for i in range(1000000):
lock.acquire() # 加锁
num += 2
lock.release() # 释放锁
# 等待线程执行完毕
time.sleep(3)
print('num = {}'.format(num))
遞歸鎖
[編輯]遞歸鎖(Rlock)允許多層加鎖、釋放鎖:
import threading, time
def run1():
lock.acquire()
print("grab the first part data")
global num
num += 1
lock.release()
return num
def run2():
lock.acquire()
print("grab the second part data")
global num2
num2 += 1
lock.release()
return num2
def run3():
lock.acquire()
res = run1()
print('--------between run1 and run2-----')
res2 = run2()
lock.release()
print(res, res2)
if __name__ == '__main__':
num, num2 = 0, 0
lock = threading.RLock()
for i in range(3):
t = threading.Thread(target=run3)
t.start()
while threading.active_count() != 1:
print(threading.active_count())
else:
print('----all threads done---')
print(num, num2)
信號量
[編輯]import threading
import time
# 设置信号量,即同时执行的线程数为3
lock = threading.BoundedSemaphore(3)
def job1():
lock.acquire()
print('{} is coming, {}'.format(threading.current_thread().name, time.strftime('%H:%M:%S',time.localtime(time.time()))))
time.sleep(3)
lock.release()
for i in range(10):
new_job1 = threading.Thread(target=job1, name='Thread{}'.format(i))
new_job1.start()
事件
[編輯]threading.Event()類會在全局定義一個Flag,當 Flag=False 時,調用 wait()方法會阻塞所有線程;而當 Flag=True 時,調用 wait() 方法不再阻塞。形象的比喻就是「紅綠燈」:在紅燈時阻塞所有線程,而在綠燈時又會一次性放行所有排隊中的線程。Event類有四個方法:
- set() 將Flag設置為True
- wait() 阻塞所有線程
- clear() 將Flag設置為False
- is_set() 返回bool值,判斷Flag是否為True
例如,主線程通知所有子線程可以開工:
import threading
import time
evt = threading.Event() # 默认 False
def worker():
print("worker: 等待信号...")
evt.wait() # 阻塞
print("worker: 收到信号,开始干活")
t = threading.Thread(target=worker)
t.start()
time.sleep(2)
print("main: 发信号")
evt.set() # 置位并唤醒等待线程
t.join()
隊列
[編輯]queue模塊是Python內置的標準模塊,模塊實現了三種類型的隊列,它們的區別僅僅是條目取回的順序,分別對應3個類:Queue,LifoQueue,PriorityQueue。共同的成員方法:
- Queue.qsize()返回隊列的大致大小。
- Queue.empty()如果隊列為空,返回 True ,否則返回 False 。
- Queue.full()如果隊列是滿的返回 True ,否則返回 False 。
- Queue.put(item, block=True, timeout=None) 將 item 放入隊列。如果可選參數 block 是 true 並且 timeout 是 None (默認),則在必要時阻塞至有空閒插槽可用。如果 timeout 是個正數,將最多阻塞 timeout 秒,如果在這段時間沒有可用的空閒插槽,將引發 Full 異常。反之 (block 是 false),如果空閒插槽立即可用,則把 item 放入隊列,否則引發 Full 異常 ( 在這種情況下,timeout 將被忽略)。
- Queue.put_nowait(item)相當於 put(item, block=False)。
- Queue.get(block=True, timeout=None)從隊列中移除並返回一個項目。如果可選參數 block 是 true 並且 timeout 是 None (默認值),則在必要時阻塞至項目可得到。如果 timeout 是個正數,將最多阻塞 timeout 秒,如果在這段時間內項目不能得到,將引發 Empty 異常。反之 (block 是 false) , 如果一個項目立即可得到,則返回一個項目,否則引發 Empty 異常 (這種情況下,timeout 將被忽略)。
- Queue.get_nowait()相當於 get(block=False) 。
- Queue.task_done()在完成一項工作以後,task_done()告訴隊列,該任務已處理完成。注意,從業務邏輯上,獲取數據get()不等價於該任務已經處理完成。
- Queue.join()阻塞至隊列中所有的元素都被接收put()和處理完畢task_done()。隊列添加新工作時,未完成任務的計數就會增一,當調用task_done()函數後,就代表執行完一個工作,未完成任務的計數就會減一,當計數為0時 join() 阻塞被解除。
先進先出隊列 queue.Queue(maxsize=0)
maxsize 是個整數,用於設置可以放入隊列中的項目數的上限。當達到這個大小的時候,插入操作將被阻塞,直至隊列中的項目被消費掉。如果 maxsize 小於等於零,隊列尺寸為無限大。
from queue import Queue
# FIFO
queue_obj = Queue() # 创建一个队列对象
for i in range(4):
queue_obj.put(i)
while not queue_obj.empty():
print(queue_obj.get())
後進先出隊列即棧 queue.LifoQueue(maxsize=0)
from queue import Queue,LifoQueue
# LIFO
queue_obj = LifoQueue() # 创建一个队列对象
for i in range(4):
queue_obj.put(i)
while not queue_obj.empty():
print(queue_obj.get())
優先級隊列queue.PriorityQueue(maxsize=0),按照級別順序取出元素,級別最低的最先取出。隊列中的元素一般採取元組(priority_number, data)的形式進行存儲
- 優先級不同,可以比較大小
- 優先級一樣,數據部分可以比較大小
- 優先級一樣,數據部分不可以比較大小則報錯 ypeError: '<' not supported between instances of 'dict' and 'dict'
優先級隊列本質上是元素可以全序(total order)比較,內部用堆(heap)實現。所以,可以自己實現一個類作為優先級隊列的元素的類型,將數據包裝到類中,在類中自定義或重寫 def __lt__(self, other)魔法方法。
SimpleQueue queue.SimpleQueue是Python 3.7版本新增的一個輕量級FIFO隊列類,提供了線程安全的簡單隊列實現。與標準的queue.Queue相比,它功能更簡單,但性能更好,並且具有重入性安全特性。主要特性
- 無界隊列:SimpleQueue是無界的,不支持設置最大容量(沒有maxsize參數)
- 線程安全:支持多生產者、多消費者場景
- 重入性安全:可以在同一線程中被安全地中斷調用(例如同一線程中signal_handler函數讀寫隊列),適合在析構函數、weakref回調或信號處理器中使用
- 高性能:相比Queue,實現更簡單,性能更好
- 內存效率:比Queue更節省內存
內存屏障Barrier
[編輯]threading.Barrier(parties, action=None, timeout=None)
條件鎖
[編輯]cond = threading.Condition()
with cond: # 必须先拿底层锁
cond.wait() # 挂起直到被 notify(会暂时放锁)
cond.wait_for(谓词) # 挂起直到谓词为真,自带重试
cond.notify(n=1) # 唤醒 n 个等待线程
cond.notify_all() # 唤醒全部
線程局部存儲
[編輯]import threading
# 创建全局ThreadLocal对象
local_data = threading.local()
def add():
# 取出ThreadLocal中的数据
n = local_data.num
local_data.num_add = n + n
def divid():
n = local_data.num
local_data.num_divid = n / 2
def times():
local_data.result = local_data.num_add * local_data.num_divid
def job1(num):
# 将数据存入ThreadLocal中
local_data.num = num
add()
divid()
times()
print('{} result >> {}'.format(threading.current_thread().name, local_data.result))
for i in range(5):
t = threading.Thread(target=job1, args=(i,), name='Thread{}'.format(i))
t.start()
線程池
[編輯]concurrent.futures.ThreadPoolExecutor
[編輯]python3新引入的庫concurrent.futures.ThreadPoolExecutor可以並行執行多個線程,適用於 I/O 密集型任務。
concurrent.futures.ThreadPoolExecutor類的常用方法:
- map(func, *iterables, timeout=None, chunksize=1):並行執行一個函數對多個輸入迭代器進行映射。使用 map 方法,有兩個特點:無需提前使用submit方法;返回結果的順序和元素的順序相同,即使子線程先返回也不會獲取結果.
- shutdown(wait=True):停止池的操作,並等待所有提交的任務完成(wait=True)或立即返回(wait=False)。
- submit(fn, *args, **kwargs) 在線程池中提交任務。返回一個 concurrent.futures.Future 對象,用於表示異步操作的結果。
示例:
from concurrent.futures import ThreadPoolExecutor
def task(n):
return n * n
with ThreadPoolExecutor(max_workers=4) as executor:
future = executor.submit(task, 2)
print(future.result()) # 输出: 4
concurrent.futures.Future
[編輯]- concurrent.futures.Future 是一個表示異步操作結果的對象,它提供了一種機制來檢查異步操作是否完成、獲取結果以及處理可能出現的異常。Future 對象是 concurrent.futures 模塊中的核心組件,主要用於線程池和進程池中的異步任務管理。
- cancel():嘗試取消這個任務。如果任務已經完成或已經被取消,返回 False。如果任務尚未執行,則任務會被取消,返回 True。
- cancelled():檢查這個任務是否已經被取消。
- done():檢查任務是否已經完成(無論是成功還是失敗)。
- result(timeout=None): 輸出對應的線程運行後方法的返回結果,如果線程還在運行,那麼其會一直阻塞在那裏,直到該線程運行完,當然,也可以設置
- result(timeout),即如果調用還沒完成那麼這個方法將等待 timeout 秒。如果在 timeout 秒內沒有執行完成,concurrent.futures.TimeoutError 將會被觸發。
- exception(timeout=None):獲取任務拋出的異常。如果任務成功完成,則返回 None。如果任務拋出異常,exception() 方法會返回該異常。如果指定了 timeout 參數並且任務還未完成,則會阻塞至超時或任務完成。
- add_done_callback(fn):在任務完成時調用 fn 回調函數。fn 必須是一個可調用對象,接收一個 Future 對象作為參數示例:
- remove_done_callback(fn):從回調函數列表中移除 fn。只有在回調函數還沒有執行時,這個方法才有效
from concurrent.futures import ThreadPoolExecutor
def task(n):
return n * n
with ThreadPoolExecutor() as executor:
future = executor.submit(task, 2)
print(future.result()) # 输出: 4
as_completed
[編輯]concurrent.futures.as_completed(fs, timeout=None)
返回一個包含fs所指定的Future實例的迭代器。在沒有任務完成的時候,會一直阻塞;如果設置了 timeout 參數,timeout 秒之後結果仍不可用,則返回的迭代器將引發concurrent.futures.TimeoutError。如果timeout未指定或為 None,則不限制等待時間。當有某個任務完成的時候,該方法會 yield 這個任務,就能執行 for循環體的語句,然後繼續阻塞住,循環到所有的任務結束。先完成的任務會先返回給主線程。
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def add(a, b):
time.sleep(3)
return a + b
task = []
list_a = [1, 2, 3, 4, 5]
list_b = [6, 7, 8, 9, 10]
with ThreadPoolExecutor(2) as pool:
for i in range(len(list_a)):
task.append(pool.submit(add, list_a[i], list_b[i]))
# 使用as_completed遍历
for i in as_completed(task):
print('result = {}'.format(i.result()))
該方法與第一種的直接遍歷所具有的優勢是,不需要等待所有線程全部返回,而是每返回一個子線程就能夠處理,上面的result方法會阻塞後面的線程。
wait方法
[編輯]wait(fs, timeout=None, return_when=ALL_COMPLETED)
fs 為指定的 Future 實例,timeout 可以用來控制返回前最大的等待秒數。 timeout 可以為 int 或 float 類型。 如果 timeout 未指定或為 None ,則不限制等待時間。return_when 指定此函數應在何時返回必須為以下常數之一:
- FIRST_COMPLETED 等待第一個線程結束時返回,即結束等待
- FIRST_EXCEPTION 函數將在任意可等待對象因引發異常而結束時返回。當沒有引發任何異常時它就相當於 ALL_COMPLETED
- ALL_COMPLETED 函數將在所有可等待對象結束或取消時返回