通过redis stream结构实现一个消息队列
前言
子公司,用户规模不大,拿的母公司的架构写的服务,因为做海外项目,而且第三方服务换的比较多,不想与阿里云的产品牵扯的比较深,所以打算把目前的一些简单的业务从rocketmq迁移到redis stream。我先来试试水。
lettuce版
因为公司项目用的jedis 2.9版本暂时不支持stream,内部还有很多自己的逻辑,库暂时不好升级,我先用spring-boot-starter-data-redis试试。
xread 与xreadgroup
因为业务并不复杂,也只准备一个服务来做这事,我打算用xread来试的,后来想了想,万一突然加机器呢?我开了两个测试项目试了一下,使用xread其实类似pubsub,每个链接的客户端都能读到,这不符合我们的场景,所以就直接使用了xreadgroup
创建消费者群组
这个$
代表从队列0开始;>
代表群组模式从消费者群组上次记录的值之后开始读
// redis 命令
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
java代码如下
// test: stream key, readoffset.lastest: $,consumer: group key
redisTemplate.opsForStream().createGroup("test",ReadOffset.latest(),"consumer");
我试过,要么事先在redis先建立好群组,不然以群组模式读取的话,会报这个stream key没有配备group的异常。所以我们可以在程序里设置下
// redis 命令
XINFO GROUPS streamkey
java代码如下
StreamInfo.XInfoGroups info = redisTemplate.opsForStream().groups("test");
if(info!=null){
info.stream( x-> ??)
//在程序判断下需要的消费群组是否存在即可
}
从队列中读取数据
// 读取设置
StreamReadOptions options = StreamReadOptions.empty();
// 设置堵塞读取多少时间
options = options.block(Duration.ofHours(1));
// 设置读取数目
options = options.count(1);
// 设置自动提交
//options = options.autoAcknowledge();
// 定义消费者,第一个参数是消费者群组,第二个参数是群组里面的消费者
Consumer consumer = Consumer.from("consumer","consumer2");
// 定义从什么stream读,从stream的什么位置读,$是latest(),>是lastConsumerd() 且只在群组读取才能使用。
StreamOffset<String> offset = StreamOffset.create("test",ReadOffset.lastConsumed());
while(true) {
List<MapRecord<String, Object, Object>> list = redisTemplate.opsForStream().read(consumer, options,offset);
if(list!=null){
// 处理方法
handle(list)
}
}
添加数据到stream中
Map<String,String> map =new HashMap(1);
map.put("xx","喜欢张硕");
redisTemplate.opsForStream().add("test",map);
效果
redis 配置
package com.zxs.redisconsumersecond.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.time.Duration;
/**
* @author zh
* @date 2020/12/1 16:49
*/
@Slf4j
@Configuration
public class RedisConfig {
@Bean
@ConfigurationProperties(prefix = "spring.redis.test")
public RedisStandaloneConfiguration redisStandaloneConfiguration() {
return new RedisStandaloneConfiguration();
}
@Bean
public LettuceClientConfiguration lettuceClientConfiguration() {
return LettucePoolingClientConfiguration
.builder()
.poolConfig(new GenericObjectPoolConfig<>())
// **因为我设置的堵塞读取时一个小时(而默认的命令超时时间是1分钟,所以这里需要设置下,实际生产中需要考虑下)**
.commandTimeout(Duration.ZERO)
.build();
}
@Bean
public RedisTemplate<String, String> redisTemplate() {
RedisTemplate<String, String> template = getRedisTemplate();
LettuceConnectionFactory factory = new LettuceConnectionFactory(redisStandaloneConfiguration(), lettuceClientConfiguration());
// 这里非常重要,因为不设置的话就会在redis获取nativeConnection的时候报空指针异常。原因如下
factory.afterPropertiesSet();
template.setConnectionFactory(factory);
return template;
}
private RedisTemplate<String, String> getRedisTemplate() {
RedisTemplate<String, String> template = new RedisTemplate<>();
template.setKeySerializer(keySerializer());
template.setValueSerializer(keySerializer());
template.setHashKeySerializer(keySerializer());
template.setHashValueSerializer(valueSerializer());
return template;
}
private RedisSerializer<?> keySerializer() {
return new StringRedisSerializer();
}
private RedisSerializer<?> valueSerializer() {
return new StringRedisSerializer();
}
}
解释下上面的afterPropertiesSet();
获取redis连接需要一个对象LettuceConnectionProvider,不设置为空的话会报异常
而afterPropertiesSet()方法的源码如下
public void afterPropertiesSet() {
this.client = this.createClient();
this.connectionProvider = new LettuceConnectionFactory.ExceptionTranslatingConnectionProvider(this.createConnectionProvider(this.client, LettuceConnection.CODEC));
this.reactiveConnectionProvider = new LettuceConnectionFactory.ExceptionTranslatingConnectionProvider(this.createConnectionProvider(this.client, LettuceReactiveRedisConnection.CODEC));
if (this.isClusterAware()) {
this.clusterCommandExecutor = new ClusterCommandExecutor(new LettuceClusterTopologyProvider((RedisClusterClient)this.client), new LettuceClusterNodeResourceProvider(this.connectionProvider), EXCEPTION_TRANSLATION);
}
if (this.getEagerInitialization() && this.getShareNativeConnection()) {
this.initConnection();
}
}
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
// 配置lettuce连接池需要
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>
转载自:https://juejin.cn/post/6901572339658391565