基于Redission 之Caffeine和Redis集成工具类及使用示例
概述:
要将Caffeine本地缓存和Redisson分布式缓存结合起来使用,可以创建一个工具类,它首先尝试从本地Caffeine缓存中获取数据,如果本地缓存中没有找到,则从Redisson分布式缓存中获取,并在获取后将数据回填到本地缓存中。
注意点:
-
并发处理:Caffeine 已经是线程安全的,所以本地缓存的并发访问不是问题。对于 Redisson,客户端本身也是线程安全的,但是在处理写回策略和缓存穿透时,可能需要额外的并发控制。
-
写回策略(Write-Back / Write-Behind):在这种策略下,数据首先写入本地缓存,然后异步地写入后端存储(例如 Redis)。这可以通过一个队列和后台线程来实现,该线程定期将更改写入后端存储。
-
缓存穿透保护:缓存穿透是指查询不存在的数据,导致请求直接打到数据库上。为了防止缓存穿透,可以使用空对象模式或布隆过滤器。空对象模式是指即使值不存在也在缓存中存储一个特殊的空对象,而布隆过滤器可以在请求到达缓存之前过滤掉不存在的键。
-
同步写入Redis和本地缓存:当本地缓存被写入时,同时将数据同步写入Redis。这样可以确保两者的数据一致性。但这种方法会增加每次写入操作的延迟。
-
使用锁或同步机制:在更新本地缓存的同时,使用锁或其他同步机制来确保数据也被写入Redis。如果本地缓存失效,可以通过锁来保证在读取Redis之前数据已经被写入。
-
设置合理的过期时间:在Redis中为缓存数据设置一个比本地缓存更长的过期时间,这样即使本地缓存失效,数据仍然可以从Redis中获取。
-
延迟本地缓存的过期时间:可以在本地缓存的基础上添加一个短暂的延迟时间,以确保Redis中的数据在本地缓存失效前已经更新。
-
使用缓存刷新策略:定期或在本地缓存即将失效时,异步刷新本地缓存的数据。这样可以确保本地缓存中的数据在大多数时间都是最新的。
工具类:
以下是这样一个工具类的简单示例:
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.github.benmanes.caffeine.cache.RemovalListener;
/**
* @Author derek_smart
* @Date 202/4/24 14:55
* @Description caffeine和redis 缓存组合工具类
* <p>
* 写回策略(Write-Back / Write-Behind):在这种策略下,数据首先写入本地缓存,然后异步地写入后端存储(例如 Redis)。这可以通过一个队列和后台线程来实现,该线程定期将更改写入后端存储。
* 一个特殊的空对象 `NULL_PLACEHOLDER` 存储到本地缓存和 Redis 中,这样下次查询相同的键时就能直接从缓存中获取到空对象,从而防止缓存穿透。
*/
public class HybridCache<K, V> {
private final Cache<K, V> localCache;
private final RedissonClient redissonClient;
private final ExecutorService writeBehindExecutor;
private final long redisExpiration; // Redis 缓存过期时间,单位:秒
private static final Object NULL_PLACEHOLDER = new Object();
// 使用 ConcurrentHashMap 来存储锁
private final ConcurrentHashMap<K, Lock> locks = new ConcurrentHashMap<>();
public HybridCache(RedissonClient redissonClient, long maxSize, long expireAfterWrite, TimeUnit timeUnit, long redisExpiration) {
this.redissonClient = redissonClient;
this.redisExpiration = redisExpiration;
this.writeBehindExecutor = Executors.newSingleThreadExecutor(); // 用于写回策略的单线程执行器
RemovalListener<K, V> writeBehindRemovalListener = (K key, V value, RemovalCause cause) -> {
if (cause.wasEvicted()) {
writeBehindExecutor.submit(() -> redissonClient.getBucket(key.toString()).set(value, redisExpiration, TimeUnit.SECONDS));
}
};
this.localCache = Caffeine.newBuilder()
.maximumSize(maxSize)
.expireAfterWrite(expireAfterWrite, timeUnit)
.removalListener((K key, V value, RemovalCause cause) -> {
System.out.printf("Key %s was removed (%s)%n", key, cause);
})
.recordStats()
.build();
}
public V get(K key) {
// 尝试从本地缓存获取数据
V value = localCache.getIfPresent(key);
if (value != null) {
return value;
}
// 本地缓存没有找到,尝试从 Redis 获取
RBucket<V> bucket = redissonClient.getBucket(key.toString());
value = bucket.get();
if (value != null) {
// 将数据回填到本地缓存
localCache.put(key, value);
}
return value;
}
/**
* 添加了一个`ConcurrentHashMap`来存储锁对象,并在获取数据时使用了一个双重检查锁定模式。
* 当本地缓存中没有数据时,首先获取一个锁,然后再次检查本地缓存以确保数据在获取锁的过程中没有被其他线程填充。
* 如果本地缓存仍然没有数据,会从Redis获取数据,如果Redis也没有数据,则从数据源加载数据并更新Redis和本地缓存。
* 这样的策略可以减少缓存击穿的风险。
*
* @param key
* @param mappingFunction
* @return
*/
public V get4Consistency(K key, Function<? super K, ? extends V> mappingFunction) {
// 先尝试从本地缓存获取数据
V value = localCache.getIfPresent(key);
if (value != null && value != NULL_PLACEHOLDER) {
return value;
}
// 获取锁对象,如果不存在则创建一个新的
Lock lock = locks.computeIfAbsent(key, k -> new ReentrantLock());
try {
// 锁定当前键,以便同步更新操作
lock.lock();
// 再次检查本地缓存,以防在获取锁的过程中数据被更新
value = localCache.getIfPresent(key);
if (value != null && value != NULL_PLACEHOLDER) {
return value;
}
// 尝试从 Redis 获取
RBucket<V> bucket = redissonClient.getBucket(key.toString());
V redisValue = bucket.get();
if (redisValue != null) {
// 如果在 Redis 中找到了,将其回填到本地缓存
localCache.put(key, redisValue);
return redisValue;
}
// 从数据源加载数据
V loadedValue = mappingFunction.apply(key);
if (loadedValue == null) {
// 存储空对象到本地缓存和 Redis 防止缓存穿透
localCache.put(key, (V) NULL_PLACEHOLDER);
bucket.set((V) NULL_PLACEHOLDER, redisExpiration, TimeUnit.SECONDS);
} else {
// 先将加载的数据写入 Redis
bucket.set(loadedValue, redisExpiration, TimeUnit.SECONDS);
// 然后回填到本地缓存
localCache.put(key, loadedValue);
}
return loadedValue;
} finally {
// 释放锁
lock.unlock();
// 移除锁对象,避免内存泄漏
locks.remove(key);
}
}
public V get(K key, Function<? super K, ? extends V> mappingFunction) {
// 尝试从本地缓存获取数据
V value = localCache.get(key, k -> {
// 尝试从 Redis 获取
RBucket<V> bucket = redissonClient.getBucket(k.toString());
V redisValue = bucket.get();
if (redisValue != null) {
return redisValue;
}
// 从数据源加载数据
V loadedValue = mappingFunction.apply(k);
if (loadedValue == null) {
// 存储空对象到本地缓存和 Redis 防止缓存穿透
localCache.put(k, (V) NULL_PLACEHOLDER);
bucket.set((V) NULL_PLACEHOLDER, redisExpiration, TimeUnit.SECONDS);
return null;
}
// 将加载的数据写入 Redis
bucket.set(loadedValue, redisExpiration, TimeUnit.SECONDS);
return loadedValue;
});
return value == NULL_PLACEHOLDER ? null : value;
}
public void put(K key, V value) {
// 同时更新本地缓存和 Redis 缓存
localCache.put(key, value);
redissonClient.getBucket(key.toString()).set(value, redisExpiration, TimeUnit.SECONDS);
}
public void invalidate(K key) {
// 同时移除本地缓存和 Redis 缓存中的数据
localCache.invalidate(key);
redissonClient.getBucket(key.toString()).delete();
}
public void refresh(K key, Function<? super K, ? extends V> mappingFunction) {
// 从数据源重新加载数据并更新缓存
V value = mappingFunction.apply(key);
if (value != null) {
put(key, value);
}
}
// 批量获取数据
public Map<K, V> getAll(Iterable<? extends K> keys) {
Map<K, V> allValues = localCache.getAllPresent(keys);
if (allValues.size() == keys.spliterator().getExactSizeIfKnown()) {
return allValues;
}
// 获取缺失的键
List<K> missingKeys = StreamSupport.stream(keys.spliterator(), false)
.filter(key -> !allValues.containsKey(key))
.collect(Collectors.toList());
// 从 Redis 批量获取缺失的键
Map<K, V> redisValues = missingKeys.stream()
.collect(Collectors.toMap(Function.identity(), key -> redissonClient.<V>getBucket(key.toString()).get()));
// 将 Redis 中获取的值回填到本地缓存
localCache.putAll(redisValues);
// 合并两个 map 并返回
allValues.putAll(redisValues);
return allValues;
}
// 批量写入数据
public void putAll(Map<? extends K, ? extends V> map) {
localCache.putAll(map);
map.forEach((key, value) -> redissonClient.getBucket(key.toString()).set(value, redisExpiration, TimeUnit.SECONDS));
}
// 批量移除数据
public void invalidateAll(Iterable<? extends K> keys) {
localCache.invalidateAll(keys);
keys.forEach(key -> redissonClient.getBucket(key.toString()).delete());
}
// 获取缓存统计信息
//这里的统计信息只来自Caffeine本地缓存,因为Redisson不提供原生的缓存统计信息
public CacheStats stats() {
return localCache.stats();
}
// 关闭方法需要关闭写回策略的线程池
public void shutdown() {
writeBehindExecutor.shutdown();
try {
if (!writeBehindExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
writeBehindExecutor.shutdownNow();
}
} catch (InterruptedException e) {
writeBehindExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
重要功能:
getAll
:批量从缓存获取数据。首先从本地缓存获取,如果本地缓存中缺失,则从Redis中获取,并回填到本地缓存。putAll
:批量写入数据到本地和Redis缓存。invalidateAll
:批量从本地和Redis缓存中移除数据。stats
:获取Caffeine缓存的统计信息。
Note:
添加了一个ConcurrentHashMap
来存储锁对象,并在获取数据时使用了一个双重检查锁定模式。当本地缓存中没有数据时,首先获取一个锁,然后再次检查本地缓存以确保数据在获取锁的过程中没有被其他线程填充。如果本地缓存仍然没有数据,会从Redis获取数据,如果Redis也没有数据,则从数据源加载数据并更新Redis和本地缓存。这样的策略可以减少缓存击穿的风险。
请注意,这种锁的使用会增加系统的复杂性,并可能导致性能开销,特别是在高并发场景下。因此,在实现这种机制时,需要仔细衡量其潜在的性能影响。
加了一个单线程的 ExecutorService
用于处理写回策略。当本地缓存中的条目因为驱逐策略被移除时,会将这个条目异步地写入 Redis。还添加了一个 shutdown
方法来关闭线程池。
对于缓存穿透保护,可以在 get
方法中加入逻辑来返回空对象或者使用布隆过滤器来预先检查键是否可能存在,一个特殊的空对象 NULL_PLACEHOLDER
存储到本地缓存和 Redis 中,这样下次查询相同的键时就能直接从缓存中获取到空对象,从而防止缓存穿透。
测试类:
使用示例:
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
/**
* @Author derek_smart
* @Date 202/4/24 15:25
* @Description caffeine和redis 缓存组合测试类
*/
public class HybridCacheExample {
public static void main(String[] args) {
// 配置 Redisson
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redissonClient = Redisson.create(config);
// 创建混合缓存实例
HybridCache<String, String> hybridCache = new HybridCache<>(
redissonClient,
1000, // 本地缓存最大条目数
10, // 本地缓存过期时间
TimeUnit.MINUTES, // 本地缓存时间单位
60 * 60 // Redis 缓存过期时间(秒)
);
// 模拟数据加载函数
Function<String, String> dataLoader = key -> {
// 模拟数据库或其他数据源的加载过程
System.out.println("Loading data for key: " + key);
return "Value for " + key;
};
// 尝试获取缓存数据,如果没有,则使用提供的函数从数据源加载
String key = "key1";
String value = hybridCache.get(key, dataLoader);
System.out.println("Value: " + value);
// 更新缓存数据
String newValue = "Updated value";
hybridCache.put(key, newValue);
// 从缓存获取更新后的数据
String updatedValue = hybridCache.get(key);
System.out.println("Updated Value: " + updatedValue);
// 刷新缓存数据
hybridCache.refresh(key, dataLoader);
// 使缓存的数据失效
hybridCache.invalidate(key);
// 关闭 Redisson 客户端和混合缓存
hybridCache.shutdown();
redissonClient.shutdown();
}
}
测试说明:
在这个示例中,首先配置了 Redisson 客户端并连接到本地运行的 Redis 服务器。然后,创建了一个 HybridCache
实例,设置了本地缓存的大小和过期时间,以及 Redis 缓存的过期时间。
定义了一个 dataLoader
函数,它模拟了从数据库或其他数据源加载数据的过程。接着,使用 hybridCache.get
方法尝试从缓存中获取数据。如果本地缓存和 Redis 缓存中都没有找到数据,将调用 dataLoader
函数加载数据并存入缓存。
然后,更新了缓存中的数据,并再次从缓存中获取更新后的数据。之后,使用 refresh
方法来手动刷新缓存中的数据。最后,使用 invalidate
方法使缓存中的数据失效,并关闭 Redisson 客户端和混合缓存。
Note:
请注意,在实际应用中,可能需要根据实际业务逻辑和数据源来实现数据加载函数。此外,还需要确保 Redis 服务器正在运行且可访问。
转载自:https://juejin.cn/post/7361255930648526848