Python标准库 - concurrent.futuresPython 在执行时,通常是采用同步的任务处理模式 ( 一
Python 在执行时,通常是采用同步的任务处理模式 ( 一个处理完成后才会接下去处理第二个 ),然而 Python 的标准函数“concurrent.futures”,提供了异步任务处理的功能,能够同时处理多个任务,这篇教程会介绍 concurrent.futures 的用法。
同步与非同步
同步和非同步的常见说法是:“ 同步模式下,每个任务必须按照顺序执行,后面的任务必须等待前面的任务执行完成,在非同步模式下,后面的任务不用等前面的,各自执行各自的任务”,也可以想像成“ 同一条路 vs 不同的多条路”,通过道路的方式,会更容易明白同步和非同步。
同步:“同一条路”,只能依序排队前进。
非同步:“不 ( 非 ) 同的多条路”,可以各走各的。
Thread 和 Process
concurrent.futures 提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 两种可以平行处理任务的实现方法,ThreadPoolExecutor 是针对 Thread ( 执行绪 ),ProcessPoolExecutor 是针对 Process ( 程序 ),下方是 Thread 和 Process 的简单差异说明:
英文 | 中文 | 说明 |
---|---|---|
Thread | 执行绪 | 程序执行任务的基本单位。 |
Process | 程序 | 启动应用程序时产生的执行实体,需要一定的 CPU 与内存资源,Process 由一到多个 Thread 组成,同一个 Process 里的 Thread 可以共用内存资源。 |
import concurrent.futures
要使用 concurrent.futures 必须先 import concurrent.futures 模块,或使用 from 的方式,单独 import 特定的类型。
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
ThreadPoolExecutor
ThreadPoolExecutor 会通过 Thread 的方式创建多个 Executors ( 执行器 ) ,执行并处理多个任务 ( tasks ), ThreadPoolExecutor 有四个参数,最常用的为 max_workers:
参数 | 说明 |
---|---|
max_workers | Thread 的数量,默认 5 ( CPU number * 5,每个 CPU 可以处理 5 个 Thread),数量越多,运行速度会越快,如果设置小于等于 0 会发生错误。 |
thread_name_prefix | Thread 的名称,默认 ''。 |
initializer | 每个 Thread 启动时调用的可调用对象,默认 None。 |
initargs | 传递给初始化程序的参数,使用 tuple,默认 ()。 |
使用 ThreadPoolExecutor 后,就能使用 Executors 的相关方法:
方法 | 参数 | 说明 |
---|---|---|
submit | fn, *args, **kwargs | 执行某个函数。 |
map | func, *iterables | 使用 map 的方式,使用某个函数执行可迭代的内容。 |
shutdown | wait | 完成执行后返回信号,释放正在使用的任何资源,wait 默认 True 会在所有对象完成后才返回信号,wait 设置 False 则会在执行后立刻返回。 |
举例来说,下方的代码执行后,会按照顺序 ( 同步 ) 显示出数字,前一个任务尚未处理完,就不会执行后续的工作。
import time
def test(n):
for i in range(n):
print(i, end=' ')
time.sleep(0.2)
test(2)
test(3)
test(4)
# 0 1 0 1 2 0 1 2 3
如果改成 ThreadPoolExecutor 的方式,就会发现三个函数就会一起进行 ( 如果执行的函数大于 5,可再设置 max_workers 的数值 )。
import time
from concurrent.futures import ThreadPoolExecutor
def test(n):
for i in range(n):
print(i, end=' ')
time.sleep(0.2)
executor = ThreadPoolExecutor() # 设置一个执行 Thread 的启动器
a = executor.submit(test, 2) # 启动第一个 test 函数
b = executor.submit(test, 3) # 启动第二个 test 函数
c = executor.submit(test, 4) # 启动第三个 test 函数
executor.shutdown() # 关闭启动器 ( 如果没有使用,则启动器会处在锁住的状态而无法继续 )
# 0 0 0 1 1 1 2 2 3
上述的做法,可以改用 with...as 的方式。
import time
from concurrent.futures import ThreadPoolExecutor
def test(n):
for i in range(n):
print(i, end=' ')
time.sleep(0.2)
with ThreadPoolExecutor() as executor: # 改用 with...as
executor.submit(test, 2)
executor.submit(test ,3)
executor.submit(test, 4)
# 0 0 0 1 1 1 2 2 3
上述的示例,也可以改用 map 的做法:
import time
from concurrent.futures import ThreadPoolExecutor
def test(n):
for i in range(n):
print(i, end=' ')
time.sleep(0.2)
with ThreadPoolExecutor() as executor:
executor.map(test, [2,3,4])
# 0 0 0 1 1 1 2 2 3
输入文字,停止函数执行
通过平行任务处理的方法,就能轻松做到“输入文字,停止正在执行的函数”,以下方的例子而言, run 是一个具有“无穷循环”的函数,如果不使用平行任务处理,在 run 后方的程序都无法运作 ( 会被无穷循环卡住 ),而 keyin 是一个具有“input”指令的函数,如果不使用平行任务处理,在 keyin 后方的程序也无法运作 ( 会被 input 卡住 ), 因此如果使用 concurrent.futures,就能让两个函数同时运行,搭配全域变量的做法,就能在输入特定指令时,停止另外函数的运作。
import time
from concurrent.futures import ThreadPoolExecutor
a = True # 定义 a 为 True
def run():
global a # 定义 a 是全域变量
while a: # 如果 a 为 True
print(123) # 不断显示 123
time.sleep(1) # 每隔一秒
def keyin():
global a # 定义 a 是全域变量
if input() == 'a':
a = False # 如果输入的是 a,就让 a 为 False,停止 run 函数中的循环
executor = ThreadPoolExecutor()
e1 = executor.submit(run)
e2 = executor.submit(keyin)
executor.shutdown()
ProcessPoolExecutor
ProcessPoolExecutor 会通过 Process 的方式创建多个 Executors ( 执行器 ),执行并处理多个程序,ProcessPoolExecutor 有四个参数,最常用的为 max_workers:
参数 | 说明 |
---|---|
max_workers | Process 的数量,默认为机器的 CPU 数量,如果 max_workers 小于等于 0 或大于等于 61 会发生错误。 |
thread_name_prefix | Thread 的名称,默认 ''。 |
initializer | 每个 Thread 启动时调用的可调用对象,默认 None。 |
initargs | 传递给初始化程序的参数,使用 tuple,默认 ()。 |
使用 ProcessPoolExecutor 后,就能使用 Executors 的相关方法:
方法 | 参数 | 说明 |
---|---|---|
submit | fn, *args, **kwargs | 执行某个函数。 |
map | func, *iterables | 使用 map 的方式,使用某个函数执行可迭代的内容。 |
shutdown | wait | 完成执行后返回信号,释放正在使用的任何资源,wait 默认 True 会在所有对象完成后才返回信号,wait 设置 False 则会在执行后立刻返回。 |
ProcessPoolExecutor 的用法基本上和 ThreadPoolExecutor 很像,但 ProcessPoolExecutor 主要会用做处理比较需要运算的程序,ThreadPoolExecutor 会使用于等待输入和输出 ( I/O ) 的程序,两者执行后也会有些差别,ProcessPoolExecutor 执行后最后是显示运算结果,而 ThreadPoolExecutor 则是显示过程。
import time
from concurrent.futures import ProcessPoolExecutor
def test(n):
for i in range(n):
print(i, end=' ')
time.sleep(0.2)
print()
with ProcessPoolExecutor() as executor:
executor.map(test, [4,5,6])
如果是使用 ThreadPoolExecutor 则会如下图的结果:
此外,Python 3.5 之后 map() 方法多了 chunksize 参数可以使用,该参数只对 ProcessPoolExecutor 有效,可以提升处理大量可迭代对象的执行性能,chunksize 默认 1,数值越大性能越好 ( 以电脑本身 CPU 的性能为主 )。
import time
from concurrent.futures import ProcessPoolExecutor
def test(n):
for i in range(n):
print(i, end=' ')
time.sleep(0.2)
print()
with ProcessPoolExecutor() as executor:
executor.map(test, [4,5,6], chunksize=5) # 设置 chunksize
小结
Python 的 concurrent.futures 内置函数库是一个相当方便的函数库,不仅可以让原本同步的执行变成非同步,大幅减少工作时间,用法上也比使用 multiprocessing、threading、asyncio 容易得多,是相当推荐的内置函数库。
转载自:https://juejin.cn/post/7416562712023777334