Python多进程报错【无法Pick】
报错详情:
Exception iterating responses: Can't pickle local object 'Servicer.Build.<locals>.run.<locals>.t'
调用堆栈
File "/home/rtx2080ti/code/xhspark/src/knowledge/servicer/views.py", line 337, in Build
run()
File "/home/rtx2080ti/code/xhspark/src/knowledge/servicer/views.py", line 317, in run
p.apply(func=t, args=['a',])
File "/home/rtx2080ti/anaconda3/envs/torch/lib/python3.11/multiprocessing/pool.py", line 360, in apply
return self.apply_async(func, args, kwds).get()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/rtx2080ti/anaconda3/envs/torch/lib/python3.11/multiprocessing/pool.py", line 774, in get
raise self._value
File "/home/rtx2080ti/anaconda3/envs/torch/lib/python3.11/multiprocessing/pool.py", line 540, in _handle_tasks
put(task)
File "/home/rtx2080ti/anaconda3/envs/torch/lib/python3.11/multiprocessing/connection.py", line 205, in send
self._send_bytes(_ForkingPickler.dumps(obj))
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/rtx2080ti/anaconda3/envs/torch/lib/python3.11/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'Servicer.Build.<locals>.run.<locals>.t'
代码复现
'''
Author: BeYoung
Date: 2024-03-08 14:05:20
LastEditTime: 2024-03-30 18:07:43
'''
import multiprocessing
import time
import asyncio
if __name__ == "__main__":
async def sleep2(nums):
await asyncio.sleep(2)
return nums
class A:
def main():
def callback(num):
q.put(num)
print(f"callback {num}")
def func(num):
asyncio.set_event_loop(asyncio.new_event_loop())
nums = list(range(num))
res = asyncio.run(sleep2(nums))
print(res)
return num
with multiprocessing.Pool() as pool:
for i in range(50):
pool.apply_async(func,args=[i],callback=callback)
pool.close()
pool.join()
q.put("OK")
q = multiprocessing.Queue()
p = multiprocessing.Process(target=A.main,args=[])
p.start()
try:
while p.is_alive():
r = q.get()
if r == "OK":
p.terminate()
time.sleep(1)
p.close()
print(f"main {r}")
except Exception as e:
print("ok")
原因
func函数为类方法中的局部变量,无法序列化。即 func 仅仅在方法执行期间有效,一旦方法执行完毕,这些局部变量就会被销毁,它们不再存在于内存中。
解决方式
将 func 移出 main 的闭包中
'''
Author: BeYoung
Date: 2024-03-08 14:05:20
LastEditTime: 2024-03-30 18:22:55
'''
import multiprocessing
import time
import asyncio
if __name__ == "__main__":
async def sleep2(nums):
await asyncio.sleep(2)
return nums
class A:
def func(num):
asyncio.set_event_loop(asyncio.new_event_loop())
nums = list(range(num))
res = asyncio.run(sleep2(nums))
print(res)
return num
def main():
def callback(num):
q.put(num)
print(f"callback {num}")
with multiprocessing.Pool() as pool:
for i in range(50):
pool.apply_async(A.func,args=[i],callback=callback)
pool.close()
pool.join()
q.put("OK")
q = multiprocessing.Queue()
p = multiprocessing.Process(target=A.main,args=[])
p.start()
try:
while p.is_alive():
r = q.get()
if r == "OK":
p.terminate()
time.sleep(1)
p.close()
print(f"main {r}")
except Exception as e:
print("ok")
转载自:https://juejin.cn/post/7351734240851247119