likes
comments
collection
share

批处理 :从ElasticSearch看批处理的性能优势

作者站长头像
站长
· 阅读数 18

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜 文章合集 : 🎁 juejin.cn/post/694164… Github : 👉 github.com/black-ant CASE 备份 : 👉 gitee.com/antblack/ca…

一. 前言

ES 优化方案网上能找到很多,生产中也会直接按照方案进行配置。但是,还是有必要搞清楚这样优化的原理的。

最常见的优化方案包括 : 批量保存修改 refresh 刷新间隔。这一篇先来全方面看看批量保存。

二. 性能直观对比

基本代码

public void addSingle() {
    logger.info("测试创建");
    for (int i = 0; i < 3000; i++) {
        AOrder order = createOrderA(i);
        aorderRepository.save(order);
        logger.info("------> 保存 :{} <-------", order);
    }
}

public void addBatch() {
    logger.info("测试创建");
    List<AOrder> orderList = new ArrayList<>();
    for (int i = 0; i < 3000; i++) {
        AOrder order = createOrderA(i);
        orderList.add(order);
    }
    List<List<AOrder>> splitList = CollectionUtil.split(orderList, 10);
    splitList.forEach(itemList -> {
        aorderRepository.saveAll(itemList);
        logger.info("------> 保存 :{} <-------", itemList.size());
    });

}

两次执行的时间差 :

c.g.s.e.demo.service.TestService         : ------> 结束 :171276 <-------
c.g.s.e.demo.service.TestService         : ------> 结束 :23253 <-------

// BulkProcessor 的会更快,但是考虑到这是一个异步操作,不作为参考
// PS :这里我执行完就去刷新了数量,多也不会多多少
c.g.s.e.demo.service.TestService         : ------> 结束 :1149 <-------

同样是3000条数据,相同的环境相同的服务器,这完全不是一个量级的性能

再看看服务器性能损耗

批处理 :从ElasticSearch看批处理的性能优势

前面一次是单条写入,后面一个波段是批量写入,从这个图不能得出几个结论:

  • 带宽使用率低,说明网络损耗在建立连接上,连接创建慢
  • CPU 明显使用更高,峰值更大

三. 批量保存的方式

上面看的是 Spring Repository 批量保存的方式,其原理还是对ES底层API进行了封装,ES 主要有2种批量保存的方式 :

  • Bulk API : 使用 Bulk API 向 Elasticsearch 发送一个包含多个索引、更新或删除操作的请求,以减少网络开销和提高性能
  • Bulk Processor : 批量处理器将操作缓冲在内存中,然后根据指定的条件(如文档数量、大小或时间间隔)自动触发批量提交

Bulk API 和 Bulk Processor 的区别

Bulk Processor:

  • Bulk Processor 是 Elasticsearch 提供的一个高级客户端库中的组件,用于处理大量的索引、更新或删除操作。
  • Bulk Processor 通过将操作缓冲在内存中,并根据指定的条件(如文档数量、大小或时间间隔)自动触发批量提交,以控制批量操作的执行。
  • Bulk Processor 提供了一些额外的功能,如并发控制、重试机制和错误处理
  • 使用 Bulk Processor 可以更方便地管理和监控批量操作的执行过程,并且可以根据需要进行动态调整。

Bulk API:

  • Bulk API 是 Elasticsearch 提供的一种原生的接口,用于执行批量操作
  • Bulk API 允许你将多个索引、更新或删除操作组合到一个请求中,以减少与 Elasticsearch 的通信次数。
  • 使用 Bulk API,你可以发送一个包含多个操作的 JSON 请求,其中每个操作都包含操作类型(索引、更新或删除)和对应的文档数据。
  • Bulk API 适用于简单的批量操作需求,对于更复杂的操作和控制,可能需要使用更高级的客户端库或自定义逻辑。

Bulk API 原理

Bulk API 主要是基于请求的批处理,可以理解为在一个请求中写入一个数组性质的数据。以ES 为例,在使用SpringDataElasticSearch 组件时,最终调用的其实就是 BulkAPI :

public class BulkRequest {
    // 最重要的关键就是这里构建的List
    final List<DocWriteRequest<?>> requests = new ArrayList<>();
}

// 后面发送就平平无奇了,就是个简单的 HTTP 请求
private final CloseableHttpAsyncClient client;
httpResponse = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null).get();

Bulk Processor 原理

public void addProcessor() {
    RestHighLevelClient client = new RestHighLevelClient(
            RestClient.builder(new HttpHost("81.69.59.111", 9200, "http")));

    BulkProcessor bulkProcessor = BulkProcessor.builder(
                    (request, bulkListener) ->
                            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
                    new BulkProcessor.Listener() {
                        @Override
                        public void beforeBulk(long executionId, BulkRequest request) {
                            // 在执行批量操作之前调用
                        }

                        @Override
                        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                            // 在执行批量操作之后调用,可以处理响应结果
                        }

                        @Override
                        public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                            // 在执行批量操作出现错误时调用
                        }
                    })
            .setBulkActions(1000) // 设置触发批量操作的文档数量
            .setConcurrentRequests(1) // 设置并发请求数量
            .setFlushInterval(TimeValue.timeValueSeconds(5)) // 设置自动刷新间隔
            .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3)) // 设置重试策略
            .build();

    for (int i = 0; i < 3000; i++) {
        AOrder order = createOrderA(i);
        String json = JSONObject.toJSONString(order);
        IndexRequest request = new IndexRequest("order").source(json, XContentType.JSON);
        bulkProcessor.add(request);
        logger.info("------> 保存 :{} <-------", order);
    }

    bulkProcessor.close();
    try {
        client.close();
    } catch (IOException e) {
        e.printStackTrace();
    }

}

和 API 不同是 ,BulkProcessor 属于缓存+批量处理。会根据配置的触发数量进行批量触发

// 传入 Consumer
 client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),

// 判断条件后,构建 BulkRequest
private Tuple<BulkRequest,Long> newBulkRequestIfNeeded(){
    ensureOpen();
    if (!isOverTheLimit()) {
        return null;
    }
    // 内部还是构建的 bulkConsumer
    final BulkRequest bulkRequest = this.bulkRequest;
    this.bulkRequest = bulkRequestSupplier.get();
    return new Tuple<>(bulkRequest,executionIdGen.incrementAndGet()) ;
}


// 判断是否到了处理的条件
private boolean isOverTheLimit() {
    if (bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) {
        return true;
    }
    if (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize) {
        return true;
    }
    return false;
}

流程很简单,主要是回调,BulkPorcessor 更像一个批量处理的工具类,其实调用和回调都是基于 consumer 实现的。

四. 深度思考

4.1 批处理节约了哪些消耗

网络消耗和 IO

IO 和 网络这2点其实是性能层面消耗最大的,通常我们在代码逻辑优化到一定地步后,这2座大山是一定会面对的.

  • 首先节约的就是建立连接消耗的资源 ,为什么建立连接要消耗资源,想想连接池存在的原因就清楚了。
  • 其次主要是磁盘IO, 将多个操作合并成同一个,在一些特定操作中可以减少磁盘IO操作的次数

CPU 节省的点

ES 中 CPU 消耗次要的就是一些网络,IO 这些重复操作带来的CPU损耗,这个损耗有,但是不多。

而批处理场景下节省的应该是上下文的切换,因为是一次批处理,读写应该是顺序的。 同时在ID的生成上也是顺序的。

顺序IO的性能是远大于随机IO的。(个人认为减少的原因大概率是这点)

同时批量处理在合并段和索引创建上应该都会有好处

总结

主要是借着 ES 的优化好好思考了一下批处理带来的好处。其实不针对于 ES ,在很多其他的场景中,批处理产生作用的原理都是一致的 ,无非就是 :

  • IO 和 网络消耗
  • CPU 损耗及上下文切换
  • 磁盘损耗和顺序IO
  • 原子处理及内存复用,共享了内存,减少了重复操作

当然还有些杂七杂八的节约:

  • 例如一次事务和多次事务带来的性能差距,对资源的锁定范围等
  • 额外的并发消耗,这涉及到多线程情况下的切换,也属于上下文切换,但是和CPU的那个维度不同
  • 重复的刷新操作,这种就是代码维度的节省了

这一篇主要的目的是说清楚批处理的好处 ,然后在高性能的系统中,要养成批处理的思维。

因为等你真的写多了这种系统,就能明确感知到不同处理方式带来的性能差距!!!