Redisson使用笔记
集群方案
主从复制
1、简介
主从复制模式中,Master持有一个replication ID(用于标记一个给定的数据集)和一个偏移量Offset(可简单理解为该数据集的版本)。在Slave主从连接断开时,Slave能够自动进行重连,当连接上Master时,Slave使用PSYNC命令发送它们记录的旧的Replication ID 与Offset,Master以此来判断需要同步的数据量(增量或全量),尽力使Slave成为Master的精确副本。
2、读写分离
Slave默认开启只读模式,只读模式下Slave将会拒绝所有写入命令,可通过redis.conf配置文件中的slave-read-only变量控制,如果Slave也允许写操作,则需要额外的开销来保证主从一致。
由于复制动作是异步进行,所以可能Master执行写操作后尚未同步到Salve中,主从服可能存在一个数据不一致的中间状态
复制过程的绝大部分步骤不会阻塞Slave,但在载入Master发来的RDB文件时,仍然会造成Slave在一段时间内不能处理命令请求。
3、可用性分析
当Master宕机时,Redis本身并不能自动实现主从切换,即无法再提供服务。
需要在Slave端执行slaveof no one ,使之成为新的Master,其他节点成为新Master的从节点,该操作需要人工干预,无法实现高可用。
4、redisson配置
---
masterSlaveServersConfig:
idleConnectionTimeout: 10000
pingTimeout: 1000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
reconnectionTimeout: 3000
failedAttempts: 3
password: null
subscriptionsPerConnection: 5
clientName: null
loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
slaveSubscriptionConnectionMinimumIdleSize: 1
slaveSubscriptionConnectionPoolSize: 50
slaveConnectionMinimumIdleSize: 32
slaveConnectionPoolSize: 64
masterConnectionMinimumIdleSize: 32
masterConnectionPoolSize: 64
readMode: "SLAVE"
slaveAddresses:
- "redis://127.0.0.1:6381"
- "redis://127.0.0.1:6380"
masterAddress: "redis://127.0.0.1:6379"
database: 0
threads: 0
nettyThreads: 0
codec: !<org.redisson.codec.JsonJacksonCodec> {}
"transportMode":"NIO"
哨兵(Sentinel)
1、简介
上面提到当Master中断服务后,需要手动将一个Slave提升为Master以继续提供服务,为此Redis提供了Sentinel系统来实现自动化的系统监控和故障恢复(自动将Slave提升为Master),一个架构中可运行多个Sentinel进程,这些进程间使用流言协议(gossip protocols)来接收关于Master是否下线的信息,并投票决定是否执行故障迁移
2、配置形式
port 27000
#master
sentinel monitor master 127.0.0.1 6379 2
sentinel down-after-milliseconds master 5000
sentinel auth-pass master 123
sentinel config-epoch master 1
sentinel leader-epoch master 1
- port :当前Sentinel服务运行的端口
- sentinel monitor mymaster 127.0.0.1 6379 2 :Sentinel去监视一个名为mymaster的主redis实例,这个主实例的IP地址为本机地址127.0.0.1,端口号为6379,而将这个主实例判断为失效至少需要2个 Sentinel进程的同意,只要同意Sentinel的数量不达标,自动故障迁移就不会执行
- 3entinel down-after-milliseconds mymaster 5000:指定了Sentinel认为Redis实例已经失效所需的毫秒数。当 实例超过该时间没有返回PING,或者直接返回错误,那么Sentinel将这个实例标记为主观下线。只有一个 Sentinel进程将实例标记为主观下线并不一定会引起实例的自动故障迁移:只有在足够数量的Sentinel都将一个实例标记为主观下线之后,实例才会被标记为客观下线,这时自动故障迁移才会执行
- sentinel parallel-syncs mymaster 1:指定了在执行故障转移时,最多可以有多少个Slave同时对新的Master进行同步,这个数字越小,完成故障转移所需的时间就越长
- sentinel failover-timeout mymaster 15000:如果在该时间(ms)内未能完成故障迁移操作,则认为该故障迁移失败
3、可用性分析
解决了“Master"宕机的问题,一般来说,实现”高可用“至少需要建立三个sentinel构成集群,即一主两从三哨兵。在数据量不大的情况下足以满足需求。
4、redisson配置
---
sentinelServersConfig:
idleConnectionTimeout: 10000
pingTimeout: 1000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
reconnectionTimeout: 3000
failedAttempts: 3
password: null
subscriptionsPerConnection: 5
clientName: null
loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
slaveSubscriptionConnectionMinimumIdleSize: 1
slaveSubscriptionConnectionPoolSize: 50
slaveConnectionMinimumIdleSize: 32
slaveConnectionPoolSize: 64
masterConnectionMinimumIdleSize: 32
masterConnectionPoolSize: 64
readMode: "SLAVE"
sentinelAddresses:
- "redis://127.0.0.1:26379"
- "redis://127.0.0.1:26389"
masterName: "mymaster"
database: 0
threads: 0
nettyThreads: 0
codec: !<org.redisson.codec.JsonJacksonCodec> {}
"transportMode":"NIO"
集群
1、简介
Redis集群是一个提供在多个Redis间共享数据的程序集,将键空间分割为16384个槽(slot),通过 HASH_SLOT=CRC16(key) mod 16384, 将不同的键均匀分配到16384个槽中,每个哈希槽只由一个节点控制, 所有的Redis节点彼此通过TCP连接(PING-PONG机制)和一个二进制协议建立通信。
2、备份迁移
Redis集群引入了主从模式,在Master宕机时在其对应的从节点中选举出新的Master,但在主从节点的映射关系固定的情况下,久而久之会积累很多单一节点独立的问题,为此,Redis集群实现了一个叫做备份迁移的概念,即在集群创建中创建不对称性,将一个从节点从一个拥有多个从节点的Master节点迁移到一个孤立的主节点。备份迁移算法可通过参数 cluster-migration-barrier进行控制,表示一个Master在拥有多少个好的从节点时需要割让出一个从节点。
3、可用性分析
哨兵模式基本已经实现高可用,读写分离。但这种模式下每台redis服务器都存储相同数据,存储能力受限于单台主机内存大小。
集群通过数据分片,利用多台机子的内存承担更大量的数据,适用数据量过大的场景。
集群的可用性体现在能容忍集群中少数节点的出错,在节点出错而进行的故障转移可能会丢失部分写操作,应用时需考虑业务的数据安全。
4、redisson配置
---
clusterServersConfig:
idleConnectionTimeout: 10000
pingTimeout: 1000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
reconnectionTimeout: 3000
failedAttempts: 3
password: null
subscriptionsPerConnection: 5
clientName: null
loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
slaveSubscriptionConnectionMinimumIdleSize: 1
slaveSubscriptionConnectionPoolSize: 50
slaveConnectionMinimumIdleSize: 32
slaveConnectionPoolSize: 64
masterConnectionMinimumIdleSize: 32
masterConnectionPoolSize: 64
readMode: "SLAVE"
nodeAddresses:
- "redis://127.0.0.1:7004"
- "redis://127.0.0.1:7001"
- "redis://127.0.0.1:7000"
scanInterval: 1000
threads: 0
nettyThreads: 0
codec: !<org.redisson.codec.JsonJacksonCodec> {}
"transportMode":"NIO
数据分片
集群模式下,客户端可自由地向集群中所有节点发送请求,Redis节点接收任何键相关的命令时首先计算键对应的槽,根据槽找出对应的节点,若是属于该节点,则自行处理,否则进行MOVED重定向。
CRC16
键空间被分割为 16384 槽(slot),事实上集群的最大节点数量是 16384 个。(然而建议最大节点数量设置在1000这个数量级上)
所有的主节点都负责 16384 个哈希槽中的一部分。当集群处于稳定状态时,集群中没有在执行重配置(reconfiguration)操作,每个哈希槽都只由一个节点进行处理(不过主节点可以有一个或多个从节点,可以在网络断线或节点失效时替换掉主节点)。
以下是用来把键映射到哈希槽的算法:
HASH_SLOT = CRC16(key) mod 16384
键哈希标签
哈希标签是确保两个键都在同一个哈希槽里的一种方式。
为了实现哈希标签,哈希槽是用另一种不同的方式计算的。基本来说,如果一个键包含一个 “{…}” 这样的模式,只有 { } 之间的字符串会被用来做哈希以获取哈希槽。
例子:
比如这两个键 {user1000}.following 和 {user1000}.followers 会被哈希到同一个哈希槽里,因为只有 user1000 这个子串会被用来计算哈希值。 对于 foo{}{bar} 这个键,整个键都会被用来计算哈希值,因为第一个出现的 { 和它右边第一个出现的 } 之间没有任何字符。 对于 foo{bar}{zap} 这个键,用来计算哈希值的是 bar 这个子串,因为算法会在第一次有效或无效(比如中间没有任何字节)地匹配到 { 和 } 的时候停止。 按照这个算法,如果一个键是以 {} 开头的话,那么就当作整个键会被用来计算哈希值。
Redisson的节点选择
Redisson在进行命令写入前,需要先计算出key值对应的节点:
/获取节点
private NodeSource getNodeSource(String key) {
int slot = connectionManager.calcSlot(key);
MasterSlaveEntry entry = connectionManager.getEntry(slot);
return new NodeSource(entry, slot);
}
而在calcSlot(key)
这一步,Redisson将上面提到的CRC16 及 键哈希标签 进行了结合
//集群模式的槽计算
public int calcSlot(String key) {
if (key == null) {
return 0;
}
int start = key.indexOf('{'); // 截取{}内的字符作为key
if (start != -1) {
int end = key.indexOf('}');
key = key.substring(start+1, end);
}
int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
log.debug("slot {} for {}", result, key);
return result;
}
在对key值进行CRC16算法之前,Redisson先截取了{}中的部分作为计算的key,因此,若是需要将某一部分的数据放在同一个槽,则可以在生成key值时指定 {相应的标识}
单个集合分片
Redisson Pro版本为单个数据提供的自动分片功能。
通过自身的分片算法,将一个大集合拆分为若干个片段(默认231个,分片数量范围是3 -16834),然后将拆分后的片段均匀的分布到集群里各个节点里,保证每个节点分配到的片段数量大体相同。比如在默认情况下231个片段分到含有4个主节点的集群里,每个主节点将会分配到大约57个片段,同样的道理如果有5个主节点,每个节点会分配到大约46个片段。
目前支持集(Set)、映射(Map)、BitSet、布隆过滤器(Bloom Filter)等等,详情见redisson官方文档
数据安全与持久化
RDB
RDB 是 Redis 默认的持久化方案。在指定的时间间隔内,执行指定次数的写操作,则会将内存中的数据写入到磁盘中。即在指定目录下生成一个dump.rdb文件。Redis 重启会通过加载dump.rdb文件恢复数据。
数据恢复
将dump.rdb 文件拷贝到redis的安装目录的bin目录下,重启redis服务即可。在实际开发中,一般会考虑到物理机硬盘损坏情况,选择备份dump.rdb 。
特点及适用场景
-
适合大规模的数据恢复。
-
数据的完整性和一致性不高,因为RDB可能在最后一次备份时宕机了。
-
备份时占用内存,因为Redis 在备份时会独立创建一个子进程,将数据写入到一个临时文件(此时内存中的数据是原来的两倍),最后再将临时文件替换之前的备份文件。
所以Redis 的持久化和数据的恢复要选择在夜深人静的时候执行是比较合理的。
AOF
Redis 默认不开启。它的出现是为了弥补RDB的不足(数据的不一致性),所以它采用日志的形式来记录每个写操作,并追加到文件中。Redis 重启的会根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作。
数据恢复
正常情况下,将appendonly.aof 文件拷贝到redis的安装目录的bin目录下,重启redis服务即可。但在实际开发中,可能因为某些原因导致appendonly.aof 文件格式异常,从而导致数据还原失败,可以通过命令redis-check-aof --fix appendonly.aof 进行修复 。
Redis中AOF日志并不相同与MySQL(及其他基于日志的DMBS)写日志的目的。虽然两者都是为了崩溃恢复服务的,但MySQL必须保证崩溃恢复的原子性与持久性,那就必须保证每一个操作(事务)的成功完成,必须是以在日志中写入了该操作的完整的信息为标志。而Redis中的AOF日志是为了服务崩溃之后,尽可能重演历史来恢复现场,但并不强求绝对精确地恢复每一个已经进行的修改操作,即并不会保证持久性。这是因为哪怕将AOF设置为always,也不能保证绝对不出现写完数据之后,写日志之前不会出现崩溃。但也正因为操作是优先于日志来完成的,才让Redis有更高的性能上限。
AOF的重写机制
前面也说到了,AOF的工作原理是将写操作追加到文件中,文件的冗余内容会越来越多。所以 Redis 新增了重写机制。当AOF文件的大小超过所设定的阈值时,Redis就会对AOF文件的内容压缩。
重写的原理:Redis 会fork出一条新进程,读取内存中的数据,并重新写到一个临时文件中。最后替换旧的aof文件。
触发机制:当AOF文件大小是上次rewrite后大小的一倍且文件大于64M时触发。这里的“一倍”和“64M” 可以通过配置文件修改。
特点及适用场景
-
数据的完整性和一致性更高
-
因为AOF记录的内容多,文件会越来越大,数据恢复也会越来越慢
Redisson部分API
分布式实时对象
1、简介
分布式实时对象(Live Object)可以理解为一个功能强化后的Java对象,可被多个位于不同JVM里的线程同时引用。Redisson Live Object 运用即时生成的代理类,将一个普通的Java对象的所有字段,以及针对这些字段的全部操作(getter/setter)全部映射到一个Redis Hash的数据结构,每个get和set方法最终被转译为针对同一个Redis Hash的hget和hset命令。
通过使用RLO,可以把Redis当做一个允许被多个JVM同时操作且不受GC影响的共享堆(Heap Space)。
2、使用方法
@REntity 仅适用于类。通过指定@REntity的各个参数,可以详细的对每个RLO类实现特殊定制,以达到改变RLO对象的行为。
namingScheme - 命名方案。命名方案规定了每个实例在Redis中对应key的名称。它不仅被用来与已存在的RLO建立关联,还被用来储存新建的RLO实例。默认采用Redisson自带的DefaultNamingScheme对象。
codec - 编码解码器。在运行当中,Redisson用编码解码器来对RLO中的每个字段进行编码解码。Redisson内部采用了实例池管理不同类型的编码解码器实例。Redisson提供了多种不同的编码解码器,默认使用JsonJacksonCodec。
fieldTransformation - 字段转换模式。如上所述,为了尽可能的保证RLO的用法和普通Java对象一致,Redisson会自动将常用的普通Java对象转换成与其匹配的Redisson分布式对象。这是由于字段转换模式的默认值是ANNOTATION_BASED,修改为IMPLEMENTATION_BASED就可以不转换。
@RId 仅适用于字段。@RId注解只能用在具备区分实例的字段上,这类字段可以理解为一个类的id字段或主键字段。这个字段的值将被命名方案namingScheme用来与事先存在的RLO建立引用。加了该注解的字段是唯一在本地JVM里同时保存赋值的字段。一个类只能有一个字段包含@RId注解。
可以通过指定一个生成器generator策略来实现自动生成这个字段的值。默认不提供生成器。
@RIndex 仅适用于字段。用来指定可用于搜索的字段。可以通过RLiveObjectService.find方法来根据条件精细查找分布式实时对象。查询条件可以是含(IN),或(OR),和(AND)或相等(EQ)以及它们的任意组合。
@RObjectField 仅适用于字段。允许通过该注解中的namingScheme或codec来改变该字段的命名或编码方式,用来区别于
@REntity中指定的预设方式。
@RCascade 仅适用于字段。用来指定包含于分布式实时对象字段内其它对象的级联操作方式。
可选的级联操作方式为如下:
RCascadeType.ALL - 执行所有级联操作RCascadeType.PERSIST - 仅在执行RLiveObjectService.persist()方法时进行级联操作RCascadeType.DETACH - 仅在执行RLiveObjectService.detach()方法时进行级联操作RCascadeType.MERGE - 仅在执行RLiveObjectService.merge()方法时进行级联操作RCascadeType.DELETE - 仅在执行RLiveObjectService.delete()方法时进行级联操作
3、使用示例
RedissonClient client = RedisService.getInstance().getClient();
RLiveObjectService liveObjectService = client.getLiveObjectService();
TestLiveObject myObject = new TestLiveObject();
TopDragonRankElement topDragonRankElement = TopDragonRankElement.createTset(Context.getUUIDService().makeUniqueGuildId());
List<Integer> testList = new ArrayList<>();
Set<Integer> testSet = new HashSet<>();
for (int i = 0 ; i<10;i++){
testList.add(i);
testSet.add(i);
}
myObject.setPlayerId("10086");
myObject.setElement(topDragonRankElement);
myObject.setTestList(testList);
myObject.setTestSet(testSet);
liveObjectService.persist(myObject);
实际redis执行的命令 (monitor):
"HSET" "TestLiveObject:playerId:10086" "\"redisson_live_object\"" "\"1\""
1600871378.202749 [0 127.0.0.1:56608] "HSET" "TestLiveObject:playerId:10086" "\"testList\"" "{\"@class\":\"org.redisson.RedissonReference\",\"codec\":\"org.redisson.codec.FstCodec\",\"keyName\":\"TestLiveObject:testList\",\"type\":\"org.redisson.RedissonList\"}"
1600871378.216239 [0 127.0.0.1:56608] "RPUSH" "TestLiveObject:testList" "\xf7\x00"
1600871378.217025 [0 127.0.0.1:56608] "RPUSH" "TestLiveObject:testList" "\xf7\x01"
1600871378.217574 [0 127.0.0.1:56608] "RPUSH" "TestLiveObject:testList" "\xf7\x02"
1600871378.218068 [0 127.0.0.1:56608] "RPUSH" "TestLiveObject:testList" "\xf7\x03"
1600871378.218599 [0 127.0.0.1:56608] "RPUSH" "TestLiveObject:testList" "\xf7\x04"
1600871378.219218 [0 127.0.0.1:56608] "RPUSH" "TestLiveObject:testList" "\xf7\x05"
1600871378.219721 [0 127.0.0.1:56608] "RPUSH" "TestLiveObject:testList" "\xf7\x06"
1600871378.220185 [0 127.0.0.1:56608] "RPUSH" "TestLiveObject:testList" "\xf7\a"
1600871378.220843 [0 127.0.0.1:56608] "RPUSH" "TestLiveObject:testList" "\xf7\b"
1600871378.221328 [0 127.0.0.1:56608] "RPUSH" "TestLiveObject:testList" "\xf7\t"
1600871378.223225 [0 127.0.0.1:56608] "HSET" "TestLiveObject:playerId:10086" "\"testSet\"" "{\"@class\":\"org.redisson.RedissonReference\",\"codec\":\"org.redisson.codec.FstCodec\",\"keyName\":\"TestLiveObject:testSet\",\"type\":\"org.redisson.RedissonSet\"}"
1600871378.225008 [0 127.0.0.1:56608] "SADD" "TestLiveObject:testSet" "\xf7\x00"
1600871378.225649 [0 127.0.0.1:56608] "SADD" "TestLiveObject:testSet" "\xf7\x01"
1600871378.226245 [0 127.0.0.1:56608] "SADD" "TestLiveObject:testSet" "\xf7\x02"
1600871378.226964 [0 127.0.0.1:56608] "SADD" "TestLiveObject:testSet" "\xf7\x03"
1600871378.227558 [0 127.0.0.1:56608] "SADD" "TestLiveObject:testSet" "\xf7\x04"
1600871378.228029 [0 127.0.0.1:56608] "SADD" "TestLiveObject:testSet" "\xf7\x05"
1600871378.228655 [0 127.0.0.1:56608] "SADD" "TestLiveObject:testSet" "\xf7\x06"
1600871378.229381 [0 127.0.0.1:56608] "SADD" "TestLiveObject:testSet" "\xf7\a"
1600871378.229804 [0 127.0.0.1:56608] "SADD" "TestLiveObject:testSet" "\xf7\b"
1600871378.230210 [0 127.0.0.1:56608] "SADD" "TestLiveObject:testSet" "\xf7\t"
1600871378.392826 [0 127.0.0.1:56608] "HSET" "TestLiveObject:playerId:10086" "\"element\"" "{\"@class\":\"com.mmorpg.newlogic.modules.crossRanking.domain.redis.TopDragonRankElement\",\"activeFightScore\":13985,\"curSeasonScoreSum\":7673,\"guildName\":\"\",\"jiaOccupyTimes\":6,\"rankIndex\":0,\"rankTime\":1600871377672,\"retries\":0,\"serverId\":2,\"topOccupyTimes\":0,\"uuid\":995060000019}"
1600871396.179511 [0 127.0.0.1:56608] "HSET" "TestLiveObject:playerId:10086" "\"redisson_live_object\"" "\"1\""
4、使用注意点
实时对象要求每个字段必需实现get/set方法,在生成代理对象后,非get/set操作为无效操作,进行持久化时,要对每个需要更新的字段调用set方法。
@REntity(codec = JsonJacksonCodec.class, namingScheme = SimpleNamingScheme.class)
public class TestLiveEntity {
@RId
private long playerId;
private Set<Integer> testSet ;
public TestLiveEntity() { //需定义无参构造,生成代理类时用到
this.testSet = new HashSet<>();
}
public TestLiveEntity(long playerId) {
this.playerId = playerId;
this.testSet = new HashSet<>();
}
public void add(int i){
testSet.add(i);
}
public Set<Integer> getTestSet() {
return testSet;
}
public void setTestSet(Set<Integer> testSet) {
this.testSet = testSet;
}
}
// 测试1
TestLiveEntity object = new TestLiveEntity(1); //此时是原生对象
for (int i = 1 ; i<=10;i++){
object.add(i); //有效
}
TestLiveEntity persist = liveObjectService.persist(object); //生成代理对象
System.out.println(persist.getTestSet().size()); //10
persist.add(11); //无效操作,只针对getter/setter做转译
System.out.println(persist.getTestSet().size()); //10
persist.setTestSet(new HashSet<>()); // 转译为Redis del命令
System.out.println(persist.getTestSet().size()); //0
// 测试2
TestLiveEntity object = new TestLiveEntity(1); //此时仍是原生对象
for (int i = 1 ; i<=10;i++){
object.add(i); //有效
}
TestLiveEntity persist = liveObjectService.persist(object); //生成代理对象
Set<Integer> testSet = persist.getTestSet(); //转换为 RedissonSet.class 对象
System.out.println(testSet.size()); //10
persist.add(11); //无效操作
System.out.println(testSet.size()); //10
testSet.add(11); //有效操作,对应Redis sadd命令
System.out.println(testSet.size()); //11
persist.setTestSet(new HashSet<>()); //转译为Redis del命令
System.out.println(testSet.size()); //0
5、关键源码
RedissonLiveObjectService
@Override
public <T> T attach(T detachedObject) {
validateDetached(detachedObject);
Class<T> entityClass = (Class<T>) detachedObject.getClass();
String idFieldName = getRIdFieldName(detachedObject.getClass());
Object id = ClassUtils.getField(detachedObject, idFieldName);
return createLiveObject(entityClass, id);
}
attache方法传入被关联对象,及原生数据类型,校验后调用createLiveObject创建代理对象。
private <T> Class<? extends T> createProxy(Class<T> entityClass, CommandAsyncExecutor commandExecutor) {
DynamicType.Builder<T> builder = new ByteBuddy()
.subclass(entityClass);
for (FieldDescription.InDefinedShape field
: Introspectior.getTypeDescription(LiveObjectTemplate.class)
.getDeclaredFields()) {
builder = builder.define(field);
}
Class<? extends T> proxied = builder.method(ElementMatchers.isDeclaredBy(
ElementMatchers.anyOf(RLiveObject.class, RExpirable.class, RObject.class))
.and(ElementMatchers.isGetter().or(ElementMatchers.isSetter())
.or(ElementMatchers.named("isPhantom"))
.or(ElementMatchers.named("delete"))))
.intercept(MethodDelegation.withDefaultConfiguration()
.withBinders(FieldProxy.Binder
.install(LiveObjectInterceptor.Getter.class,
LiveObjectInterceptor.Setter.class))
.to(new LiveObjectInterceptor(commandExecutor, this, entityClass, getRIdFieldName(entityClass))))
.implement(RLiveObject.class)
.method(ElementMatchers.isAnnotatedWith(RFieldAccessor.class)
.and(ElementMatchers.named("get")
.or(ElementMatchers.named("set"))))
.intercept(MethodDelegation.to(FieldAccessorInterceptor.class))
.method(ElementMatchers.isDeclaredBy(RExpirable.class)
.or(ElementMatchers.isDeclaredBy(RExpirableAsync.class)))
.intercept(MethodDelegation.to(RExpirableInterceptor.class))
.implement(RExpirable.class)
.method(ElementMatchers.isDeclaredBy(Map.class)
.or(ElementMatchers.isDeclaredBy(ConcurrentMap.class))
.or(ElementMatchers.isDeclaredBy(RMapAsync.class))
.or(ElementMatchers.isDeclaredBy(RMap.class)))
.intercept(MethodDelegation.to(RMapInterceptor.class))
.implement(RMap.class)
.method(ElementMatchers.not(ElementMatchers.isDeclaredBy(Object.class))
.and(ElementMatchers.not(ElementMatchers.isDeclaredBy(RLiveObject.class)))
.and(ElementMatchers.not(ElementMatchers.isDeclaredBy(RExpirable.class)))
.and(ElementMatchers.not(ElementMatchers.isDeclaredBy(RExpirableAsync.class)))
.and(ElementMatchers.not(ElementMatchers.isDeclaredBy(RObject.class)))
.and(ElementMatchers.not(ElementMatchers.isDeclaredBy(RObjectAsync.class)))
.and(ElementMatchers.not(ElementMatchers.isDeclaredBy(ConcurrentMap.class)))
.and(ElementMatchers.not(ElementMatchers.isDeclaredBy(Map.class)))
.and(ElementMatchers.isGetter()
.or(ElementMatchers.isSetter()))
.and(ElementMatchers.isPublic()
.or(ElementMatchers.isProtected()))
)
.intercept(MethodDelegation.to(new AccessorInterceptor(commandExecutor)))
.make().load(entityClass.getClassLoader(),
ClassLoadingStrategy.Default.WRAPPER)
.getLoaded();
return proxied;
}
这里使用到了ByteBuddy这个类增强工具来生成代理类,对指定类型的指定方法进行拦截并增强,具体增强逻辑在各个Interceptor中。这里有个优化点,我们可以在启服时预先生成代理类放入classCache,即调用registerClass(Class<?> cls)
,降低运行时的消耗。
AccessorInterceptor
@RuntimeType
@SuppressWarnings("NestedIfDepth")
public Object intercept(@Origin Method method,
@SuperCall Callable<?> superMethod,
@AllArguments Object[] args,
@This Object me,
@FieldValue("liveObjectLiveMap") RMap<String, Object> liveMap) throws Exception {
...省略
String fieldName = getFieldName(me.getClass().getSuperclass(), method);
Field field = ClassUtils.getDeclaredField(me.getClass().getSuperclass(), fieldName);
Class<?> fieldType = field.getType();
if (isGetter(method, fieldName)) {
Object result = liveMap.get(fieldName);
if (result == null) {
RObject ar = commandExecutor.getObjectBuilder().createObject(((RLiveObject) me).getLiveObjectId(), me.getClass().getSuperclass(), fieldType, fieldName);
if (ar != null) {
commandExecutor.getObjectBuilder().store(ar, fieldName, liveMap);
return ar;
}
}
if (result != null && fieldType.isEnum()) {
if (result instanceof String) {
return Enum.valueOf((Class) fieldType, (String) result);
}
return result;
}
if (result instanceof RedissonReference) {
return commandExecutor.getObjectBuilder().fromReference((RedissonReference) result, RedissonObjectBuilder.ReferenceType.DEFAULT);
}
return result;
}
...省略
return superMethod.call();
}
RedissonObjectBuilder
static {
SUPPORTED_CLASS_MAPPING.put(SortedSet.class, RedissonSortedSet.class);
SUPPORTED_CLASS_MAPPING.put(Set.class, RedissonSet.class);
SUPPORTED_CLASS_MAPPING.put(ConcurrentMap.class, RedissonMap.class);
SUPPORTED_CLASS_MAPPING.put(Map.class, RedissonMap.class);
SUPPORTED_CLASS_MAPPING.put(BlockingDeque.class, RedissonBlockingDeque.class);
SUPPORTED_CLASS_MAPPING.put(Deque.class, RedissonDeque.class);
SUPPORTED_CLASS_MAPPING.put(BlockingQueue.class, RedissonBlockingQueue.class);
SUPPORTED_CLASS_MAPPING.put(Queue.class, RedissonQueue.class);
SUPPORTED_CLASS_MAPPING.put(List.class, RedissonList.class);
fillCodecMethods(REFERENCES, RedissonClient.class, RObject.class);
fillCodecMethods(REFERENCES, RedissonReactiveClient.class, RObjectReactive.class);
fillCodecMethods(REFERENCES, RedissonRxClient.class, RObjectRx.class);
}
建立原生数据类型与Redisson数据类型的映射关系并放入Codec方法。
public RObject createObject(Object id, Class<?> clazz, Class<?> fieldType, String fieldName) {
Class<? extends RObject> mappedClass = getMappedClass(fieldType);//从SUPPORTED_CLASS_MAPPING中拿到对应的Redisson数据类型
try {
if (mappedClass != null) {
Codec fieldCodec = getFieldCodec(clazz, mappedClass, fieldName);
NamingScheme fieldNamingScheme = getNamingScheme(clazz, fieldCodec);
String referenceName = fieldNamingScheme.getFieldReferenceName(clazz, id, mappedClass, fieldName);
return createRObject(redisson, mappedClass, referenceName, fieldCodec);
}
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
return null;
}
private <T extends RObject, K extends Codec> T createRObject(RedissonClient redisson, Class<T> expectedType, String name, K codec) throws ReflectiveOperationException {
Class<?>[] interfaces = expectedType.getInterfaces();
for (Class<?> iType : interfaces) {
if (REFERENCES.containsKey(iType)) {// user cache to speed up things a little.
Method builder = REFERENCES.get(iType).get(codec != null);
if (codec != null) {
return (T) builder.invoke(redisson, name);
}
return (T) builder.invoke(redisson, name, codec); //调用codec完成RedissonObject创建
}
}
String codecName = null;
if (codec != null) {
codecName = codec.getClass().getName();
}
throw new ClassNotFoundException("No RObject is found to match class type of " + expectedType.getName() + " with codec type of " + codecName);
}
RSortedSet
实现了java.util.SortedSet接口,通过比较器(Comparator)接口实现了对元素的排序
private RedissonList<V> list;
private RBucket<String> comparatorHolder;
private RedissonClient redisson;
@Override
public RFuture<List<V>> readAllAsync() {
return commandExecutor.readAsync(getName(), codec, LRANGE, getName(), 0, -1);
}
RedisCommand<List<Object>> LRANGE = new RedisCommand<List<Object>>("LRANGE", new ObjectListReplayDecoder<Object>());
虽然叫set,但实际其redis底层数据结构用的是list,查询操作对应的命令为"LRANGE"
@Override
public boolean add(V value) {
lock.lock();
try {
checkComparator();
BinarySearchResult<V> res = binarySearch(value, codec);
if (res.getIndex() < 0) {
int index = -(res.getIndex() + 1);
ByteBuf encodedValue = encode(value);
commandExecutor.get(commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
"local len = redis.call('llen', KEYS[1]);"
+ "if tonumber(ARGV[1]) < len then "
+ "local pivot = redis.call('lindex', KEYS[1], ARGV[1]);"
+ "redis.call('linsert', KEYS[1], 'before', pivot, ARGV[2]);"
+ "return;"
+ "end;"
+ "redis.call('rpush', KEYS[1], ARGV[2]);", Arrays.<Object>asList(getRawName()), index, encodedValue));
return true;
} else {
return false;
}
} finally {
lock.unlock();
}
}
add操作需要先通过二分查找到对应的元素下标,并最终调用linsert进行插入
public BinarySearchResult<V> binarySearch(V value, Codec codec) {
int size = list.size();
int upperIndex = size - 1;
int lowerIndex = 0;
while (lowerIndex <= upperIndex) {
int index = lowerIndex + (upperIndex - lowerIndex) / 2;
V res = list.getValue(index);
if (res == null) {
return new BinarySearchResult<V>();
}
int cmp = comparator.compare(value, res);
if (cmp == 0) {
BinarySearchResult<V> indexRes = new BinarySearchResult<V>();
indexRes.setIndex(index);
return indexRes;
} else if (cmp < 0) {
upperIndex = index - 1;
} else {
lowerIndex = index + 1;
}
}
BinarySearchResult<V> indexRes = new BinarySearchResult<V>();
indexRes.setIndex(-(lowerIndex + 1));
return indexRes;
}
其二分查找的逻辑依赖于comparator进行定位,如果比较的值等于0,则认为找到了对应元素的下标。
因此,我们需要自定义其元素比较器,如果我们希望某个对象其值改变后可以覆盖掉旧的值,那么则不能讲该值纳入比较器的计算逻辑,否则会出现重复元素。
转载自:https://juejin.cn/post/7001789479920861220