1/使用multiprocessing模块的Pool()类
from multiprocessing.pool import Pool
Pool()类中有很多的函数。
Pool类可以提供指定数量(由processes参数决定)的进程供用户调用,
如果用户不指定该参数,则系统会根据逻辑核心数(比物理核心数,只多不少),来决定并行的进程数量。
当有新的请求提交到Pool中时,如果进程池还没有满,就会创建一个新的进程来执行请求。
如果进程池已经满了,请求就会告知先等待,直到进程池中有进程结束,才会创建新的进程来执行这些请求。
<1>下面介绍一下multiprocessing模块下的Pool()类下的几个方法:
<1>apply() 同步执行模式
同步,堵塞主进程
函数原型:apply(func[, args=()[, kwds={}]])
该函数用于传递不定参数,func参数必须要有的,其他参数是可选的。同python中的apply函数一致,
主进程会被阻塞直到所有子进程执行结束(不建议使用,并且3.x以后不再出现)。
需要用到for循环,然后把迭代序列中的各个元素传入函数进行处理。
for i in range(10):
pool.apply(main, args=(i,))
<2>apply_async() 异步执行模式
异步(子进程可以同时执行),非堵塞主进程,需要使用pool.join()来堵塞主进程。
支持使用回调函数。
函数原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])
与apply用法一致,但它是非阻塞主进程的且支持结果返回后进行回调。
需用到for循环,然后把迭代序列中的元素传入函数。
for i in range(10):
pool.apply_async(main, args=(i,), callback=f)
<3>map()
同步(一个接一个的执行),堵塞主进程
函数原型:map(func, iterable[, chunksize=None])
Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到结果返回
注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。
不需for循环。map函数会把迭代序列中的每个元素传入函数。
res = pool.map(main, list) # 把所有子进程的结果放在一个列表中
res是列表类型
<4>map_async()
异步(多个子进程同时执行),非堵塞主进程,需要.join()来堵塞主进程。
有一个chunksize参数
支持回调函数。
函数原型:map_async(func, iterable[, chunksize[, callback]])
与map用法一致,但是它是非阻塞的
res = pool.map_async(main, list)
res是map类型,需要用res.get()转化为list类型。
<5>close()
关闭进程池(pool),使其不再接受新的任务。
及开启的进程达到processes以后,就关闭进程池,新的进程需等待。
当进程池有空闲的时候,再接收新的进程。
<6>terminal()
结束工作进程,不再处理未处理的任务
<7>join()
主进程阻塞,等待子进程的退出。
也就是说,主进程在最后才执行,一直等到所有的子进程执行完毕。
join()方法要在close()或terminate()之后使用
<8>map_async()和apply_async()的区别
当想要提高一个任务的执行效率时,我们可以通过拆分任务,把整个大任务拆分成多个子任务,然后利用多进程进行异步处理,及同时处理,缩短整个的任务执行时间。
在python中的multiprocessing模块中,有2个可以构造异步执行的进程任务方法,apply_async()和map_async()。
这2个都可以分别添加任务,然后多进程异步同时执行,但是二者也有重要的区别。
下面进行说明。
(1)对于apply_async(func,args),func是要执行的函数,args是列表或者元组这样的对象,里面的元素是要传给func函数的参数。对于多个子任务,需要通过for循环分别多次调用apply_async()函数进行一一的添加,不过这可以通过列表解析实现,以让多个子进程的结果返回保存在一个列表中。
(2)而对于map_async(func,iterable,chunksize)函数,如果多个子进程任务通过同一个函数执行,只是参数不同,那么可以把拆分后的参数以列表的形式通过iterable传入,并通过chunksize参数指定进程数。实际上这里的chunksize表示的是对iterable的拆分数,但是最好还是让其等于进程数。
(3)以上只是二者比较轻微的差别,更重要的差别是:
apply_async()是通过for循环手动指定进程并且添加任务,这样每个进程的执行结果之间是独立的,会分别保存。这样的好处是某个进程出现了问题,不会影响其他的进程。
map_async()子进程并不是独立的,如果其中一个子进程出现了异常,则别的子进程也会受到影响。
(4)因此apply_async()比map_async()更加靠谱

<2>multiprocessing.Pool()类的示例
1)apply()和apply_async()
import os, time
import multiprocessing
def worker(arg):
print("子进程开始执行>>> pid={},ppid={},编号{}".format(os.getpid(),os.getppid(),arg))
time.sleep(0.5)
print("子进程终止>>> pid={},ppid={},编号{}".format(os.getpid(),os.getppid(),arg))
def main():
print("主进程开始执行>>> pid={}".format(os.getpid()))
process_pool = multiprocessing.Pool(processes=5)
for i in range(10):
#process_pool.apply(worker,args=(i,)) # 同步执行
process_pool.apply_async(worker,args=(i,)) # 异步执行
# 关闭进程池,停止接受其它进程
process_pool.close()
# 阻塞主进程
process_pool.join()
print("主进程终止")
# 测试一下
# 同步执行方式,apply(function,args=(i,))
# 结果如下所示,可以看出同步执行模式会堵塞,也就是说必须一个一个的来执行,并没有达到并行的目的。
# 这里说的堵塞,指的是子进程之间的堵塞,也就是说子进程之间是一个接着一个来执行的,并没有并行执行。
主进程开始执行>>> pid=6688
子进程开始执行>>> pid=6689,ppid=6688,编号0
子进程终止>>> pid=6689,ppid=6688,编号0
子进程开始执行>>> pid=6690,ppid=6688,编号1
子进程终止>>> pid=6690,ppid=6688,编号1
子进程开始执行>>> pid=6691,ppid=6688,编号2
子进程终止>>> pid=6691,ppid=6688,编号2
子进程开始执行>>> pid=6692,ppid=6688,编号3
子进程终止>>> pid=6692,ppid=6688,编号3
子进程开始执行>>> pid=6693,ppid=6688,编号4
子进程终止>>> pid=6693,ppid=6688,编号4
子进程开始执行>>> pid=6689,ppid=6688,编号5
子进程终止>>> pid=6689,ppid=6688,编号5
子进程开始执行>>> pid=6690,ppid=6688,编号6
子进程终止>>> pid=6690,ppid=6688,编号6
子进程开始执行>>> pid=6691,ppid=6688,编号7
子进程终止>>> pid=6691,ppid=6688,编号7
子进程开始执行>>> pid=6692,ppid=6688,编号8
子进程终止>>> pid=6692,ppid=6688,编号8
子进程开始执行>>> pid=6693,ppid=6688,编号9
子进程终止>>> pid=6693,ppid=6688,编号9
主进程终止
#######################
#######################
# 异步执行方式
# apply_async()
# 结果如下所示,可以看出异步执行模式不会堵塞,达到了并行的目的。
主进程开始执行>>> pid=8449
子进程开始执行>>> pid=8451,ppid=8449,编号0
子进程开始执行>>> pid=8452,ppid=8449,编号1
子进程开始执行>>> pid=8453,ppid=8449,编号2
子进程开始执行>>> pid=8454,ppid=8449,编号3
子进程开始执行>>> pid=8455,ppid=8449,编号4
子进程终止>>> pid=8451,ppid=8449,编号0
子进程开始执行>>> pid=8451,ppid=8449,编号5
子进程终止>>> pid=8454,ppid=8449,编号3
子进程终止>>> pid=8452,ppid=8449,编号1
子进程终止>>> pid=8453,ppid=8449,编号2子进程终止>>> pid=8455,ppid=8449,编号4
子进程开始执行>>> pid=8455,ppid=8449,编号6
子进程开始执行>>> pid=8454,ppid=8449,编号7
子进程开始执行>>> pid=8452,ppid=8449,编号8
子进程开始执行>>> pid=8453,ppid=8449,编号9
子进程终止>>> pid=8451,ppid=8449,编号5
子进程终止>>> pid=8454,ppid=8449,编号7
子进程终止>>> pid=8455,ppid=8449,编号6
子进程终止>>> pid=8453,ppid=8449,编号9
子进程终止>>> pid=8452,ppid=8449,编号8
主进程终止
2)map()和map_async()
import os,time
import multiprocessing
def worker(arg):
print("子进程开始执行>>> pid={},ppid={},编号{}".format(os.getpid(),os.getppid(),arg))
time.sleep(0.5)
print("子进程终止>>> pid={},ppid={},编号{}".format(os.getpid(),os.getppid(),arg))
def main():
print("主进程开始执行>>> pid={}".format(os.getpid()))
process_pool = multiprocessing.Pool(processes=5)
data = range(5)
# process_pool.map(worker,data)
process_pool.map_async(worker,data)
# 这里没有返回结果,如果有返回结果,则需要写成res = pool.map_async(worker,data)
# 关闭进程池,停止接受其它进程
process_pool.close()
# 阻塞进程
process_pool.join()
print("主进程终止")
2/concurrent.futures.ProcessPoolExecutor()进程池
标准库concurrent.futures模块,
它提供了ProcessPoolExecutor()和ThreadPoolExecutor()两个类,
实现了对threading和multiprocessing的进一步抽象.
有2种提交任务的方式:同步提交,异步提交。
<1>同步提交任务方式
提交任务,原地等待任务执行结束,拿到任务返回结果.result(),再执行接下来的任务
优点:可以解耦合
缺点:速度慢,因为需要等结果,然后再执行下一个
import datetime
from concurrent.futures import ProcessPoolExecutor
import time, random, os
import requests
def f(name):
print('%s %s is running'%(name,os.getpid()))
#print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
if __name__ == '__main__':
process_pool = ProcessPoolExecutor(max_workers=10) # 设置进程池内进程数
for i in range(10):
# 同步调用方式,调用和等值
# 传参方式(任务名,参数),参数使用位置或者关键字参数
obj = process_pool.submit(f,"进程pid:")
# .result()函数,得到子进程的返回结果,及return的结果
# 如果f()函数中没有return,则obj.result()是None
res = obj.result()
# 关闭进程池的入口,等待池内任务运行结束,再执行主进程
process_pool.shutdown(wait=True)
print("主线程结束")
<2>异步提交任务方式
只调用函数f,不等值
优点:速度快
缺点:存在耦合
import datetime
from concurrent.futures import ProcessPoolExecutor
import time, random, os
import requests
def f(name):
print("%s %s is running" %(name,os.getpid()))
time.sleep(random.randint(1,3))
if __name__ == '__main__':
#设置进程池内进程
process_pool = ProcessPoolExecutor(max_workers=10)
for i in range(10):
# 异步提交方式,只调用,不等值
process_pool.submit(f,'进程pid:')
# 传参方式(任务名,参数),参数使用位置参数或者关键字参数
# 关闭进程池的入口,等待池内任务运行结束,再执行主进程
process_pool.shutdown( wait=True )
print('主线程')