likes
comments
collection
share

SpringCloud 网关实现线程池异步批量保存请求日志 【SpringCloud系列16】

作者站长头像
站长
· 阅读数 16

SpringCloud 大型系列课程正在制作中,欢迎大家关注与提意见。

本文章是系列文章中的一篇

本文章实现的是 线程池异步批量保存请求日志,实现的是数据库中保存日志数据

日志过滤器添加

首先是在 网关服务中添加 日志过滤器


@Log4j2
public class LogFilter implements GlobalFilter, Ordered {
    private static final String START_TIME = "startTime";
    private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();

    @Resource
    VisitRecordService visitRecordService;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

        ServerHttpRequest request = exchange.getRequest();
        // 请求方法
        HttpMethod method = request.getMethod();
        // 请求头
        HttpHeaders headers = request.getHeaders();
        // 设置startTime 用来计算响应的时间
        exchange.getAttributes().put(START_TIME, System.currentTimeMillis());
        // 构建日志记录
        AccessRecord accessRecord = visitRecordService.build(exchange);

        if (method != null) {
            //设置请求方法
            accessRecord.setMethod(method.name());
            if (method == HttpMethod.GET) {
                //获取get请求参数
                MultiValueMap<String, String> formData = request.getQueryParams();
                if (!formData.isEmpty()) {
                    //保存请求参数
                    accessRecord.setFormData(JSON.toJSONString(formData));
                }
            } else if (method == HttpMethod.POST) {
                MediaType contentType = headers.getContentType();
                if (contentType != null) {
                    Mono<Void> voidMono = null;
                    if (contentType.equals(MediaType.APPLICATION_JSON)) {
                        // JSON
                        voidMono = readBody(exchange, chain, accessRecord);
                    }
                    if (voidMono != null) {
                        //计算请求时间
                        cacueConsumTime(exchange);

                        return voidMono;
                    }
                }
            }
        }

        visitRecordService.put(exchange, accessRecord);
        // 请求后执行保存
        return chain.filter(exchange).then(saveRecord(exchange));
    }

    private Mono<Void> saveRecord(ServerWebExchange exchange) {
        return Mono.fromRunnable(() -> {
            cacueConsumTime(exchange);
        });

    }

    /**
     * 计算访问时间
     *
     * @param exchange
     */
    private void cacueConsumTime(ServerWebExchange exchange) {
        //请求开始时设置的自定义属性标识
        Long startTime = exchange.getAttribute(START_TIME);
        Long consumingTime = 0L;
        if (startTime != null) {
            consumingTime = System.currentTimeMillis() - startTime;
            log.info(exchange.getRequest().getURI().getRawPath() + ": 耗时 " + consumingTime + "ms");
        }
        visitRecordService.add(exchange, consumingTime);
    }


    private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) {

        return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {

            byte[] bytes = new byte[dataBuffer.readableByteCount()];
            dataBuffer.read(bytes);
            DataBufferUtils.release(dataBuffer);
            Flux<DataBuffer> cachedFlux = Flux.defer(() -> {
                DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
                DataBufferUtils.retain(buffer);
                return Mono.just(buffer);
            });


            // 重写请求体,因为请求体数据只能被消费一次
            ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
                @Override
                public Flux<DataBuffer> getBody() {
                    return cachedFlux;
                }
            };

            ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();

            return ServerRequest.create(mutatedExchange, messageReaders)
                    .bodyToMono(String.class)
                    .doOnNext(objectValue -> {
                        accessRecord.setBody(objectValue);
                        visitRecordService.put(exchange, accessRecord);
                    }).then(
                            chain.filter(mutatedExchange)
                    );
        });
    }

    @Override
    public int getOrder() {
        return 2;
    }
    
}

HttpMessageReader是用于读取HTTP消息的类。 ServerRequest.create方法创建一个新的ServerRequest对象,该对象表示一个HTTP请求。 AccessRecord 是自定义的数据模型用来保存访问日志,代码如下:


import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.net.URI;
import java.time.LocalDateTime;

@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName(value = "access_recoder_log")
public class AccessRecord implements Serializable {
    private String formData;
    private URI targetUri;
    private String method;
    private String scheme;
    private String path;
    private String body;
    private String ip;
    private Integer status;
    private Long userId;
    private Long consumingTime;
    private LocalDateTime createTime;
}

VisitRecordService 就是我这里定义的异步保存日志的实现类

VisitRecordService 异步保存日志

ServerWebExchange是Spring WebFlux中的一个接口,用于表示HTTP请求和响应的交换。它提供了访问请求和响应的方法,以及访问请求属性和响应属性的方法。可以使用它来处理HTTP请求和响应,例如修改请求头或响应体,或者将请求转发到另一个处理程序。

在过滤器的 filter 方法中获取到对应的 ServerWebExchange,再从其中读取访问信息

@Slf4j
@Service
public class VisitRecordService {
    //自定义的一个标识
    private final String attributeKey = "visitRecord";
    /**
     * 构建一个 VisitRecord 实体类,但仅适用于获取 request 信息
     *
     * @param exchange gateway访问
     * @return 访问信息
     */
    public AccessRecord build(ServerWebExchange exchange) {
        // 获取请求信息
        ServerHttpRequest request = exchange.getRequest();
        String ip = RequestUtils.getIpAddress(request);
        // 请求路径
        String path = request.getPath().pathWithinApplication().value();
        // 请求schema: http/https
        String scheme = request.getURI().getScheme();
        // 请求方法
        HttpMethod method = request.getMethod();
        // 路由服务地址
        URI targetUri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        // 请求头
        HttpHeaders headers = request.getHeaders();
        // 获取请求地址
        InetSocketAddress remoteAddress = request.getRemoteAddress();

        AccessRecord accessRecord = new AccessRecord();
        accessRecord.setPath(path);
        accessRecord.setScheme(scheme);
        accessRecord.setTargetUri(targetUri);
        accessRecord.setIp(ip);
        accessRecord.setCreateTime(LocalDateTime.now());
        return accessRecord;
    }
    /**
     * 将访问信息存入 ServerWebExchange 当中,将会与当前请求关联起来,
     * 以便于后续在任何地方均可获得
     *
     * @param exchange    gateway访问合同
     * @param visitRecord 访问信息
     */
    public void put(ServerWebExchange exchange, AccessRecord visitRecord) {
        Map<String, Object> attributes = exchange.getAttributes();
        attributes.put(attributeKey, visitRecord);
    }
 }

然后在请求结束的时候保存一下日志

@Slf4j
@Service
public class VisitRecordService {
    /**
     * 缓存,在插入数据库前先存入此。
     * 为防止数据被重复插入,故使用Set,但不能确保100%不会被重复存储。
     */
    private HashSet<AccessRecord> visitCache = new HashSet<>();
    /**
     * 保存访问记录
     *
     * @param exchange      gateway访问
     * @param consumingTime  访问耗时
     */
    public void add(ServerWebExchange exchange, Long consumingTime) {
        // 获取数据
        ServerHttpResponse response = exchange.getResponse();
        ServerHttpRequest request = exchange.getRequest();
        //获取保存的日志记录体
        AccessRecord visitRecord = getOrBuild(exchange);
        //设置访问时间 单位毫秒
        visitRecord.setConsumingTime(consumingTime);

        // 设置访问状态
        if (response.getStatusCode() != null) {
            visitRecord.setStatus(response.getStatusCode().value());
        }
        //设置访问的用户ID 我这里是保存在请求头中
        String userId = request.getHeaders().getFirst("userId");
        if(StringUtils.isNotEmpty(userId)) {
            visitRecord.setUserId(Long.parseLong(userId));
        }
        // 打印访问情况
        log.info(visitRecord.toString());
        // 添加记录到缓存中
        visitCache.add(visitRecord);
        // 执行任务,保存数据
        doTask();
    }
}

doTask 在这里是使用线程池异步执行日志保存

    /**
     * 信号量,用于标记当前是否有任务正在执行,{@code true}表示当前无任务进行。
     */
    private volatile boolean taskFinish = true;
    private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 3, 15, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
    /**
     * 单次批量插入的数据量
     */
    private final int BATCH_SIZE = 500;
    
    private void doTask() {
        if (taskFinish) {
            // 当前没有任务的情况下,加锁并执行任务
            synchronized (this) {
                if (taskFinish) {
                    taskFinish = false;
                    threadPool.execute(() -> {
                        try {
                            // 当数据量较小时,则等待一段时间再插入数据,从而做到将数据尽可能的批量插入数据库
                            if (visitCache.size() <= BATCH_SIZE) {
                                Thread.sleep(500);
                            }
                            //批量保存
                            batchSave();
                        } catch (InterruptedException e) {
                            log.error("睡眠时发生了异常: {}", e.getMessage());
                        } finally {
                            // 任务执行完毕后修改标志位
                            taskFinish = true;
                        }
                    });
                }
            }
        }
    }

ThreadPoolExecutor是Java中的一个线程池实现,用于管理和复用线程,以提高应用程序的性能和响应能力。

它可以控制线程的数量,避免线程过多导致的资源浪费和性能下降,同时也可以避免线程不足导致的任务等待和响应延迟。

通过ThreadPoolExecutor,我们可以将任务提交给线程池,由线程池中的线程来执行任务,从而实现任务的异步执行和并发处理。

然后 batchSave() 方法就是具体的实现数据保存

@Slf4j
@Service
public class VisitRecordService {
    @Resource
    VisitLogService visitLogService;
    /**
     * 缩减因子,每次更新缓存Set时缩小的倍数,对应HashSet的扩容倍数
     */
    private final float REDUCE_FACTOR = 0.5f;

    private void batchSave() {
        log.debug("访问记录准备插入数据库,当前数据量:{}", visitCache.size());
        if (visitCache.size() == 0) {
            return;
        }
        // 构造新对象来存储数据,旧对象保存到数据库后不再使用
        HashSet<AccessRecord> oldCache = visitCache;
        visitCache = new HashSet<>((int) (oldCache.size() * REDUCE_FACTOR));
        boolean isSave = false;
        try {
            //批量保存
            isSave = visitLogService.saveBatch(oldCache, BATCH_SIZE);
        } finally {
            if (!isSave) {
                // 如果插入失败,则重新添加所有数据
                visitCache.addAll(oldCache);
            }
        }
    }
 }

VisitLogService 就是mybatis 的正常的数据增删改查范畴,我这里的定义如下:

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.biglead.gateway.pojo.AccessRecord;
import com.biglead.gateway.mapper.VisitLogMapper;
import org.springframework.stereotype.Service;

/**
 * 访问日志Service类
 */
@Service
public class VisitLogService extends ServiceImpl<VisitLogMapper, AccessRecord> {

}

VisitLogMapper 定义如下

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.biglead.gateway.pojo.AccessRecord;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface VisitLogMapper extends BaseMapper<AccessRecord> {
}

然后启动项目,访问接口数据,就可以自动记录到数据库中 SpringCloud 网关实现线程池异步批量保存请求日志 【SpringCloud系列16】 对应的 sql

CREATE TABLE `access_recoder_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `form_data` varchar(500) DEFAULT NULL,
  `body` varchar(500) DEFAULT NULL,
  `path` varchar(255) DEFAULT NULL,
  `ip` varchar(255) DEFAULT NULL,
  `status` int(11) DEFAULT NULL,
  `user_id` bigint(20) DEFAULT NULL,
  `scheme` varchar(255) DEFAULT NULL,
  `method` varchar(255) DEFAULT NULL,
  `consuming_time` bigint(20) DEFAULT NULL,
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8 COMMENT='日志记录表';

最后就是源码了: 本项目 SpringCloud 源码 gitee.com/android.lon…

本项目 管理后台web 源码gitee.com/android.lon…

本项目 小程序 源码 gitee.com/android.lon…

如果有兴趣可以关注一下公众号 biglead ,每周都会有 java、Flutter、小程序、js 、英语相关的内容分享

转载自:https://juejin.cn/post/7221093591295311930
评论
请登录