likes
comments
collection
share

Redission 分布式对象下篇

作者站长头像
站长
· 阅读数 40

Redission 分布式对象 下篇

Redission 分布式对象

Redission 分布式对象 上篇讲解了:

  1. 通用对象桶(Object Bucket)
  2. 二进制流(Binary Stream)
  3. 地理空间对象桶(Geospatial Bucket)
  4. BitSet
  5. 原子整长形(AtomicLong)
  6. 原子双精度浮点(AtomicDouble)

的基本使用方法.我们将继续讲解Redission 的分布式对象.

话题(订阅分发)

RTopic

RTopic 对象实现了发布、订阅的机制。并且支持自动重新订阅。

  • getTopic 获取话题
  • addListener(消息对象,消息回调函数) 添加订阅者
  • publish 发布消息 返回值 接收消息的客户端数
@Test
public void test9() throws IOException {
    // getTopic 获取话题
    RTopic topic = redisson.getTopic("anyTopic");

    // addListener(消息对象,消息回调函数) 添加订阅者
    topic.addListener(Object.class, new MessageListener<Object>() {
        @Override
        public void onMessage(CharSequence channel, Object msg) {
            System.out.println(channel+"-"+msg);
        }
    });

    // 在其他线程或JVM节点
    RTopic topicPub = redisson.getTopic("anyTopic");
    // publish 发布新消息 返回值 接收消息的客户端数
    long clientsReceivedMessage = topicPub.publish(new String("新消息"));
    System.out.println(topicPub);
}

在Redis节点故障转移(主从切换)或断线重连以后,所有的话题监听器将自动完成话题的重新订阅

RPatternTopic

模糊话题 RPatternTopic 对象可以通过正式表达式来订阅多个话题。

创建3个发布者 any.1,any.2,any.3,然后订阅所有满足any.*表达式的话题,分别发布消息,查看结果,订阅者都收到了来自3个话题的消息.

  • getPatternTopic 订阅所有满足表达式的话题
@Test
public void test10() throws IOException {
    // 创建3个发布者
    RTopic v1 = redisson.getTopic("any.1");
    RTopic v2 = redisson.getTopic("any.2");
    RTopic v3 = redisson.getTopic("any.3");


    // 订阅所有满足`any.*`表达式的话题
    RPatternTopic topic1 = redisson.getPatternTopic("any.*");
    int listenerId = topic1.addListener(String.class, new PatternMessageListener<String>() {
        @Override
        public void onMessage(CharSequence pattern, CharSequence channel, String msg) {
            System.out.println(pattern+"-"+channel+"-"+msg);
            // any.*-any.1-我是v1,发送消息
            // any.*-any.2-我是v2,发送消息
            // any.*-any.3-我是v3,发送消息
        }
    });

    // 分别发布消息
    v1.publish("我是v1,发送消息");
    v2.publish("我是v2,发送消息");
    v3.publish("我是v3,发送消息");
}

布隆过滤器(Bloom Filter)

分布式布隆过滤器(Bloom Filter。所含最大比特数量为2^32

  • getBloomFilter 获取布隆过滤器
  • tryInit 初始化布隆过滤器,预计统计元素数量为55000000,期望误差率为0.03
  • add 添加对象
  • contains 判断过滤器中是否包含对象
@Test
public void test11() throws IOException {
    // getBloomFilter 获取布隆过滤器
    RBloomFilter<User> bloomFilter = redisson.getBloomFilter("sample");

    // tryInit 初始化布隆过滤器,预计统计元素数量为55000000,期望误差率为0.03
    bloomFilter.tryInit(55000000L, 0.03);

    // add 添加对象
    bloomFilter.add(new User("0001", "张三"));
    bloomFilter.add(new User("0002", "李四"));

    // contains 判断过滤器中是否包含对象
    System.out.println(bloomFilter.contains(new User("0001", "张三"))); // true
    System.out.println(bloomFilter.contains(new User("0001", "李四"))); // false
}

@Data
@AllArgsConstructor
public class User implements Serializable {
    private String uid;
    private String uname;
}

基数估计算法(HyperLogLog)

基数估计算法(HyperLogLog)对象。该对象可以在有限的空间内通过概率算法统计大量的数据。

  • add 如果已添加对象,则为true;如果已添加,则为false
  • count 添加到此结构中的唯一元素的近似数量(个数)
  • ddAll 将对象集合中包含的所有元素添加到此结构,如果至少添加了一个对象,则为true;如果所有对象都已添加,则为false
  • countWith 返回添加到此实例和其他通过otherLogNames定义的实例中的唯一元素的近似数量。
  • mergeWith 将多个实例合并到此实例中
@Test
public void test12() throws IOException {
    RHyperLogLog<Integer> log = redisson.getHyperLogLog("log");

    // add 如果已添加对象,则为true;如果已添加,则为false
    log.add(1);
    log.add(-1);
    log.add(-1);

    // count 添加到此结构中的唯一元素的近似数量(个数)
    System.out.println(log.count()); // 2
    // ===========================================

    // addAll 将对象集合中包含的所有元素添加到此结构,如果至少添加了一个对象,则为true;如果所有对象都已添加,则为false
    log.addAll(Arrays.asList(1,2));
    // ===========================================
    
    RHyperLogLog<Integer> log2 = redisson.getHyperLogLog("log2");
    log2.addAll(Arrays.asList(3,4));
    // countWith 返回添加到此实例和其他通过otherLogNames定义的实例中的唯一元素的近似数量。
    System.out.println(log.countWith("log2")); // 5
    // ===========================================

    RHyperLogLog<Integer> log3 = redisson.getHyperLogLog("log3");
    log3.addAll(Arrays.asList(5,6));
    // mergeWith 将多个实例合并到此实例中
    log.mergeWith("log3");
    System.out.println(log.count()); // 5
}

整长型累加器(LongAdder)

整长型累加器(LongAdder)采用了与java.util.concurrent.atomic.LongAdder类似的接口。通过利用客户端内置的LongAdder对象,为分布式环境下递增和递减操作提供了很高得性能。据统计其性能最高比分布式AtomicLong对象快 12000 倍。完美适用于分布式统计计量场景。

  • getLongAdder 获取整长型累加器 默认从 0 开始
  • add 增加
  • increment 累加
  • decrement 累减
  • sum 累计值(当前值)
  • destroy 手动销毁,当不再使用整长型累加器对象的时候应该自行手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁
@Test
public void test13() throws IOException {
    // getLongAdder 获取整长型累加器 默认从 0 开始
    RLongAdder atomicLong = redisson.getLongAdder("ids");
    // add 增加
    atomicLong.add(12);
    // increment 累加
    atomicLong.increment();
    // decrement 累减
    atomicLong.decrement();
    // sum 累计值(当前值)
    System.out.println(atomicLong.sum());

    // destroy 手动销毁,当不再使用整长型累加器对象的时候应该自行手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。
    atomicLong.destroy();
}

双精度浮点累加器(DoubleAdder)

双精度浮点累加器(DoubleAdder)采用了与java.util.concurrent.atomic.DoubleAdder类似的接口。通过利用客户端内置的DoubleAdder对象,为分布式环境下递增和递减操作提供了很高得性能。据统计其性能最高比分布式AtomicDouble对象快 12000 倍。完美适用于分布式统计计量场景。

  • getLongAdder 获取整长型累加器 默认从 0 开始
  • add 增加
  • increment 累加
  • decrement 累减
  • sum 累计值(当前值)
  • destroy 手动销毁,当不再使用整长型累加器对象的时候应该自行手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁
@Test
public void test14() throws IOException {
    // getDoubleAdder 获取整长型累加器 默认从 0 开始
    RDoubleAdder ids = redisson.getDoubleAdder("ids");
    // add 增加
    ids.add(12.3);
    // increment 累加
    ids.increment();
    // decrement 累减
    ids.decrement();
    // sum 累计值(当前值)
    System.out.println(ids.sum());


    // destroy 手动销毁,当不再使用整长型累加器对象的时候应该自行手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。
    ids.destroy();
}

限流器(RateLimiter)

限流器(RateLimiter)可以用来在分布式环境下现在请求方的调用频率。既适用于不同Redisson实例下的多线程限流,也适用于相同Redisson实例下的多线程限流。该算法不保证公平性。

  • getRateLimiter 获取限流器
  • acquire 要获取的许可证数量,没有获取到会阻塞当前线程
@Test
public void test15() throws IOException, InterruptedException {
    // getRateLimiter 获取限流器
    RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter");
    // trySetRate 初始化,最大流速 = 每1秒钟产生10个令牌
    rateLimiter.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS);

    RRateLimiter limiter = redisson.getRateLimiter("myRateLimiter");
    // acquire 要获取的许可证数量,没有获取到会阻塞当前线程
    limiter.acquire(3);

    Thread t = new Thread(() -> {
        while (true){
            // 每秒10个令牌,一次取5个,大约1s打印两次
            limiter.acquire(5);
            System.out.println("轮询-"+new SimpleDateFormat("HH:mm:ss S").format(new Date()));
            // 轮询-23:12:07 501
            // 轮询-23:12:08 504
            // 轮询-23:12:08 510
            // 轮询-23:12:09 511
            // 轮询-23:12:09 516
        }
    });
    t.start();

    TimeUnit.SECONDS.sleep(10);
}
转载自:https://juejin.cn/post/7157726812250832932
评论
请登录