likes
comments
collection

Canal和Kafka整合方案——解决Canal写入Kafka并发消费问题

作者站长头像
站长
· 阅读数 10

一、问题描述

在使用Canal读取binlog来对数据库增量进行同步时遇到一下几个问题

  • 首先是在使用Canal自带客户端进行同步时需要自己手动调用get()或者getWithoutAck()进行拉取
  • 拉取日志后进行同步只能一条一条处理,效率比较低
  • 如果处理日志过慢或者其他原因容易导致日志堆积在服务器

为了解决上面的问题打算在日志同步过程中引入MQ来作为中间同步,Canal支持RocketMQ和Kafka两种,最终选用Kafka来进行

二、引入Kafka

1.Canal整合Kafka及项目初步搭建

Kafka与Canal整合在Canal搭建中已经介绍过,整体搭建过程不变。

  • 这次以一组房间数据作为示例,数据库:demo
  • 表名:pricing_house_info
  • Kafka的topic:house-topic

完成Canal搭建和整合Kafka之后,启动Canal,然后往pricing_house_info表中插入数据,并在Kafka消费数据,搭建SpringBoot项目并整合Kafka(SpringBoot整合Kafka在之前的博文中有描述:SpringBoot整合Kafka),然后完成相关配置后启动项目在Kafka控制台中看到Canal同步过来的数据表示搭建成功,并且在项目中也成功消费。

2.整合Kafka后引出新问题

此时如果按照这样的方式,确实能够缓解Canal服务器压力以及不再手动拉取日志的问题,但是这样的方式依然是在一条一条的消费消息,性能并未得到提升。

如何解决这样的问题?首先肯定想到的是多线程并发消费,如果我们单纯地用多线程并发消费的话并不能保证消息的有序性,这种binlog日志同步是需要严格有序性的,否则会导致数据错乱。那有没有办法能够保证顺序的情况下并发消费呢?答案是有的,即将指定数据发送到指定分区当中,然后起多个消费者消费不同分区的数据即可,并且Canal提供写入指定分区的配置

三、最终方案

1.修改Canal配置文件

在Canal配置文件中的mq config里面配置如下

# mq config
canal.mq.topic=house-topic
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
# canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=3
canal.mq.partitionHash=demo\\.pricing_house_info:house_code

这里面主要配置了canal.mq.partitionsNum和canal.mq.partitionHash两个参数,他们的意思如下:

  • canal.mq.partitionsNum:指定当前topic的分区数
  • canal.mq.partitionHash:指定到分区的分区规则,可以细化到字段

这里我们指定topic有3个分区,并且使用pricing_house_info表中的house_code字段来进行划分,即让相同house_code的数据全部到一个分区当中

2.修改项目代码

原先项目中只有一个消费,现在再添加两个消费的方法,让三个消费者能够消费不同分区的数据,通过@TopicPartition注解指定topic和对应的分区,并且可以同时消费多个分区的数据,三个消费者的groupId一定要保持一致,因为Kafka指定在一个group里面一条partition的消息只能被一个消费者消费

@Component
public class MessageListener {

    @KafkaListener(topicPartitions = {@TopicPartition(topic = "house-topic",partitions = {"0"})}, groupId = "house-consumer-group")
    public void partition1(ConsumerRecord<?, ?> record){
        receive(record);
    }

    @KafkaListener(topicPartitions = {@TopicPartition(topic = "house-topic",partitions = {"1"})}, groupId = "house-consumer-group")
    public void partition2(ConsumerRecord<?, ?> record){
        receive(record);
    }

    @KafkaListener(topicPartitions = {@TopicPartition(topic = "house-topic",partitions = {"2"})}, groupId = "house-consumer-group")
    public void partition3(ConsumerRecord<?, ?> record){
        receive(record);
    }
    private void receive(ConsumerRecord<?, ?> record){
        final String value = (String) record.value();
        FlatMessage flatMessage = JSONObject.parseObject(value, FlatMessage.class);
        final String houseCode = flatMessage.getData().get(0).get("house_code");
        System.out.println("分区:"+record.partition()+"\t接收到数据的code:"+houseCode+"\t操作类别:"+flatMessage.getType());
    }
}

通过这样的方式我们可以确保相同house_code的数据到同一个分区被同一个消费者有序消费且只消费一次,这样即可达到目的

3.整体架构

Canal和Kafka整合方案——解决Canal写入Kafka并发消费问题

4.结果验证

全部配置修改好后重启Canal和项目,然后往数据库里写数据,控制台结果如下:

分区:0	接收到数据的code:CD000001	操作类别:INSERT
分区:1	接收到数据的code:CD000002	操作类别:INSERT
分区:0	接收到数据的code:CD000003	操作类别:INSERT
分区:1	接收到数据的code:CD000004	操作类别:INSERT
分区:2	接收到数据的code:CD000005	操作类别:INSERT
分区:0	接收到数据的code:CD000006	操作类别:INSERT
分区:2	接收到数据的code:CD000007	操作类别:INSERT
分区:0	接收到数据的code:CD000008	操作类别:INSERT
分区:1	接收到数据的code:CD000009	操作类别:INSERT
分区:0	接收到数据的code:CD000010	操作类别:INSERT

可以看到不同house_code的数据被写入了不同的分区,并且三个消费者消费到了来自三个分区的数据,此时如果在house_code为CD000010的数据上进行修改,得到如下结果:

分区:0	接收到数据的code:CD000010	操作类别:UPDATE
分区:0	接收到数据的code:CD000010	操作类别:UPDATE
分区:0	接收到数据的code:CD000010	操作类别:UPDATE
分区:0	接收到数据的code:CD000010	操作类别:UPDATE

此时更进一步证明了同一个partition只存固定的house_code的数据,保证了单个partition和消费者的消息有序性。

四、总结思考

利用Canal将数据根据字段写入不同分区且消费者消费指定分区数据,增加了消费的吞吐量,并且保证了单个消费者的消息有序性以及单条记录(同一house_code的数据)的处理有序性,本方案是在单行数据基础上来进行分区匹配的,还可以在表和数据库的基础上进行分区匹配,修改Canal参数即可。

这个方案是通过增加消费者来提高MQ的吞吐量,使数据处理更快实时性更高,如果要更进一步提高吞吐量还有什么方法呢?这里我能够想到的是在Consumer这里接收到消息后将消息放入队列中而不是直接处理,然后再根据单个Consumer中该队列中的house_code并发消费,结构如下:

Canal和Kafka整合方案——解决Canal写入Kafka并发消费问题

但是就目前而言上面的方案已经足以解决问题,就不需要再将程序搞复杂了,待以后确实需要性能优化再考虑这样的方案吧

五、参考

Canal官方文档提供的相关配置

canal.mq.partitionHash 表达式说明:

canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔

  • 例子1:test\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
  • 例子2:.\..:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
  • 例子3:.\..:pkpkpk 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
  • 例子4: 匹配规则啥都不写,则默认发到0这个partition上
  • 例子5:.\.. ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名 按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
  • 例子6: test\.test:id,.\..* , 针对test的表按照id散列,其余的表按照table散列 注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)

mq顺序性问题

binlog本身是有序的,写入到mq之后如何保障顺序是很多人会比较关注,在issue里也有非常多人咨询了类似的问题,这里做一个统一的解答

  1. canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择
  2. canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区
  • canal.mq.dynamicTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默认topic name
  • canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做到按表级做分区、pk级做分区等
  1. canal的消费顺序性,主要取决于描述2中的路由选择,举例说明:
  • 单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢,单分区的性能写入大概在2~3k的TPS
  • 多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题
  • 单topic、多topic的多分区,如果用户选择的是指定table的方式,那和第二部分一样,保障的是表级别的顺序性(存在热点表写入分区的性能问题),如果用户选择的是指定pk hash的方式,那只能保障的是一个pk的多次binlog顺序性 ** pk hash的方式需要业务权衡,这里性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产生业务处理错乱的情况. 如果有pk变更,pk变更前和变更后的值会落在不同的分区里,业务消费就会有先后顺序的问题,需要注意