一起来学kafka之整合SpringBoot深入使用(一)
前言
目前正在出一个Kafka专题
系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~
本节给大家讲一下Kafka整合SpringBoot
中如何进行消息应答以及@SendTo 和 @KafkaListener
的讲解~
好了, 废话不多说直接开整吧~
消息应答
有时候,消费者消费消息的时候,我们需要知道它有没有消费完,需要它给我们一个回应,该怎么做呢? 我们可以通过提供的ReplyingKafkaTemplate
, 下面通过一个例子来体验一下,新建一个ReceiveCustomerController
@Slf4j
@RestController
public class ReceiveCustomerController {
private static final String topic = "hello3";
private static final String topicCroup = "hello3Group";
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer(topic + "_replies");
repliesContainer.getContainerProperties().setGroupId(topicCroup);
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> producerFactory, ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate(producerFactory, repliesContainer);
}
@Bean
public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> producerFactory) {
return new KafkaTemplate(producerFactory);
}
@Autowired
private ReplyingKafkaTemplate kafkaReplyTemplate;
@GetMapping("/send/{msg}")
@Transactional(rollbackFor = RuntimeException.class)
public void sendMsg(@PathVariable String msg) throws Exception {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
RequestReplyFuture<String, String, String> replyFuture = kafkaReplyTemplate.sendAndReceive(record);
ConsumerRecord<String, String> consumerRecord = replyFuture.get();
log.info("customer reply >>>> {}: ", consumerRecord.value()); // customer reply >>>>listen: I do it >>> 1:
}
@KafkaListener(id = topicCroup, topics = topic)
@SendTo
public String listen(String msg) {
log.info("listen receive msg >>> {}", msg); // listen receive msg >>> 1
return "listen: I do it >>> " + msg;
}
}
启动应用,测试一下,观察控制台的变化~
// listen receive msg >>> 1
// customer reply >>>>listen: I do it >>> 1:
@SendTo
在 Spring Kafka
中,@SendTo
注解可以用于指定消息被发送到的目标 Topic
。当消费者成功消费一个消息后,可以将结果发送到指定的目标 Topic
,以供其他消费者进一步处理。
@SendTo
注解可以应用于 Kafka
消费者方法上,以指定消息的处理结果将被发送到哪个 Topic
。下面通过一个例子来演示一下如何进行消息的转发~
@Slf4j
@RestController
public class SendToController {
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
// 接收消息
@Transactional(rollbackFor = Exception.class)
@KafkaListener(id = "input", topics = "inputTopic")
@SendTo("outputTopic")
public String processMessage(String message) {
// 处理消息并返回结果
log.info("inputTopic >>>> {}", message); // inputTopic >>>> 1
return "2";
}
@KafkaListener(id = "output", topics = "outputTopic")
public String process1Message(String message) {
// 处理消息并返回结果
String result = "Processed message: " + message;
log.info("outputTopic >>>> {}", result); // outputTopic >>>> Processed message: 2
return result;
}
@Transactional(rollbackFor = Exception.class)
@GetMapping("/hello")
public String hello() {
// 发送消息
kafkaTemplate.send("inputTopic", "1");
return "hello";
}
}
观察控制台的日志信息
inputTopic >>>> 1
outputTopic >>>> Processed message: 2
可以看到消息被转发到outputTopic
并且被output
消费者成功消费
@KafkaListener
@KafkaListener
是一个注解
,用于标记一个方法作为 Kafka 消费者
。在 Spring Boot
应用程序中,使用该注解可以方便地处理Kafka
消息。
@KafkaListener
注解可以添加到类级别或方法级别。在类级别添加注解,将指定默认的 Topic
和消费者组 ID
。在方法级别添加注解,则可以使用不同的 Topic
和消费者组 ID
。
在前面的几个例子中,带大家已经体验过了,但都是监听一个topic
,那么如何去监听多个topic
呢? 其实很简单,下面通过一个例子来体验下
@Slf4j
@RestController
public class ListenerController {
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@Transactional(rollbackFor = Exception.class)
@GetMapping("/hello")
public String hello() {
// 发送消息
kafkaTemplate.send("topic1", "1");
kafkaTemplate.send("topic2", "2");
return "hello";
}
/**
* 监听多个topic
* @param message
*/
@KafkaListener(topics = {"topic1", "topic2"}, groupId = "group1")
public void listen(String message) {
log.info("Received message >>> {}", message);
// Received message >>> 1
// Received message >>> 2
}
}
如上,我们发送了两条不同的消息topic1
和topic2
,通过指定topics = {"topic1", "topic2"}
成功消费两条消息
下面一起看一下,@KafkaListener
还支持哪些参数?
@KafkaListener
注解支持许多参数,以满足不同的使用场景。以下是常用参数的列表:
topics
:指定要消费的Topic
的名称,可以是字符串或字符串数组。必填参数。groupId
:指定消费者组ID
。消费者组是一组共享相同Topic
的消费者的集合。默认值为""
,表示使用默认的消费者组ID
。containerFactory
:指定要使用的KafkaListenerContainerFactory
实例的名称。如果没有指定,将使用默认的KafkaListenerContainerFactory
实例。concurrency
:指定要创建的并发消费者的数量。默认值为1
。autoStartup
:指定容器是否在应用启动时自动启动。默认值为true
。id
:指定监听器的唯一标识符。默认值为""
。errorHandler
:指定在处理消息时出现异常时要使用的ErrorHandler
实例。properties
:指定传递给消费者工厂的Kafka
消费者配置属性的Map
。partitionOffsets
: 是一个Map
类型的参数,该参数用于指定要从Topic
的每个分区
的哪个偏移量
开始消费消息
有几个参数很好理解,没啥好讲的,我们主要看一下containerFactory
,errorHandler
, partitionOffsets
containerFactory
前面我们使用的都是默认
的消息监听器
,在 Spring Kafka
中,Kafka 消费者
可以使用不同的消息监听器容器
,例如 ConcurrentKafkaListenerContainerFactory
、KafkaMessageListenerContainer
等。每个容器都提供了不同的功能和配置选项,可以根据实际需求进行选择和配置。
如果你需要自定义 Kafka 消费者
的配置选项,可以通过在 Spring Boot
配置文件中设置属性来实现。另外一种方法是通过创建 KafkaListenerContainerFactory bean
并配置其属性来实现。
下面通过一个例子来体验一下ConcurrentKafkaListenerContainerFactory
~
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
//也可以设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
//factory.setBatchListener(true);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
在上面的示例中,我们创建了一个 ConcurrentKafkaListenerContainerFactory
的 bean
,并配置了一些属性,例如:
- 使用
DefaultKafkaConsumerFactory
作为消费者工厂
; - 设置并发消费者数量为
3
; - 设置轮询超时时间为
3000 毫秒
。
那么如何使用呢?很简单,只需之指定containerFactory
就好了
@Transactional(rollbackFor = Exception.class)
@GetMapping("/hello1")
public String hello1() {
// 发送消息
kafkaTemplate.send("topic3", "3");
return "hello1";
}
@KafkaListener(topics = "topic3", containerFactory = "kafkaListenerContainerFactory")
public void processMessage(String message) {
// 处理消息
log.info("hello1 >>>>> {}", message);
}
我们可以通过containerFactory
应用于不同的消费者方法和主题,以满足不同的需求
errorHandler
在 ·Spring Kafka· 中,·ErrorHandler· 接口定义了在处理 Kafka
消息时发生错误时如何处理异常。如果不配置 ErrorHandler
,则默认使用 LoggingErrorHandler
将异常记录到日志中。
如果你需要自定义异常处理逻辑,可以通过实现 ErrorHandler
接口并配置其 bean
来实现。
下面通过一个例子来体验一下:
@Slf4j
@Component
public class MyErrorHandler implements KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException e) {
// 处理异常,例如打印错误日志、发送错误消息等,自定义逻辑处理
log.error("Error occurred while processing message: {}", e.getMessage());
return null;
}
}
使用错误处理器:
@Transactional(rollbackFor = Exception.class)
@GetMapping("/hello2")
public String hello2() {
// 发送消息
kafkaTemplate.send("topic4", "4");
return "hello2";
}
@KafkaListener(id = "topic4Group", topics = "topic4", errorHandler = "myErrorHandler")
public void processMessage1(String message) {
// 处理消息
if(true)
throw new RuntimeException("消息处理异常");
log.info("hello2 >>>>> {}", message);
}
观察控制台:
Error occurred while processing message: Listener method 'public void com.kafka.study.controller.ListenerController.processMessage1(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: 消息处理异常
结束语
本节还遗留一个partitionOffsets
这个我们放到下节给大家讲,涉及到分区 partition
和偏移量 offset
的概念,不是很好理解,下节给大家好好理一下这个概念~
本着把自己知道的都告诉大家,如果本文对您有所帮助,点赞+关注
鼓励一下呗~
相关文章
项目源码(源码已更新 欢迎star⭐️)
ElasticSearch 专题学习
项目源码(源码已更新 欢迎star⭐️)
往期并发编程内容推荐
- Java多线程专题之线程与进程概述
- Java多线程专题之线程类和接口入门
- Java多线程专题之进阶学习Thread(含源码分析)
- Java多线程专题之Callable、Future与FutureTask(含源码分析)
- 面试官: 有了解过线程组和线程优先级吗
- 面试官: 说一下线程的生命周期过程
- 面试官: 说一下线程间的通信
- 面试官: 说一下Java的共享内存模型
- 面试官: 有了解过指令重排吗,什么是happens-before
- 面试官: 有了解过volatile关键字吗 说说看
- 面试官: 有了解过Synchronized吗 说说看
- Java多线程专题之Lock锁的使用
- 面试官: 有了解过ReentrantLock的底层实现吗?说说看
- 面试官: 有了解过CAS和原子操作吗?说说看
- Java多线程专题之线程池的基本使用
- 面试官: 有了解过线程池的工作原理吗?说说看
- 面试官: 线程池是如何做到线程复用的?有了解过吗,说说看
- 面试官: 阻塞队列有了解过吗?说说看
- 面试官: 阻塞队列的底层实现有了解过吗? 说说看
- 面试官: 同步容器和并发容器有用过吗? 说说看
- 面试官: CopyOnWrite容器有了解过吗? 说说看
- 面试官: Semaphore在项目中有使用过吗?说说看(源码剖析)
- 面试官: Exchanger在项目中有使用过吗?说说看(源码剖析)
- 面试官: CountDownLatch有了解过吗?说说看(源码剖析)
- 面试官: CyclicBarrier有了解过吗?说说看(源码剖析)
- 面试官: Phaser有了解过吗?说说看
- 面试官: Fork/Join 有了解过吗?说说看(含源码分析)
- 面试官: Stream并行流有了解过吗?说说看
推荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)
博客(阅读体验较佳)
转载自:https://juejin.cn/post/7210584884322549797