likes
comments
collection
share

java批量操作数据库优化

作者站长头像
站长
· 阅读数 42

一、背景

最近收到数据库CPU告警,跟踪后发现是有小伙伴在循环操作数据库,每个数据都要查询+更新,看着血压有点高~

伪代码:

Map<String, GetTaskListResponse> taskList = ; // 接口查询一个列表
  taskList.forEach((taskId, task) -> {
  Task originalTask = taskMapper
    .selectOne(new LambdaQueryWrapper<Task>().eq(Task::getTaskId, taskId));
  ... // 存在判断+计算
  taskMapper.updateById(originalTask);
});

分析cpu飙升过程:如果循环操作10万次 【查询+更新】,会有大量请求数据库的操作。其次,请求总时间会很久,在调用日志里面看到一次请求大于60s(这个慢请求是定时器发出的,所以一开始没有重视),触发一次超时重试,会再次发起请求,最终会导致数据库频繁请求,cpu升高。

与数据库的交互

应用连接到数据库之后,与数据库之间的交互基本时间开销有:连接、网络、io,如果操作的数据量很大,还会有数据库的计算开销。上面案例中的数据表数据不多,而连接是有池化的,所以开销主要是网络和io,减少交互的次数,是最简单的优化方式。

考虑分治:改成分批次更新,比如操作10万个数据,一次查1000个数据出来,计算好,一次提交1000个更新sql,循环100次即可。如果数据是没有依赖关系的,代码里面可以用多线程,异步查询、计算及更新。当然,异步发送请求太多太快,小心数据库会撑不住,这个要观察。

sql 批量操作方法

常见的数据库,比如mysql,可以使用的批量技术有2种,ORM框架用的机制也是基于这2个:

  • 多个sql一次提交;
  • 多个sql合成单个sql。
(1)多个sql一次提交

用";"分割的多个sql:将多次提交变成一次提交。比如下面的sql,数据表为user:

CREATE TABLE `user` (
  `id` int(11) NOT NULL,
  `name` varchar(255) DEFAULT "",
  `age` int(11) DEFAULT 0,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

批量sql:

-- 插入
INSERT INTO user(name, id) VALUES('hello',1);
INSERT INTO user(name, id) VALUES('world',2);

-- 更新
update user set name='hello1' where id=1;
update user set name='world1' where id=2;
(2)合成sql

多个sql合成一个提交:

-- 插入: 这里用自增id做主键
INSERT INTO user(name, age) VALUES
('hello',10),
('world',20);

-- 更新: 利用 case-when-then 合成 
update user set 
  name= case 
    when id=1 then 'hello1' 
    when id=2 then 'world1'
  end
  where id in (1,2); -- 这里也可以用or,但是可能会索引失效。

二、常见ORM的批量操作

1.JPA

批量更新、批量插入都是saveAll,demo:

List<User> userList = // 获取用户列表
userRepository.saveAll(userList);

源码:

// org.springframework.data.jpa.repository.support.SimpleJpaRepository
@Transactional
public <S extends T> List<S> saveAll(Iterable<S> entities) {
    Assert.notNull(entities, "Entities must not be null!");
    List<S> result = new ArrayList();
    Iterator var3 = entities.iterator();

    while(var3.hasNext()) {
        S entity = var3.next();
        result.add(this.save(entity));
    }

    return result;
}

这里可以看到这个saveAll是"假"的,实现方式是循环调用save(),大量数据提交的话,会有性能问题。

2.mybatis-plus 批量更新

(1)手动批量flush
List<Task> taskList = // 获取任务列表

try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH)) {
  TaskMapper taskMapper = sqlSession.getMapper(TaskMapper.class);

  for (Task task : taskList) {
    taskMapper.updateById(task);
  }

  sqlSession.flushStatements();// 提交一批sql
  sqlSession.commit();
}

处理方式是多个sql一次提交。

(2)updateBatchById

来自mybatis-plus-extension-3.2.0.jar,下是调用demo:

public interface UserService extends IService<User> {
    void updateBatchUserById(List<User> list);
}

@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {

    @Override
    public void updateBatchUserById(List<User> list) {
        this.updateBatchById(list);
    }
}

ServiceImpl 里的 this.updateBatchById源码如下:

@Transactional(
    rollbackFor = {Exception.class}
)
public boolean updateBatchById(Collection<T> entityList, int batchSize) {
    ...
    String sqlStatement = this.sqlStatement(SqlMethod.UPDATE_BY_ID);
    // UPDATE_BY_ID("updateById", "根据ID 选择修改数据", "<script>\nUPDATE %s %s WHERE %s=#{%s} %s\n</script>"),    
    SqlSession batchSqlSession = this.sqlSessionBatch();

    try {
        int i = 0;
        // 分批
        for(Iterator var7 = entityList.iterator(); var7.hasNext(); ++i) {
            T anEntityList = var7.next();
            ParamMap<T> param = new ParamMap();
            param.put("et", anEntityList);  // 这里有坑 anEntityList不能为null
            batchSqlSession.update(sqlStatement, param);
            if (i >= 1 && i % batchSize == 0) {
                batchSqlSession.flushStatements(); //提交sql
            }
        }
        batchSqlSession.flushStatements();//提交最后一批sql
        return true;
    } catch (Throwable var17) {
        // 异常转换
    } finally {
        // 关闭会话
    }
}

思路其实和上面(1)的批量flush是一样的。也可以自定义sql:

<update id="batchUpdate" parameterType="java.util.List">
    <foreach collection="list" item="item" separator=";">
        update user set name = #{item.name} where id = #{item.id}
    </foreach>
</update>
(3)更进一步:合成一个sql

提交的一批sql还能合并吗?使用case-when-then

<update id="updateBatch" parameterType="java.util.List" >
    update user
    <trim prefix="set" suffixOverrides=",">
        <trim prefix="name=case" suffix="end,">
            <foreach collection="list" item="i" index="index">
                <if test="i.name != null and i.name != ''">
                    when id=#{i.id} then #{i.name}
                </if>
            </foreach>
        </trim>
    </trim>
    where 
    <foreach collection="list" separator="or" item="i" index="index" >
        id = #{i.id}
    </foreach>
</update>

如果直接提交10万个更新的sql,也是可以的。有可能出的问题:

  • (1)jvm 字符串太大,OOM。
  • (2)数据库限制条数或者提交sql的数据大小。

参考:Mybatis中进行批量更新(updateBatch)

3.mybatis-plus 批量插入

(1)对象操作saveBatch

demo:

List<User> userList = new ArrayList<>();
userList.add(new User(1L, "Tom"));
userList.add(new User(2L, "Jerry"));
userList.add(new User(3L, "Mike"));
int result = userMapper.saveBatch(userList);

源码:

@Transactional(
    rollbackFor = {Exception.class}
)
public boolean saveBatch(Collection<T> entityList, int batchSize) {
    String sqlStatement = this.sqlStatement(SqlMethod.INSERT_ONE);//INSERT_ONE("insert", "插入一条数据(选择字段插入)", "<script>\nINSERT INTO %s %s VALUES %s\n</script>"),    
    SqlSession batchSqlSession = this.sqlSessionBatch();
    Throwable var5 = null;
    try {
        int i = 0;
        // 分批
        for(Iterator var7 = entityList.iterator(); var7.hasNext(); ++i) {
            T anEntityList = var7.next();
            batchSqlSession.insert(sqlStatement, anEntityList);
            if (i >= 1 && i % batchSize == 0) {
                batchSqlSession.flushStatements();// 提交sql
            }
        }
        batchSqlSession.flushStatements();// //提交最后一批sql
        return true;
    } catch (Throwable var16) {
        // 异常处理
    } finally {
        // 关闭会话
    }
}

这里比较遗憾,mybatis-plus的插入不会合成一个sql,只是一次提交多个插入的sql。

#####(2)合成一个sql 脚本如下:

<insert id="batchInsert" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id" flushCache="true">    
    insert into user (name) values
    <foreach item="item" index="index" collection="list" separator=",">
        (#{item.name})
    </foreach>
</insert>

参考: 数据库批量插入这么讲究的么? MyBatis批量插入的五种方式,哪种最强

4.jdbcTemplate

(1)手动拼接

这种方式比较原始,需要把每个更新的insert/update sql语句写好,拼成一个大的sql提交。

(2)批量操作

demo如下:

List<User> userList = // 获取用户列表

String sql = "update user set age=? where id=?";
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
    @Override
    public void setValues(PreparedStatement ps, int i) throws SQLException {
        User user = userList.get(i);
        ps.setInt(1, user.gee());
        ps.setLong(2, user.getId());
    }

    @Override
    public int getBatchSize() {
        return userList.size();
    }
});

源码:

public int[] batchUpdate(String sql, BatchPreparedStatementSetter pss) throws DataAccessException {
    int[] result = (int[])this.execute(sql, (ps) -> { // execute执行sql
        try {
            ...
            if (JdbcUtils.supportsBatchUpdates(ps.getConnection())) {
                int ixx = 0;
                while(true) {
                    if (ixx < batchSize) {
                        pss.setValues(ps, ixx);
                        if (ipss == null || !ipss.isBatchExhausted(ixx)) {
                            ps.addBatch(); // 添加
                            ++ixx;
                            continue;
                        }
                    }
                    // 获取预编译参数
                    int[] var11 = ps.executeBatch();
                    return var11;
                }
            } else {
                List<Integer> rowsAffected = new ArrayList();
                for(int i = 0; i < batchSize; ++i) {
                    pss.setValues(ps, i);
                    if (ipss != null && ipss.isBatchExhausted(i)) {
                        break;
                    }
                    rowsAffected.add(ps.executeUpdate()); // 添加
                }
                int[] rowsAffectedArray = new int[rowsAffected.size()];
                for(int ix = 0; ix < rowsAffectedArray.length; ++ix) {
                    rowsAffectedArray[ix] = (Integer)rowsAffected.get(ix);
                }
                // 获取预编译参数
                int[] var13 = rowsAffectedArray;
                return var13;
            }
        } finally {
            // 清理
        }
    });
    Assert.state(result != null, "No result array");
    return result;
}

看源码,原理一样的~最后都是拼接一批sql,JdbcTemplate只是提供了辅助拼接的工具。

三、总结

  • 时刻记住,尽量不要循环操作数据库。大量操作数据库,要分批处理。
  • 功能上线后要关注接口的性能,考虑把时间过长的请求纳入告警。
  • 要加强代码review,代码静态分析工作。
转载自:https://juejin.cn/post/7247787521392377911
评论
请登录