java批量操作数据库优化
一、背景
最近收到数据库CPU告警,跟踪后发现是有小伙伴在循环操作数据库,每个数据都要查询+更新,看着血压有点高~
伪代码:
Map<String, GetTaskListResponse> taskList = ; // 接口查询一个列表
taskList.forEach((taskId, task) -> {
Task originalTask = taskMapper
.selectOne(new LambdaQueryWrapper<Task>().eq(Task::getTaskId, taskId));
... // 存在判断+计算
taskMapper.updateById(originalTask);
});
分析cpu飙升过程:如果循环操作10万次 【查询+更新】,会有大量请求数据库的操作。其次,请求总时间会很久,在调用日志里面看到一次请求大于60s(这个慢请求是定时器发出的,所以一开始没有重视),触发一次超时重试,会再次发起请求,最终会导致数据库频繁请求,cpu升高。
与数据库的交互
应用连接到数据库之后,与数据库之间的交互基本时间开销有:连接、网络、io,如果操作的数据量很大,还会有数据库的计算开销。上面案例中的数据表数据不多,而连接是有池化的,所以开销主要是网络和io,减少交互的次数,是最简单的优化方式。
考虑分治:改成分批次更新,比如操作10万个数据,一次查1000个数据出来,计算好,一次提交1000个更新sql,循环100次即可。如果数据是没有依赖关系的,代码里面可以用多线程,异步查询、计算及更新。当然,异步发送请求太多太快,小心数据库会撑不住,这个要观察。
sql 批量操作方法
常见的数据库,比如mysql,可以使用的批量技术有2种,ORM框架用的机制也是基于这2个:
- 多个sql一次提交;
- 多个sql合成单个sql。
(1)多个sql一次提交
用";"分割的多个sql:将多次提交变成一次提交。比如下面的sql,数据表为user:
CREATE TABLE `user` (
`id` int(11) NOT NULL,
`name` varchar(255) DEFAULT "",
`age` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
批量sql:
-- 插入
INSERT INTO user(name, id) VALUES('hello',1);
INSERT INTO user(name, id) VALUES('world',2);
-- 更新
update user set name='hello1' where id=1;
update user set name='world1' where id=2;
(2)合成sql
多个sql合成一个提交:
-- 插入: 这里用自增id做主键
INSERT INTO user(name, age) VALUES
('hello',10),
('world',20);
-- 更新: 利用 case-when-then 合成
update user set
name= case
when id=1 then 'hello1'
when id=2 then 'world1'
end
where id in (1,2); -- 这里也可以用or,但是可能会索引失效。
二、常见ORM的批量操作
1.JPA
批量更新、批量插入都是saveAll,demo:
List<User> userList = // 获取用户列表
userRepository.saveAll(userList);
源码:
// org.springframework.data.jpa.repository.support.SimpleJpaRepository
@Transactional
public <S extends T> List<S> saveAll(Iterable<S> entities) {
Assert.notNull(entities, "Entities must not be null!");
List<S> result = new ArrayList();
Iterator var3 = entities.iterator();
while(var3.hasNext()) {
S entity = var3.next();
result.add(this.save(entity));
}
return result;
}
这里可以看到这个saveAll是"假"的,实现方式是循环调用save(),大量数据提交的话,会有性能问题。
2.mybatis-plus 批量更新
(1)手动批量flush
List<Task> taskList = // 获取任务列表
try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH)) {
TaskMapper taskMapper = sqlSession.getMapper(TaskMapper.class);
for (Task task : taskList) {
taskMapper.updateById(task);
}
sqlSession.flushStatements();// 提交一批sql
sqlSession.commit();
}
处理方式是多个sql一次提交。
(2)updateBatchById
来自mybatis-plus-extension-3.2.0.jar,下是调用demo:
public interface UserService extends IService<User> {
void updateBatchUserById(List<User> list);
}
@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {
@Override
public void updateBatchUserById(List<User> list) {
this.updateBatchById(list);
}
}
ServiceImpl 里的 this.updateBatchById源码如下:
@Transactional(
rollbackFor = {Exception.class}
)
public boolean updateBatchById(Collection<T> entityList, int batchSize) {
...
String sqlStatement = this.sqlStatement(SqlMethod.UPDATE_BY_ID);
// UPDATE_BY_ID("updateById", "根据ID 选择修改数据", "<script>\nUPDATE %s %s WHERE %s=#{%s} %s\n</script>"),
SqlSession batchSqlSession = this.sqlSessionBatch();
try {
int i = 0;
// 分批
for(Iterator var7 = entityList.iterator(); var7.hasNext(); ++i) {
T anEntityList = var7.next();
ParamMap<T> param = new ParamMap();
param.put("et", anEntityList); // 这里有坑 anEntityList不能为null
batchSqlSession.update(sqlStatement, param);
if (i >= 1 && i % batchSize == 0) {
batchSqlSession.flushStatements(); //提交sql
}
}
batchSqlSession.flushStatements();//提交最后一批sql
return true;
} catch (Throwable var17) {
// 异常转换
} finally {
// 关闭会话
}
}
思路其实和上面(1)的批量flush是一样的。也可以自定义sql:
<update id="batchUpdate" parameterType="java.util.List">
<foreach collection="list" item="item" separator=";">
update user set name = #{item.name} where id = #{item.id}
</foreach>
</update>
(3)更进一步:合成一个sql
提交的一批sql还能合并吗?使用case-when-then
<update id="updateBatch" parameterType="java.util.List" >
update user
<trim prefix="set" suffixOverrides=",">
<trim prefix="name=case" suffix="end,">
<foreach collection="list" item="i" index="index">
<if test="i.name != null and i.name != ''">
when id=#{i.id} then #{i.name}
</if>
</foreach>
</trim>
</trim>
where
<foreach collection="list" separator="or" item="i" index="index" >
id = #{i.id}
</foreach>
</update>
如果直接提交10万个更新的sql,也是可以的。有可能出的问题:
- (1)jvm 字符串太大,OOM。
- (2)数据库限制条数或者提交sql的数据大小。
参考:Mybatis中进行批量更新(updateBatch)
3.mybatis-plus 批量插入
(1)对象操作saveBatch
demo:
List<User> userList = new ArrayList<>();
userList.add(new User(1L, "Tom"));
userList.add(new User(2L, "Jerry"));
userList.add(new User(3L, "Mike"));
int result = userMapper.saveBatch(userList);
源码:
@Transactional(
rollbackFor = {Exception.class}
)
public boolean saveBatch(Collection<T> entityList, int batchSize) {
String sqlStatement = this.sqlStatement(SqlMethod.INSERT_ONE);//INSERT_ONE("insert", "插入一条数据(选择字段插入)", "<script>\nINSERT INTO %s %s VALUES %s\n</script>"),
SqlSession batchSqlSession = this.sqlSessionBatch();
Throwable var5 = null;
try {
int i = 0;
// 分批
for(Iterator var7 = entityList.iterator(); var7.hasNext(); ++i) {
T anEntityList = var7.next();
batchSqlSession.insert(sqlStatement, anEntityList);
if (i >= 1 && i % batchSize == 0) {
batchSqlSession.flushStatements();// 提交sql
}
}
batchSqlSession.flushStatements();// //提交最后一批sql
return true;
} catch (Throwable var16) {
// 异常处理
} finally {
// 关闭会话
}
}
这里比较遗憾,mybatis-plus的插入不会合成一个sql,只是一次提交多个插入的sql。
#####(2)合成一个sql 脚本如下:
<insert id="batchInsert" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id" flushCache="true">
insert into user (name) values
<foreach item="item" index="index" collection="list" separator=",">
(#{item.name})
</foreach>
</insert>
参考: 数据库批量插入这么讲究的么? MyBatis批量插入的五种方式,哪种最强
4.jdbcTemplate
(1)手动拼接
这种方式比较原始,需要把每个更新的insert/update sql语句写好,拼成一个大的sql提交。
(2)批量操作
demo如下:
List<User> userList = // 获取用户列表
String sql = "update user set age=? where id=?";
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
User user = userList.get(i);
ps.setInt(1, user.gee());
ps.setLong(2, user.getId());
}
@Override
public int getBatchSize() {
return userList.size();
}
});
源码:
public int[] batchUpdate(String sql, BatchPreparedStatementSetter pss) throws DataAccessException {
int[] result = (int[])this.execute(sql, (ps) -> { // execute执行sql
try {
...
if (JdbcUtils.supportsBatchUpdates(ps.getConnection())) {
int ixx = 0;
while(true) {
if (ixx < batchSize) {
pss.setValues(ps, ixx);
if (ipss == null || !ipss.isBatchExhausted(ixx)) {
ps.addBatch(); // 添加
++ixx;
continue;
}
}
// 获取预编译参数
int[] var11 = ps.executeBatch();
return var11;
}
} else {
List<Integer> rowsAffected = new ArrayList();
for(int i = 0; i < batchSize; ++i) {
pss.setValues(ps, i);
if (ipss != null && ipss.isBatchExhausted(i)) {
break;
}
rowsAffected.add(ps.executeUpdate()); // 添加
}
int[] rowsAffectedArray = new int[rowsAffected.size()];
for(int ix = 0; ix < rowsAffectedArray.length; ++ix) {
rowsAffectedArray[ix] = (Integer)rowsAffected.get(ix);
}
// 获取预编译参数
int[] var13 = rowsAffectedArray;
return var13;
}
} finally {
// 清理
}
});
Assert.state(result != null, "No result array");
return result;
}
看源码,原理一样的~最后都是拼接一批sql,JdbcTemplate只是提供了辅助拼接的工具。
三、总结
- 时刻记住,尽量不要循环操作数据库。大量操作数据库,要分批处理。
- 功能上线后要关注接口的性能,考虑把时间过长的请求纳入告警。
- 要加强代码review,代码静态分析工作。
转载自:https://juejin.cn/post/7247787521392377911