🐳 一文学会 《RabbitMQ》 的消息限流、消息超时
一、什么是消息限流
RabbitMQ的消息限流是对其应用程序或服务中传递的消息数量加以约束的措施。此类限制旨在调控消息流量,避免出现发送过量或滥用的情况,从而确保整体系统的稳健性和工作效率。
实现消息限流的方法多种多样,举例如下:
- 时间间隔限制:通过设定具体的时间段,对该时段内可发送的消息数量进行限制。
- 速率限制策略:以每秒或每分钟为基准,对发送的消息数量进行把控。
- 总量控制限制:在特定的时间框架内,对发送的消息总数进行限制。
这些限制措施可根据实际需求和系统结构进行灵活配置与调整。消息限流的核心目的在于均衡系统资源的使用,防止因消息过多而导致的拥堵或资源过度消耗,同时确保其他用户或服务能够正常运作。通过这种方式,可以有效提升系统的整体稳定性和服务质量。
此外,在进行消息限流的同时,也需要注意如何妥善处理因限流而产生的潜在问题,如消息的排队、延迟或丢弃等,以确保系统的顺畅运行和用户体验的保障。
二、什么是消息超时
RabbitMQ的消息超时,是指为队列中的消息设定一个具体的存活时间(Time-To-Live,简称TTL)。这一机制在消息队列管理中起着至关重要的作用。通过设定TTL,RabbitMQ能够在消息达到指定的存活时间后自动将其从队列中移除,从而确保消息的时效性和系统的资源利用率。
消息超时的功能在多种场景下具有广泛的应用价值。例如,在某些业务场景中,过时的消息可能不再具有处理价值,甚至可能导致信息误导。通过设定合理的TTL,可以及时清理这些无效消息,防止其堆积在队列中占用宝贵的系统资源。此外,TTL还可用于实现消息的延时处理,满足特定业务需求,如定时任务、缓存更新等。
在RabbitMQ中,消息超时的设置非常灵活。用户既可以为整个队列设置统一的TTL,以便为队列中的所有消息定义默认的过期时间;也可以为单条消息单独设置TTL,以实现更精细化的控制。这种灵活性使得RabbitMQ能够轻松应对各种复杂的业务场景,提升系统的整体性能和可靠性。
三、代码实现
3.1 解决消息限流
① 导入pom文件
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
② 编写yaml
server:
port: 8080
spring:
rabbitmq:
host: 192.168.118.130
port: 5672
username: guest
password: 123456
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 把消息确认模式改为手动确认
#prefetch: 1 # 每次从队列中取回消息的数量
logging:
level:
com.bottom.mq.listener.ConsumerListener: info
```ottom.mq.listener.ConsumerListener: info
③ 新建ConsumerListener消费者监听器
@Component
@Slf4j
public class ConsumerListener {
public static final String EXCHANGE_Name = "limit.direct";
public static final String ROUTING_KEY = "limit";
public static final String QUEUE_NAME = "limit";
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_NAME, durable = "true"),
exchange = @Exchange(value = EXCHANGE_Name),
key = {ROUTING_KEY}))
public void processMessage(String dataString, Message message, Channel channel) throws IOException, InterruptedException {
TimeUnit.SECONDS.sleep(1);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("消费端接收到了消息:" + dataString);
}
}
④ 新建另一个spirngboot项目,改启动端口8081
⑤ 在新建的springboot新建LimitingController
@RestController
@RequestMapping(value="/limiting")
public class LimitingController {
public static final String EXCHANGE_DIRECT = "limit.direct";
public static final String ROUTING_KEY = "limit";
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping( "/without")
public String send(){
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY, "测试无消息限流" + i);
}
return "";
}
}
⑥ 用IDEA自带的API工具发送这个请求
启动springboot:8081项目
⑦启动8080端口,观察队列消费情况
⑧ 修改8080端口的yaml,重启8081发送100条消息,然后再重启8080,观察队列消费情况
3.2 解决消息超时
① 给队列层面:在队列层面设定消息的过期时间
并不是队列的过期时间。意思是这个队列中的消息全部使用同一个过期时间。
②消息本身:给具体的某个消息设定过期时间
@RestController
@RequestMapping(value="/limiting")
public class LimitingController {
public static final String EXCHANGE_DIRECT = "limit.direct";
public static final String ROUTING_KEY = "limit";
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping( "/without")
public String send(){
for (int i = 0; i < 100; i++) {
// 创建消息后置处理器对象
MessagePostProcessor postProcessor = message -> {
// 设置消息的过期时间,单位是毫秒
message.getMessageProperties().setExpiration("9000");
return message;
};
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY, "测试设置消息过期时间" + i,postProcessor);
}
return "";
}
}
如果两个层面都做了设置,那么哪个时间短,哪个生效
四、总结
本文深入探讨了RabbitMQ中的消息限流与消息超时两大关键机制。消息限流旨在通过时间间隔、速率、总量等多种策略,合理调控队列中的消息流量,确保系统稳健高效运行。另一方面,消息超时则是为队列中的消息设定存活时间,自动清理过期消息,以优化资源利用与保障消息时效性。文章还结合具体代码实现,详细展示了如何在实际应用中配置与使用这两大机制,从而帮助开发者更好地应对复杂业务场景,提升系统的整体性能与可靠性。
转载自:https://juejin.cn/post/7374226072781144074