likes
comments
collection
share

redis集群不允许操作多个key解决方案、redis key负载均衡方案

作者站长头像
站长
· 阅读数 29

在cluster redis 中进行同一个命令处理不同的key会报错:CROSSSLOT Keys in request don't hash to the same slot,例如:

redis集群不允许操作多个key解决方案、redis key负载均衡方案

此示例使用sdiff 命令对pool_1pool_2进行diff操作。

那么我们在业务场景中就需要将集群redis中的不同key进行操作,我们该如何处理呢?

本次的示例场景,也是我实际业务开发遇到的场景,希望可以给大家有一个借鉴地方。

场景介绍

我有五个数据池子存储在cluster redis中,(命名为:pool_1,pool_2,pool_3,pool_4,pool_5,类型为:Set集合),其中各个池子存储的数据会有重叠;

用户获取的数据是由pool_1->pool_2....->pool_5的顺序按照各个池子配置的数据比例获取对应数据个数,并且获取的数据是不允许出现相同数据的,并且不再返回之前返回过的数据。

好嘞,场景介绍完了,我说的可能比较绕...(写作小白,各位大佬请见谅)

redis集群不允许操作多个key解决方案、redis key负载均衡方案

我遇到的问题是什么呢?

我需要对不同的池子pool进行diff,那么就会遇到CROSSSLOT Keys in request don't hash to the same slot这个报错问题。

CROSSSLOT Keys in request don't hash to the same slot报错解释(知道的可以跳过了)

可能有些朋友对CROSSSLOT Keys in request don't hash to the same slot的报错不是很清楚。

那很好,我来帮大家问问GPT吧。。。。

算了,GPT太官方了,我简单的描述一下吧,这个报错翻译成中文的意思就是:请求中的crosslot键不能散列到同一个槽

为什么会出现这个问题呢?

首先Redis集群会将键空间分成16384个槽(slot)(区间:0~16383),然后每个节点分配一定区间的slot值,并且Redis集群只能在一个节点上处理一个槽的所有键(通常的讲就是说:执行多个key执行操作时,只能操作slot值相同的key), 而我们在使用执行涉及多个键的操作(如 SDIFF )时,这些键被分配到不同的槽(slot)上, 就会导致这个问题的出现。

我如何知道设计的key的slot值是多少?

通过 CLUSTER KEYSLOT {key} 命令可以得到对应的slot值,由此可见,pool_1,pool_2,pool_3,pool_4,pool_5的slot值都不相同

redis集群不允许操作多个key解决方案、redis key负载均衡方案

我如何知道各个节点对应的slot值区间?

通过CLUSTER SLOTS命令可以检查槽的分配情况(为什么会提出,需要知道各个节点slot区间呢?是为了负载均衡,后面会有解释)

redis集群不允许操作多个key解决方案、redis key负载均衡方案

解决这个问题其实很简单

使用 hashtag 机制:在添加键到 Redis 集群时,可以使用 {} 包围键名的部分来保证一致性哈希,使得相关的键都分配到同一个节点。例如: pool_1:{A} 和 pool_2:{A},这样可以确保它们存储在同一个节点上。

redis集群不允许操作多个key解决方案、redis key负载均衡方案

然后进行SDIFF操作,就不会报错

redis集群不允许操作多个key解决方案、redis key负载均衡方案

对于数据量少的情况下,这个解决方案确实没什么问题,但是在数据量大的情况下就会产生性能问题。毕竟目前的解决方案是让他们只在一个hashtag下,也就是说将压力全部给集群中的一个节点中,这个和单价版本redis有什么区别呢?所以下面就是如何解决负载均衡的问题。

redis集群 使用 hashtag 机制解决方案实现负载均衡问题

初始化操作:

第一步:获取各个节点对应的slot值区间

第二步:生成随机hashtag,让每个节点都有对应随机的hashtag

如图所示 redis集群不允许操作多个key解决方案、redis key负载均衡方案

核心代码

/**
 * init Cluster Nodes And Slot
 *
 * @author yunnuo
 * @date 2024-05-22
 */
@Slf4j
@Component
public class ClusterNodesAndSlotInitHandle {

    /**
     * node 与 slot 区间映射 mapping
     */
    public static Map<String, String> NODE_SLOT_MAPPING;

    /**
     * node 与 hashtag key 映射 mapping
     */
    public static Map<String, String> NODE_HASHTAG_KEY_MAPPING;

    @Resource
    private JedisCluster jedisCluster;

    private final GetNodeFunction getNodeFunction = new GetNodeFunction();

    @PostConstruct
    void init() {
        long start = System.currentTimeMillis();
        log.info("ClusterNodesAndSlotInitHandle init start!");
        Boolean flag = Boolean.TRUE;
        try {

            // 获取当前集群节点与 slot 区间映射
            Map<String, String> nodeSlotMapping = initNodeSlotMapping();

            // 获取之前存储在redis中的 集群节点与 pool key 映射
            Map<String, String> nodeSlotMappingCacheMap = jedisCluster.hgetAll(RedisKey.NODE_SLOT_MAPPING.getPrefix());
            Map<String, String> HashTagKeyMapMappingCacheMap = jedisCluster.hgetAll(RedisKey.NODE_HASHTAG_KEY_MAPPING.getPrefix());

            if (MapUtil.isEmpty(nodeSlotMappingCacheMap)) {
                Map<String, String> hashTagKeyMapMapping = initHashTagKeyMapMapping(nodeSlotMapping);
                setInitCache(nodeSlotMapping, hashTagKeyMapMapping);
                return;
            }

            // 判断集群节点是否有更新
            if (Objects.equals(nodeSlotMappingCacheMap, nodeSlotMapping)) {
                NODE_SLOT_MAPPING = nodeSlotMappingCacheMap;
                NODE_HASHTAG_KEY_MAPPING = HashTagKeyMapMappingCacheMap;
                // 更新缓存过期时间
                jedisCluster.expire(RedisKey.NODE_SLOT_MAPPING.getPrefix(), RedisKey.NODE_SLOT_MAPPING.expireTime());
                jedisCluster.expire(RedisKey.NODE_HASHTAG_KEY_MAPPING.getPrefix(), RedisKey.NODE_HASHTAG_KEY_MAPPING.expireTime());
            } else {
                Map<String, String> hashTagKeyMapMapping = initHashTagKeyMapMapping(nodeSlotMapping);
                setInitCache(nodeSlotMapping, hashTagKeyMapMapping);
                // todo: 通过发送消息进行异步处理,将老数据迁移到新的节点中,然后删除老数据,此处不再处理
            }
        } catch (Exception e) {
            flag = Boolean.FALSE;
            log.error("ClusterNodesAndSlotInitHandle init error! e:{}", e.getMessage(), e);
            throw e;
        } finally {
            log.info("ClusterNodesAndSlotInitHandle init end! flag:{}, NODE_SLOT_MAPPING:{}, NODE_POOL_KEY_MAPPING:{} cost:{}ms", flag, JSONObject.toJSONString(NODE_SLOT_MAPPING), JSONObject.toJSONString(NODE_HASHTAG_KEY_MAPPING), System.currentTimeMillis() - start);
        }
    }

    private void setInitCache(Map<String, String> nodeSlotMapping, Map<String, String> hashTagKeyMapMapping) {
        jedisCluster.hmset(RedisKey.NODE_SLOT_MAPPING.getPrefix(), nodeSlotMapping);
        jedisCluster.expire(RedisKey.NODE_SLOT_MAPPING.getPrefix(), RedisKey.NODE_SLOT_MAPPING.expireTime());

        jedisCluster.hmset(RedisKey.NODE_HASHTAG_KEY_MAPPING.getPrefix(), hashTagKeyMapMapping);
        jedisCluster.expire(RedisKey.NODE_HASHTAG_KEY_MAPPING.getPrefix(), RedisKey.NODE_HASHTAG_KEY_MAPPING.expireTime());

        NODE_SLOT_MAPPING = nodeSlotMapping;
        NODE_HASHTAG_KEY_MAPPING = hashTagKeyMapMapping;
    }

    Map<String, String> initNodeSlotMapping() {
        Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
        Map<String, String> nodeSlotMapping = new HashMap<>();
        for (Map.Entry<String, JedisPool> entry : clusterNodes.entrySet()) {
            Jedis jedisNode = entry.getValue().getResource();
            List<Object> slots = jedisNode.clusterSlots();
            for (Object slotInfo : slots) {
                if (slotInfo == null) {
                    continue;
                }
                if (!(slotInfo instanceof List)){
                    log.error("initNodeSlotMapping 解析slotInfo失败! clusterNodes:{}, slotInfo:{}", JSONObject.toJSONString(clusterNodes), JSONObject.toJSONString(slotInfo));
                    throw new ServiceException(ErrorCode.SYSTEM_ERROR.getCode(), "解析slotInfo失败!");
                }
                List<Object> slot = (List<Object>) slotInfo;
                long startSlot = NumberUtil.parseLong(slot.get(0).toString());
                long endSlot = NumberUtil.parseLong(slot.get(1).toString());

                if (!(slot.get(2) instanceof List)){
                    log.error("initNodeSlotMapping 解析slotInfo[2]失败! clusterNodes:{}, slotInfo:{}", JSONObject.toJSONString(clusterNodes), JSONObject.toJSONString(slotInfo));
                    throw new ServiceException(ErrorCode.SYSTEM_ERROR.getCode(), "解析slotInfo失败!");
                }
                List<Object> nodeInfo = (List<Object>) slot.get(2);
                String nodeIp = new String((byte[]) nodeInfo.get(0));
                long nodePort = NumberUtil.parseLong(nodeInfo.get(1).toString());
                // key格式:nodeIp:nodePort, value 格式:startSlot,endSlot
                nodeSlotMapping.put(String.format("%s:%s", nodeIp, nodePort), String.format("%s,%s", startSlot, endSlot));
            }
        }
        log.info("ClusterNodesAndSlotInitHandle init nodeSlotMapping:{}", JSONObject.toJSONString(nodeSlotMapping));
        return nodeSlotMapping;
    }

    private Map<String, String> initHashTagKeyMapMapping(Map<String, String> nodeSlotMapping) {
        Map<String, String> hashTagKeyMap = randomNodeHashTagKey(nodeSlotMapping);
        log.info("ClusterNodesAndSlotInitHandle init hashTagKeyMap:{}, nodeSlotMapping:{}", JSONObject.toJSONString(hashTagKeyMap), JSONObject.toJSONString(hashTagKeyMap));
        NODE_HASHTAG_KEY_MAPPING = hashTagKeyMap;
        return hashTagKeyMap;
    }

    /**
     * 随机生成 分布在不同node 的 hashtag key
     *
     * @param nodeSlotMapping node slot mapping
     * @return node 与 hashtag key 映射
     */
    private Map<String, String> randomNodeHashTagKey(Map<String, String> nodeSlotMapping) {
        Map<String, String> nodeHashTagKey = new HashMap<>(nodeSlotMapping.size());
        for (int i = 1; i < 10000; i++) {
            String key = String.format("{%s}", i + "_" + RandomUtil.randomString(4));
            String node = getNodeFunction.apply(key, nodeSlotMapping);
            if (node == null) {
                continue;
            }
            if (!nodeHashTagKey.containsKey(node)) {
                nodeHashTagKey.put(node, key);
            }
            if (nodeHashTagKey.size() == nodeSlotMapping.size()) {
                break;
            }
        }
        if (nodeHashTagKey.size() != nodeSlotMapping.size()) {
            log.error("randomNodeHashTagKey failed 请调整随机函数! nodeSlotMapping:{}", JSONObject.toJSONString(nodeSlotMapping));
            throw new ServiceException(ErrorCode.SYSTEM_ERROR.getCode(), "nodeHashTagKey.size() != nodeSlotMapping.size()! 请调整随机函数!");
        }
        return nodeHashTagKey;
    }

}

存储redis数据

对pool池数据进行分层(分片sharding),避免大key问题,导致性能问题

大概意思就是,如我的pool_1池子有100万条数据,如果只存储在一个key中,那么对于redis来说是一个大key问题,会导致性能问题,我对pool_1池子中的100万条数据,更加分数计算,分片存储到(pool_1:A,pool_1:B,pool_1:C....)中,获取数据同理可得,挨着顺序获取或者如果没有分数要求,随机获取即可。

核心代码

使用策略+工厂设计模式实现

数据池类型枚举类

用于定义各个池子的名称、存储数量大小限制、过期时间等信息

/**
 * 数据池类型枚举
 *
 * @author yunnuo
 * @date 2024-05-22
 */
public enum PoolType {

    // 数据池(pool_1,pool_2,pool_3,pool_4,pool_5)
    POOL_1("pool_1", 8, 10000, 60 * 60L),

    POOL_2("pool_2", 8, 20000, 60 * 60L),

    POOL_3("pool_3", 8, 10000, 60 * 60L),

    POOL_4("pool_4", 6, 100000, 60 * 60L),

    POOL_5("pool_5", 6, 5000, 60 * 60L),
    ;

    /**
     * 名称
     */
    public final String name;

    /**
     * 获取数量限制
     */
    public final Integer pullNumberLimit;

    /**
     * 池子数据大小限制(== 防止池子过大导致性能问题 ==)
     */
    public final Integer saveSizeLimit;

    /**
     * 池子过期时间
     */
    public final long expire;


    PoolType(String name, Integer pullNumberLimit, Integer saveSizeLimit, long expire) {
        this.name = name;
        this.pullNumberLimit = pullNumberLimit;
        this.saveSizeLimit = saveSizeLimit;
        this.expire = expire;
    }
}
pool 数据池 分片策略类(实现负载均衡)

通过定义一个规范的接口,各个池子有不同的实现逻辑可以实现该接口,比如我这定义的方法有:获取对应池子sharding出来的对应redis key

/**
 * pool 数据池 分片策略(实现负载均衡)
 *
 * @author yunnuo
 * @date 2024-05-22
 */
public interface PoolShardingKeyStrategy {

    /**
     * 获取 分片 key
     * @return 返回分片 key
     */
    List<String> getShardingKeys();

    /**
     * 通过分数获取指定的数据池分片类型
     * @param score 分数
     * @return 数据池分片类型
     */
    String getShardingKey(Integer score);

}
示例:具体pool_1 池子 分片 策略实现类

POOL_1 分片枚举类,用于定义POOL_1池子的分片个数、分片到哪个key中、保存数量限制等信息

/**
 * POOL_1 分片枚举
 *
 * @author yunnuo
 * @date 2024-05-22
 */
public enum Pool1Sharding {

    POOL_1_A(PoolShardingType.A, 8000, 10000, 400000),

    POOL_1_B(PoolShardingType.B, 5000, 7999, 300000),

    POOL_1_C(PoolShardingType.C, 0, 4999, 200000),

    ;

    /**
     * 分片类型
     */
    public final PoolShardingType type;

    /**
     * 分片最小值
     */
    public final Integer limitMinScore;

    /**
     * 分片最大值
     */
    public final Integer limitMaxScore;

    /**
     * 保存数量限制
     */
    public final Integer saveSizeLimit;

    Pool1Sharding(PoolShardingType type, Integer limitMinScore, Integer limitMaxScore, Integer saveSizeLimit) {
        this.type = type;
        this.limitMinScore = limitMinScore;
        this.limitMaxScore = limitMaxScore;
        this.saveSizeLimit = saveSizeLimit;
    }

    public static PoolShardingType getSharding(Integer score) {
        for (Pool1Sharding poolSharding : Pool1Sharding.values()) {
            if (score >= poolSharding.limitMinScore && score <= poolSharding.limitMaxScore) {
                return poolSharding.type;
            }
        }
        return null;
    }

}

pool_1 池子 分片 策略实现

/**
 * pool_1 池子 分片 策略实现
 *
 * @author yunnuo <a href="2552846359@qq.com">Email: 2552846359@qq.com</a>
 * @date 2024-05-22
 */
public class Pool1ShardingKeyStrategy implements PoolShardingKeyStrategy {

    @Override
    public List<String> getShardingKeys() {
        return Arrays.stream(Pool1Sharding.values()).map(item ->
                RedisKey.DATA_POOL.makeRedisKey(PoolType.POOL_1.name, item.type.toString())).collect(Collectors.toList());
    }

    @Override
    public String getShardingKey(Integer score) {
        PoolShardingType sharding = Pool1Sharding.getSharding(score);
        if (sharding == null){
            return null;
        }
        return RedisKey.DATA_POOL.makeRedisKey(PoolType.POOL_1.name, sharding.toString());
    }
}
数据池 分片策略工厂

将所有实现对应分片策略的Strategy,封装到一个工厂中,外部访问统一通过工厂进行获取

/**
 * 数据池 分片策略工厂
 *
 * @author yunnuo
 * @date 2024-02-22
 */
@Slf4j
@Component
public class PoolShardingKeyFactory {

    private final Map<PoolType, PoolShardingKeyStrategy> POOL_MAPPING;

    public PoolShardingKeyFactory() {
        POOL_MAPPING = init();
    }

    public HashMap<PoolType, PoolShardingKeyStrategy> init() {
        HashMap<PoolType, PoolShardingKeyStrategy> map = new HashMap<>(PoolType.values().length);
        map.put(PoolType.POOL_1, new Pool1ShardingKeyStrategy());
        map.put(PoolType.POOL_2, new Pool2ShardingKeyStrategy());
        map.put(PoolType.POOL_3, new Pool3ShardingKeyStrategy());
        map.put(PoolType.POOL_4, new Pool4ShardingKeyStrategy());
        map.put(PoolType.POOL_5, new Pool5ShardingKeyStrategy());
        return map;
    }

    public PoolShardingKeyStrategy getPoolShardingKeyStrategy(PoolType poolType) {
        if (!POOL_MAPPING.containsKey(poolType)) {
            log.warn("PoolShardingKeyFactory getPoolShardingKeyStrategy not found mapping PoolType PoolShardingKeyStrategy! PoolType:{}", poolType);
            throw new ServiceException(ErrorCode.PARAM_ERROR.getCode(), "not found mapping PoolType PoolShardingKeyStrategy!");
        }
        return POOL_MAPPING.get(poolType);
    }

}

存储数据到redis时,同步数据到各个hashtag对应的pool sharding key

图示 redis集群不允许操作多个key解决方案、redis key负载均衡方案

Test 代码

/**
 * 模拟pool 数据存储到集群redis和同步到各个hashtag key 测试
 *
 * @author yunnuo
 * @date 2024-05-22
 */
@Slf4j
@Component
public class PoolDataPushDemo {

    @Resource
    private PoolShardingKeyFactory poolShardingKeyFactory;

    @Resource
    private JedisClusterCommands jedisClusterCommands;


    @PostConstruct
    public void exec() {
        log.info("PoolDataPushDemo start!");
        long start = System.currentTimeMillis();
        for (PoolType poolType : PoolType.values()) {
            PoolShardingKeyStrategy poolShardingKeyStrategy = poolShardingKeyFactory.getPoolShardingKeyStrategy(poolType);
            Map<String, Set<String>> map = new HashMap<>(5);
            for (int i = 0; i < 10000; i++) {
                // 模拟数据
                String member = RandomUtil.randomString(5);
                // 模拟分数(用于计算分片到那个sharding key中,实现负载均衡)
                int randomScore = RandomUtil.randomInt(0, 10000);
                String shardingKey = poolShardingKeyStrategy.getShardingKey(randomScore);
                if (shardingKey != null) {
                    Set<String> sets = Objects.isNull(map.get(shardingKey)) ? new HashSet<>() : map.get(shardingKey);
                    sets.add(member);
                    map.put(shardingKey, sets);
                }
                if (!map.isEmpty()) {
                    map.forEach((k, v) -> {
                        jedisClusterCommands.sadd(k, v.toArray(new String[0]));
                        jedisClusterCommands.expire(k, poolType.expire);
                        // 数据同步(fixme: 本示例的数据同步采用直接方法调用,仅供测试,实际同步可参考mq消息方案同步)
                        sync(poolType, k);
                    });
                }
            }
        }
        log.info("PoolDataPushDemo end! cost:{}ms", System.currentTimeMillis() - start);
    }

    public void sync(PoolType poolType, String poolShardingKey){
        // 各个节点对应的hashtag 映射
        Map<String, String> map = ClusterNodesAndSlotInitHandle.NODE_HASHTAG_KEY_MAPPING;
        List<String> hashTagRedisKeys = new ArrayList<>();
        // 将sharding key 同步到 各个 hashtag key中
        map.values().forEach(hashtag -> {
            String redisKey = PoolShardingKeyUtil.getHashtagPoolShardingKey(poolShardingKey, hashtag);
            String[] members = jedisClusterCommands.smembers(poolShardingKey).toArray(new String[0]);
            if (members.length == 0){
                return;
            }
            if (jedisClusterCommands.exists(redisKey)){
                jedisClusterCommands.del(redisKey);
                jedisClusterCommands.sadd(redisKey, members);
            }else {
                jedisClusterCommands.sadd(redisKey, members);
            }
            jedisClusterCommands.expire(redisKey, poolType.expire);
            hashTagRedisKeys.add(redisKey);
        });
        log.info("pool data sync poolShardingKey-> {}, poolType-> {}, 同步的对应hashtag keys:{}", poolShardingKey, poolType, JSONObject.toJSONString(hashTagRedisKeys));
    }
}

获取pool数据,通过用户的uid实现hashtag计算分片实现负载均衡

图示 redis集群不允许操作多个key解决方案、redis key负载均衡方案

核心代码

/**
 * pool demo 实现类
 *
 * @author yunnuo <a href="2552846359@qq.com">Email: 2552846359@qq.com</a>
 * @date 2024-05-22
 */
@Slf4j
@Service
public class PoolDemoServiceImpl implements PoolDemoService {


    @Resource
    private PoolShardingKeyFactory factory;

    @Resource
    private SlotHashTagHandler slotHashTagHandler;

    @Resource
    private JedisCluster jedisCluster;

    @Resource
    private RedisTemplate<String, String> redisTemplate;

    private static final Integer USER_MAX_PULL_LIMIT = 20;


    @Override
    public Set<String> getPoolData(Long uid) {
        Set<String> poolData = new HashSet<>();

        // 通过uid计算slot 找到对应的hashtag
        String hashTagKey = slotHashTagHandler.getHashTag(uid);

        // 获取用户已获取的pool数据
        String userAcquiredPoolDataListKey = RedisKey.USER_ACQUIRED_POOL_DATA_LIST.makeRedisKey(uid, hashTagKey);

        try {
            // 对池子按照枚举类型顺序获取
            for (PoolType poolType : PoolType.values()) {
                PoolShardingKeyStrategy poolShardingKeyStrategy = factory.getPoolShardingKeyStrategy(poolType);
                Set<String> tempData = sdiffPoolData(poolType, poolShardingKeyStrategy, hashTagKey, userAcquiredPoolDataListKey, poolData.size());
                if (CollUtil.isNotEmpty(tempData)) {
                    poolData.addAll(tempData);
                }
                if (poolData.size() >= USER_MAX_PULL_LIMIT) {
                    break;
                }
            }
        } catch (Exception e) {
            log.error("getPoolData error! uid:{}, e:{}", uid, e.getMessage(), e);
            // 移除本次用户累计拉取数据
            if (CollUtil.isNotEmpty(poolData)) {
                jedisCluster.srem(userAcquiredPoolDataListKey, poolData.toArray(new String[0]));
            }
            throw e;
        }
        log.info("getPoolData uid:{}, hashTagKey:{}, res:{}", uid, hashTagKey, JSONObject.toJSONString(poolData));
        return poolData;
    }

    /**
     * 对数据池和用户累计拉取数据集合进行sdiff操作
     *
     * @param poolType 池子类型
     * @param poolShardingKeyStrategy 池子分片key策略
     * @param hashTagKey hashtag key
     * @param userAcquiredPoolDataListKey 用户累计拉取数据集合 redis key
     * @param size 当前用户累计拉取数据数量
     * @return 当前池子进行diff操作后的数据集合
     */
    private Set<String> sdiffPoolData(PoolType poolType, PoolShardingKeyStrategy poolShardingKeyStrategy, String hashTagKey, String userAcquiredPoolDataListKey, int size) {
        Set<String> res = new HashSet<>();

        // 按照各个池子进行分片后的redis存在的 hashTagKey
        List<String> redisKeys = poolShardingKeyStrategy.getShardingKeys().stream()
                .map(poolShardingKey -> PoolShardingKeyUtil.getHashtagPoolShardingKey(poolShardingKey, hashTagKey))
                .filter(jedisCluster::exists).collect(Collectors.toList());

        // 进行diff操作
        for (String redisKey : redisKeys) {
            Optional.ofNullable(jedisCluster.sdiff(redisKey, userAcquiredPoolDataListKey)).ifPresent(res::addAll);
            if (res.size() >= poolType.pullNumberLimit) {
                break;
            }
        }
        int limit = Math.min(poolType.pullNumberLimit, USER_MAX_PULL_LIMIT - size);

        // 随机获取指定个数的数据
        BiFunction<Set<String>, Integer, Set<String>> shuffleFunction = (set, numValues) -> set.stream().limit(Math.min(numValues, set.size()))
                .collect(Collectors.toSet());

        if (res.isEmpty() || res.size() <= limit) {
            return res;
        }

        return shuffleFunction.apply(res, limit);
    }
}

具体源码我已经放入到Git Hub和Gitee上了

Git Hub:github.com/ukayunnuo/s…

Gitee:gitee.com/linyunnuo/s…

转载自:https://juejin.cn/post/7371753324154519552
评论
请登录