【从1到∞精通Python】7、Pipe和Queue,进程间通信
原文链接:【Hard Python】【第一章-多进程】2、Pipe和Queue,进程间通信
第一话详细讲解了Process新进程是如何被创建的,接下来就来讲一下进程之间有什么通信的方法。
要在multiprocessing中实现进程间通信,最直接的方法是采用Pipe或者Queue。其用法如下:
from multiprocessing import Process, Pipe, Queue
import time
from mp_module import log, seg
def _test_queue(q):
while True:
msg = q.get()
if msg == 'quit':
break
else:
log(f'recv: {msg}')
log('child process end~')
def test_queue():
seg('test queue start')
q = Queue()
p = Process(target=_test_queue, args=(q,))
p.start()
cmds = ['helloworld', 'testmsg', 'quit']
for cmd in cmds:
log(f'send: {cmd}')
q.put(cmd)
time.sleep(1)
assert not p.is_alive()
seg('test queue end')
def _test_pipe(r):
while True:
msg = r.recv()
if msg == 'quit':
break
else:
log(f'recv: {msg}')
log('child process end~')
def test_pipe():
seg('test pipe start')
r, w = Pipe()
p = Process(target=_test_pipe, args=(r,))
p.start()
cmds = ['helloworld', 'testmsg', 'quit']
for cmd in cmds:
log(f'send: {cmd}')
w.send(cmd)
time.sleep(1)
assert not p.is_alive()
seg('test pipe end')
形式上非常简单。Pipe创建了一对reader跟writer,将reader传入子进程,主进程在writer写入数据,子进程即能通过reader读取到;Queue则更为方便,其实例能够直接传入子进程。主进程调用put即可写入数据,子进程调用get即可获取数据。
首先我们先看Pipe在windows下的实现:
def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a pipe
'''
address = arbitrary_address('AF_PIPE')
if duplex:
openmode = _winapi.PIPE_ACCESS_DUPLEX
access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
obsize, ibsize = BUFSIZE, BUFSIZE
else:
openmode = _winapi.PIPE_ACCESS_INBOUND
access = _winapi.GENERIC_WRITE
obsize, ibsize = 0, BUFSIZE
h1 = _winapi.CreateNamedPipe(
address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
_winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
_winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
_winapi.PIPE_WAIT,
1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
# default security descriptor: the handle cannot be inherited
_winapi.NULL
)
h2 = _winapi.CreateFile(
address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
_winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
)
_winapi.SetNamedPipeHandleState(
h2, _winapi.PIPE_READMODE_MESSAGE, None, None
)
overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True)
_, err = overlapped.GetOverlappedResult(True)
assert err == 0
c1 = PipeConnection(h1, writable=duplex)
c2 = PipeConnection(h2, readable=duplex)
return c1, c2
h1与h2互为服务端/客户端的关系。h1通过CreateNamedPipe创建,之后h2通过CreateFile连接到h1的NamedPipe上。之后用PipeConnection封装h1和h2两端,返回reader跟writer。当然,两个管道入口是否双工也是可选项。
经过PipeConnection的封装,管道即拥有了发送或接收python对象的方法。python对象的序列化/反序列化会用内置库pickle来支持。被pickle的对象会保留特定的信息,比如某个模块def的函数,在pickle时除了对函数本身进行序列化外,也会封存函数所属模块的信息。在unpickle时,如果找不到对应模块的信息,就会报错。因此多进程之间通信python对象时,需要留意序列化/反序列化后对应对象的取值/模块环境等情况。pickle的官方文档给到了我们足够的信息去了解这些机制。
接下来我们转向Queue的实现。相对于Pipe,Queue是对其的封装,并提供了更多的功能。这里我们完整列举一下Queue的关键代码::
class Queue(object):
def __init__(self, maxsize=0, *, ctx):
if maxsize <= 0:
from .synchronize import SEM_VALUE_MAX as maxsize
self._maxsize = maxsize
self._reader, self._writer = connection.Pipe(duplex=False)
self._rlock = ctx.Lock()
self._opid = os.getpid()
if sys.platform == 'win32':
self._wlock = None
else:
self._wlock = ctx.Lock()
self._sem = ctx.BoundedSemaphore(maxsize)
self._ignore_epipe = False
self._reset()
if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)
def __getstate__(self):
context.assert_spawning(self)
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._reset()
def _reset(self, after_fork=False):
if after_fork:
self._notempty._at_fork_reinit()
else:
self._notempty = threading.Condition(threading.Lock())
self._buffer = collections.deque()
self._thread = None
self._jointhread = None
self._joincancelled = False
self._closed = False
self._close = None
self._send_bytes = self._writer.send_bytes
self._recv_bytes = self._reader.recv_bytes
self._poll = self._reader.poll
def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if not self._sem.acquire(block, timeout):
raise Full
with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()
def get(self, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.monotonic() + timeout
if not self._rlock.acquire(block, timeout):
raise Empty
try:
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):
raise Empty
elif not self._poll():
raise Empty
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
return _ForkingPickler.loads(res)
def close(self):
self._closed = True
try:
self._reader.close()
finally:
close = self._close
if close:
self._close = None
close()
def _start_thread(self):
self._buffer.clear()
self._thread = threading.Thread(
target=Queue._feed,
args=(self._buffer, self._notempty, self._send_bytes,
self._wlock, self._writer.close, self._ignore_epipe,
self._on_queue_feeder_error, self._sem),
name='QueueFeederThread'
)
self._thread.daemon = True
self._thread.start()
if not self._joincancelled:
self._jointhread = Finalize(
self._thread, Queue._finalize_join,
[weakref.ref(self._thread)],
exitpriority=-5
)
self._close = Finalize(
self, Queue._finalize_close,
[self._buffer, self._notempty],
exitpriority=10
)
@staticmethod
def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
onerror, queue_sem):
debug('starting thread to feed data to pipe')
nacquire = notempty.acquire
nrelease = notempty.release
nwait = notempty.wait
bpopleft = buffer.popleft
sentinel = _sentinel
if sys.platform != 'win32':
wacquire = writelock.acquire
wrelease = writelock.release
else:
wacquire = None
while 1:
try:
nacquire()
try:
if not buffer:
nwait()
finally:
nrelease()
try:
while 1:
obj = bpopleft()
if obj is sentinel:
debug('feeder thread got sentinel -- exiting')
close()
return
obj = _ForkingPickler.dumps(obj)
if wacquire is None:
send_bytes(obj)
else:
wacquire()
try:
send_bytes(obj)
finally:
wrelease()
except IndexError:
pass
except Exception as e:
if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
return
if is_exiting():
info('error in queue thread: %s', e)
return
else:
queue_sem.release()
onerror(e, obj)
在Queue的__init__函数中,构造了这么些对象:
self._maxsize:Queue队列的最大长度self._reader,self._writer:pipe的两端self._rlock:self._reader的读锁self._wlock:self._writer的写锁self._sem:以self._maxsize为基准的信号量,用来记录队列填满情况self._notempty:队列非空的条件变量。如果队列为空,需要waitself._buffer:存储python对象的队列dequeself._thread:消费线程,用于消费self._buffer中的内容并发送到pipe中self._jointhread:关闭消费线程的finalizerself._close:关闭pipe的finalizer
其中还要留意一点是,Queue实例本身是要传给Process实例,并在另一个进程被反序列化一次。因此为了保证序列化/反序列化之后部分状态得到保留(比如pipe),Queue的类定义中采用了__getstate__和__setstate__两个钩子去实现实例内部状态的存储与读取。这个特性在pickle的官方文档内有详细的说明。
大致了解了这些对象的含义后,接下来,就详细把Queue的工作流程列一下:
- 用户调用
put,信号量self._sem增加一个占位,之后发现消费线程未启动,通过self._start_thread启动消费线程Queue._feed - 消费线程进入循环,发现队列
self._buffer为空,条件变量self._notempty进入wait self._start_thread之后,将python对象推入self._buffer,并notify条件变量self._notempty- 这一步很有概率发生在上一步之前,不过无所谓了
_feed退出wait状态,pop出python对象,然后将其pickle,最后调用self._writer._send_bytes发送序列化之后的数据到pipe内- 这里注意,如果
python对象是object(),会触发self._writer.close。因此实际业务代码中最好不要出现发送object()的情况
- 这里注意,如果
- 用户调用
get,通过self._reader读取pipe中的数据,并且让信号量self._sem释放一个占位。之后对数据进行反序列化,得到发送过来的对象 - 用户调用
close,首先关闭self._reader,然后在self._writer中发送一个object()。_feed会一直消费队列,直到检测到最后的object(),终于触发self._writer的关闭。这样pipe的两端就都关闭,并且buffer里也没有任何其它数据了。 - 用户调用
join_thread,触发self._thread.join(),关闭消费线程
在multiprocessing当中,Queue还有两种变体,分别为SimpleQueue和JoinableQueue。SimpleQueue没有提供blocking或timeout的功能,只是简单创建一对pipe交换序列化的数据。JoinableQueue则是在Queue的基础上增加了join的功能,其实现上是增加了一个初始值0的信号量_unfinished_tasks以及一个条件变量_cond。JoinableQueue在调用join时,如果_unfinished_tasks信号量不为0会进入_cond.wait,这是因为每次put的时候_unfinished_tasks信号量会release一次,只有用户每次get之后显式调用JoinableQueue.task_done才能acquire一次信号量,最终使得_unfinished_tasks信号量归零并notify_all所有join的调用。
最后,进程间通信的方法说到底,除了Pipe跟Queue外,采用Manager共享内存或者直接用socket网络通信都是ok的方式。当然,如果是在单节点上面,并且是一个内聚的python项目的话,Queue是不二选择。
转载自:https://juejin.cn/post/7240371866288209979