likes
comments
collection
share

【从1到∞精通Python】7、Pipe和Queue,进程间通信

作者站长头像
站长
· 阅读数 60

原文链接:【Hard Python】【第一章-多进程】2、Pipe和Queue,进程间通信

第一话详细讲解了Process新进程是如何被创建的,接下来就来讲一下进程之间有什么通信的方法。

要在multiprocessing中实现进程间通信,最直接的方法是采用Pipe或者Queue。其用法如下:

from multiprocessing import Process, Pipe, Queue
import time
from mp_module import log, seg


def _test_queue(q):
    while True:
        msg = q.get()
        if msg == 'quit':
            break
        else:
            log(f'recv: {msg}')
    log('child process end~')


def test_queue():
    seg('test queue start')
    q = Queue()
    p = Process(target=_test_queue, args=(q,))
    p.start()
    cmds = ['helloworld', 'testmsg', 'quit']
    for cmd in cmds:
        log(f'send: {cmd}')
        q.put(cmd)
        time.sleep(1)
    assert not p.is_alive()
    seg('test queue end')


def _test_pipe(r):
    while True:
        msg = r.recv()
        if msg == 'quit':
            break
        else:
            log(f'recv: {msg}')
    log('child process end~')


def test_pipe():
    seg('test pipe start')
    r, w = Pipe()
    p = Process(target=_test_pipe, args=(r,))
    p.start()
    cmds = ['helloworld', 'testmsg', 'quit']
    for cmd in cmds:
        log(f'send: {cmd}')
        w.send(cmd)
        time.sleep(1)
    assert not p.is_alive()
    seg('test pipe end')

形式上非常简单。Pipe创建了一对readerwriter,将reader传入子进程,主进程在writer写入数据,子进程即能通过reader读取到;Queue则更为方便,其实例能够直接传入子进程。主进程调用put即可写入数据,子进程调用get即可获取数据。

首先我们先看Pipe在windows下的实现:

    def Pipe(duplex=True):
        '''
        Returns pair of connection objects at either end of a pipe
        '''
        address = arbitrary_address('AF_PIPE')
        if duplex:
            openmode = _winapi.PIPE_ACCESS_DUPLEX
            access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
            obsize, ibsize = BUFSIZE, BUFSIZE
        else:
            openmode = _winapi.PIPE_ACCESS_INBOUND
            access = _winapi.GENERIC_WRITE
            obsize, ibsize = 0, BUFSIZE

        h1 = _winapi.CreateNamedPipe(
            address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
            _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
            _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
            _winapi.PIPE_WAIT,
            1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
            # default security descriptor: the handle cannot be inherited
            _winapi.NULL
            )
        h2 = _winapi.CreateFile(
            address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
            _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
            )
        _winapi.SetNamedPipeHandleState(
            h2, _winapi.PIPE_READMODE_MESSAGE, None, None
            )

        overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True)
        _, err = overlapped.GetOverlappedResult(True)
        assert err == 0

        c1 = PipeConnection(h1, writable=duplex)
        c2 = PipeConnection(h2, readable=duplex)

        return c1, c2

h1h2互为服务端/客户端的关系。h1通过CreateNamedPipe创建,之后h2通过CreateFile连接到h1NamedPipe上。之后用PipeConnection封装h1h2两端,返回readerwriter。当然,两个管道入口是否双工也是可选项。

经过PipeConnection的封装,管道即拥有了发送或接收python对象的方法。python对象的序列化/反序列化会用内置库pickle来支持。被pickle的对象会保留特定的信息,比如某个模块def的函数,在pickle时除了对函数本身进行序列化外,也会封存函数所属模块的信息。在unpickle时,如果找不到对应模块的信息,就会报错。因此多进程之间通信python对象时,需要留意序列化/反序列化后对应对象的取值/模块环境等情况。pickle的官方文档给到了我们足够的信息去了解这些机制。

接下来我们转向Queue的实现。相对于PipeQueue是对其的封装,并提供了更多的功能。这里我们完整列举一下Queue的关键代码::

class Queue(object):
    def __init__(self, maxsize=0, *, ctx):
        if maxsize <= 0:
            from .synchronize import SEM_VALUE_MAX as maxsize
        self._maxsize = maxsize
        self._reader, self._writer = connection.Pipe(duplex=False)
        self._rlock = ctx.Lock()
        self._opid = os.getpid()
        if sys.platform == 'win32':
            self._wlock = None
        else:
            self._wlock = ctx.Lock()
        self._sem = ctx.BoundedSemaphore(maxsize)
        self._ignore_epipe = False
        self._reset()
        if sys.platform != 'win32':
            register_after_fork(self, Queue._after_fork)

    def __getstate__(self):
        context.assert_spawning(self)
        return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
                self._rlock, self._wlock, self._sem, self._opid)

    def __setstate__(self, state):
        (self._ignore_epipe, self._maxsize, self._reader, self._writer,
         self._rlock, self._wlock, self._sem, self._opid) = state
        self._reset()

    def _reset(self, after_fork=False):
        if after_fork:
            self._notempty._at_fork_reinit()
        else:
            self._notempty = threading.Condition(threading.Lock())
        self._buffer = collections.deque()
        self._thread = None
        self._jointhread = None
        self._joincancelled = False
        self._closed = False
        self._close = None
        self._send_bytes = self._writer.send_bytes
        self._recv_bytes = self._reader.recv_bytes
        self._poll = self._reader.poll

    def put(self, obj, block=True, timeout=None):
        if self._closed:
            raise ValueError(f"Queue {self!r} is closed")
        if not self._sem.acquire(block, timeout):
            raise Full

        with self._notempty:
            if self._thread is None:
                self._start_thread()
            self._buffer.append(obj)
            self._notempty.notify()

    def get(self, block=True, timeout=None):
        if self._closed:
            raise ValueError(f"Queue {self!r} is closed")
        if block and timeout is None:
            with self._rlock:
                res = self._recv_bytes()
            self._sem.release()
        else:
            if block:
                deadline = time.monotonic() + timeout
            if not self._rlock.acquire(block, timeout):
                raise Empty
            try:
                if block:
                    timeout = deadline - time.monotonic()
                    if not self._poll(timeout):
                        raise Empty
                elif not self._poll():
                    raise Empty
                res = self._recv_bytes()
                self._sem.release()
            finally:
                self._rlock.release()
        return _ForkingPickler.loads(res)

    def close(self):
        self._closed = True
        try:
            self._reader.close()
        finally:
            close = self._close
            if close:
                self._close = None
                close()

    def _start_thread(self):
        self._buffer.clear()
        self._thread = threading.Thread(
            target=Queue._feed,
            args=(self._buffer, self._notempty, self._send_bytes,
                  self._wlock, self._writer.close, self._ignore_epipe,
                  self._on_queue_feeder_error, self._sem),
            name='QueueFeederThread'
        )
        self._thread.daemon = True
        self._thread.start()
        if not self._joincancelled:
            self._jointhread = Finalize(
                self._thread, Queue._finalize_join,
                [weakref.ref(self._thread)],
                exitpriority=-5
                )
        self._close = Finalize(
            self, Queue._finalize_close,
            [self._buffer, self._notempty],
            exitpriority=10
            )

    @staticmethod
    def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
              onerror, queue_sem):
        debug('starting thread to feed data to pipe')
        nacquire = notempty.acquire
        nrelease = notempty.release
        nwait = notempty.wait
        bpopleft = buffer.popleft
        sentinel = _sentinel
        if sys.platform != 'win32':
            wacquire = writelock.acquire
            wrelease = writelock.release
        else:
            wacquire = None

        while 1:
            try:
                nacquire()
                try:
                    if not buffer:
                        nwait()
                finally:
                    nrelease()
                try:
                    while 1:
                        obj = bpopleft()
                        if obj is sentinel:
                            debug('feeder thread got sentinel -- exiting')
                            close()
                            return
                        obj = _ForkingPickler.dumps(obj)
                        if wacquire is None:
                            send_bytes(obj)
                        else:
                            wacquire()
                            try:
                                send_bytes(obj)
                            finally:
                                wrelease()
                except IndexError:
                    pass
            except Exception as e:
                if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
                    return
                if is_exiting():
                    info('error in queue thread: %s', e)
                    return
                else:
                    queue_sem.release()
                    onerror(e, obj)

Queue__init__函数中,构造了这么些对象:

  • self._maxsizeQueue队列的最大长度
  • self._reader, self._writerpipe的两端
  • self._rlockself._reader的读锁
  • self._wlockself._writer的写锁
  • self._sem:以self._maxsize为基准的信号量,用来记录队列填满情况
  • self._notempty:队列非空的条件变量。如果队列为空,需要wait
  • self._buffer:存储python对象的队列deque
  • self._thread:消费线程,用于消费self._buffer中的内容并发送到pipe
  • self._jointhread:关闭消费线程的finalizer
  • self._close:关闭pipefinalizer

其中还要留意一点是,Queue实例本身是要传给Process实例,并在另一个进程被反序列化一次。因此为了保证序列化/反序列化之后部分状态得到保留(比如pipe),Queue的类定义中采用了__getstate____setstate__两个钩子去实现实例内部状态的存储与读取。这个特性在pickle的官方文档内有详细的说明。

大致了解了这些对象的含义后,接下来,就详细把Queue的工作流程列一下:

  • 用户调用put,信号量self._sem增加一个占位,之后发现消费线程未启动,通过self._start_thread启动消费线程Queue._feed
  • 消费线程进入循环,发现队列self._buffer为空,条件变量self._notempty进入wait
  • self._start_thread之后,将python对象推入self._buffer,并notify条件变量self._notempty
    • 这一步很有概率发生在上一步之前,不过无所谓了
  • _feed退出wait状态,poppython对象,然后将其pickle,最后调用self._writer._send_bytes发送序列化之后的数据到pipe
    • 这里注意,如果python对象是object(),会触发self._writer.close。因此实际业务代码中最好不要出现发送object()的情况
  • 用户调用get,通过self._reader读取pipe中的数据,并且让信号量self._sem释放一个占位。之后对数据进行反序列化,得到发送过来的对象
  • 用户调用close,首先关闭self._reader,然后在self._writer中发送一个object()_feed会一直消费队列,直到检测到最后的object(),终于触发self._writer的关闭。这样pipe的两端就都关闭,并且buffer里也没有任何其它数据了。
  • 用户调用join_thread,触发self._thread.join(),关闭消费线程

multiprocessing当中,Queue还有两种变体,分别为SimpleQueueJoinableQueueSimpleQueue没有提供blockingtimeout的功能,只是简单创建一对pipe交换序列化的数据。JoinableQueue则是在Queue的基础上增加了join的功能,其实现上是增加了一个初始值0的信号量_unfinished_tasks以及一个条件变量_condJoinableQueue在调用join时,如果_unfinished_tasks信号量不为0会进入_cond.wait,这是因为每次put的时候_unfinished_tasks信号量会release一次,只有用户每次get之后显式调用JoinableQueue.task_done才能acquire一次信号量,最终使得_unfinished_tasks信号量归零并notify_all所有join的调用。

最后,进程间通信的方法说到底,除了PipeQueue外,采用Manager共享内存或者直接用socket网络通信都是ok的方式。当然,如果是在单节点上面,并且是一个内聚的python项目的话,Queue是不二选择。

转载自:https://juejin.cn/post/7240371866288209979
评论
请登录