likes
comments
collection
share

如何使用 Python 多处理模块

作者站长头像
站长
· 阅读数 12

本文中,我们将学习如何使用多处理模块中的特定 Python 类(进程类)。我将通过示例为您提供快速概述。

什么是多处理模块?

还有什么比从官方文档中提取模块更好的方式来描述模块呢? Multiprocessing 是一个使用类似于线程模块的 API 支持生成进程的包。多处理包提供本地和远程并发,通过使用子进程而不是线程有效地回避全局解释器锁。

线程模块不是本文的重点,但总而言之,线程模块将处理一小段代码执行(轻量级且具有共享内存),而多处理模块将处理程序执行(较重且完全隔离) 。

一般来说,多处理模块提供了各种其他类、函数和实用程序,可用于处理程序执行期间执行的多个进程。如果程序需要在其工作流程中应用并行性,该模块专门设计为交互的主要点。我们不会讨论多处理模块中的所有类和实用程序,而是将重点关注一个非常具体的类,即进程类。

什么是进程类?

在本节中,我们将尝试更好地介绍进程是什么,以及如何在 Python 中识别、使用和管理进程。正如 GNU C 库中所解释的:“进程是分配系统资源的基本单位。每个进程都有自己的地址空间和(通常)一个控制线程。一个进程执行一个程序;可以让多个进程执行相同的程序程序,但每个进程在其自己的地址空间内都有自己的程序副本,并独立于其他副本执行它。”

但这在 Python 中是什么样子的呢?到目前为止,我们已经设法对进程是什么、进程和线程之间的区别进行了一些描述和参考,但到目前为止我们还没有触及任何代码。好吧,让我们改变一下,用 Python 做一个非常简单的流程示例:

#!/usr/bin/env python
import os

# A very, very simple process.
if __name__ == "__main__":
    print(f"Hi! I'm process {os.getpid()}")

这将产生以下输出:

[r0x0d@fedora ~]$ python /tmp/tmp.iuW2VAurGG/scratch.py
Hi! I'm process 144112

正如您所看到的,任何正在运行的 Python 脚本或程序都是它自己的一个进程。

创建子进程

那么在父进程中生成不同的子进程又如何呢?好吧,要做到这一点,我们需要多处理模块中的 Process 类的帮助,它看起来像这样:

#!/usr/bin/env python
import os
import multiprocessing

def child_process():
    print(f"Hi! I'm a child process {os.getpid()}")

if __name__ == "__main__":
    print(f"Hi! I'm process {os.getpid()}")

    # Here we create a new instance of the Process class and assign our
    # `child_process` function to be executed.
    process = multiprocessing.Process(target=child_process)

    # We then start the process
    process.start()

    # And finally, we join the process. This will make our script to hang and
    # wait until the child process is done.
    process.join()

这将产生以下输出:

[r0x0d@fedora ~]$ python /tmp/tmp.iuW2VAurGG/scratch.py
Hi! I'm process 144078
Hi! I'm a child process 144079

关于上一个脚本的一个非常重要的注意事项:如果您不使用 process.join() 来等待子进程执行并完成,那么该点的任何其他后续代码将实际执行,并且可能会变得有点难以同步您的工作流程。

考虑以下示例:

#!/usr/bin/env python
import os
import multiprocessing

def child_process():
    print(f"Hi! I'm a child process {os.getpid()}")

if __name__ == "__main__":
    print(f"Hi! I'm process {os.getpid()}")

    # Here we create a new instance of the Process class and assign our
    # `child_process` function to be executed.
    process = multiprocessing.Process(target=child_process)

    # We then start the process
    process.start()

    # And finally, we join the process. This will make our script to hang and
    # wait until the child process is done.
    #process.join()

    print("AFTER CHILD EXECUTION! RIGHT?!")

该代码片段将产生以下输出:

[r0x0d@fedora ~]$ python /tmp/tmp.iuW2VAurGG/scratch.py
Hi! I'm process 145489
AFTER CHILD EXECUTION! RIGHT?!
Hi! I'm a child process 145490

当然,断言上面的代码片段是错误的也是不正确的。这完全取决于您想要如何使用该模块以及您的子进程将如何执行。所以要明智地使用它。

创建各种子进程

如果要生成多个进程,可以利用 for 循环(或任何其他类型的循环)。它们将允许您创建对所需流程的尽可能多的引用,并在稍后阶段启动/加入它们。

#!/usr/bin/env python
import os
import multiprocessing

def child_process(id):
    print(f"Hi! I'm a child process {os.getpid()} with id#{id}")

if __name__ == "__main__":
    print(f"Hi! I'm process {os.getpid()}")
    list_of_processes = []

    # Loop through the number 0 to 10 and create processes for each one of
    # them.
    for i in range(0, 10):
        # Here we create a new instance of the Process class and assign our
        # `child_process` function to be executed. Note the difference now that
        # we are using the `args` parameter now, this means that we can pass
        # down parameters to the function being executed as a child process.
        process = multiprocessing.Process(target=child_process, args=(i,))
        list_of_processes.append(process)

    for process in list_of_processes:
        # We then start the process
        process.start()

        # And finally, we join the process. This will make our script to hang
        # and wait until the child process is done.
        process.join()

这将产生以下输出:

[r0x0d@fedora ~]$ python /tmp/tmp.iuW2VAurGG/scratch.py
Hi! I'm process 146056
Hi! I'm a child process 146057 with id#0
Hi! I'm a child process 146058 with id#1
Hi! I'm a child process 146059 with id#2
Hi! I'm a child process 146060 with id#3
Hi! I'm a child process 146061 with id#4
Hi! I'm a child process 146062 with id#5
Hi! I'm a child process 146063 with id#6
Hi! I'm a child process 146064 with id#7
Hi! I'm a child process 146065 with id#8
Hi! I'm a child process 146066 with id#9

数据通信

在上一节中,我描述了向 multiprocessing.Process 类构造函数添加一个新参数 args。此参数允许您将值传递给子进程以在函数内部使用。但你知道如何从子进程返回数据吗?

您可能会认为,要从子级返回数据,必须使用其中的 return 语句才能真正检索数据。进程非常适合以隔离的方式执行函数,而不会干扰共享资源,这意味着我们知道从函数返回数据的正常且常用的方式。在这里,由于其隔离而不允许。

相反,我们可以使用队列类,它将为我们提供一个在父进程与其子进程之间通信数据的接口。在这种情况下,队列是一个普通的 FIFO(先进先出),具有用于处理多处理的内置机制。

考虑以下示例:

#!/usr/bin/env python
import os
import multiprocessing

def child_process(queue, number1, number2):
    print(f"Hi! I'm a child process {os.getpid()}. I do calculations.")
    sum = number1 + number2

    # Putting data into the queue
    queue.put(sum)

if __name__ == "__main__":
    print(f"Hi! I'm process {os.getpid()}")

    # Defining a new Queue()
    queue = multiprocessing.Queue()

    # Here we create a new instance of the Process class and assign our
    # `child_process` function to be executed. Note the difference now that
    # we are using the `args` parameter now, this means that we can pass
    # down parameters to the function being executed as a child process.
    process = multiprocessing.Process(target=child_process, args=(queue,1, 2))

    # We then start the process
    process.start()

    # And finally, we join the process. This will make our script to hang and
    # wait until the child process is done.
    process.join()

    # Accessing the result from the queue.
    print(f"Got the result from child process as {queue.get()}")

它将给出以下输出:

[r0x0d@fedora ~]$ python /tmp/tmp.iuW2VAurGG/scratch.py
Hi! I'm process 149002
Hi! I'm a child process 149003. I do calculations.
Got the result from child process as 3

异常处理

处理异常是一项特殊且有些困难的任务,我们在使用流程模块时必须不时地完成它。原因是,默认情况下,子进程内发生的任何异常将始终由生成它的 Process 类处理。

下面的代码引发带有文本的异常:

#!/usr/bin/env python
import os
import multiprocessing

def child_process():
    print(f"Hi! I'm a child process {os.getpid()}.")
    raise Exception("Oh no! :(")

if __name__ == "__main__":
    print(f"Hi! I'm process {os.getpid()}")

    # Here we create a new instance of the Process class and assign our
    # `child_process` function to be executed. Note the difference now that
    # we are using the `args` parameter now, this means that we can pass
    # down parameters to the function being executed as a child process.
    process = multiprocessing.Process(target=child_process)

    try:
        # We then start the process
        process.start()

        # And finally, we join the process. This will make our script to hang and
        # wait until the child process is done.
        process.join()

        print("AFTER CHILD EXECUTION! RIGHT?!")
    except Exception:
        print("Uhhh... It failed?")

输出结果:

[r0x0d@fedora ~]$ python /tmp/tmp.iuW2VAurGG/scratch.py
Hi! I'm process 149505
Hi! I'm a child process 149506.
Process Process-1:
Traceback (most recent call last):
  File "/usr/lib64/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib64/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/tmp.iuW2VAurGG/scratch.py", line 7, in child_process
    raise Exception("Oh no! :(")
Exception: Oh no! :(
AFTER CHILD EXECUTION! RIGHT?!

如果您跟踪代码,您将能够注意到在 process.join() 调用之后仔细放置了一条 print 语句,以模拟父进程仍在运行,即使在子进程中引发了未处理的异常之后也是如此。

克服这种情况的一种方法是在子进程中实际处理异常,如下所示:

#!/usr/bin/env python
import os
import multiprocessing

def child_process():
    try:
        print(f"Hi! I'm a child process {os.getpid()}.")
        raise Exception("Oh no! :(")
    except Exception:
        print("Uh, I think it's fine now...")

if __name__ == "__main__":
    print(f"Hi! I'm process {os.getpid()}")

    # Here we create a new instance of the Process class and assign our
    # `child_process` function to be executed. Note the difference now that
    # we are using the `args` parameter now, this means that we can pass
    # down parameters to the function being executed as a child process.
    process = multiprocessing.Process(target=child_process)

    # We then start the process
    process.start()

    # And finally, we join the process. This will make our script to hang and
    # wait until the child process is done.
    process.join()

    print("AFTER CHILD EXECUTION! RIGHT?!")

现在,您的异常将在您的子进程内处理,这意味着您可以控制它会发生什么以及在这种情况下应该做什么。

总结

当工作和实现依赖于并行方式执行的解决方案时,多处理模块非常强大,特别是与 Process 类一起使用时。这增加了在其自己的隔离进程中执行任何函数的惊人可能性。

本文由mdnice多平台发布