likes
comments
collection
share

Python编程-并发编程基础梳理与高级特性案例讲解

作者站长头像
站长
· 阅读数 8

Python编程-并发编程基础梳理与高级特性案例讲解

同步、异步通信与IO阻塞

同步(Synchronous)和异步(Asynchronous)通信是两种不同的通信方式,涉及到处理任务的时序和相互等待的关系。同步通信简单直观,但可能导致资源浪费和阻塞;异步通信可以提高系统的并发性和响应性,但需要更复杂的编程模型。

同步通信

在同步通信中,发送方发出请求后,必须等待接收方的响应,直到接收方响应完成后才能继续执行。同步通信是阻塞的,发送方和接收方在通信期间都会被阻塞,直到通信完成。典型的同步通信方式包括函数调用、传统的阻塞式IO等。

异步通信

在异步通信中,发送方发出请求后不需要等待接收方的响应,而是可以继续执行其他任务。异步通信是非阻塞的,发送方和接收方在通信期间可以继续执行其他任务,无需相互等待。异步通信通常通过回调函数、事件驱动等方式实现,比如异步IO、消息队列等。

IO阻塞

I/O阻塞是指在进行输入/输出(I/O)操作时,程序的执行被暂时挂起或阻塞,直到所需的I/O操作完成。在计算机编程中,I/O操作包括从文件读取数据、向文件写入数据、网络通信等。当程序执行一个I/O操作时,它可能需要等待数据从磁盘加载、网络传输完成或者其他外部设备准备好。常见情况:

  • CPU与IO设备的通信

    • 同步:CPU发送IO请求后等待IO设备完成,期间CPU处于阻塞状态
    • 异步:CPU发送IO请求后可以继续执行其他任务,当IO设备完成时通过中断或轮询等方式通知CPU。
  • 进程间通信

    • 同步:一个进程发送消息后等待接收进程的响应,直到接收到响应才继续执行。
    • 异步:一个进程发送消息后可以继续执行其他任务,不需要等待接收进程的响应。

串行、并行与并发的区别

图源网络,并非自设

Python编程-并发编程基础梳理与高级特性案例讲解

串行(Sequential)、并行(Parallel)、并发(Concurrent)是计算机科学中常用的概念,它们描述了程序执行或处理任务的不同方式。

串行(Sequential)

  • 串行是指任务按照顺序一个接一个地执行,一个任务完成后才会开始下一个任务。
  • 在串行执行中,任务之间是相互依赖的,后一个任务的执行依赖于前一个任务的完成。

例如:在餐厅等待服务,你必须等待前面的顾客点完菜、用餐并离开,然后才能轮到你点菜并用餐。每个顾客的服务是按照顺序一个接一个完成的

并行(Parallel)

  • 并行是指多个任务同时执行,它们可以在同一时刻开始和结束。
  • 在并行处理中,任务之间通常是独立的,彼此之间没有相互依赖。

例如:在自助餐厅,每个人都可以同时取菜,独立进行。不同的人可以同时进行不同的任务,而彼此之间没有关联。他们在同一时刻开始并独立完成他们的任务

并发(Concurrent)

  • 并发是指任务在时间上重叠执行,但不一定同时执行。任务之间可能会有交替执行的情况,通过切换执行上下文来实现。
  • 在并发模型中,可以通过多线程、协程等方式来实现任务的并发执行。

并发(concurrengy):一个CPU采用时间片管理方式,交替的处理多个任务。一般是是任务数多余cpu核数,通过操作系统的各种任务调度算法,实现用多个任务”一起”执行(实际上总有一些任务不在执行,因为切换任务的速度相当快,看上去一起执行而已)

进程、线程与协程的区别

进程(Process)、线程(Thread)、协程(Coroutine)是计算机中用于并发执行任务的三个基本概念

进程(Process)

进程(Process) 是一个具有一定独立功能的程序关于某个数据集合的一次运行活动

  • 进程是操作系统中的一个独立的执行单元,有自己独立的地址空间。 拥有自己独立的堆和栈,既不共享堆,也不共享栈
  • 进程之间通信相对复杂,需要通过进程间通信(IPC)机制,如管道、消息队列、共享内存等。
  • 进程拥有独立的资源,包括独立的内存空间、文件句柄等。
  • 进程的创建和销毁开销相对较大,效率低,在Python中可以利用CPU多核特性。

线程(Thread)

线程 (Thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。在一个进程内部,要同时干多件事,就需要同时运行多个子任务,我们把进程内的这些子任务称为线程

  • 线程是进程的一个执行流,共享进程的地址空间,但拥有自己的栈,与其他线程共享堆,不共享栈
  • 线程之间通过共享的内存进行通信,但需要注意线程同步问题。
  • 线程拥有独立的执行流,但共享进程的资源,创建和销毁开销相对较小,效率一般(在不考虑GIL的情况下)。
  • 线程之间的切换开销较小,因为共享进程的地址空间,在CPython中不能利用CPU多核特性。

协程(Coroutine)

协程Coroutines,也叫作纤程(Fiber),是一种在线程中,比线程更加轻量级的存在,由程序员自己写程序来管理。当出现IO阻塞时,CPU一直等待IO返回,处于空转状态。这时候用协程,可以执行其他任务。当IO返回结果后,再回来处理数据。充分利用了IO等待的时间,提高了效率。

  • 协程是一种轻量级的线程,是用户级的线程,不依赖于操作系统的线程和进程,拥有自己独立的栈和堆,共享堆,不共享栈
  • 协程由用户控制调度,可以在代码中显式地切换执行流,而线程的切换通常是由操作系统控制的。
  • 协程之间的切换开销非常小,且效率高,因为不涉及内核态和用户态的切换。
  • 协程通常用于处理大量的、轻量级的任务,比如网络通信、IO操作等。

注意:线程是程序执行的最小单位,而进程是操作系统分配资源的最小单位;个进程由一个或多个线程组成,线程是一个进程中代码的不同执行路线进程之间相互独立,但同一进程下的各个线程之间共享程序的内存空间(包括代码段、数据集、堆等)及一些进程级的资源(如打开文件和信号),某进程内的线程在其它进程不可见;调度和切换: 线程上下文切换比进程上下文切换要快得多

抢占式与非抢占式资源调度

这里可以参见银行家算法

抢占式调度和非抢占式调度是操作系统中用于管理任务和资源的两种基本调度策略。

  1. 抢占式调度(Preemptive Scheduling):

    • 在抢占式调度中,操作系统具有中断正在运行的任务的能力,即当前任务被抢占运行资源,以便将处理器分配给其他任务。
    • 当具有更高优先级的任务就绪时,操作系统会暂停当前任务的执行,保存其状态,然后切换到更高优先级的任务。
    • 抢占式调度通常能够更灵活地响应紧急任务或高优先级任务,提高系统的响应性。
  2. 非抢占式调度(Non-Preemptive Scheduling):

    • 在非抢占式调度中,一个任务一旦开始执行,就会一直执行到完成或主动释放CPU。
    • 操作系统无法中断正在执行的任务,直到它自愿放弃或完成任务,即当前任务的运行资源不会被抢占
    • 这种调度策略通常简单,但可能导致对于高优先级任务的响应不够及时,因为在当前任务执行完成之前,系统无法进行任务切换。

在选择抢占式或非抢占式调度策略时,需要根据系统的需求和性能要求来决定。抢占式调度通常用于对实时性要求高的系统,以确保及时响应紧急任务。非抢占式调度适用于对实时性要求不太苛刻,且希望减少上下文切换开销的场景。不同的应用场景可能需要不同的调度策略来平衡系统性能和响应能力。

在 Python 中,**默认情况下,新建的线程是抢占式的。**这意味着操作系统可以在任何时刻剥夺一个线程的CPU时间,将CPU时间分配给其他等待执行的线程。这通过操作系统的调度器来实现,可以通过时钟中断等机制来定期触发线程切换。

方法包装的线程创建

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number, or “Thread-N (target)” where “target” is target.__name__ if the target argument is specified.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If not None, daemon explicitly sets whether the thread is daemonic. If None (the default), the daemonic property is inherited from the current thread.

from:【Thread Object】docs.python.org/3/library/t…

在python中我们使用threading模块创建线程一个线程对象Tread,其构造器中有两个常用参数:

  • target:用于接受线程中运行的方法名,其值为一个函数或方法

  • name:用于为线程设置名称,可以通过name属性获取

  • args:用于接受线程中运行方法的位置参数元组

  • kwargs:用于接受线程中运行方法的位置参数字典

下面我们创建一个接收方法的线程实例,来展示几个关键方法:

start新建线程方法

start方法用于新建一个线程,并且在线程中启动run方法执行线程任务。start方法只能启动一次同一个线程,否则将会抛出错误:

RuntimeError: threads can only be started once

from threading import Thread


def occupy_function(test_string: str, sequence: int) -> None:
    print("This is a content of test_striung"
          f" in Thread {sequence} : {test_string}")


if __name__ == "__main__":
    print("Thread main has started")
    thread1: Thread = Thread(target=occupy_function, args=("Hello World", 1))
    thread1.start()
    print("Thread main has stopped")

Python编程-并发编程基础梳理与高级特性案例讲解

这里你会发现,主线程在thresd1之前结束了,解决方案参考join方法

run运行任务方法

run方法用于运行绑定到线程的任务方法,在上述代码中指定的任务方法即为occupy_function,所以在start后运行了该方法,如果单独运行run方法将会使得新线程不会被建立,直接在主调线程中执行线程活动

Python编程-并发编程基础梳理与高级特性案例讲解

join线程等待方法

join方法接收一个参数timeout,用于标识等待时间(默认为None),当 timeout 参数不存在或者是 None ,这个操作会阻塞直到被调线程线程终结(即主调线程将会阻塞)。并且一个线程可以开启多个其他线程,并调用多个join方法。需要注意的是,在当前线程中尝试调用当前线程的join方法将会导致死锁引发 RuntimeError ,如果尝试 join() 一个尚未开始的线程,同样会引发 RuntimeError

注意join方法返回值总为None,如果需要判断超时,必须在join后使用is_alive判断(返回布尔类型)

下面是演示代码:

from threading import Thread
import time


def occupy_function_one(test_string: str, sequence: int) -> None:
    print("This is a content of test_striung"
          f" in Thread {sequence} : {test_string}")
    
def occupy_function_two(test_string: str, sequence: int) -> None:
    time.sleep(1)
    print("This is a content of test_striung"
          f" in Thread {sequence} : {test_string}")


if __name__ == "__main__":
    print("Thread main has started")

    thread1: Thread = Thread(target=occupy_function_one, args=("Hello World", 1))
    thread2: Thread = Thread(target=occupy_function_two, args=("Hello Python", 2))

    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()

    print("Thread main has stopped")   

得到运行结果:

Thread main has started
This is a content of test_striung in Thread 1 : Hello World
This is a content of test_striung in Thread 2 : Hello Python
Thread main has stopped

多线程情况下的换行符输出

你可能注意到了,在join方法的代码中的不同之处,我使用了sleep暂停了thread2线程:

def occupy_function_two(test_string: str, sequence: int) -> None:
    time.sleep(1)
    print("This is a content of test_striung"
          f" in Thread {sequence} : {test_string}")

当你去掉会出现什么情况呢?

Python编程-并发编程基础梳理与高级特性案例讲解

多运行几次你会发现会出现换行符输出不及时的情况,这是因为操作系统的调度是动态的,在多线程环境中,使用 print 函数时可能会出现输出不及时,这是因为 print 在默认情况下是原子性的操作,但在线程间切换时可能会导致部分输出被缓存而没有及时刷新到控制台。

这里需要强调,在多线程或多进程编程时,inputprint尽量不要使用,因为两者是对标准输入输出流进行处理,容易引发意想不到的错误,尤其是会阻塞线程的input

同样的在多进程与多线程情况下传递进入的函数,通常不能够像普通函数那样进行return,因为在多线程与多进程情况下,他们与主进程,主线程不在同一内存空间,但是后文将会提到池的概念,它为返回值提供了一种可行的方式

类包装继承Thread对象

语法较为简单,我们直接进行演示:

from threading import Thread


class MyExtendsThread(Thread):
    def __init__(self, greeting_string: str, sequence: int) -> None:
        super().__init__()		# 有些人也习惯 Thread.__init__(self) 但是我不喜欢 也不推荐
        self.greeting_string: str = greeting_string
        self.sequence: int = sequence

    def run(self) -> None:
        print("This is a content of test_striung"
          f" in Thread Object {self.sequence} : {self.greeting_string}")
        

if __name__ == "__main__":
    print("Thread main has started")

    my_thread_object: MyExtendsThread = MyExtendsThread("Hello World", 1)

    my_thread_object.start()

    my_thread_object.join(timeout=3)

    if not my_thread_object.is_alive():
        print("The called thread has terminated")
    
    print("Thread main has stopped") 
    

注意:在 Python 中,继承 Thread 对象并创建线程类时,不一定需要重写run方法(甚至于其他任何方法)。Thread 类本身提供了默认的 run 方法,该方法为空。如果你的线程类不需要执行特定的操作,可以直接使用 Thread 类的默认 run 方法(一般都需要重写run方法)。

守护线程的概念与使用案例

在 Python 中,守护线程(Daemon Threads)是一种特殊类型的线程,其生命周期受到主线程的影响。当所有的非守护父线程结束时(即开启守护线程的线程),守护线程也会被强制结束,而不管它们是否执行完毕。守护线程常用于为其启动线程服务,例如GC(垃圾回收器)

import threading
import time

def daemon_function() -> None:
    while True:
        print("Daemon thread is running...")
        time.sleep(1)

# 创建一个守护线程
daemon_thread = threading.Thread(target=daemon_function)
daemon_thread.daemon = True # 设置线程为守护线程
daemon_thread.start()

# 使主线程执行一段时间
time.sleep(3)
print("  Main thread is exiting  ")

在老版本中使用的是setDaemon方法来设置守护线程

守护线程的使用场景

  1. 后台任务:当一些后台任务需要执行,而不想让它们阻塞主程序的执行时,守护线程非常适合。例如,日志记录、监控系统资源、或者定期清理任务等。
  2. 定时任务:需要在程序运行期间执行一些定时任务时,例如定时备份数据、定时发送邮件等。可以创建一个守护线程来执行这些定时任务,以避免阻塞主程序的执行。
  3. 持续运行的服务:有时候可能需要在程序运行期间持续提供一些服务,例如网络服务器。可以将这些服务放在守护线程中运行,以确保主程序退出时服务也会被关闭。
  4. 资源监控:如果你需要监控系统资源或其他进程的活动,守护线程是一个很好的选择。可以创建一个守护线程来监控系统的 CPU 使用率、内存占用情况等,并在必要时采取相应的措施。
  5. 事件监听器:守护线程也适用于监听事件并做出响应的场景。例如创建一个守护线程来监听键盘输入或鼠标事件,并根据用户的操作执行相应的任务。

非高性能的Python多线程

在Python中,无论CPU有多少个内核,在Cpython解释器中永远都是假象。因为同一时间执行的线程只有一个线程,它是python的一个开发时候,设计的一个缺陷,换句话说Python的多线程并不能发挥多核CPU的优势,Python多线程的低效体现在以下方面:

  1. 全局解释器锁(GIL):Python 中的 GIL 是一个全局解释器级别的锁,它确保同一时刻只有一个线程执行 Python 字节码。这意味着在多线程环境下,即使有多个线程同时运行,它们也不能并行执行 Python 代码,而是会交替执行,因为只有一个线程能够持有 GIL。这限制了 Python 多线程程序的并行性,因此无法充分利用多核处理器的优势。
  2. I/O 操作阻塞:在 Python 中,很多 I/O 操作(如文件读写、网络通信等)会导致线程阻塞,而 GIL 的存在使得即使一个线程被阻塞,其他线程也无法继续执行 Python 代码。虽然可以使用异步编程或者多进程来规避这个问题,但是在某些场景下,多线程仍然无法避免阻塞。
  3. 线程间切换开销:由于 Python 的线程是由操作系统的原生线程实现的,线程间的切换会涉及到操作系统的上下文切换,这会带来一定的开销。当线程数量较多时,线程间的切换开销可能会变得显著,并降低程序的性能。
  4. C 调用不受 GIL 限制:尽管 Python 的多线程受到 GIL 的限制,但是对于一些耗时的 C 函数调用(如某些数学运算、文件 I/O 等),由于这些函数是由底层的 C 代码实现的,并且不会受到 GIL 的影响,因此在这些函数的调用过程中,GIL 不会成为性能瓶颈。

全局解释器锁GIL

Python代码的执行由Python 虚拟机(CPython)来控制,Python 在设计之初就考虑到要在解释器的主循环中,同时只有一个线程在执行,即在任意时刻,只有一个线程在解释器中运行。对Python 虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程的字节码在运行。由于 CPython 解释器并不是线程安全的,因此在多线程环境下同时执行 Python 代码可能会导致数据竞争和不确定的行为。为了解决这个问题,CPython 使用 GIL 来确保同一时刻只有一个线程可以执行 Python 字节码,从而避免了大部分线程安全性的问题。

线程安全(Thread Safety)指的是在多线程环境下,对于共享的数据结构或代码段的操作,不会产生不确定的行为或结果。换句话说,当多个线程同时访问共享资源时,线程安全的设计保证这些操作不会导致数据破坏、数据竞争或者其他不一致性问题。

GIL并非 Python 语言的特性

GIL(全局解释器锁)不是 Python 语言的特性,而是由 Python 解释器(Python)实现的特定设计选择。

  1. 解释器实现选择:GIL 是由 CPython 解释器的实现决定的,而 Python 语言本身并没有规定必须使用 GIL。例如,Jython 和 IronPython 等其他 Python 解释器并没有引入 GIL,它们采用了不同的实现方式来处理线程安全性。

  2. 历史原因:GIL 最初是为了简化 CPython 解释器的实现而引入的。在 Python 的早期版本中,由于解释器的内部数据结构并不是线程安全的,为了避免在多线程环境下出现数据竞争和不一致性问题,引入了 GIL。

  3. 性能和便利性的权衡:虽然 GIL 限制了 Python 程序的并行性能,但是它确实简化了 Python 解释器的实现,并且使得很多 C 扩展库能够更容易地编写和使用。在设计时,Python 的创作者可能更注重了解释器的实现和使用的便利性,而不是追求最大的并行性能。

尽管 GIL 在某些情况下限制了 Python 程序的性能表现,但是 Python 语言本身并不是固有与 GIL 相关的。实际上,有些 Python 的替代实现(如 PyPy)采用了不同的线程模型,以实现更好的并行性能。因此,GIL 可以被看作是 CPython 解释器的一个设计选择,而不是 Python 语言的特性。

线程同步与互斥锁使用案例

处理多线程问题时,多个线程访问同一个对象,并且某些线程还想修改这个对象。这时候,我们就需要用到“线程同步”。 线程同步其实就是一种等待机制,多个需要同时访问此对象的线程进入这个对象的等待池形成队列,等待前面的线程使用完毕后,下一个线程再使用。

为什么要保证线程同步

在多线程同时修改一个资源时,我们必须要保证数据的修改符合预期,比如说,同一张信用卡,绑定了微信与支付宝,假如你同时在淘宝与京东分别使用支付宝与微信付款,怎样能够保证你的信用卡账户不会在同时支付出现超出余额呢?

import threading
import time

counter: int = 100 

def decrease_counter(money, thread_order) -> None:
    global counter
    if counter < money:
        print(thread_order, 'not sufficient funds:', counter, '\t')
        return
    time.sleep(1)	# 故意休眠此线程 触发判断pass 模拟复杂情况下的不同步问题出现
    print(thread_order, 'Sufficient balance:', counter, '\t')
    counter -= 60

thread1 = threading.Thread(target=decrease_counter, args=(60, 'Thread1'))
thread2 = threading.Thread(target=decrease_counter, args=(60, 'Thread2'))

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print("Final counter value:", counter)

如上代码中,假如额度为100,运行结束时,余额为-20,这显然是不允许出现的问题,这就是线程同步的意义

什么是互斥锁

互斥锁(Mutex)是一种用于多线程编程的同步机制,用于确保在任何时刻只有一个线程可以访问共享资源,从而避免多个线程同时修改共享资源而导致的数据竞争和不一致性问题。线程的执行需要获得互斥锁,执行完毕后需要释放互斥锁,而在此期间其他线程必须等待互斥锁释放,线程获取互斥锁通常涉及以下步骤:

  1. 尝试获取锁: 线程首先尝试获取互斥锁。如果当前没有其他线程持有该锁,则获取成功,线程可以进入临界区(访问共享资源)。如果有其他线程已经持有该锁,则获取失败,线程进入阻塞状态等待。
  2. 阻塞等待: 当线程无法获取互斥锁时,它会进入阻塞状态,等待互斥锁的释放。在大多数情况下,线程在阻塞状态会暂时放弃 CPU 时间片,直到互斥锁被释放并且线程重新获得调度才能继续执行。
  3. 等待队列: 线程在未获取到互斥锁时,会加入到互斥锁的等待队列中,等待锁的持有者释放锁时,唤醒等待队列中的一个线程继续执行。
  4. 持有锁: 当线程成功获取互斥锁时,它会进入临界区执行相应的操作,并且在完成后释放互斥锁。
  5. 释放锁: 当线程完成了对共享资源的操作,需要释放互斥锁以便其他线程可以获取锁。释放锁后,等待互斥锁的其他线程中的一个会被唤醒并获得锁。

锁的临界区是指在多线程编程中,由锁保护的一段代码或一组操作,这些操作访问了共享资源,需要确保在任何时刻只有一个线程可以进入临界区执行这些操作,以避免多线程并发访问共享资源导致的数据竞争和不一致性问题。

互斥锁有以下特点:

  1. 独占性: 互斥锁只能被一个线程持有。当一个线程获得了互斥锁后,其他线程必须等待该线程释放锁之后才能获得锁。
  2. 原子性: 互斥锁的操作是原子的,即对互斥锁的获取和释放是不可分割的操作,不存在中间状态。
  3. 互斥性: 当一个线程持有互斥锁时,其他线程无法同时持有该锁。只有在当前持有锁的线程释放锁之后,其他线程才有机会获得锁。
  4. 阻塞: 当一个线程尝试获取一个已经被其他线程持有的互斥锁时,它会被阻塞,直到该锁被释放为止。
  5. 非递归性: 通常情况下,同一个线程在持有互斥锁的同时,不能再次获取该锁,否则会导致死锁。

互斥锁使用时需要注意以下方面:

  • 使用互斥锁的线程必须使用同一个锁对象,即获得与释放的都是同一个锁
  • 互斥锁的作用就是保证同一时刻只能有一个线程去操作共享数据,保证共享数据不会出现错误问题
  • 使用互斥锁的好处确保某段关键代码只能由一个线程从头到尾完整地去执行
  • 使用互斥锁会影响代码的执行效率,同时持有多把锁,容易出现死锁的情况

使用互斥锁同步线程

在Python的threading模块中,Lock(互斥锁)是一种同步原语,用于在多线程编程中控制对共享资源的访问。Lock提供了一种简单的机制,确保在任何时刻只有一个线程可以获取锁,从而避免多个线程同时修改共享资源而导致的数据竞争和不一致性问题。

  • 手动操作锁的获取与释放
import threading
import time

counter: int = 100 

def decrease_counter(money, thread_order, lock) -> None:
    global counter
    lock.acquire()
    if counter < money:
        print(thread_order, 'not sufficient funds:', counter, '\t')
        lock.release()
        return
    time.sleep(1)
    print(thread_order, 'Sufficient balance:', counter, '\t')
    counter -= 60
    lock.release()

lock = threading.Lock()

thread1 = threading.Thread(target=decrease_counter, args=(60, 'Thread1', lock))
thread2 = threading.Thread(target=decrease_counter, args=(60, 'Thread2', lock))

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print("Final counter value:", counter)

  • 使用with自动管理临界区
def decrease_counter(money, thread_order, lock) -> None:
    global counter
    with lock:
        if counter < money:
            print(thread_order, 'not sufficient funds:', counter, '\t')
            return
        time.sleep(1)
        print(thread_order, 'Sufficient balance:', counter, '\t')
        counter -= 60

死锁现象的产生原因

死锁是指在多任务系统中,两个或多个进程(线程)相互等待对方持有的资源而无法继续执行的情况。简单来说,就是进程之间陷入了相互等待的僵局,导致它们都无法向前推进,进而造成系统无法正常工作。

  • 重复获取锁导致的死锁现象
from threading import Thread, Lock


def get_lock_test(lock: Lock) -> None:
    lock.acquire()
    lock.acquire()
    lock.release()

lock = Lock()
thread1 = Thread(target=get_lock_test, args=(lock, ))
thread2 = Thread(target=get_lock_test, args=(lock, ))

thread1.start()
thread2.start()

thread1.join()
thread2.join()

上述代码中,我们尝试重复获取同一个锁,在thread1获取到第一个锁的时候,其他线程被阻塞,而获取同一个锁第二次的时候,由于该锁已被占用,获取不到锁

于是也陷入阻塞状态,而主调线程也因join继续等待

  • 获取多把锁导致的死锁现象
from threading import Thread, Lock
from time import sleep


def get_lock_test_one(lock1: Lock, lock2: Lock) -> None:
    lock1.acquire()
    print('The first lock has been acquired')
    sleep(2) # 故意休眠 使的这个方法包装的线程阻塞 慢一步获得锁
    lock2.acquire()
    print('The second lock has been acquired')
    lock1.release()
    lock2.release()

def get_lock_test_two(lock1: Lock, lock2: Lock) -> None:
    lock2.acquire() # 交换顺序让它先获取第二个锁
    print('The second lock has been acquired')
    lock1.acquire()
    print('The first lock has been acquired')
    lock1.release()
    lock2.release()


lock1 = Lock()
lock2 = Lock()
thread1 = Thread(target=get_lock_test_one, args=(lock1, lock2))
thread2 = Thread(target=get_lock_test_two, args=(lock1, lock2))

thread1.start()
thread2.start()

thread1.join()
thread2.join()

上述的代码也会导致三个进程被阻塞,原因是因为当前线程需要获取的第二个锁被其他线程占用,导致无法获得锁而陷入阻塞状态

并发访问与信号量使用案例

信号量(Semaphore)是一种用于线程同步的机制,用于控制对多个线程共享资源的访问,通常用于控制并发时对资源的访问。它可以被视为一个计数器,表示可用的资源数量,线程需要获取资源时,首先尝试获取信号量,如果信号量计数器大于零,则线程可以获取资源并将信号量计数器减一,表示资源已被占用;如果信号量计数器为零,则线程必须等待,直到有其他线程释放资源并增加信号量计数器。

方法描述
acquire([blocking])获取信号量,如果信号量不可用,可选参数 blocking 控制是否阻塞等待,默认为 True。
release()释放信号量,增加信号量的计数值。
locked()返回 True 如果当前信号量被锁定,否则返回 False。
get_value()返回当前信号量的计数值。
enter()进入上下文管理器时调用,相当于 acquire() 方法。
exit(exc_type, exc_val, exc_tb)退出上下文管理器时调用,相当于 release() 方法。
from threading import Semaphore, Thread
from time import sleep


def used_semaphore_test(semaphore: Semaphore, thread_order: str) -> None:
    semaphore.acquire()
    print(thread_order+' is started')
    sleep(3)
    semaphore.release()

threads: list[Thread] = []
semaphore = Semaphore(3)   #信号量对象
for thread_order in range(6):
    thread_sign = 'Thread'+str(thread_order)
    t = Thread(target=used_semaphore_test,args=(semaphore, thread_sign))
    t.start()
    threads.append(t)

for thread in threads:
    thread.join()

通过Semaphire设置信号量来设置允许同时访问资源的线程,它接收一个参数作为允许获得信号量的线程数,和互斥锁一样也可以使用with控制临界区:

def used_semaphore_test(semaphore: Semaphore, thread_order: str) -> None:
    with semaphore:
        print(thread_order+' is started')
        sleep(3)

事件机制与线程唤醒使用案例

threading.Event 类是 Python 标准库中用于线程间同步的一种机制。Event 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,event 对象中的信号标志被设置为False。如果有线程等待一个 event 对象,而这个 event 对象为假,那么这个线程将会被一直阻塞直至该eventTrue。当 event 对象的信号标志被设置为True,它将唤醒所有等待该 event 对象的线程。如果一个线程等待一个已经被设置为真的 event 对象,那么它将忽略这个事件,继续执行剩余操作。

Event对象主要有以下4种方法:

方法描述
is_set()返回事件的状态,如果事件被设置为 True,则返回 True,否则返回 False。
set()将事件的状态设置为 True,通知等待该事件的线程可以继续执行。
clear()将事件的状态设置为 False,重置事件,导致等待该事件的线程被阻塞。
wait(timeout=None)阻塞当前线程,直到事件的状态变为 True 或者超时(如果提供了超时参数)。如果事件已经被设置,则立即返回。
from threading import Event, Thread
import time

def thread_main_func(thread_sequence: str, event: Event) -> None:
    print(thread_sequence, ' is started')
    time.sleep(2)
    if not event.is_set():
        print('event will be set to True')
        event.set()

def thread_event_func(thread_sequence: str,  event: Event) -> None:
    event.wait()
    print(thread_sequence, ' is started')


event: Event = Event()
thread1 = Thread(target=thread_main_func, args=('thread1', event))
thread2 = Thread(target=thread_event_func, args=('thread2', event))

thread2.start()
thread1.start()

thread1.join()
thread2.join()

我们会得到运行结果:

thread1  is started
event will be set to True
thread2  is started

多线程的生产者-消费者模式

生产者-消费者模式是一种常见的并发设计模式,用于解决生产者和消费者之间的数据交换问题。在这种模式中,生产者负责生成数据,并将数据放入共享的缓冲区中,而消费者则负责从缓冲区中获取数据并进行处理。

三大核心组件

  1. 生产者(Producer):生成数据并将其放入缓冲区的实体。
  2. 消费者(Consumer):从缓冲区中获取数据并进行处理的实体。
  3. 缓冲区(Buffer):用于存储生产者生成的数据,以便消费者可以获取。

模式特点

  • 生产者和消费者之间解耦:生产者和消费者可以独立运行,并且不需要知道彼此的存在。
  • 并发处理:生产者和消费者可以并发执行,提高系统的效率和性能。
  • 缓冲区控制:通过合理设计缓冲区大小和同步机制,可以避免生产者和消费者之间的竞态条件(Race Condition)问题。

模式实现方式

  1. 使用线程:生产者和消费者分别作为线程运行,通过线程安全的队列作为缓冲区。
  2. 使用进程:生产者和消费者分别作为进程运行,通过进程间通信机制(如管道、共享内存等)作为缓冲区。

线程Queue对象

Queue类是Python中用于实现队列(先进先出)数据结构的类。它可以在多线程编程中安全地实现线程间通信和同步。Python标准库中提供了两种队列类:queue.Queuemultiprocessing.Queue。这两种类的基本用法类似,但是multiprocessing.Queue专门用于多进程编程,而queue.Queue用于多线程编程。

其常见的方法如下:

方法描述
Queue(maxsize=0)创建一个新的队列。如果 maxsize 小于或等于零,则队列大小不受限制。如果 maxsize 大于零,则队列最多可以容纳 maxsize 个项目。
put(item, block=True, timeout=None)item 放入队列。如果 blockTrue(默认值),并且 timeoutNone(默认值),则如果必要,阻塞直到队列中有空间将 item 放入队列。如果 timeout 是一个正数,最多阻塞 timeout 秒,并且如果在此时间内仍然没有可用的空间,则引发 queue.Full 异常。如果 blockFalse,则如果立即有可用的空间,则放入 item 到队列中,否则引发 queue.Full 异常(在这种情况下,忽略 timeout)。
get(block=True, timeout=None)从队列中移除并返回一个项目。如果 blockTrue(默认值),并且 timeoutNone(默认值),则如果必要,阻塞直到队列中有项目可用。如果 timeout 是一个正数,最多阻塞 timeout 秒,并且如果在此时间内仍然没有可用的项目,则引发 queue.Empty 异常。如果 blockFalse,则如果立即有项目可用,则返回一个项目,否则引发 queue.Empty 异常(在这种情况下,忽略 timeout)。
put_nowait(item)等效于 put(item, block=False)
get_nowait()等效于 get(block=False)
empty()如果队列为空,则返回 True,否则返回 False
full()如果队列已满,则返回 True,否则返回 False
qsize()(或 qsize返回队列中的项目数量。注意,此方法不是线程安全的,因为在调用 qsize 方法后,队列大小可能会立即更改。
join()阻塞直到队列中的所有项目都已获取和处理。默认情况下,putget 方法在向队列中添加或移除项目时会更新一个内部计数器。join 方法阻塞直到这个计数器为零。

模式使用案例

from queue import Queue
import threading, time

def put_data(queue: Queue) -> None:
    for i in range(5):
        queue.put(i)
        print(f"Put {i} into the queue\r")

def get_data(queue: Queue) -> None:
    time.sleep(2)
    for i in range(5):
        data = queue.get()
        print(f"Get {data} from the queue\r")
     
queue = Queue()
put_thread = threading.Thread(target=put_data, args=(queue, ))
get_thread = threading.Thread(target=get_data, args=(queue, ))

put_thread.start()
get_thread.start()

put_thread.join()
get_thread.join()

方法包装的进程创建

from multiprocessing import Process
import os, time

def processing_func(content: str) -> None:
    # 除了os模块获取进程id 还可以通过multiprocessing.current_process().pid访问
    print("The sub processingID is: ", os.getpid())
    print("The parent processingID is: ", os.getppid())
    print(f"The content is: {content}")
    time.sleep(10)

if __name__ == "__main__":
    print("The main processingID is: ", os.getpid())
    content_one: str = 'Cause I got a crush on you who you'
    process_one: Process = Process(target=processing_func, args=(content_one, ))
    process_one.start()	# 启动进程
    process_one.join()	# 等待子进程结束

在Python中,可以使用内置的multiprocessing模块来创建多进程,它的创建与多线程是类似的。我们使用Process对象来包装多进程方法,使用start方法启动多进程,使用join来阻塞当前进程,用以等待子进程的结束,同样的它也有run方法用来保存执行的任务,其执行结果如下:

The main processingID is:  10668
The sub processingID is:  13112
The parent processingID is:  10668
The content is: Cause I got a crush on you who you

利用sleep,我们还可以查看进程是否真的创建了(如果你使用的是debug模式,可能会看到多个进程):

映像名称                       PID 会话名              会话#       内存使用
========================= ======== ================ =========== ============
python.exe                   12496 Console                    1     13,864 K
python.exe                    9328 Console                    1     13,628 K

在多进程中我们需要注意:在windows上,子进程会自动import启动它的py文件,而在import的时候将会自动执行这些语句

如果不加if __name__ == "__main__"限制的话,就会无限递归创建子进程,进而报错。将会在控制台看到以下内容:

RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()

参考:【Python官方Doc】For an explanation of why the if __name__ == '__main__' part is necessary, see Programming guidelines.

类继承包装Process对象

In multiprocessing, processes are spawned by creating a Process object and then calling its start() method. Process follows the API of threading.Thread.

from:【Python官方Doc】docs.python.org/3/library/m…

由于Process对象完全遵守Thread规范,所以我这里直接展示其构造方法:

def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
                 *, daemon=None)

这里需要注意的是:Process类继承自BaseProcess类,并且只定义了两个静态方法,并且对父类中的同名方法进行了重写:

注意:Python中被标记为staticmethod的方法,仍然可以覆盖父类的同名普通方法,实质上staticmethod仅仅只是标记而已

class Process(process.BaseProcess):
    _start_method = None
    @staticmethod
    def _Popen(process_obj):
        return _default_context.get_context().Process._Popen(process_obj)

    @staticmethod
    def _after_fork():
        return _default_context.get_context().Process._after_fork()

而能够实现接受参数的实现,依赖于Python的隐式继承特性,这个特性也可以用于装饰器模式实现:

class BaseClass:
    def __init__(self, name):
        self.name = name

class SubClass(BaseClass):
    def get_name(self):
        return self.name

if __name__ == "__main__":
    # 隐式继承父类的构造方法
    obj = SubClass('kunkun')
    print(obj.get_name())

需要注意的是,构造方法中的group参数用于将线程或进程划分为组,不过现在基本不使用此参数,其他参数参照Thread即可

  • 使用案例
from multiprocessing import Process
import time

class HelloWorldProcess(Process):
    def __init__(self, times: int) -> None:
        super().__init__()
        self.times = times

    def run(self) -> None:
        if self.times > 0:
            for _ in range(self.times):
                print(f"Hello World. This is Python's {_ + 1} output")
                time.sleep(1)

if __name__ == "__main__":
    obj_of_hello = HelloWorldProcess(times=5)
    obj_of_hello.start()
    obj_of_hello.join()

全局变量的同步问题

需要强调的是:线程可以共享全局变量,进程不能够共享全局变量

线程间可以直接共享全局变量,因为线程共享同一个进程的内存空间。这意味着一个线程对全局变量的修改会立即反映到其他线程中,因为它们都在同一片内存空间中操作数据。这也就是为什么要关注线程安全,避免出现竞态条件(race condition)等问题。

进程间不能直接使用全局变量的主要原因是,每个进程都有自己独立的内存空间,这意味着一个进程中的全局变量在另一个进程中是不可见的。当一个进程修改全局变量时,这个修改只影响到了当前进程的内存空间,不会反映到其他进程中去。这导致了进程间无法直接共享全局变量的数据,传入进程任务中的数据将在进程的独立内存空间中,例如以下代码中子进程对父进程中的列表修改是失败的:

from multiprocessing import Process

def process_worker(op_list: list) -> None:
    op_list.append('Test Data')

if __name__ == "__main__":
    shared_list: list[str] = ['Hello World']
    process: Process = Process(target=process_worker, args=(shared_list, ))
    process.start()
    process.join()
    print(shared_list)

进程队列Queue使用

方法描述
qsize()返回队列中的元素数量。
empty()如果队列为空,返回True,否则返回False。
full()如果队列已满,返回True,否则返回False。
put(item, block=True, timeout=None)item放入队列。如果block为True且timeout不为None,则在队列已满时阻塞最多timeout秒。如果block为False,则在队列已满时立即抛出queue.Full异常。
put_nowait(item)相当于put(item, block=False)
get(block=True, timeout=None)从队列中获取一个元素。如果block为True且timeout不为None,则在队列为空时阻塞最多timeout秒。如果block为False,则在队列为空时立即抛出queue.Empty异常。
get_nowait()相当于get(block=False)
task_done()在完成队列中的一个任务时调用。
join()阻塞调用线程,直到队列中的所有元素都被获取和处理完。
  • 使用案例
from multiprocessing import Queue, Process
from random import randint
import os

def processing_option(option_queue: Queue, process_order: int, option_type: str='get') -> None:
    temp: int = None
    if option_type.lower() == 'get' and not option_queue.empty():
        temp = option_queue.get()
    if option_type.lower() == 'put':
        temp = randint(1, 5)
        option_queue.put(temp)
    
    print(f"The Process {process_order} ID is: {os.getpid()}")
    print(f"The elements being manipulated in the process {process_order} is: {temp}")


if __name__ == "__main__":
    option_table: list[str] = ['get', 'put', 'none', 'get', 'put']
    option_queue: Queue = Queue()
    process_list: list[Process] = []

    for i in range(len(option_table)):
        process: Process = Process(target=processing_option, 
                                   args=(option_queue, i, option_table.pop()))
        process_list.append(process)
        process.start()
    
    for process in process_list:
        process.join()
    

进程之间的管道通信

multiprocessing模块中的Pipe方法,它提供了一种在两个进程之间创建匿名管道的方式,用于在这两个进程之间进行通信。管道允许一个进程向管道写入数据,另一个进程从管道读取数据,实现了进程间的数据交换。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个参数是全双工模式,也就是说conn1conn2均可收发。若duplexFalseconn1只负责接收消息,conn2只负责发送消息。sendrecv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出OSError

def Pipe(self, duplex=True):
    '''Returns two connection object connected by a pipe'''
    from .connection import Pipe
    return Pipe(duplex)

我们直接上使用示例:

  • 父进程与子进程管道通信
from multiprocessing import Process, Pipe

parent_conn, child_conn = Pipe()

def send_data(conn) -> None:
    data: str = "Hello from child process!"
    conn.send(data)
    conn.close()

if __name__ == "__main__":
    child_process: Process = Process(target=send_data, args=(child_conn, ))
    child_process.start()

    data_received = parent_conn.recv()
    print("Received:", data_received)

    child_process.join()

  • 两进程管道通信
from multiprocessing import Process, Pipe

one_conn, two_conn = Pipe(duplex=False)

def send_data(conn) -> None:
    data: str = "Hello from send process!"
    conn.send(data)
    conn.close()

def recv_data(conn) -> None:
    print('Received:', conn.recv())

if __name__ == "__main__":
    first_process: Process = Process(target=recv_data, args=(one_conn, ))
    second_process: Process = Process(target=send_data, args=(two_conn, ))

    first_process.start()
    second_process.start()

    first_process.join()
    second_process.join()

进程数组Array使用

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

从共享内存中申请并返回一个具有ctypes类型的数组对象。默认情况下返回值实际上是被同步器包装过的数组对象。

typecode_or_type 指明了返回的数组中的元素类型: 它可能是一个 ctypes 类型或者 array 模块中每个类型对应的单字符长度的字符串。 如果 size_or_initializer 是一个整数,那就会当做数组的长度,并且整个数组的内存会初始化为0。否则,如果 size_or_initializer 会被当成一个序列用于初始化数组中的每一个元素,并且会根据元素个数自动判断数组的长度。

如果 lockTrue (默认值) 则将创建一个新的锁对象用于同步对值的访问。 如果 lock 为一个 LockRLock 对象则该对象将被用于同步对值的访问。 如果 lockFalse 则对返回对象的访问将不会自动得到锁的保护,也就是说它不是“进程安全的”。

请注意 lock 是一个仅限关键字参数。

请注意 ctypes.c_char 的数组具有 valueraw 属性,允许被用来保存和提取字符串。

from:【Python官方Doc】docs.python.org/zh-cn/3/lib…

正如上述文档,进程之间也是有锁这一对象来保证共享数据的同步,但是因为我们使用的共享数据基本上都是标准库中可以用于进程间的类型,它们出发点都实现了同步原语,所以进程方面,锁对象并不算是重点,但是其重要性是无可比拟的,这里使用Array是因为,它是可以关闭锁,来验证进程间的竞态的:

Array的第一个参数typecode用于指示参数类型,它有以下类型:

数据类型typecode
有符号字节'b'
无符号字节'B'
有符号短整型'h'
无符号短整型'H'
有符号整型'i'
无符号整型'I'
有符号长整型'l'
无符号长整型'L'
有符号长长整型'q'
无符号长长整型'Q'
单精度浮点数'f'
双精度浮点数'd'

我们编写以下代码来验证竞态:

import multiprocessing
import time

def process_worker_with_pause(shared_list, index) -> None:
    shared_list[index] += 1
    time.sleep(1)

def process_worker_not_pause(shared_list, index) -> None:
    shared_list[index] += 1

if __name__ == "__main__":
    shared_list = multiprocessing.Array('i', [0, 0, 0, 0], lock=False)

    processes: list[multiprocessing.Process] = []
    for i in range(4):
        p1 = multiprocessing.Process(target=process_worker_with_pause, args=(shared_list, i))
        p2 = multiprocessing.Process(target=process_worker_not_pause, args=(shared_list, -i))
        processes.append(p1)
        processes.append(p2)
        p1.start()
        p2.start()


    for p1 in processes:
        p1.join()

    print("Shared list after processing:", shared_list[:])

成功复现竞态情况:

Python编程-并发编程基础梳理与高级特性案例讲解

但是使用了默认的锁,它就一定安全吗?事实并非如此,Array实质上仍然不是进程安全的,使用bat脚本多此执行修改为默认锁的程序,你会发现仍然会出现竞态情况,原因在于multiprocessing.Array 内部有锁,但是锁只会保护对数组对象本身的修改,而不会保护数组内部元素的访问和修改。当上述的两个进程并发处理到同一个索引处,还是会导致数据损坏:

Python编程-并发编程基础梳理与高级特性案例讲解

正常使用还是建议加锁运行:

import multiprocessing
import time

def process_worker_with_pause(lock, shared_list, index) -> None:
    with lock:
        shared_list[index] += 1
        time.sleep(1)

def process_worker_not_pause(lock, shared_list, index) -> None:
    with lock:
        shared_list[index] += 1 

if __name__ == "__main__":
    shared_list = multiprocessing.Array('i', [0, 0, 0, 0])
    lock = multiprocessing.Lock()

    processes: list[multiprocessing.Process] = []
    for i in range(4):
        p1 = multiprocessing.Process(target=process_worker_with_pause, 
                                     args=(lock, shared_list, i))
        p2 = multiprocessing.Process(target=process_worker_not_pause, 
                                     args=(lock, shared_list, -i))
        processes.append(p1)
        processes.append(p2)
        p1.start()
        p2.start()


    for p1 in processes:
        p1.join()

    print("Shared list after processing:", shared_list[:])

进程的信号量使用

因为multiprocessing是基于threading的api实现的,所以它也是有信号量和事件的:

import multiprocessing
import time
from typing import List
from multiprocessing import Semaphore, Array

def worker(semaphore: Semaphore, shared_list: Array, index: int) -> None:
    with semaphore:
        shared_list[index] += 1
        print(f"Processed index {index}")

if __name__ == "__main__":
    semaphore = Semaphore(2)  # 允许同时访问的进程数量为2
    shared_list = Array('i', [0, 0, 0, 0])

    processes: List[multiprocessing.Process] = []
    for i in range(4):
        p = multiprocessing.Process(target=worker, args=(semaphore, shared_list, i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print("Shared list after processing:", shared_list[:])

进程的事件唤醒机制

import multiprocessing
from multiprocessing import Event
from typing import List

def wait_for_event(event: Event, event_name: str) -> None:
    print(f"Process waiting for {event_name} event")
    event.wait()
    print(f"Event {event_name} has been set")

def set_event(event: Event, event_name: str) -> None:
    print(f"Setting event {event_name}")
    event.set()

if __name__ == "__main__":
    event = Event()
    processes: List[multiprocessing.Process] = []

    p1 = multiprocessing.Process(target=wait_for_event, args=(event, "my_event"))
    p2 = multiprocessing.Process(target=set_event, args=(event, "my_event"))

    processes.extend([p1, p2])

    for p in processes:
        p.start()

    for p in processes:
        p.join()

使用Manager对象管理共享

multiprocessing模块中的Manager类提供了一种在多个进程之间共享Python对象的方式。它可以用来创建各种类型的共享对象,如listdictNamespace等,并确保这些对象在不同进程之间同步和安全地共享。

from multiprocessing import Process, current_process
from multiprocessing import Manager

def check_manager_option(option_list: list, option_dict: dict) -> None:
    print('The Process ID is:', current_process().pid)
    print('The element in list are:', option_list)
    print('The element in dict are:', option_dict)


if __name__ == "__main__":
    original_list: list = [1, 2, 3, 4]
    original_dict: dict = {'id': '2333', 'name': 'Miku'}

    with Manager() as manager:
        share_list: list = manager.list(original_list)
        share_dict: dict = manager.dict(original_dict)

        process: Process = Process(target=check_manager_option, 
                                   args=(share_list, share_dict))
        process.start()
        process.join()

进程池资源管理

进程池是一种管理和复用进程的技术,它通过预先创建一组进程并将它们保存在池中,以便在需要时重复使用这些进程,而不是频繁地创建和销毁进程,这样可以提高程序的性能和效率。

进程池可以提供指定数量的进程给用户使用,即当有新的请求提交到进程池中时,如果池未满,则会创建一个新的进程用来执行该请求;反之,如果池中的进程数已经达到规定最大值,那么该请求就会等待,只要池中有进程空闲下来,该请求就能得到执行。

from multiprocessing import Pool
import time
import os
from random import randint

def print_current_process_id(int_value: int) -> None:
    print('The current_randint value:', int_value)
    print('The current_process id:', os.getpid())
    time.sleep(2)
    # 这里进行sleep的是因为pool实现了对进程资源的复用 
    # 要直观看出差异 所以我阻塞了进程


if __name__ == "__main__":
    pool = Pool(processes=5)
    for i in range(10):
        pool.apply(func=print_current_process_id, args=(randint(1, 5), ))
    pool.close()
    pool.join()

这里如果使用异步执行,将会看到更加快速的结果,不过可能会因为pool的进程复用而导致结果不那么直观,并且一定要注意进程池的资源回收,参见:

Worker processes within a Pool typically live for the complete duration of the Pool’s work queue. A frequent pattern found in other systems (such as Apache, mod_wsgi, etc) to free resources held by workers is to allow a worker within a pool to complete only a set amount of work before being exiting, being cleaned up and a new process spawned to replace the old one. The maxtasksperchild argument to the Pool exposes this ability to the end user.

from:【Python官方Doc】:docs.python.org/3/library/m…

常用方法如下:

方法描述
Pool(processes)Pool对象的构造器,其接受的参数为进程数
apply(func, args)同步执行函数func,并返回结果。
apply_async(func, args)异步执行函数func,并返回AsyncResult对象,可用于获取结果。
map(func, iterable)同步地将函数func应用于可迭代对象的每个元素,并返回结果列表。
map_async(func, iterable)异步地将函数func应用于可迭代对象的每个元素,返回AsyncResult对象。
close()关闭进程池,不再接受新的任务。
terminate()立即终止所有工作进程。
join()等待所有工作进程结束。

还有一点,我们还可以使用with来管理程序上下文:

from multiprocessing import Pool
import os
from random import randint

def print_current_process_id(int_value: int) -> None:
    print('The current_randint value:', int_value)
    print('The current_process id:', os.getpid())

if __name__ == "__main__":
    with Pool(processes=5) as pool:
        for i in range(5):
            pool.apply(func=print_current_process_id, args=(randint(1, 5), ))

with可以帮我们自动完成资源的回收与等待,不过这里需要强调的是,如果主线程中建立的进程池进行了异步执行,那么主进程将不会被阻塞,即继续执行下文,会导致你看不到执行的异步函数中的执行结果

进程池的返回值管理

在进程池中存在一个对象AsyncResult,它允许我们收集传入方法异步执行的返回值:

from multiprocessing import Pool
from collections.abc import Iterable
import time

def processing_task(int_var: int) -> int:
    return int_var ** 2

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        # 返回AsyncResult对象         
        result = pool.apply_async(processing_task, (10, )) 
        print('Result of a single asynchronous execution:', result.get(timeout=1))        
        print('Result of a map asynchronous execution:', pool.map(processing_task, range(10)))       

        # 以可迭代对象方式返回对象
        iter_map_res = pool.imap(processing_task, range(10))
        if isinstance(iter_map_res, Iterable):
            print('Result of a iterable_map asynchronous execution:', end=' ')
            print(next(iter_map_res), end='  ')                     
            print(next(iter_map_res), end='  ')                    
            print(iter_map_res.next(timeout=1))    
                   
        # 陷入阻塞获取超时 抛出multiprocessing.context.TimeoutError
        result = pool.apply_async(time.sleep, (10, ))
        print(result.get(timeout=1))        
        

协程的实现与核心机制

避免了线程上下文切换所带来的性能损耗,简而言之就是一个线程中任务的异步,不过在实际情况中一般不常用

协程(Coroutine)是一种轻量级的线程,可以在单个线程内实现多个任务的并发执行。与传统的线程相比,协程更加轻量级,消耗的资源更少,因此可以更高效地实现并发。

在Python中,协程通常使用asyncawait关键字来定义和管理。通过async def定义一个协程函数,其中的await关键字用于挂起当前协程的执行,等待另一个协程或异步操作完成。当一个协程被挂起时,它不会阻塞整个线程,而是让出控制权给其他协程继续执行,从而实现并发执行。协程可以用于编写高效的异步程序,特别适用于I/O密集型的任务,比如网络请求和文件操作。通过使用协程,可以避免传统多线程程序中的线程切换和锁机制,通过协程实现轻量的由用户态调度的多任务模型,简化了并发编程的复杂性。

协程的核心机制在于对当前线程的控制流转移切换

  • 每个协程有自己的执行栈,可以保存自己的执行现场,可以由用户程序按需创建协程

  • 协程主动让出(yield)执行权时候,会保存执行现场(保存中断时的寄存器上下文和栈)

  • 协程恢复执行(resume)时,根据之前保存的执行现场恢复到中断前的状态,继续执行

协程执行与多线程的对比

  • 在单线程同步模型中,任务按照顺序执行。如果某个任务因为I/O而阻塞,其他所有的任务都必须 等待,直到它完成之后它们才能依次执行。
  • 多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操作系统来管理,在多处理器系 统上可以并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其 他线程得以继续执行。
  • 协程版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其他昂贵 的操作时,注册一个回调到事件循环中,然后当I/O操作完成时继续执行。回调描述了该如何处理 某个事件。事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。

需要注意的是,由于协程自身带有上下文和栈,无需线程上下文切换的开销,属于程序级别的切换,操作系统完全感知不到上下文的切换,这才是其轻量化的本质

由于在一个线程中切换,所以无需原子操作的锁定及同步的开销。使得单线程内就可以实现并发的效果,最大限度地利用了CPU,且可扩展性高,成本低(一个CPU支 持上万的协程都不是问题。所以很适合用于高并发处理)

asyncio协程是写爬虫比较好的方式。比多线程和多进程的效果都要好,因为开辟新的线程和进程带来的上下文切换是非常耗时的。

不过协程也有弊端,协程的本质是个单线程,它不能同时将单个CPU 的多核进行使用,协程需要和进程配合才能运行在多CPU上。 另外我们日常所编写的绝大部分应用都没有这个必要,除非是CPU密集型应用。

利用yield实现协程

上文提到协程是基于单线程的上下文切换,而在Python中生成器恰好也有这个特性,早期的协程设计实质上就是基于生成器来完成的,我们可以写一段代码来模拟异步:

import time

def one_task_processor(_range: int) -> None:
    for i in range(_range):
        print('The incoming value is:', i)
        time.sleep(1) # 模拟任务阻塞等待

def two_task_processor(_range: int) -> None:
    for i in range(_range):
        print('The incoming value is calculated as:', i**2)
        time.sleep(1)
    

if __name__ == "__main__":
    start_time = time.time()
    
    one_task_processor(2)
    two_task_processor(2)

    end_time = time.time()
    print('Runtime:', end_time-start_time)    

import time
from typing import Generator

def task_processor(_range: int) -> Generator:
    for i in range(_range):
        print('The incoming value is:', i)
        yield
        time.sleep(1) # 模拟任务阻塞等待

def async_task_processor(_range: int, tasks: tuple[Generator]) -> None:
    task_handles: list[Generator] = [task(_range) for task in tasks]
    
    for i in range(_range):
        print('The incoming value is calculated as:', i**2)
        for task_handle in task_handles:
            next(task_handle)
        time.sleep(1)
    

if __name__ == "__main__":
    start_time = time.time()
    async_task_processor(2, (task_processor, ))
    end_time = time.time()
    print('Runtime:', end_time-start_time)    

在第二段代码中,我在async_task_processor函数中使用了生成器来模拟异步任务的执行。在主循环中,每次循环都会调用所有任务的生成器的next方法,这样每个任务都会执行一次。由于生成器在执行yield语句时会暂停当前任务的执行,因此多个任务可以交替执行,从而实现了一种伪异步的效果。这样,虽然每个任务的执行时间仍然是1秒,但是任务之间可以同时执行,因此总的运行时间会比第一段代码少一些。当然,我们还可以头铁一点,把第二个模拟异步的函数做成一个装饰器。

在有关异步操作的关键字出现前,紧随上述模拟异步之后的便是@asyncio.coroutineyield from,他们是Python 3.4引入的关于协程的特性,用于异步编程,可以帮助简化使用asyncio模块进行异步编程的过程。

@asyncio.coroutine是一个装饰器,用于定义协程。协程是一种特殊的函数,可以在函数内部使用yield from语句来暂停执行,并在需要时恢复执行。使用@asyncio.coroutine装饰的函数可以通过yield from语句调用其他协程,实现协程之间的协作。

原生异步协程操作asyncio

asyncio是Python3.5后标准库中用于支持异步编程的模块,提供了编写协程(coroutines)、任务(tasks)和异步IO的工具。它的目标是提供一种高效的单线程异步编程方式,能够处理大量IO密集型操作而无需使用多线程或多进程。它主要包含以下内容:

  • async关键字: async用于定义异步函数,即协程(coroutines)。异步函数可以在函数内部使用await来挂起自己的执行,让出控制权给事件循环,从而实现非阻塞的异步操作。定义一个异步函数时,通常使用async def语法
  • await关键字: await用于在异步函数内部等待另一个异步操作的完成。它会暂停当前函数的执行,直到被等待的异步操作完成并返回结果。在使用await时,要求被等待的对象是一个awaitable对象(如异步函数、Future对象等)

asyncio的核心机制

  1. 事件循环(Event Loop): 事件循环是asyncio的核心机制,负责管理和调度所有的协程。它会在协程之间切换执行,处理IO事件和定时器事件。
  2. 任务(Tasks): 任务是协程的高层抽象,通过asyncio.create_task()函数创建。任务可以并发执行,可以等待其他任务完成,并且可以取消。
  3. Future对象: Future对象是一个占位符,代表将来会完成的操作的结果。在asyncio中,协程可以使用asyncio.Future类来等待其他任务的结果。
  4. 异步IO操作: asyncio提供了一组异步IO操作的函数,如asyncio.open_connection()asyncio.start_server()等,用于实现异步网络编程。
  5. 同步和异步代码混合: asyncio允许将同步代码和异步代码混合使用,通过loop.run_in_executor()函数将同步代码包装成异步操作。

asyncio的使用示例

我们修改yield协程中的代码:

import time
import asyncio

async def one_task_processor(_range: int) -> None:
    for i in range(_range):
        print('The incoming value is:', i)
        await asyncio.sleep(1)
    return 'task one is finished'

async def two_task_processor(_range: int) -> None:
    for i in range(_range):
        print('The incoming value is calculated as:', i**2)
        await asyncio.sleep(1)
    return 'task two is finished'

async def async_processor(op_num: int) -> None:
    execute_res: tuple[str, str] = (
        await asyncio.gather(one_task_processor(op_num), two_task_processor(op_num)))
    print(execute_res)

if __name__ == "__main__":
    start_time = time.time()
    
    asyncio.run(async_processor(2))

    end_time = time.time()
    print('Runtime:', end_time-start_time)    

asyncio的常用方法

方法/对象描述
asyncio.run(coro)运行一个协程,启动事件循环,直到协程运行完毕并返回结果。
asyncio.create_task(coro)创建一个任务(Task),用于并发执行协程。
asyncio.sleep(delay)创建一个暂停指定秒数的协程,用于模拟延迟或等待。
asyncio.wait(tasks)等待一组任务完成,返回已完成和未完成的任务集合。
asyncio.gather(*coros)并发运行多个协程,等待它们全部完成并返回结果。
asyncio.shield(coro)创建一个不会被取消的协程,用于保护重要的部分不受取消影响。
asyncio.ensure_future(coro)将协程包装成一个任务,用于将旧式的协程接口与新式的协程接口兼容。
asyncio.wait_for(coro, timeout)用来来设置协程异步的超时时间,在超时后抛出 asyncio.TimeoutError 异常
asyncio.Task.cancel()取消一个任务,导致该任务的await引发CancelledError(需要对应的任务响应)
asyncio.Queue异步队列,用于在协程之间进行通信。
asyncio.Lock异步锁,用于协程之间的互斥访问。
asyncio.Event异步事件,用于协程之间的信号通知。
asyncio.Condition异步条件变量,用于复杂的协程同步场景。
asyncio.Semaphore异步信号量,用于控制同时访问某个资源的协程数量。
asyncio.TimeoutError超时错误类型,用于指示异步操作超时。

asyncio的Queue对象

asyncio.Queue对象是一个用于在协程之间安全传递消息的异步队列。它的常用方法如下:

方法描述
asyncio.Queue(maxsize=0)创建一个异步队列对象,maxsize参数指定队列的最大容量,为0表示无限制。默认为0。
put_nowait(item)将项目放入队列,如果队列已满则立即引发asyncio.QueueFull异常。
get_nowait()从队列中获取一个项目,如果队列为空则立即引发asyncio.QueueEmpty异常。
task_done()通知队列项目已经被处理完毕。当使用join()方法时需要调用此方法来标记队列中的项目已完成处理。
join()阻塞直到队列中所有的项目都被处理完毕。在所有项目被处理完毕之前,调用task_done()方法来通知项目已完成处理。
asyncio.Queue.get()从队列中获取一个项目,如果队列为空,则等待直到有项目可用。
asyncio.Queue.put(item)将项目放入队列,如果队列已满,则等待直到有空间可用。
asyncio.Queue.join()阻塞直到队列中所有的项目都被处理完毕。在所有项目被处理完毕之前,调用task_done()方法来通知项目已完成处理。

我们可以使用生产者-消费者模式来编写一个例子:

import asyncio
ASQueue = asyncio.Queue  

async def async_producer(queue: ASQueue, num_produce) -> None:
    for i in range(num_produce):
        item: str = f'item-{i}'
        print(f'Producing {item}')
        await queue.put(item)
        await asyncio.sleep(1)  

async def async_consumer(queue: ASQueue):
    while True:
        try:
            item = await asyncio.wait_for(queue.get(), timeout=1.0)
        except asyncio.TimeoutError:
            print("Queue is empty. Exiting consumer.")
            break
        else:
            print(f'Consuming {item}')
            await asyncio.sleep(1)


async def async_processor() -> None:
    queue = asyncio.Queue()
    producer_task = asyncio.create_task(async_producer(queue, 5))
    consumer_task = asyncio.create_task(async_consumer(queue))
    await asyncio.gather(producer_task, consumer_task)

if __name__ == "__main__":
    asyncio.run(async_processor())

这里需要着重强调的是,在使用Queue对象时,尤其要注意超时问题,在并发项目中,队列元素的加入与放回,往往是异步的,即获取元素时,队列中不一定有元素,装入元素时队列不一定有额外空间,建议进行完整的超时机制设置,或者访问计数器,亦或者异常处理,在编写上述代码时,如果这样写,将会陷入死锁,因为消费者将一直等待:

async def async_consumer(queue: ASQueue):
    while True:
        item = await queue.get()  
        print(f'Consuming {item}')
        await asyncio.sleep(1)  

那么我就必须选择一种异步处理方式,先看第一种,我们处理队列为空时直接返回:

async def async_consumer(queue: ASQueue):
    while True:
        try:
            item = queue.get_nowait()  
        except asyncio.QueueEmpty: 
            print("Queue is empty. Exiting consumer.")
            break
        else:
            print(f'Consuming {item}')

上述的解决方案存在的问题是,我要获取生产者的所有元素,但是因为异步处理,万一开始时执行的是消费者,那不是没有结果吗,所以我使用了超时的处理机制,在允许范围内超时返回。为什么要强调这点呢?因为协程队列的get实现是没有超时参数的

asyncio的Event对象

协程的事件与信号量在使用方面和threading是几乎没有差别的,方法也就那几个常用:

import asyncio

async def tash_coro(event) -> None:
    print("Waiting for event")
    await event.wait()
    print("Event is set")

async def action_event() -> None:
    event = asyncio.Event()
    asyncio.create_task(tash_coro(event))
    await asyncio.sleep(1)
    event.set()

if __name__ == "__main__":
    asyncio.run(action_event())

asyncio的Semaphore对象

import asyncio

async def task_worker(semaphore, id) -> None:
    async with semaphore:
        print(f'Worker {id} is working')
        await asyncio.sleep(1)
        print(f'Worker {id} is done')

async def async_controller() -> None:
    semaphore = asyncio.Semaphore(2)  
    tasks = []
    for i in range(5):
        tasks.append(task_worker(semaphore, i))
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(async_controller())

asyncio的Lock对象

import asyncio

async def task_worker(lock, id) -> None:
    async with lock:
        print(f'Worker {id} is working')
        await asyncio.sleep(1)
        print(f'Worker {id} is done')

async def async_controller() -> None:
    lock = asyncio.Lock()
    tasks = []
    for i in range(5):
        tasks.append(task_worker(lock, i))
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(async_controller())

concurrent.futures高级并发管理

concurrent.futures 是 Python 标准库中提供的一个模块,用于实现并行执行任务的功能。它提供了两个主要的类ThreadPoolExecutorProcessPoolExecutor,分别用于线程池和进程池的管理。这两个类都实现了 Executor 接口,提供了一种高级的编程界面,用于管理并发任务的执行。

concurrent.futures — Launching parallel tasks

New in version 3.2.

Source code: Lib/concurrent/futures/thread.py and Lib/concurrent/futures/process.py


The concurrent.futures module provides a high-level interface for asynchronously executing callables.

The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor. Both implement the same interface, which is defined by the abstract Executor class.

Availability: not Emscripten, not WASI.

This module does not work or is not available on WebAssembly platforms wasm32-emscripten and wasm32-wasi. See WebAssembly platforms for more information.

from:【Python官方Doc】docs.python.org/3/library/c…

Executor抽象类

class concurrent.futures.Executor

An abstract class that provides methods to execute calls asynchronously. It should not be used directly, but through its concrete subclasses.

ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.

简而言之其有以下规范化特性:

  • Executor是一个抽象类,提供了异步执行调用的方法,不应直接使用,而是通过其具体的子类来使用。

  • submit(fn, /, *args, **kwargs):将可调用对象fnfn(*args, **kwargs)的形式安排在异步执行,并返回表示可调用对象执行的Future对象。

  • map(fn, *iterables, timeout=None, chunksize=1):类似于map(fn, *iterables),但是不同之处在于立即收集可迭代对象,而不是惰性地收集;fn会异步执行,并且可能会同时执行多个fn调用。

  • shutdown(wait=True, *, cancel_futures=False):通知执行器在当前待处理的所有Future执行完毕后释放任何正在使用的资源。在shutdown之后调用Executor.submit()Executor.map()会引发RuntimeError

    • 如果waitTrue,则该方法在所有待处理的Future执行完毕并释放与执行器关联的资源之前不会返回。
    • 如果waitFalse,该方法会立即返回,并在所有待处理的Future执行完毕时释放与执行器关联的资源。无论wait的值如何,整个Python程序在所有待处理的Future执行完毕之前不会退出。
    • 如果cancel_futuresTrue,该方法会取消所有未开始运行的待处理的Future。已完成或正在运行的Future不会被取消,不管cancel_futures的值是什么。
    • 如果cancel_futureswait都为True,所有执行器已开始运行的Future将在此方法返回之前完成。其余Future将被取消。

Future对象功能及特性

注意:这些内容只是表层实现,但是在Future的底层中还封装有_base,还有许多方法来自于它,比如as_completed

Future对象是一个异步执行可调用对象的封装。Future实例是通过Executor.submit()创建的,除了用于测试目的外,不应直接创建。

Future对象的主要特性包括:

  • cancel():尝试取消调用。如果调用当前正在执行或已经完成且无法取消,则该方法将返回False;否则将取消调用并返回True。
  • cancelled():如果调用成功取消,则返回True。
  • running():如果调用当前正在执行且无法取消,则返回True。
  • done():如果调用成功取消或已完成运行,则返回True。
  • result(timeout=None):返回调用的返回值。如果调用尚未完成,则此方法将等待最多timeout秒。如果调用在timeout秒内未完成,则将引发TimeoutError。
  • exception(timeout=None):返回调用引发的异常。如果调用尚未完成,则此方法将等待最多timeout秒。如果调用在timeout秒内未完成,则将引发TimeoutError。
  • add_done_callback(fn):将可调用对象fn附加到Future。当Future被取消或完成运行时,将调用fn,其唯一参数为Future
  • set_running_or_notify_cancel():该方法应由Executor实现在执行与Future相关联的工作之前调用,并且应由单元测试调用。如果方法返回False,则Future被取消。如果返回True,则Future未被取消并且已处于运行状态。
  • set_result(result):将与Future关联的工作的结果设置为result。
  • set_exception(exception):将与Future关联的工作的结果设置为异常exception。

ThreadPoolExecutor线程池

class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

ThreadPoolExecutor 是 Python 中 concurrent.futures 模块提供的一个线程池实现,用于管理和执行多个线程任务。它提供了一种简单而强大的方式来处理并发任务,尤其适用于需要执行大量独立、异步的任务的场景。

方法描述
ThreadPoolExecutor(max_workers=N)创建一个具有最大工作线程数为N的线程池。
submit(fn, *args, **kwargs)提交一个可调用对象到线程池,并返回一个Future对象。可以传递额外的参数给函数,并且会自动新建线程或进程。
cancel()取消任务。
cancelled()判断任务是否已取消。
done()判断任务是否已完成。
result(timeout=None)获取任务的执行结果(可选设置超时时间)。
exception(timeout=None)获取任务的执行异常(可选设置超时时间)。
shutdown(wait=True)关闭线程池,不再接受新任务(可选等待所有任务完成)。
map(func, *iterables)类似于内置的 map 函数,将 func 应用于 iterables 的每个元素,并返回结果。
map_async(func, *iterables)map 类似,但返回一个 concurrent.futures.Future 对象,可以用于异步获取结果。
submit_to_executor(executor, fn, *args, **kwargs)将函数 fn 提交给指定的执行器执行,并返回一个 Future 对象。
shutdown_threadpoolThreadPoolExecutor 上调用 shutdown 方法,可用于关闭线程池。
wait_for(future, timeout=None)等待指定的 Future 对象完成,并返回其结果,超时时间为 timeout 秒。
wait(fs, timeout=None, return_when=ALL_COMPLETED)等待给定的 Future 对象列表 fs 中的所有任务完成,并根据 return_when 指定的条件返回。
  • 使用示例
import concurrent.futures
import urllib.request

urls: list[str] = ['https://www.baidu.com/',
        'http://www.qq.com/']

def visit_the_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

if __name__ == "__main__":
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = {executor.submit(visit_the_url, url, 60): url for url in urls}

        for future in concurrent.futures.as_completed(futures):
            url = futures.get(future)
            try:
                data = future.result(timeout=2)
            except Exception as e:
                print(f'{url}generated an exception: {e}')
            else:
                print(f'{url} page is { len(data) } bytes')

concurrent.futures.as_completed(futures) 函数是 Python 中 concurrent.futures 模块提供的一个函数,用于迭代一个 Future 对象的迭代器,并在它们完成时产生 Future 对象。它返回一个迭代器,当其中的每个 Future 对象完成时,会产生一个值,而不会阻塞其他 Future 对象的完成。

ProcessPoolExecutor进程池

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)

The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

ProcessPoolExecutor 是 Python 中 concurrent.futures 模块提供的另一个并发执行任务的工具,与 ThreadPoolExecutor 不同的是,ProcessPoolExecutor 使用进程而不是线程来执行任务,因此更适用于 CPU 密集型的任务,它的使用与ThreadPoolExecutor是类似的。

import concurrent.futures
from typing import List

def square(n: int) -> int:
    return n * n

if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(square, i) for i in range(10)]

        results: List[int] = [future.result() for future in futures]
        
    print(results)