跳至內容

Python/Process

維基教科書,自由的教學讀本

Python中的進程類使用的是 multiprocessing 模塊,該模塊的API大部分複製了 threading 模塊的API。

每一個進程啟動時都會最先產生一個線程,即主線程。主線程所在的進程為主進程。使用多進程的時候需要特別注意,必須要有 if __name__ == '__main__':, 該語句下的代碼相當於主進程,沒有該語句會報錯。

multiprocessing 模塊的API

[編輯]
import multiprocessing

print('子进程的列表:{}'.format(multiprocessing.active_children()))
print('电脑的CPU数量:{}'.format(multiprocessing.cpu_count()))
print('现在运行的进程:{}'.format(multiprocessing.current_process()))

創建線程

[編輯]

創建線程的方法有兩種,一種是直接使用 multiprocessing 模塊裏面的類來進行創建,一種是繼承 multiprocessing 模塊的類寫一個類來對線程進行創建。

直接創建線程

[編輯]

直接從 multiprocessing.Process 繼承創建一個新的子類,並實例化後調用 start() 方法啟動新進程,即相當於它調用了進程的 run() 方法。

該方法的參數如下:

Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
  • target 指要創建的進程的方法名
  • name 指給此進程命名,命名後可以調用 multiprocessing.current_process().name 方法輸出該進程的名字
  • args/kwargs 指 target 指向的方法需要傳遞的參數,必須是元組形式,如果只有一個參數,需要以添加逗號。
  • daemon參數設置為True,則進程為守護進程。如果主進程結束了但該守護進程沒有運行完,守護進程就會被強制結束。

創建一個新進程,每隔兩秒對傳入的數加2,而主進程每隔1秒對傳入的數加1,代碼示例如下:

import multiprocessing
from multiprocessing import Process
import time

def process1(num):
    while True:
        num += 2
        print('{} is running >> {}'.format(multiprocessing.current_process().name, num))
        time.sleep(2)

if __name__ == '__main__':
	# 设置进程
    new_pro = Process(target=process1, name='Add2', args=(100,))
    # 进程开始
    new_pro.start()

    n = 0
    while True:
        n += 1
        print('{} is running >> {}'.format(multiprocessing.current_process().name, n))
        time.sleep(1)

繼承創建

[編輯]

繼承Process定義進程類,重寫run方法。

import multiprocessing
from multiprocessing import Process
import time

# 继承进程类
class MyProcess(Process):
    def __init__(self, num):
        super().__init__() # 必须调用父类的初始化方法
        self.num = num

    def run(self) -> None:
        while True:
            self.num += 2
            print('{} is running >> {}'.format(multiprocessing.current_process().name, self.num))
            time.sleep(2)


if __name__ == '__main__':
    new_pro = MyProcess(100)
    # 设置进程名字
    new_pro.name = "Add2"
    new_pro.start()

    n = 0
    while True:
        n += 1
        print('{} is running >> {}'.format(multiprocessing.current_process().name, n))
        time.sleep(1)

Process的方法

[編輯]

join方法會阻塞除了被join外的所有進程。參數為阻塞timeout秒數。如果參數為None,阻塞至被join的進程終止。

進程鎖

[編輯]
lock = multiprocessing.Lock() 

進程通信

[編輯]

進程之間不共享數據。如果進程之間需要通信,則要用到 Queue 模塊或者 Pipe 模塊或共享內存來實現。

Queue

[編輯]

任何一個子進程都可以對Queue進行存(put)和取(get)。

  • multiprocessing.Queue([maxsize])
  • put(obj[, block[, timeout]]) 將 obj 放入隊列。如果可選參數 block 是 True (默認值) 而且 timeout 是 None (默認值), 將會阻塞當前進程,直到有空的緩衝槽。如果 timeout 是正數,將會在阻塞了最多 timeout 秒之後還是沒有可用的緩衝槽時拋出 queue.Full 異常。反之 (block 是 False 時),僅當有可用緩衝槽時才放入對象,否則拋出 queue.Full 異常 (在這種情形下 timeout 參數會被忽略)。
  • get([block[, timeout]]) 從隊列中取出並返回對象。如果可選參數 block 是 True (默認值) 而且 timeout 是 None (默認值), 將會阻塞當前進程,直到隊列中出現可用的對象。如果 timeout 是正數,將會在阻塞了最多 timeout 秒之後還是沒有可用的對象時拋出 queue.Empty 異常。反之 (block 是 False 時),僅當有可用對象能夠取出時返回,否則拋出 queue.Empty 異常 (在這種情形下 timeout 參數會被忽略)。

Pipe

[編輯]

Pipe只提供兩個端點,只允許兩個子進程進行存(send)和取(recv)。Pipe實現了兩個子進程之間的通信。

from multiprocessing import Pool

def main():
    with (Pool() as pool):
        pool.starmap(process_function, [param for param in range(25)])

Value

[編輯]

Value數據共享類最多能夠共享一個值,該函數的參數如下:

Value(typecode_or_type, args, lock=True)

上述方法中,參數 typecode_or_type 定義 ctypes() 對象的類型,可以傳 Type code 或 C Type,具體對照表見下文。args 傳遞給 typecode_or_type 構造函數的參數,lock 默認為True,創建一個互斥鎖來限制對Value對象的訪問,如果傳入一個鎖,如Lock或RLock的實例,將用於同步。如果傳入False,Value的實例就不會被鎖保護,它將不是進程安全的。

標題文本
Type code C Type Python Type Minimum size in bytes
'b' signed char int 1
'B' unsigned char int 1
'u' Py_UNICODE Unicode character 2
'h' signed short int 2
'H' unsigned short int 2
'i' signed int int 2
'I' unsigned int int 2
'l' signed long int 4
'L' unsigned long int 4
'q' signed long long int 8
'Q' unsigned long long int 8
'f' float float 4
'd' double float 8

例如下面我們在一個進程中傳入值並對其進行改動,另一個進程輸出傳入的值,代碼如下:

from multiprocessing import Process, Value

def process1(n):
    n.value = 101

def process2(n):
    print('改变后的参数 = {}'.format(n.value))

if __name__ == '__main__':
    # 初始化value
    num = Value('d', 0)
    new_pro1 = Process(target=process1, args=(num,))
    new_pro1.start()
    new_pro1.join()

    new_pro2 = Process(target=process2, args=(num,))
    new_pro2.start()

Array

[編輯]

上述的 Value 類只能傳遞一個參數,但是 Array 可以傳遞很多的參數,該方法參數如下:

Array(typecode_or_type, size_or_initializer, **kwds[, lock])

typecode_or_type 參數與上面的相同,同樣參照上表,size_or_initializer 如果它是一個整數,那麼它確定數組的長度,並且數組將被初始化為0。否則,size_or_initializer 是用於初始化數組的序列,其長度決定數組的長度。kwds 是傳遞給 typecode_or_type 構造函數的參數,lock 參數與上面的相同。

例如,我們傳入一個0數組給一個進程,然後在另一個進程中計算其和,代碼如下:

from multiprocessing import Process, Array

def process1(n):
    n[3] = 0
    n[4] = 0

def process2(n):
    # 从队列中得到数据
    print('数组的和为 = {}'.format(sum(n)))

if __name__ == '__main__':
    # 初始化数组
    num = Array('d', range(5))
    new_pro1 = Process(target=process1, args=(num,))
    new_pro1.start()
    new_pro1.join()

    new_pro2 = Process(target=process2, args=(num,))
    new_pro2.start()

進程池

[編輯]

進程池可以提供指定數量的進程給用戶使用。即當有新的請求提交到進程池中時,如果池未滿,則會創建一個新的進程用來執行該請求;反之,如果池中的進程數已經達到規定最大值,那麼該請求就會等待,只要池中有進程空閒下來,該請求就能得到執行。

進程池的話Python中有兩個方法可以實現,一個是 multiprocessing 模塊自帶的 Pool 類;還有一個是 concurrent.futures.ProcessPoolExecutor 進行實現,其中後者的API與線程池的API一模一樣。下面介紹前一種方式。

  • Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]]) 該類一般初始化使用的是processes ,該參數是要使用的工作進程數目。如果 processes 為 None,則使用 os.cpu_count() 返回的值。
  • apply(func[, args[, kwds]]) 使用阻塞的方式調用func,必須等待上⼀個進程執行完任務後才能執行下一個進程,了解即可,幾乎不用
  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) 使用非阻塞的方式調用 func(任務並行執行),args 為傳遞給 func 的參數列表,kwds 為傳遞給 func 的關鍵字參數列表。
  • terminate() 不管任務是否完成,立即終止
  • close() 關閉Pool,使其不再接受新的任務。
  • join()主進程阻塞,等待子進程的退出,必須在 close 或 terminate 之後使用。

下面創建一個容量為4的進程池,並讓10個進程都停留三秒輸出進程名,代碼如下:

import multiprocessing
from multiprocessing import Process, Pool
import time

def proc():
    time.sleep(3)
    print('{} Already Endding >> {}'.format(multiprocessing.current_process().name,
                                            time.strftime('%H:%M:%S',time.localtime(time.time()))))

if __name__ == '__main__':
	# 创建容量为4的进程池
    pool = Pool(4)

    for i in range(10):
        pool.apply_async(proc)
    pool.close()
    # 阻塞主进程,等所有子进程运行完后再通过
    pool.join()
    print('{} Already Endding >> {}'.format(multiprocessing.current_process().name,
                                            time.strftime('%H:%M:%S', time.localtime(time.time()))))