likes
comments
collection
share

Aliyun SDK 使用Python同步原语Semaphore信号量限制协程并发请求数

作者站长头像
站长
· 阅读数 4

事件原因

这个问题是我最近在使用aliyun SDK遇到的问题。因为要对云上资源sls日志服务做采集。在采集logstores时大概有800多个。为了加快拉取速度我使用了async来请求。在aliyun SDK提供了async请求。于是便有了下面的代码🤔

# 初始化request
list_log_stores_request = sls_20201230_models.ListLogStoresRequest()
runtime = util_models.RuntimeOptions()
headers = {}
# 获取project
projects = SlsUtil.get_sls_project_name_list()

try:
    # 并发获取 logstore详情
    tasks = [sls.client.sls.client.get_log_store_with_options_async(project, logstore, headers, runtime) for project in projects]
    results = await asyncio.gather(
            *tasks,
            return_exceptions=True
        )
    logstore_desc = [result.body.to_map() for index, result in enumerate(results)]

except Exception as error:
    log.error(error)

异常报错

没错我直接把所有的 coroutine 放到一个list中然后使用asyncio.gather并发请求。😂然后就不出意外的报错了。

Traceback (most recent call last):
  File "C:\Python310\lib\multiprocessing\process.py", line 314, in _bootstrap
    self.run()
  File "C:\Python310\lib\multiprocessing\process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Logn\Documents\python-project\Sre-CMDB\venv\lib\site-packages\uvicorn\_subprocess.py", line 76, in subprocess_started
    target(sockets=sockets)
  File "C:\Users\Logn\Documents\python-project\Sre-CMDB\venv\lib\site-packages\uvicorn\server.py", line 61, in run
    return asyncio.run(self.serve(sockets=sockets))
  File "C:\Python310\lib\asyncio\runners.py", line 47, in run
    _cancel_all_tasks(loop)
  File "C:\Python310\lib\asyncio\runners.py", line 63, in _cancel_all_tasks
    loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
  File "C:\Python310\lib\asyncio\base_events.py", line 636, in run_until_complete
    self.run_forever()
  File "C:\Python310\lib\asyncio\base_events.py", line 603, in run_forever
    self._run_once()
  File "C:\Python310\lib\asyncio\base_events.py", line 1863, in _run_once
    event_list = self._selector.select(timeout)
  File "C:\Python310\lib\selectors.py", line 324, in select
    r, w, _ = self._select(self._readers, self._writers, [], timeout)
  File "C:\Python310\lib\selectors.py", line 315, in _select
    r, w, x = select.select(r, w, w, timeout)
ValueError: too many file descriptors in select()

问题解决

作为一名运维工程师虽然平时用的都是Linux 但是在Windows上报这个错误还是下意识的大概明白应该是并发请求数过多导致文件描述符超额。🤔下面是GPT给出的解释 Windows最大文件打开数目限制在509。而我们的请求数在800+ 于是就出现了上述异常。 Aliyun SDK 使用Python同步原语Semaphore信号量限制协程并发请求数 为了解决这个问题我就去找了我工位对面的python大佬 他和我说使用redis setex可以解决这问题。我在想我就拉去个数据用redis这种高大上太浪费了(重点是我也不会)🐶。于是乎我想起在写go的时候用到了sync.semaphore基于信号量来显示go协程的并发请求数。查了一下python官方文档确实是存在semaphore这个原语。

Aliyun SDK 使用Python同步原语Semaphore信号量限制协程并发请求数 便有了下面的代码。

...
#初始化一个 信号量实例
sem = asyncio.Semaphore(30)

try:
    # 并发获取 logstore详情
    async with sem:
        results = await asyncio.gather(
            *tasks,
            return_exceptions=True
        )
...

执行还是报错。🤔眉头一皱看来问题没有那么简单。经过一番折腾,通过修改代码如下。正常运行并未报错可以正常拉取。😋

#定义一个信号量函数
async def worker_with_semaphore(semaphore, tasks: Any):
    async with semaphore:
        return await tasks

 # 初始化一个信号量实例
 sem = asyncio.Semaphore(20)
 
try:
    # 并发获取 logstore详情
    tasks = [worker_with_semaphore(sem,sls.client.sls.client.get_log_store_with_options_async(project, logstore, headers, runtime)) for project in projects]
    async with sem:
        results = await asyncio.gather(
                *tasks,
                return_exceptions=True
            )
    logstore_desc = [result.body.to_map() for index, result in enumerate(results)]

except Exception as error:
    log.error(error)

问题分析

当然事情到这里并没有结束,我们为啥需要创建这么一个 worker_with_semaphore 函数。协程这块参考了b站码农高天关于async的视频。 这是他视频的链接

【python】asyncio的理解与入门,搞不明白协程?看这个视频就够了。_哔哩哔哩_bilibili

首先我们分析asyncio.gather 以下是python中文文档的描述。 协程与任务 — Python 3.10.13 文档

Aliyun SDK 使用Python同步原语Semaphore信号量限制协程并发请求数

在aliyun SDK中返回的是一个 coroutine对象 通过列表解析式生成一个list gather会把该list遍历转化为一个Future对象予以执行。

Aliyun SDK 使用Python同步原语Semaphore信号量限制协程并发请求数

查看 gather函数源码 遍历协程列表获取传入的 coros_or_futures 并通过装饰器封装为一个 coro_or_future并执行。 Aliyun SDK 使用Python同步原语Semaphore信号量限制协程并发请求数

Aliyun SDK 使用Python同步原语Semaphore信号量限制协程并发请求数

因此使用如下代码测试。

async with sem:
    log.info(sem)
    log.info(sem.locked())
    results = await asyncio.gather(
        *tasks,
        return_exceptions=True
    )
    log.info("exec gather")
    log.info(sem)
    log.info(sem.locked())

日志打印如下 信号量并只有在初次执行gather函数时被使用了一次。

2024-01-07 15:58:05.729 | INFO     | cmdb.aliyun.sls.sync:sync_sls_log:140 - <asyncio.locks.Semaphore object at 0x000001A0825F78B0 [unlocked, value:29]>
2024-01-07 15:58:05.729 | INFO     | cmdb.aliyun.sls.sync:sync_sls_log:141 - False
2024-01-07 15:58:07.189 | INFO     | cmdb.aliyun.sls.sync:sync_sls_log:146 - exec gather
2024-01-07 15:58:07.189 | INFO     | cmdb.aliyun.sls.sync:sync_sls_log:147 - <asyncio.locks.Semaphore object at 0x000001A0825F78B0 [unlocked, value:29]>
2024-01-07 15:58:07.189 | INFO     | cmdb.aliyun.sls.sync:sync_sls_log:148 - False

对应信号量的使用我们应该在每个 coroutine 通过并发竞争来获取计数器拿到计数器就执行 否则就同步阻塞。以下是python对信号量的描述。

Aliyun SDK 使用Python同步原语Semaphore信号量限制协程并发请求数 因为我们通过一个 worker_with_semaphore 包装一下 coroutine 在执行时使用上下文管理来获取信号 执行完了再释放。修改一下 worker_with_semaphore如下打印一下在执行时信号量情况。

async def worker_with_semaphore(semaphore, tasks: Any):
    async with semaphore:
        log.info(semaphore.locked())
        log.info(semaphore)
        return await tasks

可以看到在执行一段时间后 开始出现locked等待信号的情况说明限制开始生效了。

2024-01-07 16:17:29.444 | INFO     | cmdb.aliyun.sls.sls_util:worker_with_semaphore:27 - <asyncio.locks.Semaphore object at 0x000002079FDB77C0 [unlocked, value:4]>
2024-01-07 16:17:29.452 | INFO     | cmdb.aliyun.sls.sls_util:worker_with_semaphore:26 - False
2024-01-07 16:17:29.453 | INFO     | cmdb.aliyun.sls.sls_util:worker_with_semaphore:27 - <asyncio.locks.Semaphore object at 0x000002079FDB77C0 [unlocked, value:3]>
2024-01-07 16:17:29.462 | INFO     | cmdb.aliyun.sls.sls_util:worker_with_semaphore:26 - False
2024-01-07 16:17:29.462 | INFO     | cmdb.aliyun.sls.sls_util:worker_with_semaphore:27 - <asyncio.locks.Semaphore object at 0x000002079FDB77C0 [unlocked, value:2]>
2024-01-07 16:17:29.474 | INFO     | cmdb.aliyun.sls.sls_util:worker_with_semaphore:26 - False
2024-01-07 16:17:29.474 | INFO     | cmdb.aliyun.sls.sls_util:worker_with_semaphore:27 - <asyncio.locks.Semaphore object at 0x000002079FDB77C0 [unlocked, value:1]>
2024-01-07 16:17:29.483 | INFO     | cmdb.aliyun.sls.sls_util:worker_with_semaphore:26 - True
2024-01-07 16:17:29.484 | INFO     | cmdb.aliyun.sls.sls_util:worker_with_semaphore:27 - <asyncio.locks.Semaphore object at 0x000002079FDB77C0 [locked]>
2024-01-07 16:17:29.602 | INFO     | cmdb.aliyun.sls.sls_util:worker_with_semaphore:26 - True
2024-01-07 16:17:29.602 | INFO     | cmdb.aliyun.sls.sls_util:worker_with_semaphore:27 - <asyncio.locks.Semaphore object at 0x000002079FDB77C0 [locked, waiters:69]>
2024-01-07 16:17:29.611 | INFO     | cmdb.aliyun.sls.sls_util:worker_with_semaphore:26 - True
2024-01-07 16:17:29.611 | INFO     | cmdb.aliyun.sls.sls_util:worker_with_semaphore:27 - <asyncio.locks.Semaphore object at 0x000002079FDB77C0 [locked, waiters:68]>
2024-01-07 16:17:30.326 | INFO     | cmdb.aliyun.sls.sls_util:worker_with_semaphore:26 - True
2024-01-07 16:17:30.326 | INFO     | cmdb.aliyun.sls.sls_util:worker_with_semaphore:27 - <asyncio.locks.Semaphore object at 0x000002079FDB77C0 [locked, waiters:67]>
2024-01-07 16:17:30.335 | INFO     | cmdb.aliyun.sls.sls_util:worker_with_semaphore:26 - True

以上就是对python 同步原语Semaphore的使用和原理分析。因为自己也只是一个python萌新有错误的话也请指出。😊