分布式缓存的简单实践
技术选型
分布式缓存方案默认采用的是主流的缓存框架:Redis,即将缓存数据存储在另一台Redis服务器上。 系统在使用缓存时,依赖的是缓存的接口,而非具体的实现;
分布式缓存在更新时不允许并发更新,防止缓存击穿,因此我们在Redis的基础上,采用了基于Redisson的分布式锁,在更新分布式缓存前必须先获得锁。
缓存生命周期
更新机制
- 被动刷新:基于Redis的数据驱逐策略,包括LRU和TTL等;
- 主动刷新:业务数据驱动的数据更新。当业务侧有数据变更时,将会主动刷新分布式缓存。比如当秒杀品下线时,会发出相应的领域事件,而在领域事件的处理中就会刷新缓存。
分布式缓存在刷新的过程中,并不会主动刷新所有服务器上的本地缓存,本地缓存将遵循单机的刷新策略。这意味着,本地缓存可能会有秒级或毫秒级的滞后,对于数据一致性非绝对敏感的场景,这种短时间的延迟下的脏数据是可以接受的,它只是会对用户侧的展示有所影响,而不会影响到服务端的数据状态。
分布式锁
基于Redis实现分布式锁
- 利用set nx ex获取锁,并设置过期时间,保存线程标识
- 释放锁时先判断线程标识是否与自己一致,一致则删除锁
/**
* 基于redis的分布式锁
*/
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString() + "-";
/**
* 锁
*
* @param time 锁的过期时间
* @return
*/
@Override
public boolean tryLock(long time) {
long id = Thread.currentThread().getId();
// 值用 uuid + 线程id拼接
String value = ID_PREFIX + id;
// 自动拆箱有空指针问题
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, value, time, TimeUnit.SECONDS);
return Boolean.TRUE.equals(aBoolean);
}
/**
* 解锁
*/
@Override
public void unLock() {
String valueInRedis = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
long id = Thread.currentThread().getId();
// 值用 uuid + 线程id拼接
String value = ID_PREFIX + id;
// 两者相同才释放锁,要先做判断再进行释放
if (value.equals(valueInRedis)) {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}
基于Redison实现分布式锁
导包
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.3</version>
</dependency>
配置文件
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://localhost:6379")
.setPassword("ezreal")
.setDatabase(0);
return Redisson.create(config);
}
}
代码实现
@Component
public class RedissonLockService implements DistributedLockFactoryService {
private final Logger logger = LoggerFactory.getLogger(RedissonLockService.class);
@Resource
private RedissonClient redissonClient;
@Override
public DistributedLock getDistributedLock(String key) {
RLock rLock = redissonClient.getLock(key);
return new DistributedLock() {
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
boolean isLockSuccess = rLock.tryLock(waitTime, leaseTime, unit);
logger.info("{} get lock result:{}", key, isLockSuccess);
return isLockSuccess;
}
@Override
public void lock(long leaseTime, TimeUnit unit) {
rLock.lock(leaseTime, unit);
}
@Override
public void unlock() {
if (isLocked() && isHeldByCurrentThread()) {
rLock.unlock();
}
}
@Override
public boolean isLocked() {
return rLock.isLocked();
}
@Override
public boolean isHeldByThread(long threadId) {
return rLock.isHeldByThread(threadId);
}
@Override
public boolean isHeldByCurrentThread() {
return rLock.isHeldByCurrentThread();
}
};
}
}
实现原理
- 可重入:利用hash结构记录线程id和重入次数
- 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
- 超时续约::利用watchDog,每隔一段时间 (releaseTime/3),重置超时时间
分布式缓存
基本逻辑
每次获取缓存的时候,先从本地缓存中获取,再从分布式缓存中获取。
若分布式缓存中不存在对应的值,则需要获取分布式锁,然后对分布式缓存数据进行更新
- 若获取锁成功,则查询数据库,获取最新的数据放入缓存;
若数据库中的数据也为空,则也要存储入数据库,防止缓存穿透
-
若获取锁失败,则直接返回,不要等待重新获取锁,客户端对这次请求进行静默处理;
实现代码
缓存的接口
实现部分Redis数据结构的存储接口
public interface DistributedCacheService {
void put(String key, String value);
void put(String key, Object value);
void put(String key, Object value, long timeout, TimeUnit unit);
void put(String key, Object value, long expireTime);
<T> T getObject(String key, Class<T> targetClass);
String getString(String key);
<T> List<T> getList(String key, Class<T> targetClass);
Boolean delete(String key);
Boolean hasKey(String key);
}
基本逻辑代码
private SeckillGoodCache updateDistributedSeckillGood(Long itemId) {
logger.info("更新远程缓存|{}", itemId);
DistributedLock distributedLock = distributedLockFactoryService.getDistributedLock(UPDATE_ITEMS_CACHE_LOCK_KEY + itemId);
try {
boolean tryLock = distributedLock.tryLock(1, 5, TimeUnit.SECONDS);
// 如果没有获得到锁,就返回重试
if (!tryLock) {
return new SeckillGoodCache().tryLater();
}
// 再次检查
SeckillGoodCache distributedSeckillCache = distributedCacheService.getObject(buildItemCacheKey(itemId), SeckillGoodCache.class);
if (distributedSeckillCache != null) {
return distributedSeckillCache;
}
// 查询数据库
SeckillGood seckillGood = seckillGoodMapper.selectById(itemId);
SeckillGoodCache seckillGoodCache = new SeckillGoodCache();
if (seckillGood == null) {
// 数据不存在 也要返回 也要存缓存 防止缓存穿透
seckillGoodCache.notExist();
} else {
seckillGoodCache.with(seckillGood).setVersion(System.currentTimeMillis());
}
logger.info("itemCache|远程缓存已更新|{}", itemId);
distributedCacheService.put(buildItemCacheKey(itemId), JSON.toJSONString(seckillGoodCache));
return seckillGoodCache;
} catch (InterruptedException e) {
logger.error("itemCache|远程缓存更新失败|{}", itemId);
return new SeckillGoodCache().tryLater();
} finally {
distributedLock.unlock();
}
}
转载自:https://juejin.cn/post/7185204730270842940