基于Redission 实战之大Key处理
原因:
目前基于项目新的问题,一次性获取60多万数据,每秒执行该查询大约 0.66 次 ,三天下来I/O量达到恐怖的60多G,严重影响项目正常运行。以下是解决步骤,请勿喷。。。。
public List<String> loadSOrigin(List<String> c, List<Integer> w, List<Integer> l) {
return shelfService.getShelfCodes(c, w, l);
}
基于此我这边的设计想通过Redis 进行缓存数据,减少数据库请求,应该该数据变化量不大,有一定的容错空间,但是其返回值来看,如果存储到redis 也将达到12M数据,形成一个大key.故此则得想办法进行解决该大key问题。
原理:
在 Redis 中,所谓的 "大 key" 通常指的是单个 key 所存储的数据量非常大,它可能会导致性能问题,比如在执行某些操作时(例如删除、备份、迁移)可能会阻塞 Redis 服务器。对于是否算作 "大 key" 并没有一个严格的界限,这取决于具体的使用场景和 Redis 服务器的配置。然而,一般来说,一个超过 1 MB 的 key 可以被认为是一个较大的 key。
在目前情况下,12.58 MB 的数据存储在一个 List 类型的 key 中,可以认为是一个大 key。处理大 key 的策略包括:
1. 分页加载:不要一次性加载整个大 key 的所有元素,而是使用 LRANGE
命令分页加载,每次只获取部分元素。
2. 分割 key:可以将一个大的 List 分割成多个小的 List,例如根据数据的时间戳、ID 范围或其他逻辑进行分割,每个小 List 作为一个独立的 key 存储。
3. 数据压缩:如果数据是可以压缩的,可以在客户端对数据进行压缩后再存储到 Redis,这样可以减少内存的使用量。
4. 数据存储策略优化:评估是否所有数据都需要存储在 Redis 中,或者是否可以通过其他方式(如数据库)来存储部分数据。
5. 使用其他数据结构:如果适用,可以考虑使用其他更内存高效的数据结构,比如 zset(有序集合)
6. 定期清理:定期检查并清理不再需要的数据,以释放内存。
7. Redis 集群:如果单个 Redis 实例的内存不足以处理大量数据,可以考虑使用 Redis 集群来分散数据和负载。
8. 监控和警报:使用 Redis 的 INFO MEMORY
命令或其他监控工具来监控内存使用情况,并设置警报机制。
在实际操作之前,建议先分析你的应用场景和数据访问模式,以便选择最适合你需求的优化策略。如果需要进一步的帮助,可以考虑咨询 Redis 性能优化方面的专家。
实战解决:
设计思路是为了有效地处理大量数据的加载和缓存,同时确保数据的一致性和高效的缓存管理。基于此该基于以下设计思路进行设计:
1. 分布式环境下的数据一致性:
- 使用分布式锁(RLock
)来确保在更新缓存时,只有一个进程/线程可以操作缓存,从而避免并发写入导致的数据不一致问题。
- 使用缓存标记(cacheMarker)来指示缓存的状态,确保在更新数据时,可以先使缓存无效,然后再进行缓存更新操作。
2. 缓存失效和更新策略: - 设置缓存的过期时间(TTL),确保缓存数据不会永久存储,从而避免潜在的陈旧数据问题。 - 在更新缓存数据之前,先将缓存标记设置为无效,更新完成后再重新将其设置为有效,并更新过期时间,这样可以减少读取陈旧缓存数据的风险。
3. 处理大规模数据集: - 由于单个 Redis 值可能会非常大,设计中采用了分批存储数据的方法。这样可以避免单个 Redis 值的大小超过处理能力或导致性能问题。 - 通过使用子列表(sublist)的方式,将大数据集分解为更小的批次,并为每个批次生成单独的缓存键。
4. 优化读取性能: - 在读取数据时,先检查缓存是否存在数据。如果存在,则直接从缓存中读取,避免了不必要的数据库访问,从而提高了性能。 - 如果缓存中没有数据,则从数据库加载数据,并将结果存储到缓存中,以便后续请求可以快速获取。
5. 键名生成:
- 为了确保缓存的唯一性和可查询性,设计了一个方法(generateKey
)来根据输入参数生成一个唯一的缓存键。这保证了相同的查询参数将会引用相同的缓存数据。
具体实现代码:
先根据对应参数生成对应key,后续获取生成都将使用到:
生成Key:
/**
* 生成key
*
* @param c
* @param w
* @param l
* @return
*/
private String generateKey(List<String> c, List<Integer> w, List<Integer> l) {
// Implement a method to generate a unique key based on the method parameters
// This could be a concatenation of the parameters' hashCodes or a more complex logic
return "shelfCodes:" + c.hashCode() + ":" + w.hashCode() + ":" + l.hashCode();
}
简单版:
public List<String> loadSv(List<String> c, List<Integer> w, List<Integer> l) throws ExecutionException, InterruptedException {
String key = generateKey(c, w, l);
List<String> shelfCodes = new ArrayList<>();
boolean dataFoundInRedis = false;
// 尝试从 Redis 分批获取数据
int index = 0;
while (true) {
String sublistKey = key + ":sublist:" + index;
RList<String> sublist = redissonClient.getList(sublistKey, StringCodec.INSTANCE);
if (sublist.isExists()) {
shelfCodes.addAll(sublist.readAll());
dataFoundInRedis = true;
index++;
} else {
break;
}
}
// 如果 Redis 中没有数据,则从数据库加载并存储到 Redis
if (!dataFoundInRedis) {
shelfCodes = shelfService.getShelfCodes(c, w, l);
storeS(c, w, l, shelfCodes, ttl, timeUnit); // 设置过期时间
}
return shelfCodes;
}
加锁版:
public List<String> loadS(List<String> c, List<Integer> w, List<Integer> l) {
String key = generateKey(c, w, l);
List<String> shelfCodes = new ArrayList<>();
boolean dataFoundInRedis = false;
// 获取缓存标记
RBucket<Boolean> cacheMarker = redissonClient.getBucket(key + ":marker", StringCodec.INSTANCE);
if (Boolean.TRUE.equals(cacheMarker.get())) {
// 尝试从 Redis 分批获取数据
int index = 0;
while (true) {
String sublistKey = key + ":sublist:" + index;
RList<String> sublist = redissonClient.getList(sublistKey, StringCodec.INSTANCE);
if (!sublist.isEmpty()) {
shelfCodes.addAll(sublist);
dataFoundInRedis = true;
index++;
} else {
break;
}
}
}
// 如果 Redis 中没有数据,则从数据库加载并存储到 Redis
if (!dataFoundInRedis) {
// 使用分布式锁确保一致性
RLock lock = redissonClient.getLock(key + ":lock");
lock.lock();
try {
// 双重检查,确保数据没有被其他线程加载
if (Boolean.TRUE.equals(cacheMarker.get())) {
shelfCodes.clear();
int index = 0;
while (true) {
String sublistKey = key + ":sublist:" + index;
RList<String> sublist = redissonClient.getList(sublistKey, StringCodec.INSTANCE);
if (!sublist.isEmpty()) {
shelfCodes.addAll(sublist);
index++;
} else {
break;
}
}
} else {
shelfCodes = shelfService.getShelfCodes(c,w,l);
// 先使缓存标记无效
cacheMarker.set(false);
// 存储新的数据到缓存
storeS(c,w,l, shelfCodes, ttl, timeUnit);
// 重新标记缓存为有效
cacheMarker.set(true, ttl, timeUnit);
}
} finally {
lock.unlock();
}
}
return shelfCodes;
}
流程图:
解读:
1. 客户端 (Client
) 请求 LargeDataStorageService
加载货架代码 (loadShelf
)。
2. 服务首先检查 Redis 中的缓存标记 (cacheMarker
)。
3. 如果缓存标记为真,表示缓存中有数据,服务将循环获取缓存中的所有子列表,直到没有更多子列表。
4. 如果缓存标记不存在或为假,服务将尝试获取 Redis 的分布式锁。
5. 在锁内部,服务再次检查缓存标记,以确保在等待锁的期间没有其他进程已经加载了数据并更新了缓存。
6. 如果缓存标记仍然为假,服务将从数据库 (ShelfService
) 加载货架代码。
7. 服务将数据库中获取的货架代码存储到 Redis 缓存中,并将缓存标记设置为真。
8. 服务释放 Redis 的分布式锁。
9. 服务返回货架代码给客户端。
10.
分片存储:
/**
* 分片存储
*
* @param c
* @param w
* @param l
* @param s
* @param ttl
* @param timeUnit
*/
public void storeS(List<String> c, List<Integer> w, List<Integer> l, List<String> s, long ttl, TimeUnit timeUnit) {
// Generate a unique key for the given parameters
String key = generateKey(c, w, l);
// Batch operation for atomic execution
RBatch batch = redissonClient.createBatch();
// Assuming each sublist has a reasonable size that Redis can handle efficiently
int sublistSize = 1000; // Adjust this size as needed
for (int i = 0; i < s.size(); i += sublistSize) {
int end = Math.min(s.size(), i + sublistSize);
List<String> sublist = s.subList(i, end);
// Store each sublist in a separate key with a suffix
String sublistKey = key + ":sublist:" + (i / sublistSize);
RListAsync<String> listAsync = batch.getList(sublistKey, StringCodec.INSTANCE);
listAsync.addAllAsync(sublist);
listAsync.expireAsync(ttl, timeUnit);
}
// Execute batch commands in one round trip
batch.execute();
}
解读上述代码:
-
loadS 方法:
- 接受c(
c
)、w (w
)和 l(l
)作为参数。 - 使用这些参数生成一个唯一的缓存键。
- 尝试从 Redis 缓存中分批获取数据,如果找到数据则返回。
- 如果缓存中没有数据,使用分布式锁来防止多个进程同时加载和缓存数据。
- 在锁内部,再次检查缓存标记以确认数据是否已被其他进程缓存。
- 如果数据仍未被缓存,则调用
shelfService.getShelfCodes
方法从数据库中加载数据。 - 使用
storeShelfCodes
方法将数据存储到 Redis,并设置缓存标记,标记数据现在可用。 - 最后释放锁并返回加载的数据。
- 接受c(
-
storeS 方法:
- 接受相同的参数以及要存储的货架代码列表和过期时间设置。
- 使用批处理操作将数据分批存储到 Redis 中,每个批次是一个子列表,这有助于避免单个 Redis 值过大。
- 设置每个子列表的过期时间。
-
generateKey 方法:
- 生成基于输入参数的唯一键,用于在 Redis 中标识特定的数据集。
这个类的实现具有以下特点:
- 数据一致性:通过使用分布式锁和缓存标记来确保数据一致性,防止缓存污染。
- 缓存失效策略:通过设置过期时间来确保缓存数据最终会被刷新,允许新的数据被加载和缓存。
- 性能优化:通过分批处理大量数据来优化性能,避免单个大型 Redis 值可能导致的性能问题。
这个类的设计考虑了大规模数据处理的需求,并通过缓存机制来提高数据检索的效率。同时,它还考虑了缓存数据的一致性和时效性,以确保系统的稳定性和数据的准确性。
总结:
总体而言,LargeDataStorageService
的设计考虑到了分布式系统中数据一致性的重要性,缓存数据的有效管理,以及如何有效地处理和存储大量数据。这个设计通过减少对数据库的依赖和优化缓存的使用,提高了数据检索的效率和系统的整体性能。
转载自:https://juejin.cn/post/7366070642046189583