likes
comments
collection
share

http异步请求(1)---工具封装

作者站长头像
站长
· 阅读数 4

一、需求引入

在开发某个性能测试工具过程中,需要采集后端的数据变更到前台刷新的时间。

由此需要在一个时间段内循环请求后端获取最新的数据,以校验数据是否已刷新为变更后的值,并记录下时间。 在循环请求中,我开始使用的是requests库同步请求接口,虽然也能用,但可能请求本身可能还是比较耗时,增加了采集的数据刷新时间的误差,所以我决定使用异步请求实现一下同样的功能。

二、功能分析

需要实现一个支持保持长连接与每次请求时重新建立连接两种方式,支持所有http请求方法的工具类。 根据实际需求,通过设置参数来选择是否启用长连接。

三、技术方案

基于python的aiohttp

四、工具实现

  1. 导入所需库 确保已安装 aiohttp 库。如果没有安装,通过 pip install aiohttp 进行安装。在代码中导入所需模块:
import aiohttp
from typing import Optional, Dict, Union, List
from aiohttp.client_exceptions import ClientError
from loguru import logger
  1. 定义配置类 创建一个用于存储全局配置的类,如超时时间、重试次数、是否开启长连接等
class HttpClientConfig:
   def __init__(self,
                    timeout: float = 30.0,
                    max_retries: int = 3,
                    use_cookies: bool = False,
                    keep_alive: bool = True):
       self.timeout = timeout
       self.max_retries = max_retries
       self.use_cookies = use_cookies
       self.keep_alive = keep_alive

       # 可以添加其他必要的配置项

  1. 创建请求工具类 定义一个名为 AsyncHttpClient 的异步请求工具类,包含构造函数、基本请求方法(如 get、post、patch)以及通用请求方法 _request。根据配置决定是否启用长连接
class AsyncHttpClient:
    def __init__(self, config: HttpClientConfig):
        self.config = config
        self.session = aiohttp.ClientSession(
            connector=aiohttp.TCPConnector(keepalive_timeout=config.timeout) if config.keep_alive else None,
            timeout=aiohttp.ClientTimeout(total=config.timeout),
            cookie_jar=aiohttp.CookieJar() if self.config.use_cookies else None,
        )

    async def __aenter__(self):
        return self

    # async def __aenter__(self):
    #     if not self.session or not self.config.keep_alive:
    #         self.session = aiohttp.ClientSession(
    #             timeout=aiohttp.ClientTimeout(total=self.config.timeout),
    #             cookie_jar=aiohttp.CookieJar() if self.config.use_cookies else None,
    #             connector=aiohttp.TCPConnector(keepalive_timeout=60) if self.config.keep_alive else None)
    #     return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session and not self.config.keep_alive:
            await self.session.close()

    async def _request(self, method: str, url: str, *, params: Optional[Dict[str, str]] = None,
                       data: Optional[Union[Dict[str, str], List[Dict[str, str]], bytes]] = None,
                       json: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None,
                       retry_count: int = 0) -> Optional[aiohttp.ClientResponse]:
        request_info = {
            "method": method,
            "url": url,
            "params": params,
            "data": data,
            "json": json,
            "headers": headers,
        }
        logger.debug(f"Sending HTTP request: {request_info}")

        try:
            async with self.session.request(method, url, params=params, data=data, json=json, headers=headers) as resp:
                response_info = {
                    "status": resp.status,
                    "reason": resp.reason,
                    "headers": dict(resp.headers),
                }
                logger.debug(f"Received HTTP response: {response_info}")
                if resp.status < 400:
                    return resp
                else:
                    raise ClientError(f"HTTP error {resp.status}: {await resp.text()}")
        except ClientError as e:
            if retry_count < self.config.max_retries:
                await asyncio.sleep(1 << retry_count)  # Exponential backoff
                return await self._request(method, url, params=params, data=data, json=json, headers=headers,
                                           retry_count=retry_count + 1)
            else:
                raise e

    async def get(self, url: str, params: Optional[Dict[str, str]] = None,
                  headers: Optional[Dict[str, str]] = None) -> Optional[aiohttp.ClientResponse]:
        return await self._request("GET", url, params=params, headers=headers)

    async def post(self, url: str, data: Optional[Union[Dict[str, Any], List[Dict[str, Any]], bytes]],
                   json: Optional[Dict[str, Any]] = None,
                   headers: Optional[Dict[str, str]] = None) -> Optional[aiohttp.ClientResponse]:
        return await self._request("POST", url, data=data, json=json, headers=headers)

    async def patch(self, url: str, data: Optional[Union[Dict[str, Any], List[Dict[str, Any]], bytes]],
                    json: Optional[Dict[str, Any]] = None,
                    headers: Optional[Dict[str, str]] = None) -> Optional[aiohttp.ClientResponse]:
        return await self._request("PATCH", url, data=data, json=json, headers=headers)

    async def put(self, url: str, data: Optional[Union[Dict[str, Any], List[Dict[str, Any]], bytes]],
                  json: Optional[Dict[str, Any]] = None,
                  headers: Optional[Dict[str, str]] = None) -> Optional[aiohttp.ClientResponse]:
        return await self._request("PUT", url, data=data, json=json, headers=headers)

    async def delete(self, url: str, params: Optional[Dict[str, str]] = None,
                     headers: Optional[Dict[str, str]] = None) -> Optional[aiohttp.ClientResponse]:
        return await self._request("DELETE", url, params=params, headers=headers)

    async def options(self, url: str, params: Optional[Dict[str, str]] = None,
                      headers: Optional[Dict[str, str]] = None) -> Optional[aiohttp.ClientResponse]:
        return await self._request("OPTIONS", url, params=params, headers=headers)

    async def head(self, url: str, params: Optional[Dict[str, str]] = None,
                   headers: Optional[Dict[str, str]] = None) -> Optional[aiohttp.ClientResponse]:
        return await self._request("HEAD", url, params=params, headers=headers)

    async def close(self):
        if self.session:
            await self.session.close()

五、使用示例与适用场景

1. 长连接(多次http请求复用tcp连接)

使用长连接示例:

import asyncio

async def main():
    # 配置并创建一个启用长连接的 HTTP 客户端
    config = HttpClientConfig(keep_alive=True)
    client = AsyncHttpClient(config)

    async with client:
        # 使用长连接发送多个请求
        for i in range(5):
            response = await client.get("https://example.com/api/data")
            assert response.status == 200
            print(await response.json())

asyncio.run(main())

在这个示例中,我们首先创建了一个启用长连接的 AsyncHttpClient 实例,并使用 async with 语句管理其上下文。在循环中,我们连续发送了五个 GET 请求到同一服务器。由于启用了长连接,这些请求将复用同一个 TCP 连接,从而减少网络开销和提高性能。

2. 短连接(每次http请求重新建立tcp连接)

每次重新创建连接示例:

import asyncio

async def main():
    # 配置并创建一个每次请求重新创建连接的 HTTP 客户端
    config = HttpClientConfig(keep_alive=False)
    client = AsyncHttpClient(config)

    for i in range(5):
        async with client:
            # 每次请求前都会重新创建连接
            response = await client.get("https://example.com/api/data")
            assert response.status == 200
            print(await response.json())

asyncio.run(main())

这个示例与上一个示例的主要区别在于,我们为 HttpClientConfig 设置了 keep_alive=False,表示每次请求时应重新创建连接。在循环中,每次执行 async with client: 语句时,都会创建一个新的 aiohttp.ClientSession,并在请求完成后关闭它。因此,尽管这五个 GET 请求的目标服务器相同,但它们会通过不同的 TCP 连接进行通信。

不使用with

不使用with的话,则在使用完毕后,需要手动关闭 ClientSession 以释放资源

async def perform_request(url: str, query_params: Dict[str, Any] = None) -> Dict[str, Any]:
    client = AsyncHttpClient()

    try:
        response = await client.get(url, query_params)
        if response.status != 200:
            raise aiohttp.ClientResponseError(response.status, await response.text())

        response_data = await response.json()
        return response_data
    except aiohttp.ClientResponseError as e:
        print(f"Request failed with status {e.status}: {e.message}")
        raise
    except Exception as e:
        print(f"Unexpected error occurred: {str(e)}")
        raise
    finally:
        await client.close()  # Make sure to close the client in the finally block

# 使用示例

长连接与短连接的适用场景

实际使用时应根据业务场景和性能需求选择合适的连接策略。 长连接通常适用于频繁向同一服务器发送请求的情况。 每次重新创建连接则适用于请求之间间隔较长或目标服务器分散的情况。

六、AsyncHttpClient与直接使用aiohttp

1. AsyncHttpClient(HttpClientConfig(keep_alive=True)).get与aiohttp.get

工具是封装完成了,但其实我一直以来都是直接使用aiohttp.get/post等方法,所以就想搞明白两者有啥区别。 以get请求为例,使用 aiohttp.get() 直接发起请求与通过新建 ClientSession 后再调用其 get() 方法有以下区别:

  1. 生命周期管理:
    • 直接使用 aiohttp.get(): 当您直接调用 aiohttp.get() 函数时,它会在内部自动创建一个新的 ClientSession 对象,执行完请求后立即关闭这个会话。这意味着每个独立的 get() 调用都会伴随着一次会话的创建和销毁。这种方式适合一次性、独立的请求,无需关心会话的复用和清理。
    • 新建 ClientSession 后调用 get(): 如果您先创建一个 ClientSession 实例,然后多次调用其 get() 方法,那么所有的请求都将共享同一个会话对象。您可以控制 ClientSession 的生命周期,例如使用 async with 语句来确保会话在使用完毕后被正确关闭。这种做法有利于复用连接,特别是当您需要向同一主机发送多个请求或者希望利用持久连接(Keep-Alive)特性时。
  2. 连接复用与 Keep-Alive:
    • 直接使用 aiohttp.get(): 每次单独的 get() 调用都会创建一个新的 TCP 连接。即使请求的目标主机相同,也不会复用之前的连接。这可能导致额外的网络延迟和资源消耗,特别是在高并发或频繁请求同一服务器的情况下。
    • 新建 ClientSession 后调用 get(): 同一 ClientSession 内的请求可以复用已有的 TCP 连接,充分利用 Keep-Alive 功能。这有助于减少建立新连接的开销,提高请求效率,尤其对于那些需要多次交互的服务接口更为有利。
  3. Cookie 管理与状态共享:
    • 直接使用 aiohttp.get(): 每个独立的 get() 调用不会共享 Cookie 或其他会话状态。如果您需要处理涉及登录状态、购物车等依赖于 Cookie 或 Session 数据的场景,直接使用 aiohttp.get() 可能会导致状态丢失或不一致。
    • 新建 ClientSession 后调用 get(): 通过同一个 ClientSession 发起的所有请求会共享相同的 Cookie jar,自动处理 Cookie 的发送和接收。这对于需要维持会话状态的交互特别有用,因为相关的状态信息将在所有请求之间保持一致。

总结来说,直接使用 aiohttp.get() 适用于单次、独立的请求,无需关注会话管理和连接复用。而新建 ClientSession 后调用 get() 更适合批量请求、需要保持会话状态或利用连接复用提升性能的场景。在实际应用中,根据具体需求选择合适的方式至关重要。

2. AsyncHttpClient(HttpClientConfig(keep_alive=False)).get与aiohttp.get

说到这儿,又引申出来另一个问题,都是基于短连接的AsyncHttpClient,传入的HttpClientConfig(keep_alive=False),再调用get方法,与直接调用aiohttp.get又有什么区别?

AsyncHttpClient 类在传入 HttpClientConfig(keep_alive=False) 时,其行为与直接调用 aiohttp.get() 在某些方面类似,但在另一些方面有所不同。下面详细对比两者的相似之处和差异: 相似之处:

  • 连接管理: 当 AsyncHttpClient 使用 HttpClientConfig(keep_alive=False) 时,每次请求都会创建一个新的 ClientSession,并在请求完成后关闭该会话。这一点与直接调用 aiohttp.get() 相似,即每个请求都伴随着一个独立的 ClientSession 的生命周期,不复用任何连接。
  • 资源清理: 两者都确保了请求完成后相关资源(如 TCP 连接、会话对象等)得到及时释放,避免资源泄漏。无论是 AsyncHttpClient 在每次请求后关闭 ClientSession,还是 aiohttp.get() 自动完成这一过程,都确保了请求的自我封闭性。 差异之处:
  • API 使用体验: 使用 AsyncHttpClient 类封装后,提供了统一的接口(如 get()、post() 等方法)供用户调用,隐藏了底层 aiohttp 库的细节,简化了代码结构,增强了代码可读性和维护性。相比之下,直接调用 aiohttp.get() 需要在代码中显式处理更多细节。
  • 错误处理与重试机制: AsyncHttpClient 示例中包含了错误处理逻辑(如重试机制)和配置选项(如超时时间),这些都是直接调用 aiohttp.get() 所不具备的。使用 AsyncHttpClient 可以更方便地定制和扩展请求行为,如添加重试策略、统一处理异常等。
  • 代码组织与复用: AsyncHttpClient 类作为一个独立的组件,易于在整个项目中复用,便于集中管理 HTTP 请求相关的配置和行为。而直接调用 aiohttp.get() 的代码可能散布在各个文件或函数中,不利于整体协调和优化。

综上所述,虽然在连接管理(不复用连接)这一点上,使用 AsyncHttpClient 传入 HttpClientConfig(keep_alive=False) 与直接调用 aiohttp.get() 行为相似,但前者在 API 使用体验、错误处理、代码组织与复用等方面具有优势。选择哪种方式取决于项目的具体需求和偏好。如果您看重代码的简洁性、一致性以及易于管理和扩展,推荐使用封装后的 AsyncHttpClient 类。