python multiprocess pipe 报错“管道已关闭”?

作者站长头像
站长
· 阅读数 21

问题描述

通过python mulitproccess 的 Pipe 方法,建立父子进程通信。发现一开始运行程序就会报"管道已关闭"错误

问题出现的环境背景及自己尝试过哪些方法

大概知道跟管道通信和进程join方法的调用有关系,尝试过只要 single.py里追加server.stop() 就不会报错

相关代码

service.py

import os
from multiprocessing import Process, Pipe


def start_child_process(child_conn):
    # here is a task will work for a long time
    # start_http_server_run_for_long_time()
    # now tell you which port listened
    child_conn.send({"port": 123456, "ret": 1, "pid": os.getpid()})
    # exit child process only after receive main process signal/message
    signal = child_conn.recv()
    if signal:
        child_conn.close()


class Server:
    def __init__(self):
        self.child_conn = None
        self.child = None
        self.parent_conn, self.child_conn = Pipe()

    def run(self):
        self.child = Process(target=start_child_process, name="my_child_process", args=(self.child_conn,))
        self.child.start()

        data = self.parent_conn.recv()
        result = {
            "endpoints": {
                "http": f"http://127.0.0.1:{data['port']}/cmd",
                "ws": f"ws://127.0.0.1:{data['port']}/api",
            }
        }
        return result

    """ stop方法是个函数接口,外部函数会调用来终止子进程 """
    def stop(self):
        self.parent_conn.send(True)
        self.child.join()
        self.child = None


if __name__ == "__main__":
    server = Server()
    r = server.run()
    print("r:", r)

single.py

from service import Server
import time


def main():
    server = Server()
    result = server.run()
    print("r:", result)
    # time.sleep(5)
    # server.stop() # 只要打开这行代码的注释,问题就会消失


if __name__ == "__main__":
    main()

运行python single.py就会报错python multiprocess pipe 报错“管道已关闭”?

但是只要将 single.py 里 time.sleep(5) 和 server.stop() 这两行注释打开,就会运行正常。

你期待的结果是什么?实际看到的错误信息又是什么?

希望子进程长期存活(因为子进程内会可能会启动一个web server),除非接收到主进程的退出通知。如显式调用server.stop()则退出子进程

回复
1个回答
avatar
test
2024-07-02

仔细想了下,对比了下官方文档answer image answer image大概理解了报错的原因点:因为子进程在在执行到 signal = child_conn.recv() 会进入到进程阻塞状态,等待主进程发送信息,然而,直到主进程栈空退出,对端关闭连接,子进程依然没有收到退出通知,自动触发了一次通道内数据的全量读取,然而此刻通道已经关闭,所以报出了最初的 "通道已关闭错误"

那么解决办法也好办了,只要对 signal = child_conn.recv() 加一个error catch, 无视错误就好,因为确实有可能主进程不会通知子进程退出就退出主进程自己的进程,此种情况也算合理。不知道有没有更科学的实现方法。修改好代码如下:

service.py

import os
from multiprocessing import Process, Pipe


def start_child_process(child_conn):
    # here is a task will work for a long time
    # run_server_for_long_time()
    child_conn.send({"port": 123, "ret": 1, "pid": os.getpid()})
    # exit child process only after receive main process signal/message

    # catch IOError when main process automatically exit
    try:
        signal = child_conn.recv()
        if signal:
            child_conn.close()
    except EOFError as err:
        print(err)


class Server:
    def __init__(self):
        self.child_conn = None
        self.child = None
        self.parent_conn, self.child_conn = Pipe()

    def run(self):
        self.child = Process(target=start_child_process, name="my_child_process", args=(self.child_conn,))
        self.child.start()

        data = self.parent_conn.recv()
        result = {
            "endpoints": {
                "http": f"http://127.0.0.1:{data['port']}/cmd",
                "ws": f"ws://127.0.0.1:{data['port']}/api",
            }
        }
        return result

    def stop(self):
        """ call it only when necessary """
        self.parent_conn.send(True)
        self.child.join()
        self.child = None


if __name__ == "__main__":
    server = Server()
    r = server.run()
    print("r:", r)

single.py

from service import Server
import time


def main():
    server = Server()
    result = server.run()
    print("r:", result)
    time.sleep(5)
    # server.stop() # 只要打开这行代码的注释,问题就会消失


if __name__ == "__main__":
    main()
回复
likes
适合作为回答的
  • 经过验证的有效解决办法
  • 自己的经验指引,对解决问题有帮助
  • 遵循 Markdown 语法排版,代码语义正确
不该作为回答的
  • 询问内容细节或回复楼层
  • 与题目无关的内容
  • “赞”“顶”“同问”“看手册”“解决了没”等毫无意义的内容