likes
comments
collection
share

Python标准库 - concurrent.futuresPython 在执行时,通常是采用同步的任务处理模式 ( 一

作者站长头像
站长
· 阅读数 20

Python 在执行时,通常是采用同步的任务处理模式 ( 一个处理完成后才会接下去处理第二个 ),然而 Python 的标准函数“concurrent.futures”,提供了异步任务处理的功能,能够同时处理多个任务,这篇教程会介绍 concurrent.futures 的用法。

同步与非同步

同步和非同步的常见说法是:“ 同步模式下,每个任务必须按照顺序执行,后面的任务必须等待前面的任务执行完成,在非同步模式下,后面的任务不用等前面的,各自执行各自的任务”,也可以想像成“ 同一条路 vs 不同的多条路”,通过道路的方式,会更容易明白同步和非同步。

  • 同步:“同一条路”,只能依序排队前进

  • 非同步:“不 ( 非 ) 同的多条路”,可以各走各的

Python标准库 - concurrent.futuresPython 在执行时,通常是采用同步的任务处理模式 ( 一

Thread 和 Process

concurrent.futures 提供了 ThreadPoolExecutorProcessPoolExecutor 两种可以平行处理任务的实现方法,ThreadPoolExecutor 是针对 Thread ( 执行绪 ),ProcessPoolExecutor 是针对 Process ( 程序 ),下方是 Thread 和 Process 的简单差异说明:

英文中文说明
Thread执行绪程序执行任务的基本单位。
Process程序启动应用程序时产生的执行实体,需要一定的 CPU 与内存资源,Process 由一到多个 Thread 组成,同一个 Process 里的 Thread 可以共用内存资源。

import concurrent.futures

要使用 concurrent.futures 必须先 import concurrent.futures 模块,或使用 from 的方式,单独 import 特定的类型。

import concurrent.futures
from concurrent.futures import ThreadPoolExecutor

ThreadPoolExecutor

ThreadPoolExecutor 会通过 Thread 的方式创建多个 Executors ( 执行器 ) ,执行并处理多个任务 ( tasks ), ThreadPoolExecutor 有四个参数,最常用的为 max_workers:

参数说明
max_workersThread 的数量,默认 5 ( CPU number * 5,每个 CPU 可以处理 5 个 Thread),数量越多,运行速度会越快,如果设置小于等于 0 会发生错误。
thread_name_prefixThread 的名称,默认 ''。
initializer每个 Thread 启动时调用的可调用对象,默认 None。
initargs传递给初始化程序的参数,使用 tuple,默认 ()。

使用 ThreadPoolExecutor 后,就能使用 Executors 的相关方法:

方法参数说明
submitfn, *args, **kwargs执行某个函数。
mapfunc, *iterables使用 map 的方式,使用某个函数执行可迭代的内容。
shutdownwait完成执行后返回信号,释放正在使用的任何资源,wait 默认 True 会在所有对象完成后才返回信号,wait 设置 False 则会在执行后立刻返回。

举例来说,下方的代码执行后,会按照顺序 ( 同步 ) 显示出数字,前一个任务尚未处理完,就不会执行后续的工作。

import time
def test(n):
    for i in range(n):
        print(i, end=' ')
        time.sleep(0.2)
test(2)
test(3)
test(4)
0 1 0 1 2 0 1 2 3

如果改成 ThreadPoolExecutor 的方式,就会发现三个函数就会一起进行 ( 如果执行的函数大于 5,可再设置 max_workers 的数值 )。

import time
from concurrent.futures import ThreadPoolExecutor
def test(n):
    for i in range(n):
        print(i, end=' ')
        time.sleep(0.2)
executor = ThreadPoolExecutor()  # 设置一个执行 Thread 的启动器
a = executor.submit(test, 2)     # 启动第一个 test 函数
b = executor.submit(test, 3)     # 启动第二个 test 函数
c = executor.submit(test, 4)     # 启动第三个 test 函数
executor.shutdown()              # 关闭启动器 ( 如果没有使用,则启动器会处在锁住的状态而无法继续 )
# 0 0 0 1 1 1 2 2 3

上述的做法,可以改用 with...as 的方式。

import time
from concurrent.futures import ThreadPoolExecutor
def test(n):
    for i in range(n):
        print(i, end=' ')
        time.sleep(0.2)
with ThreadPoolExecutor() as executor:    # 改用 with...as
    executor.submit(test, 2)
    executor.submit(test ,3)
    executor.submit(test, 4)
# 0 0 0 1 1 1 2 2 3

上述的示例,也可以改用 map 的做法:

import time
from concurrent.futures import ThreadPoolExecutor
def test(n):
    for i in range(n):
        print(i, end=' ')
        time.sleep(0.2)
with ThreadPoolExecutor() as executor:
    executor.map(test, [2,3,4])
# 0 0 0 1 1 1 2 2 3

输入文字,停止函数执行

通过平行任务处理的方法,就能轻松做到“输入文字,停止正在执行的函数”,以下方的例子而言, run 是一个具有“无穷循环”的函数,如果不使用平行任务处理,在 run 后方的程序都无法运作 ( 会被无穷循环卡住 ),而 keyin 是一个具有“input”指令的函数,如果不使用平行任务处理,在 keyin 后方的程序也无法运作 ( 会被 input 卡住 ), 因此如果使用 concurrent.futures,就能让两个函数同时运行,搭配全域变量的做法,就能在输入特定指令时,停止另外函数的运作。

import time
from concurrent.futures import ThreadPoolExecutor
a = True               # 定义 a 为 True
def run():
    global a           # 定义 a 是全域变量
    while a:           # 如果 a 为 True
        print(123)     # 不断显示 123
        time.sleep(1)  # 每隔一秒
def keyin():
    global a           # 定义 a 是全域变量
    if input() == 'a':
        a = False      # 如果输入的是 a,就让 a 为 False,停止 run 函数中的循环
executor = ThreadPoolExecutor()
e1 = executor.submit(run)
e2 = executor.submit(keyin)
executor.shutdown()

ProcessPoolExecutor

ProcessPoolExecutor 会通过 Process 的方式创建多个 Executors ( 执行器 ),执行并处理多个程序,ProcessPoolExecutor 有四个参数,最常用的为 max_workers:

参数说明
max_workersProcess 的数量,默认为机器的 CPU 数量,如果 max_workers 小于等于 0 或大于等于 61 会发生错误。
thread_name_prefixThread 的名称,默认 ''。
initializer每个 Thread 启动时调用的可调用对象,默认 None。
initargs传递给初始化程序的参数,使用 tuple,默认 ()。

使用 ProcessPoolExecutor 后,就能使用 Executors 的相关方法:

方法参数说明
submitfn, *args, **kwargs执行某个函数。
mapfunc, *iterables使用 map 的方式,使用某个函数执行可迭代的内容。
shutdownwait完成执行后返回信号,释放正在使用的任何资源,wait 默认 True 会在所有对象完成后才返回信号,wait 设置 False 则会在执行后立刻返回。

ProcessPoolExecutor 的用法基本上和 ThreadPoolExecutor 很像,但 ProcessPoolExecutor 主要会用做处理比较需要运算的程序,ThreadPoolExecutor 会使用于等待输入和输出 ( I/O ) 的程序,两者执行后也会有些差别,ProcessPoolExecutor 执行后最后是显示运算结果,而 ThreadPoolExecutor 则是显示过程。

import time
from concurrent.futures import ProcessPoolExecutor
def test(n):
    for i in range(n):
        print(i, end=' ')
        time.sleep(0.2)
    print()
with ProcessPoolExecutor() as executor:
    executor.map(test, [4,5,6])

Python标准库 - concurrent.futuresPython 在执行时,通常是采用同步的任务处理模式 ( 一

如果是使用 ThreadPoolExecutor 则会如下图的结果:

Python标准库 - concurrent.futuresPython 在执行时,通常是采用同步的任务处理模式 ( 一

此外,Python 3.5 之后 map() 方法多了 chunksize 参数可以使用,该参数只对 ProcessPoolExecutor 有效,可以提升处理大量可迭代对象的执行性能,chunksize 默认 1,数值越大性能越好 ( 以电脑本身 CPU 的性能为主 )。

import time
from concurrent.futures import ProcessPoolExecutor
def test(n):
    for i in range(n):
        print(i, end=' ')
        time.sleep(0.2)
    print()
with ProcessPoolExecutor() as executor:
    executor.map(test, [4,5,6], chunksize=5)  # 设置 chunksize

小结

Python 的 concurrent.futures 内置函数库是一个相当方便的函数库,不仅可以让原本同步的执行变成非同步,大幅减少工作时间,用法上也比使用 multiprocessing、threading、asyncio 容易得多,是相当推荐的内置函数库。

转载自:https://juejin.cn/post/7416562712023777334
评论
请登录