Mysql-SQL 切面监控Mysql-SQL 切面监控 前言 前一阵子,公司的同事写了一个含bug的代码:当某个查询参
Mysql-SQL 切面监控
前言
前一阵子,公司的同事写了一个含bug的代码:当某个查询参数缺失时,会导致全表查询,数据量过大导致直接挤爆了java进程内存,从而导致了线上故障。
基于这个事故,我们准备全面排查各个系统中的sql执行情况:
- 查询的返回量不能超过某个阈值
- 查询的执行速度
从而去推动存在问题的sql改造。
再说一下背景,我们系统使用Mysql作为关系型数据库,持久层框架使用:
- Mybatis
- Spring JPA
接下来会分别在不同层次上介绍如何在切面上实现我们想要的监控效果。
基于 Mysql Connector 的QueryInterceptor
使用方法:
- 自定义 QueryInterceptor 的实现类
- mysql连接的url中添加该实现类,相当于是注册该拦截器
自定义拦截器
Mysql Connector 提供了一个QueryInterceptor接口(connector8.0版本以上),只要实现该接口后,就能实现SQL查询操作的拦截处理。
connector 5-6版本提供的类似接口叫 StatementInterceptor。
CustomQueryInterceptor
/**
* 拦截mysql jdbc query查询语句
* @author tunan
*/
public class CustomQueryInterceptor implements QueryInterceptor {
private static final Logger logger = LoggerFactory.getLogger(MybatisInterceptor.class);
@Override
public QueryInterceptor init(MysqlConnection mysqlConnection, Properties properties, Log log) {
return this;
}
@Override
public <T extends Resultset> T preProcess(Supplier<String> supplier, Query query) {
Transaction transaction = new Transaction();
transaction.setStartTime(System.currentTimeMillis());
transaction.setSql(supplier.get());
TransactionHolder.add(transaction);
return null;
}
@Override
public boolean executeTopLevelOnly() {
return false;
}
@Override
public void destroy() {
}
@Override
public <T extends Resultset> T postProcess(Supplier<String> supplier, Query query, T t, ServerSession serverSession) {
TransactionHolder.get().setEndTime(System.currentTimeMillis());
handle(TransactionHolder.get(), t);
TransactionHolder.remove();
return t;
}
// 处理查询的Transaction
private <T extends Resultset> void handle(Transaction transaction, T t) {
// 处理查询结果
if (t != null) {
try {
long cost = transaction.getEndTime() - transaction.getStartTime();
logger.info("SQL查询耗时: {}ms,SQL: {}", cost, transaction.getSql());
if (t.hasRows()) {
int size = t.getRows().size();
// 判断结果数量是否超过 1000
if (size > 1000) {
logger.warn("SQL查询结果数量超过 1000 条,SQL: {},结果数量: {}", transaction.getSql(), size);
}
}
} catch (Exception e) {
logger.error("CustomQueryInterceptor >> handle error", e);
}
}
}
}
TransactionHolder
需要使用 ThreadLocal 来记录同一次sql查询的preProcess和postProcess的时间戳。
public class TransactionHolder {
private static final ThreadLocal<Transaction> HOLDER = new ThreadLocal<>();
public static void add(Transaction transaction) {
HOLDER.set(transaction);
}
public static void remove() {
HOLDER.remove();
}
public static Transaction get() {
return HOLDER.get();
}
}
url 中添加该拦截器
我是在spring直接配置Datasource的。
spring:
datasource:
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://xxx.xxx.xxx.xxx:3306/xxxxx?queryInterceptors=com.fourportun.spring.boot.dao.mysql.CustomQueryInterceptor
username: xxx
password: xxx
效果
会打印出一堆非业务相关的sql语句,并且只能打印出sql语句,不知道是哪个方法执行的该sql。
基于 Mybatis 的 Inteceptor
Mybatis 框架提供了类似于上文中的拦截接口,叫做Interceptor,通过实现该接口并注册到Spring容器中就可以使用了。
自定义拦截器
MybatisInterceptor
除了需要实现该接口外,还需要在类上声明一个@Intercept的注解,并配合@Signature一起使用。
@Component
@ConditionalOnProperty(name = "mybatis.interceptor", havingValue = "true")
@Intercepts({@Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class,
RowBounds.class, ResultHandler.class})})
public class MybatisInterceptor implements Interceptor {
@Value("${mybatis.query-limit:1000}")
private Integer queryLimit;
@Resource
private ApplicationContext applicationContext;
private static final Logger logger = LoggerFactory.getLogger(MybatisInterceptor.class);
@Override
public Object intercept(Invocation invocation) throws Throwable {
Object[] args = invocation.getArgs();
MappedStatement ms = (MappedStatement) args[0];
Object parameter = args[1];
// 获取 SQL 语句
String sql = ms.getBoundSql(parameter).getSql();
// 获取 Mapper 类和方法名
String mapperId = ms.getId();
// 执行查询
Object result = invocation.proceed();
// 获取查询结果数量
int resultCount = ((List<?>) result).size();
// 判断结果数量是否超过 1000
if (resultCount > queryLimit) {
// 记录 SQL 语句
logger.warn("查询结果数量超过 {},mapper: {}, SQL 语句:{}", queryLimit, mapperId, sql);
try {
QueryLimitExceededHandler handler = applicationContext.getBean(QueryLimitExceededHandler.class);
handler.handle(mapperId, sql, resultCount);
} catch (NoSuchBeanDefinitionException e) {
// logger.error("未配置 QueryLimitExceededHandler.");
} catch (Exception e) {
logger.error("QueryLimitExceededHandler 处理异常:", e);
}
}
return result;
}
@Override
public Object plugin(Object target) {
return Interceptor.super.plugin(target);
}
@Override
public void setProperties(Properties properties) {
Interceptor.super.setProperties(properties);
}
}
这个类将会放在第三方库中供多个业务方使用,如果业务方想开启该Inteceptor只要在配置文件中声明:
mybatis:
interceptor: true
这是利用@ConditionalOnProperty实现的。
此外,如果业务方还需要额外进行处理,可以实现我们自己定义好的QueryLimitExceededHandler接口并注册到Spring容器中。
QueryLimitExceededHandler
public interface QueryLimitExceededHandler {
void handle(String mapperName, String sql, int limit);
}
@Component
@Slf4j
public class CustomQueryLimitExceedHandler implements QueryLimitExceededHandler {
@Override
public void handle(String mapperName, String sql, int limit) {
log.warn("Query limit exceeded, mapperName: {}, sql: {}, limit: {}", mapperName, sql, limit);
}
}
效果
基于 Spring AOP 的JPA切面
确认公共切面位置
Spring JPA
框架没有提供直接的拦截和切面来实现我们想要的功能,所以我们只能基于Spring AOP
提供的动态代理的AOP
的方式来实现。
如何来设计这个切面呢?首先我们先看Spring JPA
下的接口都是什么样子的:
public interface UserDao extends JpaSpecificationExecutor<UserEntity>, JpaRepository<UserEntity, Long> {
}
一般来说,我们自定义的接口会继承JpaRepository
。所以我们只要围绕该接口做切面即可。
当然,不继承JpaRepository也就可以实现JPA,具体要围绕哪个公共的接口根据自己的使用情况来定,也可以同时对多个公共接口作切面。
做切面处理
@Aspect
@Component
public class JpaAspect {
@Value("${mybatis.query-limit:1000}")
private Integer queryLimit;
@Value("${spring.datasource.url:no}")
private String url;
@Resource
private ApplicationContext applicationContext;
private static final Logger logger = LoggerFactory.getLogger(JpaAspect.class);
@Pointcut("execution(* org.springframework.data.jpa.repository.JpaRepository+.*(..))")
public void jpaRepositoryMethod() {
}
@Around("jpaRepositoryMethod()")
public Object logJpaRepositoryMethodExecution(ProceedingJoinPoint joinPoint) throws Throwable {
// 获取目标对象的真实类名
String className = JpaUtil.getClassName(joinPoint.getSignature());
String methodName = JpaUtil.getMethodName(joinPoint.getSignature());
Object[] args = joinPoint.getArgs();
long startTime = System.currentTimeMillis();
try {
Object result = joinPoint.proceed();
// 获取查询结果数量
int resultCount = 0;
if (result instanceof List) {
resultCount = ((List<?>) result).size();
}
if (resultCount > queryLimit) {
// dosomething
}
return result;
} catch (Exception e) {
throw e;
} finally {
// dosomething
}
}
}
通过joinPoint的签名拿到实现类的类名和方法名
public class JpaUtil {
public static String getClassName(Signature signature) {
try {
String className = signature.getDeclaringTypeName();
String[] split = className.split("\.");
className = split[split.length - 1];
return className;
} catch (Exception e) {
return "";
}
}
public static String getMethodName(Signature signature) {
try {
return signature.getName();
} catch (Exception e) {
return "";
}
}
public static String getHost(String url) {
Pattern pattern = Pattern.compile("jdbc:mysql://([^:]+):(\d+)/([^/]+)");
Matcher matcher = pattern.matcher(url);
if (matcher.find()) {
return matcher.group(1);
} else {
return "";
}
}
public static String getSchema(String url) {
Pattern pattern = Pattern.compile("jdbc:mysql://([^:]+):(\d+)/([^/]+)");
Matcher matcher = pattern.matcher(url);
if (matcher.find()) {
return matcher.group(3);
} else {
return "";
}
}
}
效果
略
转载自:https://juejin.cn/post/7409872370138595378