redis集群不允许操作多个key解决方案、redis key负载均衡方案
在cluster redis 中进行同一个命令处理不同的key会报错:CROSSSLOT Keys in request don't hash to the same slot
,例如:
此示例使用sdiff
命令对pool_1
与pool_2
进行diff操作。
那么我们在业务场景中就需要将集群redis中的不同key进行操作,我们该如何处理呢?
本次的示例场景,也是我实际业务开发遇到的场景,希望可以给大家有一个借鉴地方。
场景介绍
我有五个数据池子存储在cluster redis
中,(命名为:pool_1,pool_2,pool_3,pool_4,pool_5
,类型为:Set
集合),其中各个池子存储的数据会有重叠;
用户获取的数据是由pool_1->pool_2....->pool_5
的顺序按照各个池子配置的数据比例获取对应数据个数,并且获取的数据是不允许出现相同数据的,并且不再返回之前返回过的数据。
好嘞,场景介绍完了,我说的可能比较绕...(写作小白,各位大佬请见谅)

我遇到的问题是什么呢?
我需要对不同的池子pool进行diff,那么就会遇到CROSSSLOT Keys in request don't hash to the same slot
这个报错问题。
CROSSSLOT Keys in request don't hash to the same slot
报错解释(知道的可以跳过了)
可能有些朋友对CROSSSLOT Keys in request don't hash to the same slot
的报错不是很清楚。
那很好,我来帮大家问问GPT吧。。。。
算了,GPT太官方了,我简单的描述一下吧,这个报错翻译成中文的意思就是:请求中的crosslot键不能散列到同一个槽
。
为什么会出现这个问题呢?
首先Redis集群会将键空间分成16384个槽(slot)(区间:0~16383),然后每个节点分配一定区间的slot值,并且Redis集群只能在一个节点上处理一个槽的所有键(通常的讲就是说:执行多个key执行操作时,只能操作slot值相同的key), 而我们在使用执行涉及多个键的操作(如 SDIFF
)时,这些键被分配到不同的槽(slot)上, 就会导致这个问题的出现。
我如何知道设计的key的slot值是多少?
通过 CLUSTER KEYSLOT {key}
命令可以得到对应的slot值,由此可见,pool_1,pool_2,pool_3,pool_4,pool_5
的slot值都不相同
我如何知道各个节点对应的slot值区间?
通过CLUSTER SLOTS
命令可以检查槽的分配情况(为什么会提出,需要知道各个节点slot区间呢?是为了负载均衡,后面会有解释)
解决这个问题其实很简单
使用 hashtag
机制:在添加键到 Redis 集群时,可以使用 {}
包围键名的部分来保证一致性哈希,使得相关的键都分配到同一个节点。例如: pool_1:{A}
和 pool_2:{A}
,这样可以确保它们存储在同一个节点上。
然后进行SDIFF
操作,就不会报错
对于数据量少的情况下,这个解决方案确实没什么问题,但是在数据量大的情况下就会产生性能问题。毕竟目前的解决方案是让他们只在一个hashtag
下,也就是说将压力全部给集群中的一个节点中,这个和单价版本redis有什么区别呢?所以下面就是如何解决负载均衡的问题。
redis集群 使用 hashtag
机制解决方案实现负载均衡问题
初始化操作:
第一步:获取各个节点对应的slot
值区间
第二步:生成随机hashtag
,让每个节点都有对应随机的hashtag
如图所示
核心代码
/**
* init Cluster Nodes And Slot
*
* @author yunnuo
* @date 2024-05-22
*/
@Slf4j
@Component
public class ClusterNodesAndSlotInitHandle {
/**
* node 与 slot 区间映射 mapping
*/
public static Map<String, String> NODE_SLOT_MAPPING;
/**
* node 与 hashtag key 映射 mapping
*/
public static Map<String, String> NODE_HASHTAG_KEY_MAPPING;
@Resource
private JedisCluster jedisCluster;
private final GetNodeFunction getNodeFunction = new GetNodeFunction();
@PostConstruct
void init() {
long start = System.currentTimeMillis();
log.info("ClusterNodesAndSlotInitHandle init start!");
Boolean flag = Boolean.TRUE;
try {
// 获取当前集群节点与 slot 区间映射
Map<String, String> nodeSlotMapping = initNodeSlotMapping();
// 获取之前存储在redis中的 集群节点与 pool key 映射
Map<String, String> nodeSlotMappingCacheMap = jedisCluster.hgetAll(RedisKey.NODE_SLOT_MAPPING.getPrefix());
Map<String, String> HashTagKeyMapMappingCacheMap = jedisCluster.hgetAll(RedisKey.NODE_HASHTAG_KEY_MAPPING.getPrefix());
if (MapUtil.isEmpty(nodeSlotMappingCacheMap)) {
Map<String, String> hashTagKeyMapMapping = initHashTagKeyMapMapping(nodeSlotMapping);
setInitCache(nodeSlotMapping, hashTagKeyMapMapping);
return;
}
// 判断集群节点是否有更新
if (Objects.equals(nodeSlotMappingCacheMap, nodeSlotMapping)) {
NODE_SLOT_MAPPING = nodeSlotMappingCacheMap;
NODE_HASHTAG_KEY_MAPPING = HashTagKeyMapMappingCacheMap;
// 更新缓存过期时间
jedisCluster.expire(RedisKey.NODE_SLOT_MAPPING.getPrefix(), RedisKey.NODE_SLOT_MAPPING.expireTime());
jedisCluster.expire(RedisKey.NODE_HASHTAG_KEY_MAPPING.getPrefix(), RedisKey.NODE_HASHTAG_KEY_MAPPING.expireTime());
} else {
Map<String, String> hashTagKeyMapMapping = initHashTagKeyMapMapping(nodeSlotMapping);
setInitCache(nodeSlotMapping, hashTagKeyMapMapping);
// todo: 通过发送消息进行异步处理,将老数据迁移到新的节点中,然后删除老数据,此处不再处理
}
} catch (Exception e) {
flag = Boolean.FALSE;
log.error("ClusterNodesAndSlotInitHandle init error! e:{}", e.getMessage(), e);
throw e;
} finally {
log.info("ClusterNodesAndSlotInitHandle init end! flag:{}, NODE_SLOT_MAPPING:{}, NODE_POOL_KEY_MAPPING:{} cost:{}ms", flag, JSONObject.toJSONString(NODE_SLOT_MAPPING), JSONObject.toJSONString(NODE_HASHTAG_KEY_MAPPING), System.currentTimeMillis() - start);
}
}
private void setInitCache(Map<String, String> nodeSlotMapping, Map<String, String> hashTagKeyMapMapping) {
jedisCluster.hmset(RedisKey.NODE_SLOT_MAPPING.getPrefix(), nodeSlotMapping);
jedisCluster.expire(RedisKey.NODE_SLOT_MAPPING.getPrefix(), RedisKey.NODE_SLOT_MAPPING.expireTime());
jedisCluster.hmset(RedisKey.NODE_HASHTAG_KEY_MAPPING.getPrefix(), hashTagKeyMapMapping);
jedisCluster.expire(RedisKey.NODE_HASHTAG_KEY_MAPPING.getPrefix(), RedisKey.NODE_HASHTAG_KEY_MAPPING.expireTime());
NODE_SLOT_MAPPING = nodeSlotMapping;
NODE_HASHTAG_KEY_MAPPING = hashTagKeyMapMapping;
}
Map<String, String> initNodeSlotMapping() {
Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
Map<String, String> nodeSlotMapping = new HashMap<>();
for (Map.Entry<String, JedisPool> entry : clusterNodes.entrySet()) {
Jedis jedisNode = entry.getValue().getResource();
List<Object> slots = jedisNode.clusterSlots();
for (Object slotInfo : slots) {
if (slotInfo == null) {
continue;
}
if (!(slotInfo instanceof List)){
log.error("initNodeSlotMapping 解析slotInfo失败! clusterNodes:{}, slotInfo:{}", JSONObject.toJSONString(clusterNodes), JSONObject.toJSONString(slotInfo));
throw new ServiceException(ErrorCode.SYSTEM_ERROR.getCode(), "解析slotInfo失败!");
}
List<Object> slot = (List<Object>) slotInfo;
long startSlot = NumberUtil.parseLong(slot.get(0).toString());
long endSlot = NumberUtil.parseLong(slot.get(1).toString());
if (!(slot.get(2) instanceof List)){
log.error("initNodeSlotMapping 解析slotInfo[2]失败! clusterNodes:{}, slotInfo:{}", JSONObject.toJSONString(clusterNodes), JSONObject.toJSONString(slotInfo));
throw new ServiceException(ErrorCode.SYSTEM_ERROR.getCode(), "解析slotInfo失败!");
}
List<Object> nodeInfo = (List<Object>) slot.get(2);
String nodeIp = new String((byte[]) nodeInfo.get(0));
long nodePort = NumberUtil.parseLong(nodeInfo.get(1).toString());
// key格式:nodeIp:nodePort, value 格式:startSlot,endSlot
nodeSlotMapping.put(String.format("%s:%s", nodeIp, nodePort), String.format("%s,%s", startSlot, endSlot));
}
}
log.info("ClusterNodesAndSlotInitHandle init nodeSlotMapping:{}", JSONObject.toJSONString(nodeSlotMapping));
return nodeSlotMapping;
}
private Map<String, String> initHashTagKeyMapMapping(Map<String, String> nodeSlotMapping) {
Map<String, String> hashTagKeyMap = randomNodeHashTagKey(nodeSlotMapping);
log.info("ClusterNodesAndSlotInitHandle init hashTagKeyMap:{}, nodeSlotMapping:{}", JSONObject.toJSONString(hashTagKeyMap), JSONObject.toJSONString(hashTagKeyMap));
NODE_HASHTAG_KEY_MAPPING = hashTagKeyMap;
return hashTagKeyMap;
}
/**
* 随机生成 分布在不同node 的 hashtag key
*
* @param nodeSlotMapping node slot mapping
* @return node 与 hashtag key 映射
*/
private Map<String, String> randomNodeHashTagKey(Map<String, String> nodeSlotMapping) {
Map<String, String> nodeHashTagKey = new HashMap<>(nodeSlotMapping.size());
for (int i = 1; i < 10000; i++) {
String key = String.format("{%s}", i + "_" + RandomUtil.randomString(4));
String node = getNodeFunction.apply(key, nodeSlotMapping);
if (node == null) {
continue;
}
if (!nodeHashTagKey.containsKey(node)) {
nodeHashTagKey.put(node, key);
}
if (nodeHashTagKey.size() == nodeSlotMapping.size()) {
break;
}
}
if (nodeHashTagKey.size() != nodeSlotMapping.size()) {
log.error("randomNodeHashTagKey failed 请调整随机函数! nodeSlotMapping:{}", JSONObject.toJSONString(nodeSlotMapping));
throw new ServiceException(ErrorCode.SYSTEM_ERROR.getCode(), "nodeHashTagKey.size() != nodeSlotMapping.size()! 请调整随机函数!");
}
return nodeHashTagKey;
}
}
存储redis数据
对pool池数据进行分层(分片sharding),避免大key问题,导致性能问题
大概意思就是,如我的
pool_1
池子有100万条数据,如果只存储在一个key
中,那么对于redis
来说是一个大key问题,会导致性能问题,我对pool_1
池子中的100万
条数据,更加分数计算,分片存储到(pool_1:A,pool_1:B,pool_1:C....
)中,获取数据同理可得,挨着顺序获取或者如果没有分数要求,随机获取即可。
核心代码
使用策略+工厂设计模式实现
数据池类型枚举类
用于定义各个池子的名称、存储数量大小限制、过期时间等信息
/**
* 数据池类型枚举
*
* @author yunnuo
* @date 2024-05-22
*/
public enum PoolType {
// 数据池(pool_1,pool_2,pool_3,pool_4,pool_5)
POOL_1("pool_1", 8, 10000, 60 * 60L),
POOL_2("pool_2", 8, 20000, 60 * 60L),
POOL_3("pool_3", 8, 10000, 60 * 60L),
POOL_4("pool_4", 6, 100000, 60 * 60L),
POOL_5("pool_5", 6, 5000, 60 * 60L),
;
/**
* 名称
*/
public final String name;
/**
* 获取数量限制
*/
public final Integer pullNumberLimit;
/**
* 池子数据大小限制(== 防止池子过大导致性能问题 ==)
*/
public final Integer saveSizeLimit;
/**
* 池子过期时间
*/
public final long expire;
PoolType(String name, Integer pullNumberLimit, Integer saveSizeLimit, long expire) {
this.name = name;
this.pullNumberLimit = pullNumberLimit;
this.saveSizeLimit = saveSizeLimit;
this.expire = expire;
}
}
pool 数据池 分片策略类(实现负载均衡)
通过定义一个规范的接口,各个池子有不同的实现逻辑可以实现该接口,比如我这定义的方法有:获取对应池子sharding出来的对应redis key
/**
* pool 数据池 分片策略(实现负载均衡)
*
* @author yunnuo
* @date 2024-05-22
*/
public interface PoolShardingKeyStrategy {
/**
* 获取 分片 key
* @return 返回分片 key
*/
List<String> getShardingKeys();
/**
* 通过分数获取指定的数据池分片类型
* @param score 分数
* @return 数据池分片类型
*/
String getShardingKey(Integer score);
}
示例:具体pool_1 池子 分片 策略实现类
POOL_1 分片枚举类,用于定义POOL_1池子的分片个数、分片到哪个key中、保存数量限制等信息
/**
* POOL_1 分片枚举
*
* @author yunnuo
* @date 2024-05-22
*/
public enum Pool1Sharding {
POOL_1_A(PoolShardingType.A, 8000, 10000, 400000),
POOL_1_B(PoolShardingType.B, 5000, 7999, 300000),
POOL_1_C(PoolShardingType.C, 0, 4999, 200000),
;
/**
* 分片类型
*/
public final PoolShardingType type;
/**
* 分片最小值
*/
public final Integer limitMinScore;
/**
* 分片最大值
*/
public final Integer limitMaxScore;
/**
* 保存数量限制
*/
public final Integer saveSizeLimit;
Pool1Sharding(PoolShardingType type, Integer limitMinScore, Integer limitMaxScore, Integer saveSizeLimit) {
this.type = type;
this.limitMinScore = limitMinScore;
this.limitMaxScore = limitMaxScore;
this.saveSizeLimit = saveSizeLimit;
}
public static PoolShardingType getSharding(Integer score) {
for (Pool1Sharding poolSharding : Pool1Sharding.values()) {
if (score >= poolSharding.limitMinScore && score <= poolSharding.limitMaxScore) {
return poolSharding.type;
}
}
return null;
}
}
pool_1 池子 分片 策略实现
/**
* pool_1 池子 分片 策略实现
*
* @author yunnuo <a href="2552846359@qq.com">Email: 2552846359@qq.com</a>
* @date 2024-05-22
*/
public class Pool1ShardingKeyStrategy implements PoolShardingKeyStrategy {
@Override
public List<String> getShardingKeys() {
return Arrays.stream(Pool1Sharding.values()).map(item ->
RedisKey.DATA_POOL.makeRedisKey(PoolType.POOL_1.name, item.type.toString())).collect(Collectors.toList());
}
@Override
public String getShardingKey(Integer score) {
PoolShardingType sharding = Pool1Sharding.getSharding(score);
if (sharding == null){
return null;
}
return RedisKey.DATA_POOL.makeRedisKey(PoolType.POOL_1.name, sharding.toString());
}
}
数据池 分片策略工厂
将所有实现对应分片策略的Strategy,封装到一个工厂中,外部访问统一通过工厂进行获取
/**
* 数据池 分片策略工厂
*
* @author yunnuo
* @date 2024-02-22
*/
@Slf4j
@Component
public class PoolShardingKeyFactory {
private final Map<PoolType, PoolShardingKeyStrategy> POOL_MAPPING;
public PoolShardingKeyFactory() {
POOL_MAPPING = init();
}
public HashMap<PoolType, PoolShardingKeyStrategy> init() {
HashMap<PoolType, PoolShardingKeyStrategy> map = new HashMap<>(PoolType.values().length);
map.put(PoolType.POOL_1, new Pool1ShardingKeyStrategy());
map.put(PoolType.POOL_2, new Pool2ShardingKeyStrategy());
map.put(PoolType.POOL_3, new Pool3ShardingKeyStrategy());
map.put(PoolType.POOL_4, new Pool4ShardingKeyStrategy());
map.put(PoolType.POOL_5, new Pool5ShardingKeyStrategy());
return map;
}
public PoolShardingKeyStrategy getPoolShardingKeyStrategy(PoolType poolType) {
if (!POOL_MAPPING.containsKey(poolType)) {
log.warn("PoolShardingKeyFactory getPoolShardingKeyStrategy not found mapping PoolType PoolShardingKeyStrategy! PoolType:{}", poolType);
throw new ServiceException(ErrorCode.PARAM_ERROR.getCode(), "not found mapping PoolType PoolShardingKeyStrategy!");
}
return POOL_MAPPING.get(poolType);
}
}
存储数据到redis时,同步数据到各个hashtag对应的pool sharding key
图示
Test 代码
/**
* 模拟pool 数据存储到集群redis和同步到各个hashtag key 测试
*
* @author yunnuo
* @date 2024-05-22
*/
@Slf4j
@Component
public class PoolDataPushDemo {
@Resource
private PoolShardingKeyFactory poolShardingKeyFactory;
@Resource
private JedisClusterCommands jedisClusterCommands;
@PostConstruct
public void exec() {
log.info("PoolDataPushDemo start!");
long start = System.currentTimeMillis();
for (PoolType poolType : PoolType.values()) {
PoolShardingKeyStrategy poolShardingKeyStrategy = poolShardingKeyFactory.getPoolShardingKeyStrategy(poolType);
Map<String, Set<String>> map = new HashMap<>(5);
for (int i = 0; i < 10000; i++) {
// 模拟数据
String member = RandomUtil.randomString(5);
// 模拟分数(用于计算分片到那个sharding key中,实现负载均衡)
int randomScore = RandomUtil.randomInt(0, 10000);
String shardingKey = poolShardingKeyStrategy.getShardingKey(randomScore);
if (shardingKey != null) {
Set<String> sets = Objects.isNull(map.get(shardingKey)) ? new HashSet<>() : map.get(shardingKey);
sets.add(member);
map.put(shardingKey, sets);
}
if (!map.isEmpty()) {
map.forEach((k, v) -> {
jedisClusterCommands.sadd(k, v.toArray(new String[0]));
jedisClusterCommands.expire(k, poolType.expire);
// 数据同步(fixme: 本示例的数据同步采用直接方法调用,仅供测试,实际同步可参考mq消息方案同步)
sync(poolType, k);
});
}
}
}
log.info("PoolDataPushDemo end! cost:{}ms", System.currentTimeMillis() - start);
}
public void sync(PoolType poolType, String poolShardingKey){
// 各个节点对应的hashtag 映射
Map<String, String> map = ClusterNodesAndSlotInitHandle.NODE_HASHTAG_KEY_MAPPING;
List<String> hashTagRedisKeys = new ArrayList<>();
// 将sharding key 同步到 各个 hashtag key中
map.values().forEach(hashtag -> {
String redisKey = PoolShardingKeyUtil.getHashtagPoolShardingKey(poolShardingKey, hashtag);
String[] members = jedisClusterCommands.smembers(poolShardingKey).toArray(new String[0]);
if (members.length == 0){
return;
}
if (jedisClusterCommands.exists(redisKey)){
jedisClusterCommands.del(redisKey);
jedisClusterCommands.sadd(redisKey, members);
}else {
jedisClusterCommands.sadd(redisKey, members);
}
jedisClusterCommands.expire(redisKey, poolType.expire);
hashTagRedisKeys.add(redisKey);
});
log.info("pool data sync poolShardingKey-> {}, poolType-> {}, 同步的对应hashtag keys:{}", poolShardingKey, poolType, JSONObject.toJSONString(hashTagRedisKeys));
}
}
获取pool数据,通过用户的uid实现hashtag计算分片实现负载均衡
图示
核心代码
/**
* pool demo 实现类
*
* @author yunnuo <a href="2552846359@qq.com">Email: 2552846359@qq.com</a>
* @date 2024-05-22
*/
@Slf4j
@Service
public class PoolDemoServiceImpl implements PoolDemoService {
@Resource
private PoolShardingKeyFactory factory;
@Resource
private SlotHashTagHandler slotHashTagHandler;
@Resource
private JedisCluster jedisCluster;
@Resource
private RedisTemplate<String, String> redisTemplate;
private static final Integer USER_MAX_PULL_LIMIT = 20;
@Override
public Set<String> getPoolData(Long uid) {
Set<String> poolData = new HashSet<>();
// 通过uid计算slot 找到对应的hashtag
String hashTagKey = slotHashTagHandler.getHashTag(uid);
// 获取用户已获取的pool数据
String userAcquiredPoolDataListKey = RedisKey.USER_ACQUIRED_POOL_DATA_LIST.makeRedisKey(uid, hashTagKey);
try {
// 对池子按照枚举类型顺序获取
for (PoolType poolType : PoolType.values()) {
PoolShardingKeyStrategy poolShardingKeyStrategy = factory.getPoolShardingKeyStrategy(poolType);
Set<String> tempData = sdiffPoolData(poolType, poolShardingKeyStrategy, hashTagKey, userAcquiredPoolDataListKey, poolData.size());
if (CollUtil.isNotEmpty(tempData)) {
poolData.addAll(tempData);
}
if (poolData.size() >= USER_MAX_PULL_LIMIT) {
break;
}
}
} catch (Exception e) {
log.error("getPoolData error! uid:{}, e:{}", uid, e.getMessage(), e);
// 移除本次用户累计拉取数据
if (CollUtil.isNotEmpty(poolData)) {
jedisCluster.srem(userAcquiredPoolDataListKey, poolData.toArray(new String[0]));
}
throw e;
}
log.info("getPoolData uid:{}, hashTagKey:{}, res:{}", uid, hashTagKey, JSONObject.toJSONString(poolData));
return poolData;
}
/**
* 对数据池和用户累计拉取数据集合进行sdiff操作
*
* @param poolType 池子类型
* @param poolShardingKeyStrategy 池子分片key策略
* @param hashTagKey hashtag key
* @param userAcquiredPoolDataListKey 用户累计拉取数据集合 redis key
* @param size 当前用户累计拉取数据数量
* @return 当前池子进行diff操作后的数据集合
*/
private Set<String> sdiffPoolData(PoolType poolType, PoolShardingKeyStrategy poolShardingKeyStrategy, String hashTagKey, String userAcquiredPoolDataListKey, int size) {
Set<String> res = new HashSet<>();
// 按照各个池子进行分片后的redis存在的 hashTagKey
List<String> redisKeys = poolShardingKeyStrategy.getShardingKeys().stream()
.map(poolShardingKey -> PoolShardingKeyUtil.getHashtagPoolShardingKey(poolShardingKey, hashTagKey))
.filter(jedisCluster::exists).collect(Collectors.toList());
// 进行diff操作
for (String redisKey : redisKeys) {
Optional.ofNullable(jedisCluster.sdiff(redisKey, userAcquiredPoolDataListKey)).ifPresent(res::addAll);
if (res.size() >= poolType.pullNumberLimit) {
break;
}
}
int limit = Math.min(poolType.pullNumberLimit, USER_MAX_PULL_LIMIT - size);
// 随机获取指定个数的数据
BiFunction<Set<String>, Integer, Set<String>> shuffleFunction = (set, numValues) -> set.stream().limit(Math.min(numValues, set.size()))
.collect(Collectors.toSet());
if (res.isEmpty() || res.size() <= limit) {
return res;
}
return shuffleFunction.apply(res, limit);
}
}
具体源码我已经放入到Git Hub和Gitee上了
Git Hub:github.com/ukayunnuo/s…
Gitee:gitee.com/linyunnuo/s…
转载自:https://juejin.cn/post/7371753324154519552