likes
comments
collection
share

FastApi(自用脚手架)+Snowy搭建后台管理系统(4)脚手架--loguru日志插件

作者站长头像
站长
· 阅读数 13

前言

关于日志是任何一个应用必备。而记录的日志,需要记录信息需要结合自身的需求来确定,有的应用需要记录的信息有:

  • 请求报文信息
  • 响应报文信息 一个请求完成下来,我们可能还有其他日志需要记录,比如:
  • 第三方API接口请求日志记录
  • 数据库相关操作的日志记录
  • 自定义操作日志记录

而要完成上面一个上下文请求中日志,特别是用于分散的日志记录情况下,我们需要依赖一个链路ID来串联起来,所以需要一个具备用链路ID请求上下文request。

如下日志插件可以完成类型。

【集中式】请求日志信息集中起来记录到一条记录中,如下日志格式所示:

INFO  |2023-02-02 12:48:57 |P:SpawnProcess-1 |T:15732:MainThread | - Application startup complete.
INFO  |2023-02-02 12:48:58 | ip:127.0.0.1 |P:SpawnProcess-1 |T:15732:MainThread | reqId:6NzRH5yfYaLmdVaggBVZqW | - [{"trace_index": 1, "event_name": "request", "msg": {"useragent": {"os": "Windows 10", "browser": "QQ Browser 11.5.5240", "device": {"family": "Other", "brand": null, "model": null}}, "url": "/auth/b/getPicCaptcha2/", "method": "GET", "params": {}, "ts": "2023-02-02 12:48:58"}, "cost_time": "0.00", "ts": "2023-02-02 12:48:58"},{"trace_index": 2, "event_name": "response", "msg": "{"msg":"\u5904\u7406\u6210\u529f","code":200,"data":{"validCodeBase64":"xxxxxxxx"}}", "cost_time": "0.03", "ts": "2023-02-02 12:48:58"}]

【分散式】日志,主要是把相关日志使用单独的一条记录进行记录,如下日式格式所示:

INFO  |2023-02-02 12:50:46 |P:SpawnProcess-1 |T:2648:MainThread | - Application startup complete.
INFO  |2023-02-02 12:50:47 | ip:127.0.0.1 |P:SpawnProcess-1 |T:2648:MainThread | reqId:hqterUhnCGXraywSiiWU79 index:1 | event:request | cost_time:0.00 | - {'useragent': {'os': 'Windows 10', 'browser': 'QQ Browser 11.5.5240', 'device': {'family': 'Other', 'brand': None, 'model': None}}, 'url': '/auth/b/getPicCaptcha2/', 'method': 'GET', 'params': {}, 'ts': '2023-02-02 12:50:47'}
INFO  |2023-02-02 12:50:47 | ip:127.0.0.1 |P:SpawnProcess-1 |T:2648:MainThread | reqId:hqterUhnCGXraywSiiWU79 index:2 | event:response | cost_time:0.03 | - {"msg":"处理成功","code":200,"data":{"validCodeBase64":"XXXXXX","validCodeReqNo":"QBLMwF5NT66fNzeLDZni8z"}}

如上日志格式所示,我们在记录日志时候需要实现自定义字段扩展有:

  • 日志等级(INFO)
  • 日式记录时间(2023-02-02 12:50:47)
  • 请求来源IP( ip:127.0.0.1)
  • 当前请求所处的进程名称(P:SpawnProcess-1 )
  • 当前请求所处的线程名称(T:2648:MainThread)
  • 当前请求traceid链路ID 和请求链路操作事件记录索引index(reqId:hqterUhnCGXraywSiiWU79 index:1 index:2)
  • 当前日志记录名称event_name (event:response | event:request)
  • 截止当前日志生成(分散式日志下)耗时时间(cost_time:0.03)
  • message要完成记录日志详情信息({"msg"XXXX})

日志插件中涉及到的中间件

如果翻看之前我自己之前分享,有关于基于路由方式完成日志记录的话,此方式似乎还是存在一定小小不优雅,而基于中间件来实现请求报文(特别是涉及需要记录body信息的时候)以及同时要记录响应报文内容时候,则存在一定局限性。

所以要完成一个基于中间件的方式来实现:既要记录请求报文(特别是涉及需要记录body信息的时候)又要实现响应报文内容,需要进行一些特殊的处理。也是前面我在公众号中有分享关于一些中间件解决body重复消费的问题,但是那些处理依然还是存在一定局限,以下是一个比较完美的解决上述问题中间件,具体代码如下所示:

from contextvars import ContextVar
from time import perf_counter
from starlette.requests import Request
import typing
from starlette.datastructures import Headers
from starlette.types import ASGIApp, Message, Receive, Scope, Send
import shortuuid
from starlette.responses import Response

from .contextvar import log_request_var, logrequest
from .enums import RecordModel
from afastcore.plugins.loguru import logger

import json
from typing import Optional
from pydantic import BaseModel, Field


class ResponseInfo(BaseModel):
    headers: Optional[Headers] = Field(default=None, title="Response header")
    body: str = Field(default="", title="Response body")
    status_code: Optional[int] = Field(default=None, title="Status code")

    class Config:
        arbitrary_types_allowed = True


# 存贮日志内容的上下文信息
log_msg_var: ContextVar[dict] = ContextVar("log_msg_var", default={})


class LoguruPluginClientMiddleware:
    '''
    # 这个中间件的话,如果需要在内部-0---读取请求体内容,需要再最终添加,也就是需要在最前面的执行
    # 也就是需要在所有的中间件的最后面再去注册,不能放在其他的前面,
    # 如果需要在内部消费   body = await self.request.body(),则需要开启is_proxy=True
    '''

    def __init__(self, *, app: ASGIApp, is_proxy=True, client) -> None:
        self.app = app
        self.is_proxy = is_proxy
        self.client = client
        self.request: typing.Optional[Request] = None

    async def get_body(self):
        """获取请求BODY,实现使用代理方式解析读,解决在中间件中火球Body的问题"""
        body = await self.request.body()
        return body

    async def get_json(self):
        """获取json请求参数"""
        return json.loads(await self.get_body())

    async def before_request(self, request: Request) -> [Response, None]:
        """如果需要修改请求信息,可直接重写此方法"""
        pass
        # # 追踪ID
        request.state.traceid = str(shortuuid.uuid())
        request.state.traceindex = 0
        #
        request.state.close_record = False
        # # 计算时间
        request.state.start_time = perf_counter()

    async def after_request(self, request: Request, token=None, response: Response = None) -> [Response,
                                                                                               None]:
        """请求后的处理【记录请求耗时等,注意这里没办法对响应结果进行处理】"""
        pass
        # 记录响应报文体内容信息,
        log_msg = log_msg_var.get()
        if self.client.settings.IS_RECORD_RESPONSE and log_msg and response.status_code != 404:
            logger.info(str(response.body, 'utf-8'), event_name='response')

        try:
            request.state.traceindex = None
            request.state.traceid = None
            request.state.trace_logs_record = None
            log_request_var.reset(token)
        except:
            pass

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] != "http":
            return await self.app(scope, receive, send)
        # 和上面的判断类似
        if scope["type"] not in ["http", 'websocket']:
            await self.app(scope, receive, send)
            return

        # 解决读取BODY问题
        if self.is_proxy:
            receive_ = await receive()

            async def receive():
                return receive_

        # 这里考虑直接读取一次body然后保存到对应的上下文中

        # 解析当前的请求体
        self.request = Request(scope, receive=receive)
        # 获取到客户端对象需要过滤的不记录的URL信息,这里直接的跳过
        if self.client.filter_request_url(request=self.request):
            # 新增过滤直接跳过不需要做其他判断处理
            return await self.app(scope, receive, send)

        # 解析报文体内容
        response_info = ResponseInfo()
        # 自定义回调函数,可以自己进行重写实现具体的业务逻辑
        await self.before_request(self.request) or self.app
        # 解析当前的请求体
        token = log_request_var.set(self.request)
        #
        # 离散是日志记录模式
        if self.client.settings.MODEL == RecordModel.SCATTERED:
            logrequest.state.record_model = RecordModel.SCATTERED
            log_msg = await self.client.make_request_log_msg(self.request)
            log_msg_var.set(log_msg)
            # 如果过滤了,则也记录请求信息了
            if log_msg:
                logger.info(log_msg, event_name='request')
        else:
            # 集中式日志记录模式已
            pass
            # 创建全局是日志上下文
            logrequest.state.record_model = RecordModel.CENTRALIZED
            self.request.state.trace_logs_record = []
            log_msg = await self.client.make_request_log_msg(self.request)
            log_msg_var.set(log_msg)
            logger.info(log_msg, event_name='request', )

        # 下一个循环体
        async def _next_send(message: Message) -> None:
            if message.get("type") == "http.response.start":
                response_info.headers = Headers(raw=message.get("headers"))
                response_info.status_code = message.get("status")
            # 解析响应体内容信息
            elif message.get("type") == "http.response.body":
                if body := message.get("body"):
                    response_info.body += body.decode("utf8")
                # print('响应头吧', response_info)
                response = Response(content=response_info.body,
                                    status_code=response_info.status_code,
                                    headers=dict(response_info.headers)
                                    )
                await self.after_request(request=self.request, token=token, response=response)

            await send(message)

        try:
            await self.app(scope, receive, _next_send)
        finally:
            pass

1:中间件中解决request.body()重复消费

如上中间件代码中,我们是通过如下代码方式来解决在中间件重复消费request.body()的问题:

  # 解决读取BODY问题
        if self.is_proxy:
            receive_ = await receive()

            async def receive():
                return receive_

        # 这里考虑直接读取一次body然后保存到对应的上下文中

        # 解析当前的请求体
        self.request = Request(scope, receive=receive)

2:中间件中解决读取响应报文问题

而解决要读取响应报文内容,则是通过如下的代码的方式来解决:

 # 下一个循环体
        async def _next_send(message: Message) -> None:
            if message.get("type") == "http.response.start":
                response_info.headers = Headers(raw=message.get("headers"))
                response_info.status_code = message.get("status")
            # 解析响应体内容信息
            elif message.get("type") == "http.response.body":
                if body := message.get("body"):
                    response_info.body += body.decode("utf8")
                # print('响应头吧', response_info)
                response = Response(content=response_info.body,
                                    status_code=response_info.status_code,
                                    headers=dict(response_info.headers)
                                    )
                await self.after_request(request=self.request, token=token, response=response)

            await send(message)

其中读取响应报文的内容时候关键的是对 message.get("type") == "http.response.body"的判断处理再进行读取解析,并还原一个Response。如上日志中间件的关键则解决了我们日志记录依赖相关难题。

3.日志中间件中全局request

另外在此中间件中,我们还涉及到了类似全局request的请求处理,所以在也定义类似请求上下文对象:


from contextvars import ContextVar
from .bind_ import bind_contextvar
from starlette.requests import Request
# 如果 context_var 在 get 之前没有先 set,那么会抛出一个 LookupError,可以通过设置 contextvar.ContextVar 默认值:
log_request_var: ContextVar[Request] = ContextVar("logrequest")
logrequest: Request = bind_contextvar(log_request_var)

然后在中间件中进行初始,如下初始化代码:

async def after_request(self, request: Request, token=None, response: Response = None) -> [Response,
                                                                                           None]:
    """请求后的处理【记录请求耗时等,注意这里没办法对响应结果进行处理】"""
    pass
    # 记录响应报文体内容信息,
    log_msg = log_msg_var.get()
    if self.client.settings.IS_RECORD_RESPONSE and log_msg and response.status_code != 404:
        logger.info(str(response.body, 'utf-8'), event_name='response')

    try:
        request.state.traceindex = None
        request.state.traceid = None
        request.state.trace_logs_record = None
        log_request_var.reset(token)
    except:
        pass

日志自定义字段的扩展包

要实现我们通过loguru库来实现其他字段记录,我们还需要进行另外扩展实现,如下libs包下的logger的包结构所示:

FastApi(自用脚手架)+Snowy搭建后台管理系统(4)脚手架--loguru日志插件

其中我们的logger下的v1.py代码内容如下:

"""Configure snowy_controller and formats for application loggers."""
import logging
import os
import platform
import sys
from datetime import datetime, timezone
from pathlib import Path
from pprint import pformat
from loguru import logger


# from app.configs.configs import app_config


def set_log_extras(record):
    """set_log_extras [summary].

    [extended_summary]
    Args:
        record ([type]): [description]
    """
    record["extra"]["datetime"] = datetime.now(
        timezone.utc
    )  # Log datetime in UTC time zone, even if server is using another timezone
    record["extra"]["host"] = os.getenv( "HOSTNAME", os.getenv("COMPUTERNAME", platform.node())).split(".")[0]
    record["extra"]["pid"] = os.getpid()
    # record["extra"]["traceid"] = correlation_id.get()



class InterceptHandler(logging.Handler):
    """
    Default snowy_controller from examples in loguru documentaion.
    See https://loguru.readthedocs.io/en/stable/overview.html#entirely-compatible-with-standard-logging
    """
    def emit(self, record: logging.LogRecord):
        # Get corresponding Loguru level if it exists
        try:
            level = logger.level(record.levelname).name
        except ValueError:
            level = record.levelno

        # Find caller from where originated the logged message
        frame, depth = logging.currentframe(), 2
        while frame.f_code.co_filename == logging.__file__:
            frame = frame.f_back
            depth += 1


        logger.opt(
            depth=depth,
            exception=record.exc_info,
        ).log(level, record.getMessage())



def format_record(record: dict) -> str:
    """
    Custom format for loguru loggers.
    Uses pformat for log any data like request/response body during debug.
    Works with logging if loguru handles it.
    Example:
    >>> payload = [{"users":[{"name": "Nick", "age": 87, "is_active": True}, {"name": "Alex", "age": 27, "is_active": True}], "count": 2}]
    >>> logger.bind(payload=).debug("users payload")
    >>> [   {   'count': 2,
    >>>         'users': [   {'age': 87, 'is_active': True, 'name': 'Nick'},
    >>>                      {'age': 27, 'is_active': True, 'name': 'Alex'}]}]
    """


    # 等级
    format_string = "<level>{level: <5}</level> |"
    # 时间
    format_string += "<green>{time:YYYY-MM-DD HH:mm:ss}</green> |"
    # IP
    if record["extra"].get("ip") is not None:
        format_string += " <cyan>ip:{extra[ip]} </cyan>|"
        # 记录进程和线程信息
    format_string += "<green>P:{process.name}</green> |"
    format_string += "<green>T:{thread.id}:{thread.name}</green> |"
    if record["extra"].get("traceid") is not None:
        if record["extra"].get("traceindex") is not None:
            format_string += " reqId:{extra[traceid]} index:{extra[traceindex]} |"
        else:
            format_string += " reqId:{extra[traceid]} |"

    if record["extra"].get("event_name") is not None:
        format_string += " event:{extra[event_name]} |"

    if record["extra"].get("cost_time") is not None:
        format_string += " cost_time:{extra[cost_time]} |"
    # 正式的日志内容
    format_string += " - <level>{message}</level>"
    # 绑定时候是否包含有payload,有的话则换行
    if record["extra"].get("payload") is not None:
        record["extra"]["payload"] = pformat(
            record["extra"]["payload"], indent=4, compact=True, width=88
        )
        format_string += "\n<level>{extra[payload]}</level>"

    format_string += "{exception}\n"
    # format_string += "<cyan>{module}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan>  |"

    # 扩展自定义字段--需要使用 logger.bind(ip=get_client_ip(request))来设置扩展字段信息的值

    if record["extra"].get("traceid") is not None:
        if record["extra"].get("traceindex") is not None:
            format_string += " reqId:{extra[traceid]} index:{extra[traceindex]} |"
        else:
            format_string += " reqId:{extra[traceid]} |"

    if record["extra"].get("event_name") is not None:
        format_string += " event:{extra[event_name]} |"

    if record["extra"].get("cost_time") is not None:
        format_string += " cost_time:{extra[cost_time]} |"


    # 正式的日志内容
    format_string += " - <level>{message}</level>"

    # 绑定时候是否包含有payload,有的话则换行
    if record["extra"].get("payload") is not None:
        record["extra"]["payload"] = pformat(
            record["extra"]["payload"], indent=4, compact=True, width=88
        )
        format_string += "\n<level>{extra[payload]}</level>"

    format_string += "{exception}\n"

    return format_string


def init_logging(app_config):
    """
    Replaces logging snowy_controller with a snowy_controller for using the custom snowy_controller.

    WARNING!
    if you call the init_logging in startup event function,
    then the first logs before the application start will be in the old format
    >>> app.add_event_handler("startup", init_logging)
    stdout:
    INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
    INFO:     Started reloader process [11528] using statreload
    INFO:     Started server process [6036]
    INFO:     Waiting for application startup.
    2020-07-25 02:19:21.357 | INFO     | uvicorn.lifespan.on:startup:34 - Application startup complete.

    """

    # disable snowy_controller for specific uvicorn loggers
    # to redirect their output to the default uvicorn logger
    # snowy_tasks with uvicorn==0.11.6
    loggers = (
        logging.getLogger(name)
        for name in logging.root.manager.loggerDict
        if name.startswith("uvicorn.")
    )

    for uvicorn_logger in loggers:
        uvicorn_logger.handlers = []

    # change snowy_controller for default uvicorn logger
    # 加上这段会导致日志输出断层或只是偶尔记录
    intercept_handler = InterceptHandler()
    logging.getLogger("uvicorn").handlers = [intercept_handler]
    # logging.getLogger("uvicorn").snowy_controller = []
    logging.getLogger("rocketry").handlers = []
    # set logs output, level and format
    logger.configure(
        handlers=[{"sink": sys.stdout, "level": logging.DEBUG, "format": format_record}]
    )
    #   logger.add参数:
    #
    # sink:要添加的日志记录器的句柄。它可以是一个函数,一个可调用的对象,或者一个字符串(用于表示内置的日志记录器)。
    #
    # backtrace:是一个布尔值,用于指定是否应在每条日志消息中包含执行跟踪。默认情况下,它被设置为False。
    #
    # colorize:是一个布尔值,用于指定是否应将日志消息中的消息类型颜色化。默认情况下,它被设置为True。
    #
    # format:表示将要用于格式化日志消息的字符串。默认情况下,它被设置为"{time} | {level} | {message}({file}:{line})"。
    #
    # enqueue:是一个布尔值,用于指定是否应将日志消息发送到内部消息队列,以便异步处理。默认情况下,它被设置为False。
    #
    # level:表示要记录的最低日志级别的字符串。默认情况下,它被设置为"INFO"。
    #
    # filter:表示要过滤的日志消息的字符串,只有包含该字符串的日志消息才会被记录。默认情况下,它被设置为None。
    #
    # close_atexit:是一个布尔值,用于指定是否应在程序退出时关闭日志记录器。默认情况下,它被设置为True。
    logger.add(
        Path(app_config.LOG_FILE_PATH) / f"{app_config.PROJECT_SLUG}.log",
        rotation=app_config.LOG_FILE_ROTATION,
        retention=app_config.LOG_FILE_RETENTION,
        compression=app_config.LOG_FILE_COMPRESSION,
        enqueue=True,
        backtrace=True,
        # serialize=True, # the record is provided as a JSON string to the snowy_controller
        level=app_config.LOG_FILE_LEVEL,
        format=format_record,
    )

    return logger.bind(traceid=None, method=None)

相关日志请求扩展字段我们是在format_record函数中进行扩展实现的。而完成日志记录器则是在init_logging函数中完成。

日志插件实现过程

首先前面我介绍日志插件依赖一些基础库的实现,以及涉及到日志中所需的中间件。接下来开始完成我们的插件。首先我们需要实现具体的插件实例。

FastApi(自用脚手架)+Snowy搭建后台管理系统(4)脚手架--loguru日志插件

1.插件客户端定义

其中client.py客户端代码,如下所示:

from datetime import datetime
from typing import Union
from urllib.parse import parse_qs
from user_agents import parse
from afastcore.libs.logger.v1 import init_logging
from pydantic import BaseSettings as Settings
from fastapi import FastAPI
from .enums import RecordModel
from .middleware import LoguruPluginClientMiddleware
from ..pluginbase import IBasePlugin as BasePlugin

from starlette.requests import Request


class LoguruPluginClient(BasePlugin):
    '''
    注意事项:
    因为是日志作用,所以一般使用的时候最后再执行注册
    ------------------
    用法示例:
    log = LoguruPluginClient(app=app,configs=LoguruPluginClient.LoguruConfig(
     MODEL = RecordModel.CENTRALIZED
    ))
    from afastcore.plugins.log import logger
    @app.get("/stream12")
    def stream():
        # 写入
        # print("dddddddddf",logrequest.state.background)
        # snowy_tasks = BackgroundTask(logger.info, 'aaaaa22222222222222---1')
        # logrequest.state.background = snowy_tasks
        # snowy_tasks.add_task(logger.info,'aaaaa22222222222222---1')
        # log.bind(ip=get_client_ip(request)).info('aaaaa')
        logger.info('日志记录信息')
        # 读取(另一个接口读取)
        # cyrewct_code = request.session["session_code"]
        return {
            "dasd":2323
        }
    '''

    name = '日志记录插件'

    # 再静态的里面使用self来查询也可以,遵循从内到外的查询

    def filter_request_url(self, request: Request):
        path_info = request.url.path
        # print("当前地址:",path_info)
        # print(self.configs.FLITER_REQUEST_URL )
        # print('aaaaaaaaaaaa',path_info in self.configs.FLITER_REQUEST_URL)
        # print('过滤不需要记录日志请求地址URL', path_info in self.configs.FLITER_REQUEST_URL or 'websocket' in path_info or request.method != 'OPTIONS')
        # 过滤不需要记录日志请求地址URL
        if request.method == 'OPTIONS':
            return True
        if 'websocket' in path_info:
            return True
        return path_info in self.settings.FLITER_REQUEST_URL

    class LoguruConfig(Settings):

        # =========================记录模式
        MODEL = RecordModel.CENTRALIZED

        # =========================
        # 配置日志相关信息
        PROJECT_SLUG: str = "info"
        # 日志文件的目录,当前插件初始化位置所在的目录
        LOG_FILE_PATH: str = "."
        # 日志文件切割的日期- rotation='00:00',  # 每天 0 点创建一个新日志文件
        LOG_FILE_ROTATION: str = "00:00"
        # 日志文件的保留的天数  #  retention="7 days",  # 定时自动清理文件
        LOG_FILE_RETENTION: Union[int, str] = 8
        # 日志压缩的搁置
        LOG_FILE_COMPRESSION: str = "gz"
        # 日志记录的等等级
        LOG_FILE_LEVEL: str = "INFO"
        # =========================
        # 日志记录相关配置-
        NESS_ACCESS_HEADS_KEYS = []
        # 是否记录响应报文内容,如果包含内容过多,不建议记录
        IS_RECORD_RESPONSE: bool = True
        # 是否记录用户提交请求头信息
        IS_RECORD_HEADERS: bool = True
        # 是否记录用户UA信息
        IS_RECORD_UA: bool = True
        # 需要过来的请求URL路径信息
        FLITER_REQUEST_URL = ['/favicon.ico', '/docs', '/', '/openapi.json','/health_checks']

    async def make_request_log_msg(self, request: Request):
        log_msg = None
        if self.filter_request_url(request):
            request.state.close_record = True
        else:
            ip, method, url = request.client.host, request.method, request.url.path
            # 解析请求提交的表单信息
            try:
                body_form = await request.form()
            except:
                body_form = None

            # 解析请求提交的body信息
            body = None
            try:
                pass
                body_bytes = await request.body()
                if body_bytes:
                    try:
                        body = await  request.json()
                    except:
                        pass
                        if body_bytes:
                            try:
                                body = body_bytes.decode('utf-8')
                            except:
                                body = body_bytes.decode('gb2312')
            except Exception as e:
                raise e
            # 在这里记录下当前提交的body的数据,用于下文的提取
            request.state.body = body
            # 从头部里面获取出对应的请求头信息,用户用户机型等信息获取
            try:
                user_agent = parse(request.headers["user-agent"])
                browser = user_agent.browser.version
                if len(browser) >= 2:
                    browser_major, browser_minor = browser[0], browser[1]
                else:
                    browser_major, browser_minor = 0, 0
                # 用户当前系统OS信息提取
                user_os = user_agent.os.version
                if len(user_os) >= 2:
                    os_major, os_minor = user_os[0], user_os[1]
                else:
                    os_major, os_minor = 0, 0

                log_msg = {
                    # 'headers': str(gziprequest.headers),
                    # 'user_agent': str(gziprequest.user_agent),
                    # 记录请求头信息----如果需要特殊的获取某些请求的记录则做相关的配置即可
                    'headers': None if not self.settings.IS_RECORD_HEADERS else
                    [request.headers.get(i, '') for i in
                     self.settings.NESS_ACCESS_HEADS_KEYS] if self.settings.NESS_ACCESS_HEADS_KEYS else None,
                    # 记录请求URL信息
                    "useragent": None if not self.settings.IS_RECORD_UA else
                    {
                        "os": "{} {}".format(user_agent.os.family, user_agent.os.version_string),
                        'browser': "{} {}".format(user_agent.browser.family, user_agent.browser.version_string),
                        "device": {
                            "family": user_agent.device.family,
                            "brand": user_agent.device.brand,
                            "model": user_agent.device.model,
                        }
                    },
                    'url': url,
                    # 记录请求方法
                    'method': method,
                    # 记录请求来源IP
                    # 'ip': ip,
                    # 'path': gziprequest.path,
                    # 记录请求提交的参数信息
                    'params': {
                        'query_params': parse_qs(str(request.query_params)),
                        'from': body_form,
                        'body': body
                    },
                    # 记录请求的开始时间
                    "ts": f'{datetime.now():%Y-%m-%d %H:%M:%S%z}'
                    # 'start_time':  f'{(start_time)}',
                }
            except Exception as e:
                log_msg = {
                    # 'headers': str(gziprequest.headers),
                    # 'user_agent': str(gziprequest.user_agent),
                    # 记录请求头信息----如果需要特殊的获取某些请求的记录则做相关的配置即可
                    'headers': None if not self.settings.IS_RECORD_HEADERS else
                    [request.headers.get(i, '') for i in
                     self.settings.NESS_ACCESS_HEADS_KEYS] if self.settings.NESS_ACCESS_HEADS_KEYS else None,
                    'url': url,
                    # 记录请求方法
                    'method': method,
                    # 记录请求来源IP
                    # 'ip': ip,
                    # 'path': gziprequest.path,
                    # 记录请求提交的参数信息
                    'params': {
                        'query_params': parse_qs(str(request.query_params)),
                        'from': body_form,
                        'body': body
                    },
                    # 记录请求的开始时间
                    "ts": f'{datetime.now():%Y-%m-%d %H:%M:%S%z}'
                    # 'start_time':  f'{(start_time)}',
                }
                raise e

            # 对于没有的数据清除
            if not log_msg['headers']:
                log_msg.pop('headers')
            if not log_msg['params']['query_params']:
                log_msg['params'].pop('query_params')
            if not log_msg['params']['from']:
                log_msg['params'].pop('from')
            if not log_msg['params']['body']:
                log_msg['params'].pop('body')
        return log_msg

    def setup(self, app: FastAPI, name: str = None, settings=None, *args, **kwargs):
        """插件初始化"""

        # init_logging_ex = init_logging(app_config)
        #def init_logging_ex():
        #    return init_logging(settings)

        # 开始初始化
        #app.add_event_handler("startup", init_logging_ex)
       
       # 直接调用不要通过事件回调,会有延迟!
       init_logging(settings)
       
       app.add_middleware(LoguruPluginClientMiddleware, is_proxy=True, client=self)

上述的插件实例化过程,第一步是引入了libs库中的init_logging函数来完成对应日志记录器实例化,然后通过启动事件回调来完成初始化,如下代码所示:

app.add_event_handler("startup", init_logging_ex)

完成日志记录器初始化完成后,开始初始化日志插件需要中间件,如下代码所示:

app.add_middleware(LoguruPluginClientMiddleware, is_proxy=True, client=self)

在此日志插件中,解析我们的请求报文信息,详细方法为make_request_log_msg,该方法主要对我们的请求报文信息提取:包含请求参数、请求报文体、请求头信息等提取等。而filter_request_url方法则是对一些请求进行过滤,有些请求URL不需要进行日志记录就忽略处理。

2.插件客户端配置项说明

在初始化日志插件的时候,默认我们的插件有基本一些参数初始化,具体如下代码所示:

class LoguruConfig(Settings):

    # =========================记录模式
    MODEL = RecordModel.CENTRALIZED

    # =========================
    # 配置日志相关信息
    PROJECT_SLUG: str = "info"
    # 日志文件的目录,当前插件初始化位置所在的目录
    LOG_FILE_PATH: str = "."
    # 日志文件切割的日期- rotation='00:00',  # 每天 0 点创建一个新日志文件
    LOG_FILE_ROTATION: str = "00:00"
    # 日志文件的保留的天数  #  retention="7 days",  # 定时自动清理文件
    LOG_FILE_RETENTION: Union[int, str] = 8
    # 日志压缩的搁置
    LOG_FILE_COMPRESSION: str = "gz"
    # 日志记录的等等级
    LOG_FILE_LEVEL: str = "INFO"
    # =========================
    # 日志记录相关配置-
    NESS_ACCESS_HEADS_KEYS = []
    # 是否记录响应报文内容,如果包含内容过多,不建议记录
    IS_RECORD_RESPONSE: bool = True
    # 是否记录用户提交请求头信息
    IS_RECORD_HEADERS: bool = True
    # 是否记录用户UA信息
    IS_RECORD_UA: bool = True
    # 需要过来的请求URL路径信息
    FLITER_REQUEST_URL = ['/favicon.ico', '/docs', '/', '/openapi.json','/health_checks']

3.定义日志记录函数

到此为止我们的日志还已基本完成日志插件一些初始化工作,当我们的需要启用该插件的时候,只需要在我们的前篇中提到的定义的函数中进行初始化即可,如下代码所示:

def _register_loguru_log_client(self, app: FastAPI) -> None:
    # 放在在最后处理因为是日志作用,所以一般使用的时候最后再执行注册
    pass
    # 日志插件初始化
    LoguruPluginClient(app=app,
                       settings=LoguruPluginClient.LoguruConfig(PROJECT_SLUG=self.settings.LOG_PROJECT_SLUG,
                                                                FLITER_REQUEST_URL=self.settings.FLITER_REQUEST_URL,
                                                                LOG_FILE_PATH=self.settings.LOG_FILE_PATH,
                                                                MODEL=self.settings.LOG_MODEL)
                       )
    logger.info("LoguruPluginClient插件安装成功")


当初始化完成后,接下来则是定义对应的写入日志函数,所以在如下图所示的文件中,我们定义了具体写入日志操作。

FastApi(自用脚手架)+Snowy搭建后台管理系统(4)脚手架--loguru日志插件

详细代码如下所示:

from datetime import datetime
from time import perf_counter

from loguru import logger as log
from fastapi import Request
from starlette.background import BackgroundTask

from afastcore.plugins.loguru.contextvar import logrequest
from afastcore.plugins.loguru.enums import RecordModel
from afastcore.utils.json_helper import dict_to_json


def get_client_ip(request: Request):
    """
    获取客户端真实ip
    :param request:
    :return:
    """
    forwarded = request.headers.get("X-Forwarded-For")
    if forwarded:
        return forwarded.split(",")[0]
    return request.client.host


def info(msg, event_name='logic', model=RecordModel.SCATTERED):
    try:
        assert hasattr(logrequest.state, 'traceid'), '需要先初始化日志插件对象'
        if hasattr(logrequest.state, 'close_record') and not logrequest.state.close_record:
            # 分散日志记录
            traceid = logrequest.state.traceid
            # 叠加事件ID编号
            logrequest.state.traceindex = traceindex = logrequest.state.traceindex + 1
            start_time = getattr(logrequest.state, 'start_time')
            end_time = f'{(perf_counter() - start_time):.2f}'
            # 每个打点记录的都记录一下消耗的时间
            if logrequest.state.record_model == model:
                try:
                    log.bind(traceid=traceid, event_name=event_name, cost_time=end_time, traceindex=traceindex,ip=get_client_ip(request=logrequest)).info(msg)
                except Exception:
                    return None
            else:
                # 集中式日志日志记录
                logmsg = {
                    # 定义链路所以序号
                    'trace_index': traceindex,
                    # 时间类型描述描述
                    'event_name': event_name,
                    # 日志内容详情
                    'msg': msg,
                    'cost_time': end_time,
                    "ts": f'{datetime.now():%Y-%m-%d %H:%M:%S%z}'
                }
                logrequest.state.trace_logs_record.append(dict_to_json(logmsg))
                # 标记事件结尾开始记录日志
                if event_name == 'response':
                    try:
                        log.bind(traceid=traceid, ip=get_client_ip(request=logrequest)).info(f"[{','.join(logrequest.state.trace_logs_record)}]")
                    except Exception:
                        return None
                pass
    except Exception:
        log.bind(event_name=event_name).info(msg)
    else:
        # 忽略整个请求链路下所有其他日志的日志请求
        pass

上述代码中中核心的关键是info函数,该函数也是我们后续调用相关日志记录必须使用的,在该函数中,我们首先判断是否存在对应的链路追踪ID来判断是否已经初始化日志插件,如果存在,则继续判断是否关闭了日志请求,如果没有,则继续开始根据设置的记录类型来进行日志记录,如果是汇总记录,则放在如果日志事件类型是:if event_name == 'response'的时候才进行日志的写入,如果是分散式,则每次调用都进行写入。

其中日志记录一些自定义扩展字段的关键实现在于以下代码段:

log.bind(traceid=traceid, event_name=event_name, cost_time=end_time, traceindex=traceindex,ip=get_client_ip(request=logrequest)).info(msg)

其中扩展自定义的字段我们需要通过log.bind来实现。

至此完成后,我们只需要在需要进行日志记录的地方进行调用,函数即可,如下调用示例所示:

LoguruPluginClient(app=app,
                   settings=LoguruPluginClient.LoguruConfig(PROJECT_SLUG=self.settings.LOG_PROJECT_SLUG,
                                                            FLITER_REQUEST_URL=self.settings.FLITER_REQUEST_URL,
                                                            LOG_FILE_PATH=self.settings.LOG_FILE_PATH,
                                                            MODEL=self.settings.LOG_MODEL)
                   )
logger.info("LoguruPluginClient插件安装成功")

以上内容分享纯属个人经验,仅供参考!文笔有限,如有笔误或错误!欢迎批评指正!感谢各位大佬!

结尾

END

简书:www.jianshu.com/u/d6960089b…

掘金:juejin.cn/user/296393…

公众号:微信搜【程序员小钟同学】

小钟同学 | 文 【欢迎一起学习交流】| QQ:308711822