跳转到内容

Python/Threading

维基教科书,自由的教学读本

Python 中的线程用于同时运行多个线程(任务、函数调用)。请注意,这并不意味着它们在不同的 CPU 上执行。如果程序已经使用了 100% 的 CPU 时间,Python 线程将不会让您的程序运行得更快。在这种情况下,您可能需要研究并行编程。如果您对使用 Python 进行并行编程感兴趣,请参阅 此处

Python 线程用于执行任务需要等待的情况。一个例子是与另一台计算机上托管的服务(如 Web 服务器)交互。线程允许 Python 在等待时执行其他代码;这可以通过 sleep 函数轻松模拟。

Python 3已经停用了thread模块,并改名为 _thread 模块。Python 3在_thread 模块的基础上开发了更高级的 threading 模块。threading 模块除了包含 _thread 模块中的所有方法外,还提供的其他方法,常用的方法如下:

  • threading.current_thread() 返回当前线程的信息
  • threading.enumerate() 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  • threading.active_count() 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果

直接创建线程

[编辑]
threading.Thread(target=None, name=None, args=(), kwargs={})

target 指要创建的线程的方法名,name 指给此线程命名,命名后可以调用 threading.current_thread().name 方法输出该线程的名字, args/kwargs 指 target 指向的方法需要传递的参数,必须是元组形式,如果只有一个参数,需要以添加逗号。

创建一个线程,打印 1-10 的数字,并在每次打印之间等待一秒钟:

import threading
import time

def loop1_10():
    for i in range(1, 11):
        time.sleep(1)
        print(i)

threading.Thread(target=loop1_10).start()

类的继承创建线程

[编辑]

通过直接从 threading.Thread 继承创建一个新的子类后,必须要重写其中的 run 方法。实例化后调用 start() 方法启动新线程,即相当于它调用了线程的 run() 方法。

#!/usr/bin/env python

import threading
import time


class MyThread(threading.Thread):
    def run(self):                                         # Default called function with mythread.start()
        print("{} started!".format(self.getName()))        # "Thread-x started!"
        time.sleep(1)                                      # Pretend to work for a second
        print("{} finished!".format(self.getName()))       # "Thread-x finished!"

def main():
    for x in range(4):                                     # Four times...
        mythread = MyThread(name = "Thread-{}".format(x))  # ...Instantiate a thread and pass a unique ID to it
        mythread.start()                                   # ...Start the thread, run method will be invoked
        time.sleep(.9)                                     # ...Wait 0.9 seconds before starting another

if __name__ == '__main__':
    main()

输出为:

Thread-0 started!
Thread-1 started!
Thread-0 finished!
Thread-2 started!
Thread-1 finished!
Thread-3 started!
Thread-2 finished!
Thread-3 finished!

守护线程

[编辑]

如果当前python线程是守护线程,那么意味着这个线程是“不重要”的,“不重要”意味着如果他的主线程结束了但该守护线程没有运行完,守护线程就会被强制结束。如果线程是非守护线程,那么父进程只有等到非守护线程运行完毕后才能结束。

只要当前主线程中尚存任何一个非守护线程没有结束,守护线程就全部工作;只有当最后一个非守护线程结束是,守护线程随着主线程一同结束工作。

import threading
import time

# 每1秒加1
def job1(num):
    while True:
        num += 1
        print('{} is running >> {}'.format(threading.current_thread().name, num))
        time.sleep(1)

# 每2秒加2
def job2(num):
    while True:
        num += 2
        print('{} is running >> {}'.format(threading.current_thread().name, num))
        time.sleep(2)

# 线程1,一秒加一
new_job1 = threading.Thread(target=job1, name='Add1', args=(100,))
# 设置为守护线程
new_job1.setDaemon(True)
new_job1.start()

# 线程2,两秒加二
new_job2 = threading.Thread(target=job2, name='Add2', args=(1,))
new_job2.setDaemon(True)
new_job2.start()

# 主线程等待9秒
time.sleep(9)
print('{} Ending'.format(threading.current_thread().name))

随着输出 MainThread Ending 后,程序就运行结束了,这表明子线程全为守护线程时,会随着主线程的结束而强制结束。

线程的阻塞会合

[编辑]

join([timeout])方法会使线程进入等待状态(阻塞),直到调用join()方法的子线程运行结束。同时也可以通过设置 timeout 参数来设定等待的时间。

线程并发控制

[编辑]

互斥锁(Lock)

[编辑]

互斥锁只能加锁一次然后释放一次。

import threading
import time


num = 0

lock = threading.Lock()

def job1():
    global num
    for i in range(1000000):
        lock.acquire() # 加锁
        num += 1
        lock.release() # 释放锁
        # 上述代码也可以直接写为
        # with lock:
        # 	 num += 1

new_job1 = threading.Thread(target=job1, name='Add1')
new_job1.start()

for i in range(1000000):
    lock.acquire() # 加锁
    num += 2
    lock.release() # 释放锁

# 等待线程执行完毕
time.sleep(3)
print('num = {}'.format(num))

递归锁

[编辑]

递归锁(Rlock)允许多层加锁、释放锁:

import threading, time


def run1():
    lock.acquire()
    print("grab the first part data")
    global num
    num += 1
    lock.release()
    return num


def run2():
    lock.acquire()
    print("grab the second part data")
    global num2
    num2 += 1
    lock.release()
    return num2


def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res, res2)


if __name__ == '__main__':
    num, num2 = 0, 0
    lock = threading.RLock()
    for i in range(3):
        t = threading.Thread(target=run3)
        t.start()

while threading.active_count() != 1:
    print(threading.active_count())
else:
    print('----all threads done---')
    print(num, num2)

信号量

[编辑]
import threading
import time

# 设置信号量,即同时执行的线程数为3
lock = threading.BoundedSemaphore(3)

def job1():
    lock.acquire()
    print('{} is coming, {}'.format(threading.current_thread().name, time.strftime('%H:%M:%S',time.localtime(time.time()))))
    time.sleep(3)
    lock.release()

for i in range(10):
    new_job1 = threading.Thread(target=job1, name='Thread{}'.format(i))
    new_job1.start()

事件

[编辑]

threading.Event()类会在全局定义一个Flag,当 Flag=False 时,调用 wait()方法会阻塞所有线程;而当 Flag=True 时,调用 wait() 方法不再阻塞。形象的比喻就是“红绿灯”:在红灯时阻塞所有线程,而在绿灯时又会一次性放行所有排队中的线程。Event类有四个方法:

  • set() 将Flag设置为True
  • wait() 阻塞所有线程
  • clear() 将Flag设置为False
  • is_set() 返回bool值,判断Flag是否为True

线程局部存储

[编辑]
import threading

# 创建全局ThreadLocal对象
local_data = threading.local()
def add():
	# 取出ThreadLocal中的数据
    n = local_data.num
    local_data.num_add = n + n

def divid():
    n = local_data.num
    local_data.num_divid = n / 2

def times():
    local_data.result = local_data.num_add * local_data.num_divid

def job1(num):
	# 将数据存入ThreadLocal中
    local_data.num = num
    add()
    divid()
    times()
    print('{} result >> {}'.format(threading.current_thread().name, local_data.result))

for i in range(5):
    t = threading.Thread(target=job1, args=(i,), name='Thread{}'.format(i))
    t.start()

线程池

[编辑]

concurrent.futures.ThreadPoolExecutor

[编辑]

python3新引入的库concurrent.futures.ThreadPoolExecutor可以并行执行多个线程,适用于 I/O 密集型任务。

concurrent.futures.ThreadPoolExecutor类的常用方法:

  • map(func, *iterables, timeout=None, chunksize=1):并行执行一个函数对多个输入迭代器进行映射。使用 map 方法,有两个特点:无需提前使用submit方法;返回结果的顺序和元素的顺序相同,即使子线程先返回也不会获取结果.
  • shutdown(wait=True):停止池的操作,并等待所有提交的任务完成(wait=True)或立即返回(wait=False)。
  • submit(fn, *args, **kwargs) 在线程池中提交任务。返回一个 concurrent.futures.Future 对象,用于表示异步操作的结果。

示例:

from concurrent.futures import ThreadPoolExecutor

def task(n):
    return n * n

with ThreadPoolExecutor(max_workers=4) as executor:
    future = executor.submit(task, 2)
    print(future.result())  # 输出: 4

concurrent.futures.Future

[编辑]
  • concurrent.futures.Future 是一个表示异步操作结果的对象,它提供了一种机制来检查异步操作是否完成、获取结果以及处理可能出现的异常。Future 对象是 concurrent.futures 模块中的核心组件,主要用于线程池和进程池中的异步任务管理。
  • cancel():尝试取消这个任务。如果任务已经完成或已经被取消,返回 False。如果任务尚未执行,则任务会被取消,返回 True。
  • cancelled():检查这个任务是否已经被取消。
  • done():检查任务是否已经完成(无论是成功还是失败)。
  • result(timeout=None): 输出对应的线程运行后方法的返回结果,如果线程还在运行,那么其会一直阻塞在那里,直到该线程运行完,当然,也可以设置
  • result(timeout),即如果调用还没完成那么这个方法将等待 timeout 秒。如果在 timeout 秒内没有执行完成,concurrent.futures.TimeoutError 将会被触发。
  • exception(timeout=None):获取任务抛出的异常。如果任务成功完成,则返回 None。如果任务抛出异常,exception() 方法会返回该异常。如果指定了 timeout 参数并且任务还未完成,则会阻塞至超时或任务完成。
  • add_done_callback(fn):在任务完成时调用 fn 回调函数。fn 必须是一个可调用对象,接收一个 Future 对象作为参数示例:
  • remove_done_callback(fn):从回调函数列表中移除 fn。只有在回调函数还没有执行时,这个方法才有效
from concurrent.futures import ThreadPoolExecutor

def task(n):
    return n * n

with ThreadPoolExecutor() as executor:
    future = executor.submit(task, 2)
    print(future.result())  # 输出: 4

as_completed

[编辑]
concurrent.futures.as_completed(fs, timeout=None)

返回一个包含fs所指定的Future实例的迭代器。在没有任务完成的时候,会一直阻塞;如果设置了 timeout 参数,timeout 秒之后结果仍不可用,则返回的迭代器将引发concurrent.futures.TimeoutError。如果timeout未指定或为 None,则不限制等待时间。当有某个任务完成的时候,该方法会 yield 这个任务,就能执行 for循环体的语句,然后继续阻塞住,循环到所有的任务结束。先完成的任务会先返回给主线程。

import time
from concurrent.futures import ThreadPoolExecutor, as_completed

def add(a, b):
    time.sleep(3)
    return a + b

task = []
list_a = [1, 2, 3, 4, 5]
list_b = [6, 7, 8, 9, 10]
with ThreadPoolExecutor(2) as pool:
    for i in range(len(list_a)):
        task.append(pool.submit(add, list_a[i], list_b[i]))
	
	# 使用as_completed遍历
    for i in as_completed(task):
        print('result = {}'.format(i.result()))

该方法与第一种的直接遍历所具有的优势是,不需要等待所有线程全部返回,而是每返回一个子线程就能够处理,上面的result方法会阻塞后面的线程。

wait方法

[编辑]
wait(fs, timeout=None, return_when=ALL_COMPLETED)

fs 为指定的 Future 实例,timeout 可以用来控制返回前最大的等待秒数。 timeout 可以为 int 或 float 类型。 如果 timeout 未指定或为 None ,则不限制等待时间。return_when 指定此函数应在何时返回必须为以下常数之一:

  • FIRST_COMPLETED 等待第一个线程结束时返回,即结束等待
  • FIRST_EXCEPTION 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED
  • ALL_COMPLETED 函数将在所有可等待对象结束或取消时返回