likes
comments
collection
share

用举例来学习Python中的并行性、并发性和异步性

作者站长头像
站长
· 阅读数 5

Parallelism, Concurrency, and AsyncIO in Python - by example

用举例来学习Python中的并行性、并发性和异步性

一篇中文译文,来自:(Parallelism, Concurrency, and AsyncIO in Python - by example | TestDriven.io) Posted by Amal Shaji

This tutorial looks at how to speed up CPU-bound and IO-bound operations with multiprocessing, threading, and AsyncIO. 本教程介绍如何通过多进程、线程和 AsyncIO 来加速 CPU 密集型和 IO 密集型操作。

Concurrency vs Parallelism 并发与并行

Concurrency and parallelism are similar terms, but they are not the same thing. 并发和并行是相似的术语,但它们不是同一件事。

Concurrency is the ability to run multiple tasks on the CPU at the same time. Tasks can start, run, and complete in overlapping time periods. In the case of a single CPU, multiple tasks are run with the help of context switching, where the state of a process is stored so that it can be called and executed later. 并发性是指在 CPU 上同时运行多个任务的能力。任务可以在重叠的时间段内启动、运行和完成。在单CPU的情况下,多个任务在上下文切换的帮助下运行,其中(上下文中)存储了进程的状态,以便稍后调用和执行。

Parallelism, meanwhile, is the ability to run multiple tasks at the same time across multiple CPU cores. 与此同时,并行(Parallelism)是指在多个 CPU 核心上同时运行多个任务(multiple tasks)的能力。

Though they can increase the speed of your application, concurrency and parallelism should not be used everywhere. The use case depends on whether the task is CPU-bound or IO-bound. 尽管它们可以提高应用程序的速度,但并发(concurrency)和并行(Parallelism)不应该在所有的地方使用。是否使用取决于任务是 CPU 密集型(CPU-bound)还是 IO 密集型(IO-bound)。

Tasks that are limited by the CPU are CPU-bound. For example, mathematical computations are CPU-bound since computational power increases as the number of computer processors increases. Parallelism is for CPU-bound tasks. 受 CPU 限制的任务是 CPU 密集型的。例如,数学计算受 CPU 限制,因为计算能力随着计算机处理器数量的增加而增加。并行(Parallelism)适用于 CPU 密集型任务。

In theory, If a task is divided into n-subtasks, each of these n-tasks can run in parallel to effectively reduce the time to 1/n of the original non-parallel task. Concurrency is preferred for IO-bound tasks, as you can do something else while the IO resources are being fetched. 理论上,如果一个任务被划分为n个子任务,那么这n个子任务中的每一个都可以并行运行,从而有效地将运行时间减少到原来非并行运行时的1/n。对于 IO 密集型(IO-bound)任务来说,并发(Concurrency)是首选,因为您可以在 IO 资源正在被获取的同时去做其他操作。

The best example of CPU-bound tasks is in data science. Data scientists deal with huge chunks of data. For data preprocessing, they can split the data into multiple batches and run them in parallel, effectively decreasing the total time to process.Increasing the number of cores results in faster processing. CPU 密集型(CPU-bound)任务的最佳示例是数据科学。数据科学家处理大量数据。对于数据预处理,他们可以将数据分成多个批次并行(parallel)运行,从而有效减少总处理时间。增加核心数量可以加快处理速度。

Web scraping is IO-bound. Because the task has little effect on the CPU since most of the time is spent on reading from and writing to the network. Other common IO-bound tasks include database calls and reading and writing files to disk.Web applications, like Django and Flask, are IO-bound applications. Web 抓取是 IO 密集型(IO-bound)。因为该任务对 CPU 的影响很小,因为大部分时间都花在网络读写上。其他常见的 IO 密集型任务包括数据库调用以及向磁盘读取和写入文件。Web 应用程序(例如 Django 和 Flask)是 IO 密集型应用程序。

If you're interested in learning more about the differences between threads, multiprocessing, and async in Python, check out the Speeding Up Python with Concurrency, Parallelism, and asyncio article. 如果您有兴趣了解有关 Python 中线程、多处理和异步之间差异的更多信息,请查看通过并发、并行和异步加速 Python 文章

Scenario 设想

With that, let's take a look at how to speed up the following tasks: 接下来,让我们看看如何加快以下任务的速度:

# tasks.py

import os
from multiprocessing import current_process
from threading import current_thread

import requests


def make_request(num):
    # io-bound

    pid = os.getpid()
    thread_name = current_thread().name
    process_name = current_process().name
    print(f"{pid} - {process_name} - {thread_name}")

    requests.get("https://httpbin.org/ip")


async def make_request_async(num, client):
    # io-bound

    pid = os.getpid()
    thread_name = current_thread().name
    process_name = current_process().name
    print(f"{pid} - {process_name} - {thread_name}")

    await client.get("https://httpbin.org/ip")


def get_prime_numbers(num):
    # cpu-bound

    pid = os.getpid()
    thread_name = current_thread().name
    process_name = current_process().name
    print(f"{pid} - {process_name} - {thread_name}")

    numbers = []

    prime = [True for i in range(num + 1)]
    p = 2

    while p * p <= num:
        if prime[p]:
            for i in range(p * 2, num + 1, p):
                prime[i] = False
        p += 1

    prime[0] = False
    prime[1] = False

    for p in range(num + 1):
        if prime[p]:
            numbers.append(p)

    return numbers

All of the code examples in this tutorial can be found in the parallel-concurrent-examples-python repo.z 本教程中的所有代码示例都可以在parallel-concurrent-examples-python 存储库中找到。

Notes: 笔记(上面代码中的函数说明):

  • make_request makes an HTTP request to httpbin.org/ip X number of times.
  • make_requesthttpbin.org/ip 发出 HTTP 请求 X 次。
  • make_request_async makes the same HTTP request asynchronously with HTTPX.
  • make_request_async 使用 HTTPX 异步(asynchronously )发出相同的 HTTP 请求。
  • get_prime_numbers calculates the prime numbers, via the Sieve of Eratosthenes method, from two to the provided limit.
  • get_prime_numbers 通过埃拉托斯特尼筛法计算从 2 到 指定数值 间的素数。

We'll be using the following libraries from the standard library to speed up the above tasks: 我们将使用标准库中的以下库来加速上述任务:

  • threading for running tasks concurrently

  • threading线程用于并发运行任务的

  • multiprocessing for running tasks in parallel

  • multiprocessing 多进程并行运行任务的

  • concurrent.futures for running tasks concurrently and in parallel from a single interface

  • concurrent.futures 异步并发 用于并发运行任务,以及从单个接口并行

    (注:concurrent.futures是重要的异步编程库。内部实现机制非常复杂,简单来说就是开辟一个固定大小为n的进程池/线程池。进程池中最多执行n个进程/线程,当任务完成后,从任务队列中取新任务。若池满,则排队等待。)

  • asyncio for running tasks concurrency with coroutines managed by the Python interpreter

  • asyncio 异步IO用于使用 Python 解释器管理的协程 coroutines 并发运行任务

Library 图书馆Class/Method 类/方法Processing Type 加工类型
threading 线程Thread 线程concurrent 并发
concurrent.futures 异步并发ThreadPoolExecutor 线程池执行器concurrent 并发
asyncio 异步IOgather 收集concurrent (via coroutines) 并发(通过协程)
multiprocessing 多进程Pool 池parallel 并行
concurrent.futures 异步并发ProcessPoolExecutor 进程池执行器parallel 并行

IO-bound Operation IO 绑定操作

Again, IO-bound tasks spend more time on IO than on the CPU. 同样,IO 密集型任务在 IO 上花费的时间比在 CPU 上花费的时间更多。

Since web scraping is IO bound, we should use threading to speed up the processing as the retrieving of the HTML (IO) is slower than parsing it (CPU). 由于网页抓取受 IO 限制,因此我们应该使用线程(threading )来加快处理速度,因为 HTML 的检索 (IO) 比解析它 (CPU) 慢。

Scenario: How to speed up a Python-based web scraping and crawling script? 场景:如何加速基于Python的网页抓取和爬行脚本?

Sync Example 同步示例

Let's start with a benchmark. 让我们从一个基准测试程序(benchmark)开始。

# io-bound_sync.py

import time

from tasks import make_request


def main():
    for num in range(1, 101):
        make_request(num)


if __name__ == "__main__":
    start_time = time.perf_counter()

    main()

    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

Here, we made 100 HTTP requests using the make_request function. Since requests happen synchronously, each task is executed sequentially. 在这里,我们使用 make_request 函数发出 100 个 HTTP 请求。由于请求是同步(synchronously)发生的,因此每个任务都是按顺序执行的。

Elapsed run time: 15.710984757 seconds.

So, that's roughly 0.16 seconds per request. 因此,每个请求大约需要 0.16 秒。

Threading Example 线程示例

# io-bound_concurrent_1.py

import threading
import time

from tasks import make_request


def main():
    tasks = []

    for num in range(1, 101):
        tasks.append(threading.Thread(target=make_request, args=(num,)))
        tasks[-1].start()

    for task in tasks:
        task.join()


if __name__ == "__main__":
    start_time = time.perf_counter()

    main()

    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

Here, the same make_request function is called 100 times. This time the threading library is used to create a thread for each request. 此处,相同的 make_request 函数被调用 100 次。这次使用 threading 库为每个请求创建一个线程。

Elapsed run time: 1.020112515 seconds.

The total time decreases from ~16s to ~1s. 总时间从约 16 秒减少到约 1 秒。

Since we're using separate threads for each request, you might be wondering why the whole thing didn't take ~0.16s to finish. This extra time is the overhead for managing threads. The Global Interpreter Lock (GIL) in Python makes sure that only one thread uses the Python bytecode at a time. 由于我们对每个请求使用单独的线程,因此您可能想知道为什么整个事情不是花费大约 0.16 秒(16 秒/100个线程=0.16 秒)完成。这个额外的耗费时间就是管理线程的开销。 Python 中的全局解释器锁 (GIL) 确保一次只有一个线程使用 Python 字节码。

concurrent.futures Example concurrent.futures示例

# io-bound_concurrent_2.py

import time
from concurrent.futures import ThreadPoolExecutor, wait

from tasks import make_request


def main():
    futures = []

    with ThreadPoolExecutor() as executor:
        for num in range(1, 101):
            futures.append(executor.submit(make_request, num))

    wait(futures)


if __name__ == "__main__":
    start_time = time.perf_counter()

    main()

    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

Here we used concurrent.futures.ThreadPoolExecutor to achieve multithreading. After all the futures/promises are created, we used wait to wait for all of them to complete. 这里我们使用 concurrent.futures.ThreadPoolExecutor 来实现多线程。创建所有 futures/promise 后,我们使用 wait 等待它们全部完成。

Elapsed run time: 1.340592231 seconds

concurrent.futures.ThreadPoolExecutor is actually an abstraction around the multithreading library, which makes it easier to use. In the previous example, we assigned each request to a thread and in total 100 threads were used. But ThreadPoolExecutor defaults the number of worker threads to min(32, os.cpu_count() + 4). ThreadPoolExecutor exists to ease the process of achieving multithreading. If you want more control over multithreading, use the multithreading library instead. concurrent.futures.ThreadPoolExecutor 实际上是对 multithreading 库的抽象,这使得它更易于使用。在前面的示例中,我们将每个请求分配给一个线程,总共使用了 100 个线程。但 ThreadPoolExecutor 默认工作线程数为 min(32, os.cpu_count() + 4) 。 ThreadPoolExecutor 的存在是为了简化实现多线程的过程。如果您想要对多线程进行更多控制,请改用 multithreading 库。

AsyncIO Example 异步IO示例

# io-bound_concurrent_3.py

import asyncio
import time

import httpx

from tasks import make_request_async


async def main():
    async with httpx.AsyncClient() as client:
        return await asyncio.gather(
            *[make_request_async(num, client) for num in range(1, 101)]
        )


if __name__ == "__main__":
    start_time = time.perf_counter()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    end_time = time.perf_counter()
    elapsed_time = end_time - start_time
    print(f"Elapsed run time: {elapsed_time} seconds")

httpx is used here since requests does not support async operations. 此处使用 httpx ,因为 requests 不支持异步操作。

Here, we used asyncio to achieve concurrency. 这里,我们使用 asyncio 来实现并发。

Elapsed run time: 0.553961068 seconds

asyncio is faster than the other methods, because threading makes use of OS (Operating System) threads. So the threads are managed by the OS, where thread switching is preempted by the OS. asyncio uses coroutines, which are defined by the Python interpreter. With coroutines, the program decides when to switch tasks in an optimal way. This is handled by the even_loop in asyncio. asyncio 比其他方法更快,因为 threading 使用 OS(操作系统)线程。因此线程由操作系统管理,其中线程切换由操作系统抢占。 asyncio 使用由 Python 解释器定义的协程。通过协程,程序可以决定何时以最佳方式切换任务。这是由 asyncio 中的 even_loop 处理的。

CPU-bound Operation CPU 密集型操作

Scenario: How to speed up a simple data processing script? 场景:如何加速一个简单的数据处理脚本?

Sync Example 同步示例

Again, let's start with a benchmark. 再次,让我们从基准测试程序开始。

# cpu-bound_sync.py

import time

from tasks import get_prime_numbers


def main():
    for num in range(1000, 16000):
        get_prime_numbers(num)


if __name__ == "__main__":
    start_time = time.perf_counter()

    main()

    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

Here, we executed the get_prime_numbers function for numbers from 1000 to 16000. 在这里,我们对 1000 到 16000 之间的数字执行 get_prime_numbers 函数。

Elapsed run time: 17.863046316 seconds.

Multiprocessing Example 多进程示例

# cpu-bound_parallel_1.py

import time
from multiprocessing import Pool, cpu_count

from tasks import get_prime_numbers


def main():
    with Pool(cpu_count() - 1) as p:
        p.starmap(get_prime_numbers, zip(range(1000, 16000)))
        p.close()
        p.join()


if __name__ == "__main__":
    start_time = time.perf_counter()

    main()

    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

Here, we used multiprocessing to calculate the prime numbers. 在这里,我们使用 multiprocessing 来计算素数。

Elapsed run time: 2.9848740599999997 seconds.

concurrent.futures Example concurrent.futures示例

# cpu-bound_parallel_2.py

import time
from concurrent.futures import ProcessPoolExecutor, wait
from multiprocessing import cpu_count

from tasks import get_prime_numbers


def main():
    futures = []

    with ProcessPoolExecutor(cpu_count() - 1) as executor:
        for num in range(1000, 16000):
            futures.append(executor.submit(get_prime_numbers, num))

    wait(futures)


if __name__ == "__main__":
    start_time = time.perf_counter()

    main()

    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

Here, we achieved multiprocessing using concurrent.futures.ProcessPoolExecutor. Once the jobs are added to futures, wait(futures) waits for them to finish. 在这里,我们使用 concurrent.futures.ProcessPoolExecutor 实现了多处理。将作业添加到 future 后, wait(futures) 就会等待它们完成。

Elapsed run time: 4.452427557 seconds.

concurrent.futures.ProcessPoolExecutor is a wrapper around multiprocessing.Pool. It has the same limitations as the ThreadPoolExecutor. If you want more control over multiprocessing, use multiprocessing.Pool. concurrent.futures provides an abstraction over both multiprocessing and threading, making it easy to switch between the two. concurrent.futures.ProcessPoolExecutormultiprocessing.Pool 的包装器。它具有与 ThreadPoolExecutor 相同的限制。如果您想更好地控制多处理,请使用 multiprocessing.Poolconcurrent.futures 提供了对多处理和线程的抽象,使得在两者之间切换变得容易。

Conclusion 结论

It's worth noting that using multiprocessing to execute the make_request function will be much slower than the threading flavor since the processes will be need to wait for the IO. The multiprocessing approach will be faster then the sync approach, though. 值得注意的是,使用多进程来执行 make_request 函数将比线程的方式慢得多,因为进程需要等待 IO。不过,多进程方法还是会比同步方式更快。

Similarly, using concurrency for CPU-bound tasks is not worth the effort when compared to parallelism. 同样,与并行相比,对 CPU 密集型任务使用并发并不值得。

That being said, using concurrency or parallelism to execute your scripts adds complexity. Your code will generally be harder to read, test, and debug, so only use them when absolutely necessary for long-running scripts. 话虽这么说,使用并发或并行来执行脚本会增加复杂性。您的代码通常更难阅读、测试和调试,因此仅在长时间运行的脚本绝对必要时才使用它们。

concurrent.futures is where I generally start since- concurrent.futures 是我通常开始的地方,因为-

  1. It's easy to switch back and forth between concurrency and parallelism
  2. 可以轻松地在并发和并行之间来回切换
  3. The dependent libraries don't need to support asyncio (requests vs httpx)
  4. 依赖库不需要支持 asyncio ( requestshttpx
  5. It's cleaner and easier to read over the other approaches
  6. 与其他方法相比,它更干净、更容易阅读

Grab the code from the parallel-concurrent-examples-python repo on GitHub. 从 GitHub 上的 parallel-concurrent-examples-python 存储库获取代码。