跳至內容

Python/Threading

維基教科書,自由的教學讀本

Python 中的執行緒用於同時運行多個執行緒(任務、函數調用)。請注意,這並不意味著它們在不同的 CPU 上執行。如果程序已經使用了 100% 的 CPU 時間,Python 執行緒將不會讓您的程序運行得更快。在這種情況下,您可能需要研究並行編程。如果您對使用 Python 進行並行編程感興趣,請參閱 此處

Python 執行緒用於執行任務需要等待的情況。一個例子是與另一台計算機上託管的服務(如 Web 伺服器)交互。執行緒允許 Python 在等待時執行其他代碼;這可以通過 sleep 函數輕鬆模擬。

Python 3已經停用了thread模塊,並改名為 _thread 模塊。Python 3在_thread 模塊的基礎上開發了更高級的 threading 模塊。threading 模塊除了包含 _thread 模塊中的所有方法外,還提供的其他方法,常用的方法如下:

  • threading.current_thread() 返回當前執行緒的信息
  • threading.enumerate() 返回一個包含正在運行的執行緒的list。正在運行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。
  • threading.active_count() 返回正在運行的執行緒數量,與len(threading.enumerate())有相同的結果

直接創建執行緒

[編輯]
threading.Thread(target=None, name=None, args=(), kwargs={})

target 指要創建的執行緒的方法名,name 指給此執行緒命名,命名後可以調用 threading.current_thread().name 方法輸出該執行緒的名字, args/kwargs 指 target 指向的方法需要傳遞的參數,必須是元組形式,如果只有一個參數,需要以添加逗號。

創建一個執行緒,列印 1-10 的數字,並在每次列印之間等待一秒鐘:

import threading
import time

def loop1_10():
    for i in range(1, 11):
        time.sleep(1)
        print(i)

threading.Thread(target=loop1_10).start()

類的繼承創建執行緒

[編輯]

通過直接從 threading.Thread 繼承創建一個新的子類後,必須要重寫其中的 run 方法。實例化後調用 start() 方法啟動新執行緒,即相當於它調用了執行緒的 run() 方法。

#!/usr/bin/env python

import threading
import time


class MyThread(threading.Thread):
    def run(self):                                         # Default called function with mythread.start()
        print("{} started!".format(self.getName()))        # "Thread-x started!"
        time.sleep(1)                                      # Pretend to work for a second
        print("{} finished!".format(self.getName()))       # "Thread-x finished!"

def main():
    for x in range(4):                                     # Four times...
        mythread = MyThread(name = "Thread-{}".format(x))  # ...Instantiate a thread and pass a unique ID to it
        mythread.start()                                   # ...Start the thread, run method will be invoked
        time.sleep(.9)                                     # ...Wait 0.9 seconds before starting another

if __name__ == '__main__':
    main()

輸出為:

Thread-0 started!
Thread-1 started!
Thread-0 finished!
Thread-2 started!
Thread-1 finished!
Thread-3 started!
Thread-2 finished!
Thread-3 finished!

守護執行緒

[編輯]

如果當前python執行緒是守護執行緒,那麼意味著這個執行緒是「不重要」的,「不重要」意味著如果他的主執行緒結束了但該守護執行緒沒有運行完,守護執行緒就會被強制結束。如果執行緒是非守護執行緒,那麼父進程只有等到非守護執行緒運行完畢後才能結束。

只要當前主執行緒中尚存任何一個非守護執行緒沒有結束,守護執行緒就全部工作;只有當最後一個非守護執行緒結束是,守護執行緒隨著主執行緒一同結束工作。

import threading
import time

# 每1秒加1
def job1(num):
    while True:
        num += 1
        print('{} is running >> {}'.format(threading.current_thread().name, num))
        time.sleep(1)

# 每2秒加2
def job2(num):
    while True:
        num += 2
        print('{} is running >> {}'.format(threading.current_thread().name, num))
        time.sleep(2)

# 线程1,一秒加一
new_job1 = threading.Thread(target=job1, name='Add1', args=(100,))
# 设置为守护线程
new_job1.setDaemon(True)
new_job1.start()

# 线程2,两秒加二
new_job2 = threading.Thread(target=job2, name='Add2', args=(1,))
new_job2.setDaemon(True)
new_job2.start()

# 主线程等待9秒
time.sleep(9)
print('{} Ending'.format(threading.current_thread().name))

隨著輸出 MainThread Ending 後,程序就運行結束了,這表明子執行緒全為守護執行緒時,會隨著主執行緒的結束而強制結束。

執行緒的阻塞會合

[編輯]

join([timeout])方法會使執行緒進入等待狀態(阻塞),直到調用join()方法的子執行緒運行結束。同時也可以通過設置 timeout 參數來設定等待的時間。

執行緒並發控制

[編輯]

互斥鎖(Lock)

[編輯]

互斥鎖只能加鎖一次然後釋放一次。

import threading
import time


num = 0

lock = threading.Lock()

def job1():
    global num
    for i in range(1000000):
        lock.acquire() # 加锁
        num += 1
        lock.release() # 释放锁
        # 上述代码也可以直接写为
        # with lock:
        # 	 num += 1

new_job1 = threading.Thread(target=job1, name='Add1')
new_job1.start()

for i in range(1000000):
    lock.acquire() # 加锁
    num += 2
    lock.release() # 释放锁

# 等待线程执行完毕
time.sleep(3)
print('num = {}'.format(num))

遞歸鎖

[編輯]

遞歸鎖(Rlock)允許多層加鎖、釋放鎖:

import threading, time


def run1():
    lock.acquire()
    print("grab the first part data")
    global num
    num += 1
    lock.release()
    return num


def run2():
    lock.acquire()
    print("grab the second part data")
    global num2
    num2 += 1
    lock.release()
    return num2


def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res, res2)


if __name__ == '__main__':
    num, num2 = 0, 0
    lock = threading.RLock()
    for i in range(3):
        t = threading.Thread(target=run3)
        t.start()

while threading.active_count() != 1:
    print(threading.active_count())
else:
    print('----all threads done---')
    print(num, num2)

信號量

[編輯]
import threading
import time

# 设置信号量,即同时执行的线程数为3
lock = threading.BoundedSemaphore(3)

def job1():
    lock.acquire()
    print('{} is coming, {}'.format(threading.current_thread().name, time.strftime('%H:%M:%S',time.localtime(time.time()))))
    time.sleep(3)
    lock.release()

for i in range(10):
    new_job1 = threading.Thread(target=job1, name='Thread{}'.format(i))
    new_job1.start()

事件

[編輯]

threading.Event()類會在全局定義一個Flag,當 Flag=False 時,調用 wait()方法會阻塞所有執行緒;而當 Flag=True 時,調用 wait() 方法不再阻塞。形象的比喻就是「紅綠燈」:在紅燈時阻塞所有執行緒,而在綠燈時又會一次性放行所有排隊中的執行緒。Event類有四個方法:

  • set() 將Flag設置為True
  • wait() 阻塞所有執行緒
  • clear() 將Flag設置為False
  • is_set() 返回bool值,判斷Flag是否為True

執行緒局部存儲

[編輯]
import threading

# 创建全局ThreadLocal对象
local_data = threading.local()
def add():
	# 取出ThreadLocal中的数据
    n = local_data.num
    local_data.num_add = n + n

def divid():
    n = local_data.num
    local_data.num_divid = n / 2

def times():
    local_data.result = local_data.num_add * local_data.num_divid

def job1(num):
	# 将数据存入ThreadLocal中
    local_data.num = num
    add()
    divid()
    times()
    print('{} result >> {}'.format(threading.current_thread().name, local_data.result))

for i in range(5):
    t = threading.Thread(target=job1, args=(i,), name='Thread{}'.format(i))
    t.start()

執行緒池

[編輯]

concurrent.futures.ThreadPoolExecutor

[編輯]

python3新引入的庫concurrent.futures.ThreadPoolExecutor可以並行執行多個執行緒,適用於 I/O 密集型任務。

concurrent.futures.ThreadPoolExecutor類的常用方法:

  • map(func, *iterables, timeout=None, chunksize=1):並行執行一個函數對多個輸入迭代器進行映射。使用 map 方法,有兩個特點:無需提前使用submit方法;返回結果的順序和元素的順序相同,即使子執行緒先返回也不會獲取結果.
  • shutdown(wait=True):停止池的操作,並等待所有提交的任務完成(wait=True)或立即返回(wait=False)。
  • submit(fn, *args, **kwargs) 在執行緒池中提交任務。返回一個 concurrent.futures.Future 對象,用於表示異步操作的結果。

示例:

from concurrent.futures import ThreadPoolExecutor

def task(n):
    return n * n

with ThreadPoolExecutor(max_workers=4) as executor:
    future = executor.submit(task, 2)
    print(future.result())  # 输出: 4

concurrent.futures.Future

[編輯]
  • concurrent.futures.Future 是一個表示異步操作結果的對象,它提供了一種機制來檢查異步操作是否完成、獲取結果以及處理可能出現的異常。Future 對象是 concurrent.futures 模塊中的核心組件,主要用於執行緒池和進程池中的異步任務管理。
  • cancel():嘗試取消這個任務。如果任務已經完成或已經被取消,返回 False。如果任務尚未執行,則任務會被取消,返回 True。
  • cancelled():檢查這個任務是否已經被取消。
  • done():檢查任務是否已經完成(無論是成功還是失敗)。
  • result(timeout=None): 輸出對應的執行緒運行後方法的返回結果,如果執行緒還在運行,那麼其會一直阻塞在那裡,直到該執行緒運行完,當然,也可以設置
  • result(timeout),即如果調用還沒完成那麼這個方法將等待 timeout 秒。如果在 timeout 秒內沒有執行完成,concurrent.futures.TimeoutError 將會被觸發。
  • exception(timeout=None):獲取任務拋出的異常。如果任務成功完成,則返回 None。如果任務拋出異常,exception() 方法會返回該異常。如果指定了 timeout 參數並且任務還未完成,則會阻塞至超時或任務完成。
  • add_done_callback(fn):在任務完成時調用 fn 回調函數。fn 必須是一個可調用對象,接收一個 Future 對象作為參數示例:
  • remove_done_callback(fn):從回調函數列表中移除 fn。只有在回調函數還沒有執行時,這個方法才有效
from concurrent.futures import ThreadPoolExecutor

def task(n):
    return n * n

with ThreadPoolExecutor() as executor:
    future = executor.submit(task, 2)
    print(future.result())  # 输出: 4

as_completed

[編輯]
concurrent.futures.as_completed(fs, timeout=None)

返回一個包含fs所指定的Future實例的迭代器。在沒有任務完成的時候,會一直阻塞;如果設置了 timeout 參數,timeout 秒之後結果仍不可用,則返回的迭代器將引發concurrent.futures.TimeoutError。如果timeout未指定或為 None,則不限制等待時間。當有某個任務完成的時候,該方法會 yield 這個任務,就能執行 for循環體的語句,然後繼續阻塞住,循環到所有的任務結束。先完成的任務會先返回給主執行緒。

import time
from concurrent.futures import ThreadPoolExecutor, as_completed

def add(a, b):
    time.sleep(3)
    return a + b

task = []
list_a = [1, 2, 3, 4, 5]
list_b = [6, 7, 8, 9, 10]
with ThreadPoolExecutor(2) as pool:
    for i in range(len(list_a)):
        task.append(pool.submit(add, list_a[i], list_b[i]))
	
	# 使用as_completed遍历
    for i in as_completed(task):
        print('result = {}'.format(i.result()))

該方法與第一種的直接遍歷所具有的優勢是,不需要等待所有執行緒全部返回,而是每返回一個子執行緒就能夠處理,上面的result方法會阻塞後面的執行緒。

wait方法

[編輯]
wait(fs, timeout=None, return_when=ALL_COMPLETED)

fs 為指定的 Future 實例,timeout 可以用來控制返回前最大的等待秒數。 timeout 可以為 int 或 float 類型。 如果 timeout 未指定或為 None ,則不限制等待時間。return_when 指定此函數應在何時返回必須為以下常數之一:

  • FIRST_COMPLETED 等待第一個執行緒結束時返回,即結束等待
  • FIRST_EXCEPTION 函數將在任意可等待對象因引發異常而結束時返回。當沒有引發任何異常時它就相當於 ALL_COMPLETED
  • ALL_COMPLETED 函數將在所有可等待對象結束或取消時返回