【黑马点评】商户查询缓存模块简单来说,缓存(cache)可以比作我们床头的**小柜子**。在家里,我们可能会有许多物品,
前言
国庆假期因为做完短信登录模块有点放松所以后面几天全去玩了(对不起),但是假期结束后我也是第一时间投入了项目的制作中,从10.9-10.14,也算是完成了商户查询模块的功能!这一模块明显可以感觉到难度上来了,很多关于线程、锁、高并发的知识,第一次遇见这些知识的我感觉有点烧脑,所以这一模块我的文章会以思路总结为主,希望让每一位读者都能学懂!
正片开始
什么是缓存
简单来说,缓存(cache)可以比作我们床头的小柜子。在家里,我们可能会有许多物品,比如零食、书籍、杂志等等。每次我们需要某样东西时,如果它都在一个比较远的地方,比如储藏室,拿起来就会很麻烦。因此,我们通常会选择将一些常用的物品放在一个更方便的地方,比如电脑桌上或者床头柜上。这样一来,当我们需要用到这些东西时,就能快速找到,并省去很多时间。
在计算机中,缓存就是这样一个“床头柜”。当我们访问一个网站、运行一个程序或者读取文件时,计算机会把一些频繁使用的数据存储在缓存中(通常是内存,这个项目中我们使用的Redis就是基于内存的)。这样,当下一次需要这些数据时,计算机可以更快地从缓存中读取,而不需要再去慢速的硬盘或网络上寻找它们。
总而言之,缓存的作用就是提高访问速度和效率,通过储存常用的信息,让需要这些信息的过程变得更加快捷便利。就像你在家中把常用物品放在易于拿取的地方一样,计算机的缓存也是为了让数据的读取更加高效。
缓存的模型与思路
如果我们要添加缓存,那我们的思路该是怎么样的呢?首先我们知道,在我们没有添加Redis作为缓存前,我们的请求是直接打到Mysql数据库的,现在我们添加缓存,自然是把请求先发送到Redis,所以我们可以得出Redis的位置是处于客户端与Mysql之间的。
首先我们的客户端会发送请求到Redis中查询缓存,这一时间有大量请求直接打到Redis上,如果请求全部命中,那么Redis将会将这些命中的缓存数据全部返回给客户端,如果没有命中或者有部分请求没命中,那么这些请求将会到达数据库,然后将数据库查询到的那些Redis里不存在的数据写入缓存并且返还给客户端。
代码如下:(这里代码并非最终版本,后续有很多问题需要改进)
缓存一致性问题与更新策略
缓存虽然便利快捷,但也带来了很多问题。我们的缓存是写在Redis里的,而我们的Mysql数据库是会进行更新的。当我们的数据库某个数据进行更新以后,Redis并不知道该数据进行了更新,那么下一次用户请求发送到Redis时,返还给用户的就会是一个旧数据,这便会产生一致性问题!
在企业中我们有如下几种方案解决该问题:
Cache Aside Pattern 人工编码方式:缓存调用者在更新完数据库后再去更新缓存,也称之为双写方案 Read/Write Through Pattern : 由系统本身完成,数据库与缓存的问题交由系统本身去处理 Write Behind Caching Pattern :调用者只操作缓存,其他线程去异步处理数据库,实现最终一致
综合考虑后黑马点评中选择了第一种方案,但是有几点需要注意:
-
缓存更新策略(删除还是更新?):
- 更新缓存:每次更新数据库后同步更新缓存,但这可能导致无效的写操作。
- 删除缓存:更新数据库时删除缓存,等到再次查询时再加载数据。
-
操作一致性:
- 单体系统:可以将缓存和数据库操作放在同一事务中,确保成功或失败同步。
- 分布式系统:可以采用TCC等分布式事务方案来确保一致性。
-
操作顺序(先操作数据库还是先删除缓存?):
- 应先操作数据库,然后删除缓存。如果先删除缓存,再更新数据库,可能出现并发问题:一个线程删除缓存,另一个线程在此时查询并写入了缓存,导致新数据被旧数据覆盖。
总结起来,建议在数据库更新后再删除缓存,以确保数据一致性和准确性。
代码: 1.添加过期时间(TTL)作为保底方案,如果因为某些原因未更新,则到期自动删除缓存:
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),
CACHE_SHOP_TTL, TimeUnit.MINUTES);
2.增加更新方法:
@Override
@Transactional
public Result update(Shop shop) {
Long id = shop.getId();
if (id == null) {
return Result.fail("id不能为空!");
}
//1.先操作数据库
updateById(shop);
//2.删除缓存
stringRedisTemplate.delete(CACHE_SHOP_KEY + id);
return Result.ok();
}
}
缓存三大问题!穿透、雪崩、击穿!
缓存穿透
当用户访问的数据在缓存与数据库中都不存在时,请求访问缓存将永远不会命中,这些请求都会打到数据库上,由于数据库也不存在该数据,所以无法重构缓存。当有大量这样的请求到来时,数据库的压力骤增,这便是缓存穿透问题!缓存穿透一般发生于黑客的恶意攻击,故意访问大量不存在的数据来使数据库崩溃。
我们常见的解决方案有两种:1.缓存空字符串 2.加入布隆过滤器
- 缓存空对象
- 优点:实现简单,维护方便
- 缺点:
- 额外的内存消耗
- 可能造成短期的不一致
- 布隆过滤
- 优点:内存占用较少,没有多余key
- 缺点:
- 实现复杂
- 存在误判可能
在黑马点评中我们采用缓存空对象的方法来防止缓存穿透!简单来说就是即便访问的数据在数据库中不存在,我们也返还一个空对象给Redis让它建立缓存,后续客户端继续访问缓存时,便会取到空对象,不会继续访问数据库。
代码如下:
// 数据不存在,使用缓存空对象来防止缓存穿透,并返回失败信息
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.SECONDS);
return Result.fail("店铺不存在");
缓存雪崩
在讲解一致性问题时我们知道了,通常为了保证缓存最终能正常更新,我们都会设置有效期作为保底方案,这些有效期大多都是一个固定的数字,那么当同一时间大量缓存同时失效,我们请求的数据无法在缓存中找到,便会全部打到数据库上,导致数据库崩溃。还有一种更直接的原因--Redis故障,这也会导致大量请求直接打到数据库,造成数据库的崩溃。
由此可知,导致缓存雪崩发生的无非就两种原因:大量缓存同时到期、Redis发生故障。
针对有效期到期的问题,我们可以设置随机数TTL,比如原先我们设置的TTL都是30分钟那么后续我们可以使用随机数让TTL在30-40分钟之间(具体看业务需求),这样就可以避免大量缓存同时到期的问题。
对于Redis故障问题,我们要么在雪崩后对业务进行降级限流,防止数据库崩溃。要么我们通过主从节点的方式构建 Redis 缓存高可靠集群。
缓存击穿
缓存击穿问题又被称为热点key问题,我们的业务中时常会有某几个热点数据会被高并发访问(例如秒杀活动中的数据),当这些数据失效的同时又被高并发地访问,那么这些请求会在瞬间冲击数据库,造成我们业务的崩溃。
进行深入分析我们能发现问题的本质其实是这样的:假设线程1在查询缓存发现没有后,去访问数据库并重新加载数据到缓存。如果线程1尚未完成这个过程,而线程2、线程3和线程4同时来访问相同的方法,那么这些后续线程会发现缓存中没有数据。由于没有缓存可用,它们将同时尝试访问数据库,这会导致数据库承受过大的访问压力。
解决方案一:使用互斥锁
通过使用锁,可以实现互斥性,确保只有一个线程能够访问数据库,从而减少数据库的访问压力。不过,这种做法会影响查询性能,因为并行访问变成了串行。
为了解决这个问题,我们可以采用tryLock方法结合双重检查机制。
假设线程1访问时缓存未命中,但它成功获得了锁。此时,只有线程1会去执行后续逻辑。若此时线程2尝试访问,由于没有获得锁,它将进入休眠状态,等待线程1释放锁。一旦线程1完成操作并释放锁,线程2获得锁后,就能从缓存中获取数据。 这种方式既能够防止数据库过载,又能在一定程度上提升查询的性能。
代码如下:
@Override
public Result queryById(Long id) {
String key = CACHE_SHOP_KEY + id;
// 1.1查询店铺缓存
Result result = getShopFromCache(key);
//1.2判断缓存是否命中
if (Objects.nonNull(result)) {
// 1.3缓存命中,直接返回
return result;
}
try {
//2.1缓存未命中则先请求互斥锁再重建缓存
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
//2.2判断能否能够获取互斥锁
if (!isLock) {
// 2.3获取锁失败,说明有别的线程在重建缓存,休眠以后重试
Thread.sleep(50);
return queryById(id);
}
// 2.3获取锁成功,再次判断缓存是否重建,使用双检防止堆积的线程全部请求数据库
result = getShopFromCache(key);
if (Objects.nonNull(result)) {
// 缓存命中,直接返回
return result;
}
//3.1在数据库中查询店铺信息
Shop shop = this.getById(id);
//3.2判断数据库中是否存在店铺信息
if (Objects.isNull(shop)) {
// 3.3数据不存在,使用缓存空对象来防止缓存穿透,并返回失败信息
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.SECONDS);
return Result.fail("店铺不存在");
}
// 4.数据存在,重建缓存
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.ok(shop);
}catch (Exception e){
throw new RuntimeException("发生异常!");
} finally {
// 5.释放锁(放在finally中以防止死锁)
unlock(key);
}
}
private Result getShopFromCache(String key) {
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 判断缓存是否命中
if (StrUtil.isNotBlank(shopJson)) {
// 缓存命中,返回店铺数据
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
// 判断缓存中查询的数据是否是空字符串(isNotBlank把null和空字符串都判为false,需要再次检查)
if (Objects.nonNull(shopJson)) {
// 不是null就是空字符串,缓存命中之前放穿透的空对象,返回失败信息
return Result.fail("店铺不存在");
}
// 缓存未命中
return null;
}
//获取互斥锁
private boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", LOCK_SHOP_TTL,TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
//释放互斥锁
private void unlock(String key){
stringRedisTemplate.delete(key);
}
解决方案二:逻辑过期
使用逻辑过期解决缓存击穿,简单来说就是:当用户请求数据时,首先查询缓存,如果缓存命中且数据未逻辑过期,则直接返回缓存数据。如果缓存未命中,则直接返回空数据,不查询数据库。如果缓存命中但数据已逻辑过期,则直接返回缓存中的旧数据,并开启独立线程更新缓存数据。
这样一来,即使在缓存失效的瞬间,也能避免大量请求直接访问数据库,有效防止缓存击穿,保护了数据库。虽然逻辑过期可能导致返回的数据不是最新的,但也比直接导致数据库崩溃要好得多。
代码如下:
首先,我们先组合一个RedisData类,来封装shop和逻辑过期时间(不在shop上直接加上逻辑过期字段对原本代码没有入侵性,不使用继承因为组合优于继承,组合通常提供了更灵活、更易于维护的代码结构。)
@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}
然后我们新增一个将组合后的RedisData写入Redis的方法
public void saveShop2Redis(Long id,Long expireSeconds){
//1.获取商铺信息
Shop shop = getById(id);
//2.封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
//3.写入Redis
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
}
使用单元测试进行预热
@SpringBootTest
class HmDianPingApplicationTests {
@Resource
private ShopServiceImpl shopService;
@Test
void testSaveShop(){
shopService.saveShop2Redis(1L,10L);
}
}
接下来是最终代码
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public Shop queryWithLogicalExpire( Long id ) {
String key = CACHE_SHOP_KEY + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isBlank(json)) {
// 3.存在,直接返回
return null;
}
// 4.命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
// 5.判断是否过期
if(expireTime.isAfter(LocalDateTime.now())) {
// 5.1.未过期,直接返回店铺信息
return shop;
}
// 5.2.已过期,需要缓存重建
// 6.缓存重建
// 6.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
// 6.2.判断是否获取锁成功
if (isLock){
CACHE_REBUILD_EXECUTOR.submit( ()->{
try{
//重建缓存
this.saveShop2Redis(id,20L);
}catch (Exception e){
throw new RuntimeException(e);
}finally {
unlock(lockKey);
}
});
}
// 6.4.返回过期的商铺信息
return shop;
}
缓存击穿解决方案总结(一致性or可用性)
在分布式系统中,一致性和可用性之间存在着难以调和的矛盾,这被称为 CAP 定理。CAP 定理指出,一个分布式系统不可能同时满足以下三个特性:
- 一致性(Consistency) : 所有节点的数据始终保持一致。
- 可用性(Availability) : 系统始终处于可用的状态,即使部分节点出现故障。
- 分区容忍性(Partition Tolerance) : 系统在网络出现分区的情况下依然能够正常工作。
为什么不能同时保证一致性和可用性? 因为在网络出现分区的情况下,不同节点之间无法进行通信,如果想要保证数据一致性,就需要等待网络恢复,这会造成系统不可用。反之,如果想要保证系统可用性,就需要允许部分节点的数据不一致,从而导致数据不一致。
缓存击穿解决方案中,逻辑过期和互斥锁在一致性和可用性方面各有侧重:
逻辑过期:
- 一致性: 由于逻辑过期时间设置的偏差,可能会出现数据不一致的情况。如果逻辑过期时间设置过短,会导致频繁访问数据库,降低性能;如果设置过长,会导致数据滞后。
- 可用性: 逻辑过期相对简单,实现成本较低,能够有效提高缓存命中率,降低数据库压力,提高可用性。
互斥锁:
- 一致性: 互斥锁能够保证同一时间只有一个线程访问数据库,有效避免并发问题,保证数据的一致性。
- 可用性: 互斥锁的实现相对复杂,可能会引入锁竞争问题,影响性能,甚至导致死锁,降低可用性。
总的来说:
- 逻辑过期更偏向 可用性, 但 一致性 较弱。
- 互斥锁 更偏向 一致性, 但 可用性 较低。
选择方案时,需要根据应用场景进行权衡。
封装缓存工具类
这一部分非常非常难,思想难,知识点难,真的听的猪脑过载了,很多完全没有听过的知识点和思路,所以这里就只给出代码,等后续我学的更透彻了再来补这个坑! 基于StringRedisTemplate封装一个缓存工具类,满足下列需求:
- 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
- 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓
存击穿问题
- 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
- 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
代码如下:
@Slf4j
@Component
public class CacheClient {
private final StringRedisTemplate stringRedisTemplate;
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public CacheClient(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public void set(String key, Object value, Long time, TimeUnit unit) {
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);
}
public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) {
// 设置逻辑过期
RedisData redisData = new RedisData();
redisData.setData(value);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
// 写入Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}
public <R,ID> R queryWithPassThrough(
String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit){
String key = keyPrefix + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isNotBlank(json)) {
// 3.存在,直接返回
return JSONUtil.toBean(json, type);
}
// 判断命中的是否是空值
if (json != null) {
// 返回一个错误信息
return null;
}
// 4.不存在,根据id查询数据库
R r = dbFallback.apply(id);
// 5.不存在,返回错误
if (r == null) {
// 将空值写入redis
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
// 返回错误信息
return null;
}
// 6.存在,写入redis
this.set(key, r, time, unit);
return r;
}
public <R, ID> R queryWithLogicalExpire(
String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
String key = keyPrefix + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isBlank(json)) {
// 3.存在,直接返回
return null;
}
// 4.命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
LocalDateTime expireTime = redisData.getExpireTime();
// 5.判断是否过期
if(expireTime.isAfter(LocalDateTime.now())) {
// 5.1.未过期,直接返回店铺信息
return r;
}
// 5.2.已过期,需要缓存重建
// 6.缓存重建
// 6.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
// 6.2.判断是否获取锁成功
if (isLock){
// 6.3.成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
// 查询数据库
R newR = dbFallback.apply(id);
// 重建缓存
this.setWithLogicalExpire(key, newR, time, unit);
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
// 释放锁
unlock(lockKey);
}
});
}
// 6.4.返回过期的商铺信息
return r;
}
public <R, ID> R queryWithMutex(
String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
String key = keyPrefix + id;
// 1.从redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 3.存在,直接返回
return JSONUtil.toBean(shopJson, type);
}
// 判断命中的是否是空值
if (shopJson != null) {
// 返回一个错误信息
return null;
}
// 4.实现缓存重建
// 4.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
R r = null;
try {
boolean isLock = tryLock(lockKey);
// 4.2.判断是否获取成功
if (!isLock) {
// 4.3.获取锁失败,休眠并重试
Thread.sleep(50);
return queryWithMutex(keyPrefix, id, type, dbFallback, time, unit);
}
// 4.4.获取锁成功,根据id查询数据库
r = dbFallback.apply(id);
// 5.不存在,返回错误
if (r == null) {
// 将空值写入redis
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
// 返回错误信息
return null;
}
// 6.存在,写入redis
this.set(key, r, time, unit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
// 7.释放锁
unlock(lockKey);
}
// 8.返回
return r;
}
private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
private void unlock(String key) {
stringRedisTemplate.delete(key);
}
}
在ShopServiceImpl 中:
@Resource
private CacheClient cacheClient;
@Override
public Result queryById(Long id) {
// 解决缓存穿透
Shop shop = cacheClient
.queryWithPassThrough(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);
// 互斥锁解决缓存击穿
// Shop shop = cacheClient
// .queryWithMutex(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);
// 逻辑过期解决缓存击穿
// Shop shop = cacheClient
// .queryWithLogicalExpire(CACHE_SHOP_KEY, id, Shop.class, this::getById, 20L, TimeUnit.SECONDS);
if (shop == null) {
return Result.fail("店铺不存在!");
}
// 7.返回
return Result.ok(shop);
}
以上便是缓存模块的相关知识与代码,如有错漏欢迎指出,共勉!
转载自:https://juejin.cn/post/7425441107100188708