【从1到∞精通Python】7、Pipe和Queue,进程间通信
原文链接:【Hard Python】【第一章-多进程】2、Pipe和Queue,进程间通信
第一话详细讲解了Process新进程是如何被创建的,接下来就来讲一下进程之间有什么通信的方法。
要在multiprocessing
中实现进程间通信,最直接的方法是采用Pipe
或者Queue
。其用法如下:
from multiprocessing import Process, Pipe, Queue
import time
from mp_module import log, seg
def _test_queue(q):
while True:
msg = q.get()
if msg == 'quit':
break
else:
log(f'recv: {msg}')
log('child process end~')
def test_queue():
seg('test queue start')
q = Queue()
p = Process(target=_test_queue, args=(q,))
p.start()
cmds = ['helloworld', 'testmsg', 'quit']
for cmd in cmds:
log(f'send: {cmd}')
q.put(cmd)
time.sleep(1)
assert not p.is_alive()
seg('test queue end')
def _test_pipe(r):
while True:
msg = r.recv()
if msg == 'quit':
break
else:
log(f'recv: {msg}')
log('child process end~')
def test_pipe():
seg('test pipe start')
r, w = Pipe()
p = Process(target=_test_pipe, args=(r,))
p.start()
cmds = ['helloworld', 'testmsg', 'quit']
for cmd in cmds:
log(f'send: {cmd}')
w.send(cmd)
time.sleep(1)
assert not p.is_alive()
seg('test pipe end')
形式上非常简单。Pipe
创建了一对reader
跟writer
,将reader
传入子进程,主进程在writer
写入数据,子进程即能通过reader
读取到;Queue
则更为方便,其实例能够直接传入子进程。主进程调用put
即可写入数据,子进程调用get
即可获取数据。
首先我们先看Pipe
在windows下的实现:
def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a pipe
'''
address = arbitrary_address('AF_PIPE')
if duplex:
openmode = _winapi.PIPE_ACCESS_DUPLEX
access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
obsize, ibsize = BUFSIZE, BUFSIZE
else:
openmode = _winapi.PIPE_ACCESS_INBOUND
access = _winapi.GENERIC_WRITE
obsize, ibsize = 0, BUFSIZE
h1 = _winapi.CreateNamedPipe(
address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
_winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
_winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
_winapi.PIPE_WAIT,
1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
# default security descriptor: the handle cannot be inherited
_winapi.NULL
)
h2 = _winapi.CreateFile(
address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
_winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
)
_winapi.SetNamedPipeHandleState(
h2, _winapi.PIPE_READMODE_MESSAGE, None, None
)
overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True)
_, err = overlapped.GetOverlappedResult(True)
assert err == 0
c1 = PipeConnection(h1, writable=duplex)
c2 = PipeConnection(h2, readable=duplex)
return c1, c2
h1
与h2
互为服务端/客户端的关系。h1
通过CreateNamedPipe
创建,之后h2
通过CreateFile
连接到h1
的NamedPipe
上。之后用PipeConnection
封装h1
和h2
两端,返回reader
跟writer
。当然,两个管道入口是否双工也是可选项。
经过PipeConnection
的封装,管道即拥有了发送或接收python
对象的方法。python
对象的序列化/反序列化会用内置库pickle
来支持。被pickle
的对象会保留特定的信息,比如某个模块def
的函数,在pickle
时除了对函数本身进行序列化外,也会封存函数所属模块的信息。在unpickle
时,如果找不到对应模块的信息,就会报错。因此多进程之间通信python对象时,需要留意序列化/反序列化后对应对象的取值/模块环境等情况。pickle的官方文档给到了我们足够的信息去了解这些机制。
接下来我们转向Queue
的实现。相对于Pipe
,Queue
是对其的封装,并提供了更多的功能。这里我们完整列举一下Queue
的关键代码::
class Queue(object):
def __init__(self, maxsize=0, *, ctx):
if maxsize <= 0:
from .synchronize import SEM_VALUE_MAX as maxsize
self._maxsize = maxsize
self._reader, self._writer = connection.Pipe(duplex=False)
self._rlock = ctx.Lock()
self._opid = os.getpid()
if sys.platform == 'win32':
self._wlock = None
else:
self._wlock = ctx.Lock()
self._sem = ctx.BoundedSemaphore(maxsize)
self._ignore_epipe = False
self._reset()
if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)
def __getstate__(self):
context.assert_spawning(self)
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._reset()
def _reset(self, after_fork=False):
if after_fork:
self._notempty._at_fork_reinit()
else:
self._notempty = threading.Condition(threading.Lock())
self._buffer = collections.deque()
self._thread = None
self._jointhread = None
self._joincancelled = False
self._closed = False
self._close = None
self._send_bytes = self._writer.send_bytes
self._recv_bytes = self._reader.recv_bytes
self._poll = self._reader.poll
def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if not self._sem.acquire(block, timeout):
raise Full
with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()
def get(self, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.monotonic() + timeout
if not self._rlock.acquire(block, timeout):
raise Empty
try:
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):
raise Empty
elif not self._poll():
raise Empty
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
return _ForkingPickler.loads(res)
def close(self):
self._closed = True
try:
self._reader.close()
finally:
close = self._close
if close:
self._close = None
close()
def _start_thread(self):
self._buffer.clear()
self._thread = threading.Thread(
target=Queue._feed,
args=(self._buffer, self._notempty, self._send_bytes,
self._wlock, self._writer.close, self._ignore_epipe,
self._on_queue_feeder_error, self._sem),
name='QueueFeederThread'
)
self._thread.daemon = True
self._thread.start()
if not self._joincancelled:
self._jointhread = Finalize(
self._thread, Queue._finalize_join,
[weakref.ref(self._thread)],
exitpriority=-5
)
self._close = Finalize(
self, Queue._finalize_close,
[self._buffer, self._notempty],
exitpriority=10
)
@staticmethod
def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
onerror, queue_sem):
debug('starting thread to feed data to pipe')
nacquire = notempty.acquire
nrelease = notempty.release
nwait = notempty.wait
bpopleft = buffer.popleft
sentinel = _sentinel
if sys.platform != 'win32':
wacquire = writelock.acquire
wrelease = writelock.release
else:
wacquire = None
while 1:
try:
nacquire()
try:
if not buffer:
nwait()
finally:
nrelease()
try:
while 1:
obj = bpopleft()
if obj is sentinel:
debug('feeder thread got sentinel -- exiting')
close()
return
obj = _ForkingPickler.dumps(obj)
if wacquire is None:
send_bytes(obj)
else:
wacquire()
try:
send_bytes(obj)
finally:
wrelease()
except IndexError:
pass
except Exception as e:
if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
return
if is_exiting():
info('error in queue thread: %s', e)
return
else:
queue_sem.release()
onerror(e, obj)
在Queue
的__init__
函数中,构造了这么些对象:
self._maxsize
:Queue
队列的最大长度self._reader
,self._writer
:pipe
的两端self._rlock
:self._reader
的读锁self._wlock
:self._writer
的写锁self._sem
:以self._maxsize
为基准的信号量,用来记录队列填满情况self._notempty
:队列非空的条件变量。如果队列为空,需要wait
self._buffer
:存储python
对象的队列deque
self._thread
:消费线程,用于消费self._buffer
中的内容并发送到pipe
中self._jointhread
:关闭消费线程的finalizer
self._close
:关闭pipe
的finalizer
其中还要留意一点是,Queue
实例本身是要传给Process
实例,并在另一个进程被反序列化一次。因此为了保证序列化/反序列化之后部分状态得到保留(比如pipe
),Queue
的类定义中采用了__getstate__
和__setstate__
两个钩子去实现实例内部状态的存储与读取。这个特性在pickle
的官方文档内有详细的说明。
大致了解了这些对象的含义后,接下来,就详细把Queue
的工作流程列一下:
- 用户调用
put
,信号量self._sem
增加一个占位,之后发现消费线程未启动,通过self._start_thread
启动消费线程Queue._feed
- 消费线程进入循环,发现队列
self._buffer
为空,条件变量self._notempty
进入wait
self._start_thread
之后,将python
对象推入self._buffer
,并notify
条件变量self._notempty
- 这一步很有概率发生在上一步之前,不过无所谓了
_feed
退出wait
状态,pop
出python
对象,然后将其pickle
,最后调用self._writer._send_bytes
发送序列化之后的数据到pipe
内- 这里注意,如果
python
对象是object()
,会触发self._writer.close
。因此实际业务代码中最好不要出现发送object()
的情况
- 这里注意,如果
- 用户调用
get
,通过self._reader
读取pipe
中的数据,并且让信号量self._sem
释放一个占位。之后对数据进行反序列化,得到发送过来的对象 - 用户调用
close
,首先关闭self._reader
,然后在self._writer
中发送一个object()
。_feed
会一直消费队列,直到检测到最后的object()
,终于触发self._writer
的关闭。这样pipe
的两端就都关闭,并且buffer
里也没有任何其它数据了。 - 用户调用
join_thread
,触发self._thread.join()
,关闭消费线程
在multiprocessing
当中,Queue
还有两种变体,分别为SimpleQueue
和JoinableQueue
。SimpleQueue
没有提供blocking
或timeout
的功能,只是简单创建一对pipe
交换序列化的数据。JoinableQueue
则是在Queue
的基础上增加了join
的功能,其实现上是增加了一个初始值0的信号量_unfinished_tasks
以及一个条件变量_cond
。JoinableQueue
在调用join
时,如果_unfinished_tasks
信号量不为0会进入_cond.wait
,这是因为每次put
的时候_unfinished_tasks
信号量会release
一次,只有用户每次get
之后显式调用JoinableQueue.task_done
才能acquire
一次信号量,最终使得_unfinished_tasks
信号量归零并notify_all
所有join
的调用。
最后,进程间通信的方法说到底,除了Pipe
跟Queue
外,采用Manager
共享内存或者直接用socket
网络通信都是ok的方式。当然,如果是在单节点上面,并且是一个内聚的python
项目的话,Queue
是不二选择。
转载自:https://juejin.cn/post/7240371866288209979