likes
comments
collection
share

通过redis stream结构实现一个消息队列

作者站长头像
站长
· 阅读数 59

前言

子公司,用户规模不大,拿的母公司的架构写的服务,因为做海外项目,而且第三方服务换的比较多,不想与阿里云的产品牵扯的比较深,所以打算把目前的一些简单的业务从rocketmq迁移到redis stream。我先来试试水。

lettuce版

因为公司项目用的jedis 2.9版本暂时不支持stream,内部还有很多自己的逻辑,库暂时不好升级,我先用spring-boot-starter-data-redis试试。

xread 与xreadgroup

因为业务并不复杂,也只准备一个服务来做这事,我打算用xread来试的,后来想了想,万一突然加机器呢?我开了两个测试项目试了一下,使用xread其实类似pubsub,每个链接的客户端都能读到,这不符合我们的场景,所以就直接使用了xreadgroup

创建消费者群组

这个$代表从队列0开始;>代表群组模式从消费者群组上次记录的值之后开始读

// redis 命令
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

java代码如下

// test: stream key, readoffset.lastest: $,consumer: group key
redisTemplate.opsForStream().createGroup("test",ReadOffset.latest(),"consumer");

我试过,要么事先在redis先建立好群组,不然以群组模式读取的话,会报这个stream key没有配备group的异常。所以我们可以在程序里设置下

// redis 命令
XINFO GROUPS streamkey

java代码如下

StreamInfo.XInfoGroups info = redisTemplate.opsForStream().groups("test");
if(info!=null){
  info.stream( x-> ??)
  //在程序判断下需要的消费群组是否存在即可
}

从队列中读取数据

  // 读取设置
  StreamReadOptions options = StreamReadOptions.empty();
  // 设置堵塞读取多少时间
  options = options.block(Duration.ofHours(1));
  // 设置读取数目
  options = options.count(1);
  // 设置自动提交
  //options = options.autoAcknowledge();
  // 定义消费者,第一个参数是消费者群组,第二个参数是群组里面的消费者
  Consumer consumer = Consumer.from("consumer","consumer2");
  // 定义从什么stream读,从stream的什么位置读,$是latest(),>是lastConsumerd() 且只在群组读取才能使用。
  StreamOffset<String> offset = StreamOffset.create("test",ReadOffset.lastConsumed());
  while(true) {
      List<MapRecord<String, Object, Object>> list = redisTemplate.opsForStream().read(consumer, options,offset);
      if(list!=null){
      // 处理方法
          handle(list)
      }
  }       

添加数据到stream中

Map<String,String> map =new HashMap(1);
map.put("xx","喜欢张硕");
redisTemplate.opsForStream().add("test",map);

效果

通过redis stream结构实现一个消息队列 通过redis stream结构实现一个消息队列

redis 配置

package com.zxs.redisconsumersecond.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.time.Duration;

/**
 * @author zh
 * @date 2020/12/1 16:49
 */
@Slf4j
@Configuration
public class RedisConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.redis.test")
    public RedisStandaloneConfiguration redisStandaloneConfiguration() {
        return new RedisStandaloneConfiguration();
    }

    @Bean
    public LettuceClientConfiguration lettuceClientConfiguration() {
        return LettucePoolingClientConfiguration
                .builder()
                .poolConfig(new GenericObjectPoolConfig<>())
                // **因为我设置的堵塞读取时一个小时(而默认的命令超时时间是1分钟,所以这里需要设置下,实际生产中需要考虑下)**
                .commandTimeout(Duration.ZERO)
                .build();
    }

    @Bean
    public RedisTemplate<String, String> redisTemplate() {
        RedisTemplate<String, String> template = getRedisTemplate();
        LettuceConnectionFactory factory =  new LettuceConnectionFactory(redisStandaloneConfiguration(), lettuceClientConfiguration());
        // 这里非常重要,因为不设置的话就会在redis获取nativeConnection的时候报空指针异常。原因如下
        factory.afterPropertiesSet();
        template.setConnectionFactory(factory);
        return template;
    }

    private RedisTemplate<String, String> getRedisTemplate() {
        RedisTemplate<String, String> template = new RedisTemplate<>();
        template.setKeySerializer(keySerializer());
        template.setValueSerializer(keySerializer());
        template.setHashKeySerializer(keySerializer());
        template.setHashValueSerializer(valueSerializer());
        return template;
    }

    private RedisSerializer<?> keySerializer() {
        return new StringRedisSerializer();
    }

    private RedisSerializer<?> valueSerializer() {
        return new StringRedisSerializer();
    }
}

解释下上面的afterPropertiesSet(); 获取redis连接需要一个对象LettuceConnectionProvider,不设置为空的话会报异常 通过redis stream结构实现一个消息队列 而afterPropertiesSet()方法的源码如下

 public void afterPropertiesSet() {
        this.client = this.createClient();
        this.connectionProvider = new LettuceConnectionFactory.ExceptionTranslatingConnectionProvider(this.createConnectionProvider(this.client, LettuceConnection.CODEC));
        this.reactiveConnectionProvider = new LettuceConnectionFactory.ExceptionTranslatingConnectionProvider(this.createConnectionProvider(this.client, LettuceReactiveRedisConnection.CODEC));
        if (this.isClusterAware()) {
            this.clusterCommandExecutor = new ClusterCommandExecutor(new LettuceClusterTopologyProvider((RedisClusterClient)this.client), new LettuceClusterNodeResourceProvider(this.connectionProvider), EXCEPTION_TRANSLATION);
        }

        if (this.getEagerInitialization() && this.getShareNativeConnection()) {
            this.initConnection();
        }

    }

pom.xml

<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
        // 配置lettuce连接池需要
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-pool2</artifactId>
			<version>2.9.0</version>
		</dependency>
	</dependencies>
转载自:https://juejin.cn/post/6901572339658391565
评论
请登录