python多进程共享一个可操作的变量, 如何保证原子操作?

作者站长头像
站长
· 阅读数 5

python多进程共享一个可操作的变量, 如何保证原子操作?

我的需求目前有多组数据需要执行计算型任务, 每执行完一组数据就要给java通知并传递生成的结果文件, 并且我要在把所有任务都执行完的时候(也就是最后一组数据执行完的时候)告诉java本轮次数据全部执行完毕, java那边就要去整体做数据库入库的操作。

我的设想多进程之间维护一个整型数值, 在执行完一组数据的时候自增1, 并在通知java的方法中去比较这个整型数值和总任务数量, 相等就代表全部完毕。(这中间不考虑计算任务执行失败的情况)

面临的情况我通过 multiprocessing 模块中的 Manager 去声明了一个整型变量, 然后给每个进程传递了过去, 在执行自增的时候, 出现了多个进程读取到同一个值的情况(详细可以看下面的输出, 或者自行执行下面的demo), 无法满足数值的读写原子, 我尝试了加锁, 但是没有生效。

这是我抽象出来的demo

from concurrent.futures import ProcessPoolExecutor
import ctypes
from multiprocessing import Manager, Lock
from multiprocessing.managers import ValueProxy
import os


m = Manager().Value(ctypes.c_int, 0)


def calc_number(x: int, y: int, _m: "ValueProxy", total_tasks: int):
    """模拟耗时任务函数"""

    # 模拟耗时计算
    res = x**y

    # with Lock(): 加锁也不管用...
    # 多进程共享变量, 用于比较总任务数量
    with Lock():
        _m.value += 1

    # 当总任务数量和_m.value相等的时候, 通知第三方任务全部做完了
    if _m.value == total_tasks:
        print(True)

    print(f"m_value: {_m.value}, p_id: {os.getpid()}, res: {res}")


def main():
    # 以下假设有8组任务
    t1 = (100, 200, 300, 400, 500, 600, 700, 800)
    t2 = (80, 70, 60, 50, 40, 30, 20, 10)

    len_t = len(t1)

    # 多进程执行cpu耗时性任务
    with ProcessPoolExecutor(max_workers=len_t) as executor:
        {executor.submit(calc_number, x, y, m, len_t) for x, y in zip(t1, t2)}


if __name__ == "__main__":
    main()

这是我目前的demo,从我的业务代码中抽象出来的。

这是代码的输出打印(很明显是错误的):

m_value: 2, p_id: 14873, res: 118059162071741130342400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 2, p_id: 14877, res: 12676506002282294014967032053760000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 3, p_id: 14875, res: 42391158275216203514294433201000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 3, p_id: 14872, res: 10000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 4, p_id: 14883, res: 797922662976120010000000000000000000000000000000000000000
m_value: 5, p_id: 14879, res: 909494701772928237915039062500000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 5, p_id: 14881, res: 221073919720733357899776000000000000000000000000000000000000000000000000000000000000
m_value: 6, p_id: 14885, res: 107374182400000000000000000000

正确的输出应该 m_value1,2,3,4,5,6,7,8 以此打印出来的。

希望对此颇有研究的大佬指点迷津, 或者给出其他可行方案。最好贴出代码, 不胜感激。


问题已解决: 锁多次创建没有保证是同一把锁是主因

回复
1个回答
avatar
test
2024-06-24
from concurrent.futures import ProcessPoolExecutor
import ctypes
from multiprocessing import Manager, Lock
import os

# 创建 Manager 和 Lock
manager = Manager()
m = manager.Value(ctypes.c_int, 0)
lock = manager.Lock()

def calc_number(x: int, y: int, _m, total_tasks: int, _lock):
    """模拟耗时任务函数"""
    # 模拟耗时计算
    res = x ** y

    # 用锁来保证原子操作
    with _lock:
        _m.value += 1
        current_value = _m.value

    # 当总任务数量和_m.value相等的时候, 通知第三方任务全部做完了
    if current_value == total_tasks:
        print(True)

    print(f"m_value: {current_value}, p_id: {os.getpid()}, res: {res}")

def main():
    # 任务参数
    t1 = (100, 200, 300, 400, 500, 600, 700, 800)
    t2 = (80, 70, 60, 50, 40, 30, 20, 10)

    len_t = len(t1)

    # 多进程执行任务
    with ProcessPoolExecutor(max_workers=len_t) as executor:
        for x, y in zip(t1, t2):
            executor.submit(calc_number, x, y, m, len_t, lock)

if __name__ == "__main__":
    main()
回复
likes
适合作为回答的
  • 经过验证的有效解决办法
  • 自己的经验指引,对解决问题有帮助
  • 遵循 Markdown 语法排版,代码语义正确
不该作为回答的
  • 询问内容细节或回复楼层
  • 与题目无关的内容
  • “赞”“顶”“同问”“看手册”“解决了没”等毫无意义的内容