Redission 分布式对象下篇
Redission 分布式对象 下篇
Redission 分布式对象
- 通用对象桶(Object Bucket)
- 二进制流(Binary Stream)
- 地理空间对象桶(Geospatial Bucket)
- BitSet
- 原子整长形(AtomicLong)
- 原子双精度浮点(AtomicDouble)
的基本使用方法.我们将继续讲解Redission 的分布式对象.
话题(订阅分发)
RTopic
RTopic 对象实现了发布、订阅的机制。并且支持自动重新订阅。
getTopic
获取话题addListener(消息对象,消息回调函数)
添加订阅者publish
发布消息 返回值 接收消息的客户端数
@Test
public void test9() throws IOException {
// getTopic 获取话题
RTopic topic = redisson.getTopic("anyTopic");
// addListener(消息对象,消息回调函数) 添加订阅者
topic.addListener(Object.class, new MessageListener<Object>() {
@Override
public void onMessage(CharSequence channel, Object msg) {
System.out.println(channel+"-"+msg);
}
});
// 在其他线程或JVM节点
RTopic topicPub = redisson.getTopic("anyTopic");
// publish 发布新消息 返回值 接收消息的客户端数
long clientsReceivedMessage = topicPub.publish(new String("新消息"));
System.out.println(topicPub);
}
在Redis节点故障转移(主从切换)或断线重连以后,所有的话题监听器将自动完成话题的重新订阅。
RPatternTopic
模糊话题 RPatternTopic 对象可以通过正式表达式来订阅多个话题。
创建3个发布者 any.1,any.2,any.3,然后订阅所有满足
any.*
表达式的话题,分别发布消息,查看结果,订阅者都收到了来自3个话题的消息.
getPatternTopic
订阅所有满足表达式的话题
@Test
public void test10() throws IOException {
// 创建3个发布者
RTopic v1 = redisson.getTopic("any.1");
RTopic v2 = redisson.getTopic("any.2");
RTopic v3 = redisson.getTopic("any.3");
// 订阅所有满足`any.*`表达式的话题
RPatternTopic topic1 = redisson.getPatternTopic("any.*");
int listenerId = topic1.addListener(String.class, new PatternMessageListener<String>() {
@Override
public void onMessage(CharSequence pattern, CharSequence channel, String msg) {
System.out.println(pattern+"-"+channel+"-"+msg);
// any.*-any.1-我是v1,发送消息
// any.*-any.2-我是v2,发送消息
// any.*-any.3-我是v3,发送消息
}
});
// 分别发布消息
v1.publish("我是v1,发送消息");
v2.publish("我是v2,发送消息");
v3.publish("我是v3,发送消息");
}
布隆过滤器(Bloom Filter)
分布式布隆过滤器(Bloom Filter。所含最大比特数量为2^32
。
getBloomFilter
获取布隆过滤器tryInit
初始化布隆过滤器,预计统计元素数量为55000000,期望误差率为0.03add
添加对象contains
判断过滤器中是否包含对象
@Test
public void test11() throws IOException {
// getBloomFilter 获取布隆过滤器
RBloomFilter<User> bloomFilter = redisson.getBloomFilter("sample");
// tryInit 初始化布隆过滤器,预计统计元素数量为55000000,期望误差率为0.03
bloomFilter.tryInit(55000000L, 0.03);
// add 添加对象
bloomFilter.add(new User("0001", "张三"));
bloomFilter.add(new User("0002", "李四"));
// contains 判断过滤器中是否包含对象
System.out.println(bloomFilter.contains(new User("0001", "张三"))); // true
System.out.println(bloomFilter.contains(new User("0001", "李四"))); // false
}
@Data
@AllArgsConstructor
public class User implements Serializable {
private String uid;
private String uname;
}
基数估计算法(HyperLogLog)
基数估计算法(HyperLogLog)对象。该对象可以在有限的空间内通过概率算法统计大量的数据。
add
如果已添加对象,则为true;如果已添加,则为falsecount
添加到此结构中的唯一元素的近似数量(个数)ddAll
将对象集合中包含的所有元素添加到此结构,如果至少添加了一个对象,则为true;如果所有对象都已添加,则为falsecountWith
返回添加到此实例和其他通过otherLogNames定义的实例中的唯一元素的近似数量。mergeWith
将多个实例合并到此实例中
@Test
public void test12() throws IOException {
RHyperLogLog<Integer> log = redisson.getHyperLogLog("log");
// add 如果已添加对象,则为true;如果已添加,则为false
log.add(1);
log.add(-1);
log.add(-1);
// count 添加到此结构中的唯一元素的近似数量(个数)
System.out.println(log.count()); // 2
// ===========================================
// addAll 将对象集合中包含的所有元素添加到此结构,如果至少添加了一个对象,则为true;如果所有对象都已添加,则为false
log.addAll(Arrays.asList(1,2));
// ===========================================
RHyperLogLog<Integer> log2 = redisson.getHyperLogLog("log2");
log2.addAll(Arrays.asList(3,4));
// countWith 返回添加到此实例和其他通过otherLogNames定义的实例中的唯一元素的近似数量。
System.out.println(log.countWith("log2")); // 5
// ===========================================
RHyperLogLog<Integer> log3 = redisson.getHyperLogLog("log3");
log3.addAll(Arrays.asList(5,6));
// mergeWith 将多个实例合并到此实例中
log.mergeWith("log3");
System.out.println(log.count()); // 5
}
整长型累加器(LongAdder)
整长型累加器(LongAdder)采用了与java.util.concurrent.atomic.LongAdder
类似的接口。通过利用客户端内置的LongAdder对象,为分布式环境下递增和递减操作提供了很高得性能。据统计其性能最高比分布式AtomicLong
对象快 12000 倍。完美适用于分布式统计计量场景。
getLongAdder
获取整长型累加器 默认从 0 开始add
增加increment
累加decrement
累减sum
累计值(当前值)destroy
手动销毁,当不再使用整长型累加器对象的时候应该自行手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。
@Test
public void test13() throws IOException {
// getLongAdder 获取整长型累加器 默认从 0 开始
RLongAdder atomicLong = redisson.getLongAdder("ids");
// add 增加
atomicLong.add(12);
// increment 累加
atomicLong.increment();
// decrement 累减
atomicLong.decrement();
// sum 累计值(当前值)
System.out.println(atomicLong.sum());
// destroy 手动销毁,当不再使用整长型累加器对象的时候应该自行手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。
atomicLong.destroy();
}
双精度浮点累加器(DoubleAdder)
双精度浮点累加器(DoubleAdder)采用了与java.util.concurrent.atomic.DoubleAdder
类似的接口。通过利用客户端内置的DoubleAdder对象,为分布式环境下递增和递减操作提供了很高得性能。据统计其性能最高比分布式AtomicDouble
对象快 12000 倍。完美适用于分布式统计计量场景。
getLongAdder
获取整长型累加器 默认从 0 开始add
增加increment
累加decrement
累减sum
累计值(当前值)destroy
手动销毁,当不再使用整长型累加器对象的时候应该自行手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。
@Test
public void test14() throws IOException {
// getDoubleAdder 获取整长型累加器 默认从 0 开始
RDoubleAdder ids = redisson.getDoubleAdder("ids");
// add 增加
ids.add(12.3);
// increment 累加
ids.increment();
// decrement 累减
ids.decrement();
// sum 累计值(当前值)
System.out.println(ids.sum());
// destroy 手动销毁,当不再使用整长型累加器对象的时候应该自行手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。
ids.destroy();
}
限流器(RateLimiter)
限流器(RateLimiter)可以用来在分布式环境下现在请求方的调用频率。既适用于不同Redisson实例下的多线程限流,也适用于相同Redisson实例下的多线程限流。该算法不保证公平性。
getRateLimiter
获取限流器acquire
要获取的许可证数量,没有获取到会阻塞当前线程
@Test
public void test15() throws IOException, InterruptedException {
// getRateLimiter 获取限流器
RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter");
// trySetRate 初始化,最大流速 = 每1秒钟产生10个令牌
rateLimiter.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS);
RRateLimiter limiter = redisson.getRateLimiter("myRateLimiter");
// acquire 要获取的许可证数量,没有获取到会阻塞当前线程
limiter.acquire(3);
Thread t = new Thread(() -> {
while (true){
// 每秒10个令牌,一次取5个,大约1s打印两次
limiter.acquire(5);
System.out.println("轮询-"+new SimpleDateFormat("HH:mm:ss S").format(new Date()));
// 轮询-23:12:07 501
// 轮询-23:12:08 504
// 轮询-23:12:08 510
// 轮询-23:12:09 511
// 轮询-23:12:09 516
}
});
t.start();
TimeUnit.SECONDS.sleep(10);
}
转载自:https://juejin.cn/post/7157726812250832932