likes
comments
collection
share

Python并发编程之Futures:简化异步编程的利器

作者站长头像
站长
· 阅读数 11

前言

Python是一门流行且强大的编程语言,具备灵活的异步编程能力。在并发编程中,Futures模块是Python提供的一个强大工具,它简化了异步编程的复杂性,使得编写并发代码变得更加直观和易于阅读。本文将介绍Futures的基本概念和用法,并通过实例来引导读者深入理解。

区别并发与并行

并发指的是同时处理多个任务的能力。在并发执行中,任务按照交替、间断的方式进行,看起来好像是同时执行。通过在任务之间快速切换,实现了看似同时进行的效果。并发适用于单核处理器系统,其中只有一个物理处理单元。

并行指的是真正同时执行多个任务的能力。在并行执行中,任务可以在多个物理处理单元(例如多核处理器、多个计算机节点等)上同时进行,每个任务都有自己的处理资源。并行适用于多核处理器系统,可以充分利用多个处理单元提高任务的执行效率。

简而言之, 并发指的是任务的交替执行,通过任务之间的快速切换来实现并行的假象;并行则是真正的同时执行多个任务。

以下是并发和并行的一些关键区别:

  1. 单元数量:并发适用于单个处理单元(如单核CPU),并行适用于多个处理单元(如多核CPU)。
  2. 实际执行:并发是任务交替执行,看起来像是同时进行;并行是真正同时执行多个任务。
  3. 物理资源:并发共享物理资源,任务之间快速切换;并行每个任务都有自己的物理资源。
  4. 提高性能:并行可以通过同时执行多个任务来提高整体性能,而并发主要用于提高响应性和减少等待时间。

并发,是指遇到I/O阻塞时(一般是网络I/O或磁盘I/O),通过多个线程之间切换执行多个任务(多线程)或单线程内多个任务之间切换执行的方式来最大化利用CPU时间,但同一时刻,只允许有一个线程或任务执行。适合I/O阻塞频繁的业务场景。 并行,是指多个进程完全同步同时的执行。适合CPU密集型业务场景。

Futures简介

Futures是Python标准库中concurrent.futures模块提供的一种并发编程概念。它允许我们在主线程中定义任务,并在后台进行并发执行。通过使用Futures,我们可以异步地执行任务、获取任务的结果,以及处理异常情况,而无需显式地管理线程或进程。

Futures的基本用法

要使用Futures,首先需要导入concurrent.futures模块:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed

接下来,我们需要创建一个线程池或进程池对象,以便执行我们定义的任务。ThreadPoolExecutor用于开启线程并执行任务,ProcessPoolExecutor则用于开启进程。

下面是一个使用ThreadPoolExecutor的示例,计算斐波那契数列的值:

def fibonacci(n):
    if n <= 1:
        return n
    else:
        return fibonacci(n-1) + fibonacci(n-2)

def main():
    start_time = time.perf_counter()
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(fibonacci, i) for i in range(10)]
        results = [future.result() for future in as_completed(futures)]

    print(results)
    end_time = time.perf_counter()

    print(end_time - start_time) # 0.0010214539999999772

在上述示例中,我们使用executor.submit()方法将任务fibonacci(i)提交给线程池,返回一个Future对象。通过遍历所有的Future对象,并使用as_completed()函数等待并获取任务的结果,我们可以得到每个任务的计算结果。

这里我们创建了一个线程池,总共有 2个线程可以分配使用。虽然线程的数量可以自己定义,但是线程数并不是越多越好,因为线程的创建、维护和删除也会有一定的开销。所以如果你设置的很大,反而可能会导致速度变慢。我们往往需要根据实际的需求做一些测试,来寻找最优的线程数量。

多线程与单线程的比较

import time

def fibonacci(n):
    if n <= 1:
        return n
    else:
        return fibonacci(n-1) + fibonacci(n-2)

def main():
    start_time = time.perf_counter()
    for i in range(10):
        fibonacci(i)
    end_time = time.perf_counter()

    print(end_time - start_time)  # 5.414600000000491

这是单线程的执行时间,单线程的优点是简单明了,但是明显效率低下

异步执行和获取结果

Futures提供了异步执行任务的能力。调用executor.submit()方法时,任务会立即开始执行,并返回一个Future对象,代表该任务的未来结果。

要获取任务的结果,可以使用Future对象的result()方法。它会阻塞主线程,直到任务完成并返回结果。另外,还可以使用add_done_callback()方法注册一个回调函数,当任务完成时自动触发。

下面是一个使用回调函数处理任务结果的示例:

def callback(future):
    result = future.result()
    print("Task completed with result:", result)

def main():
    with ThreadPoolExecutor() as executor:
        future = executor.submit(fibonacci, 10)
        future.add_done_callback(callback)

    # Do other work here

    # 阻塞主线程,等待任务的完成
    futures.wait([future])

在上述示例中,我们通过future.add_done_callback()方法注册了一个回调函数。当任务完成时,回调函数会被自动调用,并传递Future对象作为参数。我们可以在回调函数中使用future.result()获取任务的计算结果。

控制并发度

Futures允许我们通过控制并发度来管理并发执行的任务。并发度指的是同时进行的任务数量。

通过传递max_workers参数给线程池或进程池对象,我们可以限制并发执行的任务数量。如果未指定max_workers,则默认使用系统可用的核心数。

下面是一个控制并发度的示例,通过设置max_workers为2,限制了同时执行的任务数量:

def main():
    with ThreadPoolExecutor(max_workers=2) as executor:
        futures = [executor.submit(fibonacci, i) for i in range(10)]
        results = [future.result() for future in as_completed(futures)]

    print(results)

在上述示例中,我们使用max_workers=2创建了一个最大并发度为2的线程池。这意味着最多同时执行两个任务,其余的任务会等待前面的任务完成后再执行。

错误处理

当任务执行过程中出现异常时,Futures提供了异常处理机制,使得我们可以捕获和处理任务中的异常情况。

可以通过在任务函数内部抛出异常,或者使用future.set_exception()方法手动设置异常,来模拟任务的执行失败。

下面是一个处理任务异常的示例:

def task():
    raise Exception("Task execution failed")

def main():
    with ThreadPoolExecutor() as executor:
        future = executor.submit(task)
        try:
            result = future.result()
        except Exception as e:
            print("Task execution failed:", e)

在上述示例中,我们通过在任务函数task()中抛出异常模拟了任务执行失败的情况。在获取任务结果时,使用try-except语句捕获异常,并进行处理。

为啥多线程每次只有一个线程执行

同一时刻,Python 主程序只允许有一个线程执行,所以 Python 的并发,是通过多线程的切换完成的。你可能会疑惑这到底是为什么呢?这里我简单提一下全局解释器锁的概念。事实上,Python 的解释器并不是线程安全的,为了解决由此带来的 race condition 等问题,Python 便引入了全局解释器锁,也就是同一时刻,只允许一个线程执行。当然,在执行 I/O 操作时,如果一个线程被 block 了,全局解释器锁便会被释放,从而让另一个线程能够继续执行。

最后

Futures是Python并发编程的一个强大工具,它简化了异步编程的复杂性,使得编写并发代码变得更加直观和易于阅读。本文介绍了Futures的基本用法,包括任务提交和执行、获取结果、控制并发度以及错误处理等方面。

再举个race condition的例子

"Race condition"是并发编程中的一个常见问题,指的是多个进程或线程之间的执行顺序不确定,导致程序的行为变得不可预测。

假设有一个共享的计数器变量 count,初始值为0。现在有两个线程同时对 count 进行增加操作,代码如下

import threading

count = 0

def increment():
    global count
    for _ in range(1000000):
        count += 1

thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=increment)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print("Final count:", count)

预期结果应该是2000000,即两个线程各加了1000000次,但是实际执行结果并不是这样。这就是Race condition问题。

在这个例子中,当两个线程同时执行 count += 1 操作时,它们会读取当前的 count 值,然后分别进行加一操作,最后再写回 count。然而,由于线程之间的切换和调度是不确定的,可能在某个线程读取 count 后,还未来得及进行加一操作之前,另一个线程也已经读取了 count ,这样导致了Race condition。

对于 threading,操作系统知道每个线程的所有信息,因此它会做主在适当的时候做线程切换。很显然,这样的好处是代码容易书写,因为程序员不需要做任何切换操作的处理;但是切换线程的操作,容易出现在语句执行过程中,这样就容易出现 race condition 的情况。

而对于 asyncio,主程序想要切换任务时,必须得到此任务可以被切换的通知,这样一来也就可以避免刚刚提到的 race condition 的情况。