likes
comments
collection
share

python:多进程的几种实现方式

作者站长头像
站长
· 阅读数 49

1/使用multiprocessing模块的Pool()类

from multiprocessing.pool import Pool
Pool()类中有很多的函数。

Pool类可以提供指定数量(由processes参数决定)的进程供用户调用,
如果用户不指定该参数,则系统会根据逻辑核心数(比物理核心数,只多不少),来决定并行的进程数量。

当有新的请求提交到Pool中时,如果进程池还没有满,就会创建一个新的进程来执行请求。
如果进程池已经满了,请求就会告知先等待,直到进程池中有进程结束,才会创建新的进程来执行这些请求。

<1>下面介绍一下multiprocessing模块下的Pool()类下的几个方法:

<1>apply() 同步执行模式
    同步,堵塞主进程
    函数原型:apply(func[, args=()[, kwds={}]])
    该函数用于传递不定参数,func参数必须要有的,其他参数是可选的。同python中的apply函数一致,
    主进程会被阻塞直到所有子进程执行结束(不建议使用,并且3.x以后不再出现)。
    需要用到for循环,然后把迭代序列中的各个元素传入函数进行处理。
    for i in range(10):
        pool.apply(main, args=(i,))

<2>apply_async() 异步执行模式
    异步(子进程可以同时执行),非堵塞主进程,需要使用pool.join()来堵塞主进程。
    支持使用回调函数。
    函数原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])
    与apply用法一致,但它是非阻塞主进程的且支持结果返回后进行回调。
    需用到for循环,然后把迭代序列中的元素传入函数。
    for i in range(10):
        pool.apply_async(main, args=(i,), callback=f)

<3>map()
    同步(一个接一个的执行),堵塞主进程
    函数原型:map(func, iterable[, chunksize=None])
    Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到结果返回
    注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。
    不需for循环。map函数会把迭代序列中的每个元素传入函数。
    res = pool.map(main, list) # 把所有子进程的结果放在一个列表中
    res是列表类型

<4>map_async()
    异步(多个子进程同时执行),非堵塞主进程,需要.join()来堵塞主进程。
    有一个chunksize参数
    支持回调函数。
    函数原型:map_async(func, iterable[, chunksize[, callback]])
    与map用法一致,但是它是非阻塞的
    res = pool.map_async(main, list)
    res是map类型,需要用res.get()转化为list类型。


<5>close()
    关闭进程池(pool),使其不再接受新的任务。
    及开启的进程达到processes以后,就关闭进程池,新的进程需等待。
    当进程池有空闲的时候,再接收新的进程。

<6>terminal()
    结束工作进程,不再处理未处理的任务

<7>join()
    主进程阻塞,等待子进程的退出。
    也就是说,主进程在最后才执行,一直等到所有的子进程执行完毕。
    join()方法要在close()或terminate()之后使用
    
<8>map_async()和apply_async()的区别
    当想要提高一个任务的执行效率时,我们可以通过拆分任务,把整个大任务拆分成多个子任务,然后利用多进程进行异步处理,及同时处理,缩短整个的任务执行时间。
    在python中的multiprocessing模块中,有2个可以构造异步执行的进程任务方法,apply_async()和map_async()。
2个都可以分别添加任务,然后多进程异步同时执行,但是二者也有重要的区别。
    下面进行说明。
    (1)对于apply_async(func,args),func是要执行的函数,args是列表或者元组这样的对象,里面的元素是要传给func函数的参数。对于多个子任务,需要通过for循环分别多次调用apply_async()函数进行一一的添加,不过这可以通过列表解析实现,以让多个子进程的结果返回保存在一个列表中。
2)而对于map_async(func,iterable,chunksize)函数,如果多个子进程任务通过同一个函数执行,只是参数不同,那么可以把拆分后的参数以列表的形式通过iterable传入,并通过chunksize参数指定进程数。实际上这里的chunksize表示的是对iterable的拆分数,但是最好还是让其等于进程数。
3)以上只是二者比较轻微的差别,更重要的差别是:
       apply_async()是通过for循环手动指定进程并且添加任务,这样每个进程的执行结果之间是独立的,会分别保存。这样的好处是某个进程出现了问题,不会影响其他的进程。
       map_async()子进程并不是独立的,如果其中一个子进程出现了异常,则别的子进程也会受到影响。
    
4)因此apply_async()比map_async()更加靠谱
    

python:多进程的几种实现方式

<2>multiprocessing.Pool()类的示例

1)apply()和apply_async()

import os, time
import multiprocessing 

def worker(arg):
    print("子进程开始执行>>> pid={},ppid={},编号{}".format(os.getpid(),os.getppid(),arg))
    time.sleep(0.5)
    
    print("子进程终止>>> pid={},ppid={},编号{}".format(os.getpid(),os.getppid(),arg))
  
def main():
    print("主进程开始执行>>> pid={}".format(os.getpid()))
    
    process_pool = multiprocessing.Pool(processes=5)
    for i in range(10):
        #process_pool.apply(worker,args=(i,))      # 同步执行
        process_pool.apply_async(worker,args=(i,)) # 异步执行
        
    # 关闭进程池,停止接受其它进程
    process_pool.close()
    
    # 阻塞主进程
    process_pool.join()
    
    print("主进程终止")
    
    
# 测试一下
# 同步执行方式,apply(function,args=(i,))
# 结果如下所示,可以看出同步执行模式会堵塞,也就是说必须一个一个的来执行,并没有达到并行的目的。
# 这里说的堵塞,指的是子进程之间的堵塞,也就是说子进程之间是一个接着一个来执行的,并没有并行执行。
主进程开始执行>>> pid=6688

子进程开始执行>>> pid=6689,ppid=6688,编号0
子进程终止>>> pid=6689,ppid=6688,编号0
子进程开始执行>>> pid=6690,ppid=6688,编号1
子进程终止>>> pid=6690,ppid=6688,编号1
子进程开始执行>>> pid=6691,ppid=6688,编号2
子进程终止>>> pid=6691,ppid=6688,编号2
子进程开始执行>>> pid=6692,ppid=6688,编号3
子进程终止>>> pid=6692,ppid=6688,编号3
子进程开始执行>>> pid=6693,ppid=6688,编号4
子进程终止>>> pid=6693,ppid=6688,编号4
子进程开始执行>>> pid=6689,ppid=6688,编号5
子进程终止>>> pid=6689,ppid=6688,编号5
子进程开始执行>>> pid=6690,ppid=6688,编号6
子进程终止>>> pid=6690,ppid=6688,编号6
子进程开始执行>>> pid=6691,ppid=6688,编号7
子进程终止>>> pid=6691,ppid=6688,编号7
子进程开始执行>>> pid=6692,ppid=6688,编号8
子进程终止>>> pid=6692,ppid=6688,编号8
子进程开始执行>>> pid=6693,ppid=6688,编号9
子进程终止>>> pid=6693,ppid=6688,编号9

主进程终止


#######################
#######################
# 异步执行方式
# apply_async()
# 结果如下所示,可以看出异步执行模式不会堵塞,达到了并行的目的。
主进程开始执行>>> pid=8449

子进程开始执行>>> pid=8451,ppid=8449,编号0
子进程开始执行>>> pid=8452,ppid=8449,编号1
子进程开始执行>>> pid=8453,ppid=8449,编号2
子进程开始执行>>> pid=8454,ppid=8449,编号3
子进程开始执行>>> pid=8455,ppid=8449,编号4
子进程终止>>> pid=8451,ppid=8449,编号0
子进程开始执行>>> pid=8451,ppid=8449,编号5
子进程终止>>> pid=8454,ppid=8449,编号3
子进程终止>>> pid=8452,ppid=8449,编号1
子进程终止>>> pid=8453,ppid=8449,编号2子进程终止>>> pid=8455,ppid=8449,编号4

子进程开始执行>>> pid=8455,ppid=8449,编号6
子进程开始执行>>> pid=8454,ppid=8449,编号7
子进程开始执行>>> pid=8452,ppid=8449,编号8
子进程开始执行>>> pid=8453,ppid=8449,编号9
子进程终止>>> pid=8451,ppid=8449,编号5
子进程终止>>> pid=8454,ppid=8449,编号7
子进程终止>>> pid=8455,ppid=8449,编号6
子进程终止>>> pid=8453,ppid=8449,编号9
子进程终止>>> pid=8452,ppid=8449,编号8

主进程终止

2)map()和map_async()

import os,time
import multiprocessing

def worker(arg):
    print("子进程开始执行>>> pid={},ppid={},编号{}".format(os.getpid(),os.getppid(),arg))
    time.sleep(0.5)
    
    print("子进程终止>>> pid={},ppid={},编号{}".format(os.getpid(),os.getppid(),arg))
  
def main():
    print("主进程开始执行>>> pid={}".format(os.getpid()))
    
    process_pool = multiprocessing.Pool(processes=5)
    data = range(5)
    
    # process_pool.map(worker,data)     
    process_pool.map_async(worker,data)   
    # 这里没有返回结果,如果有返回结果,则需要写成res = pool.map_async(worker,data)
        
    # 关闭进程池,停止接受其它进程
    process_pool.close()
    
    # 阻塞进程
    process_pool.join()
    
    print("主进程终止")
    

2/concurrent.futures.ProcessPoolExecutor()进程池

标准库concurrent.futures模块,
它提供了ProcessPoolExecutor()和ThreadPoolExecutor()两个类,
实现了对threading和multiprocessing的进一步抽象.

2种提交任务的方式:同步提交,异步提交。

<1>同步提交任务方式

提交任务,原地等待任务执行结束,拿到任务返回结果.result(),再执行接下来的任务 
优点:可以解耦合 
缺点:速度慢,因为需要等结果,然后再执行下一个
   
  import datetime
  from concurrent.futures import ProcessPoolExecutor
  import time, random, os
  import requests

  def f(name):
      print('%s %s is running'%(name,os.getpid()))
      #print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
  if __name__ == '__main__':
    process_pool = ProcessPoolExecutor(max_workers=10) # 设置进程池内进程数
    
    for i in range(10):
        # 同步调用方式,调用和等值
        # 传参方式(任务名,参数),参数使用位置或者关键字参数
        obj = process_pool.submit(f,"进程pid:")
        
        # .result()函数,得到子进程的返回结果,及return的结果
        # 如果f()函数中没有return,则obj.result()是None
        res = obj.result()
        
    # 关闭进程池的入口,等待池内任务运行结束,再执行主进程
    process_pool.shutdown(wait=True) 
    
    print("主线程结束")

<2>异步提交任务方式

只调用函数f,不等值
优点:速度快
缺点:存在耦合
    import datetime
    from concurrent.futures import ProcessPoolExecutor
    import time, random, os
    import requests
    
    def f(name): 
        print("%s %s is running" %(name,os.getpid())) 
        time.sleep(random.randint(1,3)) 
    
    if __name__ == '__main__': 
        #设置进程池内进程 
        process_pool = ProcessPoolExecutor(max_workers=10)
        
        for i in range(10): 
            # 异步提交方式,只调用,不等值 
            process_pool.submit(f,'进程pid:') 
            # 传参方式(任务名,参数),参数使用位置参数或者关键字参数
        
        # 关闭进程池的入口,等待池内任务运行结束,再执行主进程
        process_pool.shutdown( wait=True ) 
        
        print('主线程')
转载自:https://juejin.cn/post/7026933546698670088
评论
请登录