python多进程共享一个可操作的变量, 如何保证原子操作?
python多进程共享一个可操作的变量, 如何保证原子操作?
我的需求目前有多组数据需要执行计算型任务, 每执行完一组数据就要给java通知并传递生成的结果文件, 并且我要在把所有任务都执行完的时候(也就是最后一组数据执行完的时候)告诉java本轮次数据全部执行完毕, java那边就要去整体做数据库入库的操作。
我的设想多进程之间维护一个整型数值, 在执行完一组数据的时候自增1, 并在通知java的方法中去比较这个整型数值和总任务数量, 相等就代表全部完毕。(这中间不考虑计算任务执行失败的情况)
面临的情况我通过 multiprocessing
模块中的 Manager
去声明了一个整型变量, 然后给每个进程传递了过去, 在执行自增的时候, 出现了多个进程读取到同一个值的情况(详细可以看下面的输出, 或者自行执行下面的demo), 无法满足数值的读写原子, 我尝试了加锁, 但是没有生效。
这是我抽象出来的demo
from concurrent.futures import ProcessPoolExecutor
import ctypes
from multiprocessing import Manager, Lock
from multiprocessing.managers import ValueProxy
import os
m = Manager().Value(ctypes.c_int, 0)
def calc_number(x: int, y: int, _m: "ValueProxy", total_tasks: int):
"""模拟耗时任务函数"""
# 模拟耗时计算
res = x**y
# with Lock(): 加锁也不管用...
# 多进程共享变量, 用于比较总任务数量
with Lock():
_m.value += 1
# 当总任务数量和_m.value相等的时候, 通知第三方任务全部做完了
if _m.value == total_tasks:
print(True)
print(f"m_value: {_m.value}, p_id: {os.getpid()}, res: {res}")
def main():
# 以下假设有8组任务
t1 = (100, 200, 300, 400, 500, 600, 700, 800)
t2 = (80, 70, 60, 50, 40, 30, 20, 10)
len_t = len(t1)
# 多进程执行cpu耗时性任务
with ProcessPoolExecutor(max_workers=len_t) as executor:
{executor.submit(calc_number, x, y, m, len_t) for x, y in zip(t1, t2)}
if __name__ == "__main__":
main()
这是我目前的demo,从我的业务代码中抽象出来的。
这是代码的输出打印(很明显是错误的):
m_value: 2, p_id: 14873, res: 118059162071741130342400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 2, p_id: 14877, res: 12676506002282294014967032053760000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 3, p_id: 14875, res: 42391158275216203514294433201000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 3, p_id: 14872, res: 10000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 4, p_id: 14883, res: 797922662976120010000000000000000000000000000000000000000
m_value: 5, p_id: 14879, res: 909494701772928237915039062500000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 5, p_id: 14881, res: 221073919720733357899776000000000000000000000000000000000000000000000000000000000000
m_value: 6, p_id: 14885, res: 107374182400000000000000000000
正确的输出应该 m_value
是 1,2,3,4,5,6,7,8
以此打印出来的。
希望对此颇有研究的大佬指点迷津, 或者给出其他可行方案。最好贴出代码, 不胜感激。
问题已解决: 锁多次创建没有保证是同一把锁是主因
回复
1个回答
test
2024-06-24
from concurrent.futures import ProcessPoolExecutor
import ctypes
from multiprocessing import Manager, Lock
import os
# 创建 Manager 和 Lock
manager = Manager()
m = manager.Value(ctypes.c_int, 0)
lock = manager.Lock()
def calc_number(x: int, y: int, _m, total_tasks: int, _lock):
"""模拟耗时任务函数"""
# 模拟耗时计算
res = x ** y
# 用锁来保证原子操作
with _lock:
_m.value += 1
current_value = _m.value
# 当总任务数量和_m.value相等的时候, 通知第三方任务全部做完了
if current_value == total_tasks:
print(True)
print(f"m_value: {current_value}, p_id: {os.getpid()}, res: {res}")
def main():
# 任务参数
t1 = (100, 200, 300, 400, 500, 600, 700, 800)
t2 = (80, 70, 60, 50, 40, 30, 20, 10)
len_t = len(t1)
# 多进程执行任务
with ProcessPoolExecutor(max_workers=len_t) as executor:
for x, y in zip(t1, t2):
executor.submit(calc_number, x, y, m, len_t, lock)
if __name__ == "__main__":
main()
回复
适合作为回答的
- 经过验证的有效解决办法
- 自己的经验指引,对解决问题有帮助
- 遵循 Markdown 语法排版,代码语义正确
不该作为回答的
- 询问内容细节或回复楼层
- 与题目无关的内容
- “赞”“顶”“同问”“看手册”“解决了没”等毫无意义的内容