likes
comments
collection
share

高并发下缓存失效问题-解决缓存穿透、雪崩、击穿(本地锁和分布式锁Redisson)

作者站长头像
站长
· 阅读数 9

缓存穿透

  • 产生原因

    指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是 数据库也无此记录,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义

  • 风险

    利用不存在的数据进行攻击,数据库瞬时压力增大,最终导致崩溃

  • 解决

    null结果缓存,并加入短暂过期时间

    public Map<String, List<Category2Vo>> getCatalogJson() {
      // 加入缓存逻辑
      String catalogJson = redisTemplate.opsForValue().get("catalogJson");
    
      if (StringUtils.isEmpty(catalogJson)) {
        // 查询数据库
        Map<String, List<Category2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
        // 将数据转换为Json加入到缓存中
        String jsonString = JSON.toJSONString(catalogJsonFromDb);
        // 解决缓存穿透问题
        if (catalogJsonFromDb == null) {
          redisTemplate.opsForValue().set("catalogJson", "0");
        } else {
          redisTemplate.opsForValue().set("catalogJson", jsonString);
        }
    
        return catalogJsonFromDb;
      }
    
      Map<String, List<Category2Vo>> listMap = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Category2Vo>>>() {});
      return listMap;
    }
    

缓存雪崩

  • 产生原因

    缓存雪崩是指在我们设置缓存时key采用了相同的过期时间, 导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。

  • 解决

    原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体 失效的事件。

    public Map<String, List<Category2Vo>> getCatalogJson() {
      // 加入缓存逻辑
      String catalogJson = redisTemplate.opsForValue().get("catalogJson");
    
      if (StringUtils.isEmpty(catalogJson)) {
        // 查询数据库
        Map<String, List<Category2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
        // 将数据转换为Json加入到缓存中
        String jsonString = JSON.toJSONString(catalogJsonFromDb);
        // 解决缓存穿透问题
        if (catalogJsonFromDb == null) {
          redisTemplate.opsForValue().set("catalogJson", "0");
        } else {
          // 解决缓存雪崩问题
          redisTemplate.opsForValue().set("catalogJson", jsonString, new Random().nextInt(5) + 1, TimeUnit.DAYS);
        }
    
        return catalogJsonFromDb;
      }
    
      Map<String, List<Category2Vo>> listMap = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Category2Vo>>>() {});
      return listMap;
    } 
    

缓存击穿

  • 产生原因

    1. 对于一些设置了过期时间的key,如果这些key可能会在某些 时间点被超高并发地访问,是一种非常“热点”的数据。
    2. 如果这个key在大量请求同时进来前正好失效,那么所有对 这个key的数据查询都落到db,我们称为缓存击穿。
  • 解决:加锁

    大量并发只让一个去查,其他人等待,查到以后释放锁,其他 人获取到锁,先查缓存,就会有数据,不用去db

本地锁

synchronized、JUC(Lock)

@Override
public Map<String, List<Category2Vo>> getCatalogJson() {
  // 加入缓存逻辑
  String catalogJson = redisTemplate.opsForValue().get("catalogJson");

  if (StringUtils.isEmpty(catalogJson)) {
    // 查询数据库
    Map<String, List<Category2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
    return catalogJsonFromDb;
  }

  Map<String, List<Category2Vo>> listMap = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Category2Vo>>>() {});
  return listMap;
}

/**
 * 从数据库中查询并封装分类数据
 *
 * @return
 */
public Map<String, List<Category2Vo>> getCatalogJsonFromDb() {

  // 加锁:解决缓存击穿
  // 只要是同一把锁,就能锁住需要这个锁的所有线程
  // synchronized (this)  SpringBoot所有的组件在容器中都是单例的
  synchronized (this) {

    // 得到锁之后,我们需要再去缓存中确定一次,如果没有才需要继续查询
    String catalogJson = redisTemplate.opsForValue().get("catalogJson");
    if (!StringUtils.isEmpty(catalogJson)) {
      // 缓存不为空直接返回
      Map<String, List<Category2Vo>> listMap = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Category2Vo>>>() {});
      return listMap;
    }

    List<CategoryEntity> selectList = baseMapper.selectList(null);

    System.out.println("查询了数据库...");

    // 获取全部一级分类
    List<CategoryEntity> category1List = getCategoryByParentCid(selectList, 0L);
    // 封装数据
    Map<String, List<Category2Vo>> listMap = category1List.stream().collect(Collectors.toMap(key -> key.getCatId().toString(), v -> {
      // 查询二级分类
      List<CategoryEntity> category2List = getCategoryByParentCid(selectList, v.getCatId());
      List<Category2Vo> category2VoList = category2List.stream().map(item -> {
        Category2Vo category2Vo = new Category2Vo(item.getParentCid().toString(), item.getCatId().toString(), item.getName(), null);

        // 查询三级分类
        List<CategoryEntity> category3List = getCategoryByParentCid(selectList, item.getCatId());
        List<Category2Vo.Category3Vo> category3VoList = category3List.stream().map(item2 -> {
          return new Category2Vo.Category3Vo(item2.getParentCid().toString(), item2.getCatId().toString(), item2.getName());
        }).collect(Collectors.toList());

        category2Vo.setCatalog3List(category3VoList);
        return category2Vo;
      }).collect(Collectors.toList());

      return category2VoList;
    }));

    // 需要在锁中进行缓存,否则会再释放锁的同时,有其他请求进来,导致查询了多次数据库,出现没有锁住的问题
    // 将数据转换为Json加入到缓存中
    String jsonString = JSON.toJSONString(listMap);
    // 解决缓存穿透问题
    if (listMap == null) {
      redisTemplate.opsForValue().set("catalogJson", "0");
    } else {
      // 结局缓存雪崩问题
      redisTemplate.opsForValue().set("catalogJson", jsonString, new Random().nextInt(5) + 1, TimeUnit.DAYS);
    }

    return listMap;
  }
}

/**
 * 根据父类ID查询出全部子分类
 *
 * @param selectList
 * @param parentCid
 * @return
*/
public List<CategoryEntity> getCategoryByParentCid(List<CategoryEntity> selectList, Long parentCid) {
  List<CategoryEntity> entityList = selectList.stream().
    filter(item -> item.getParentCid() == parentCid).
    collect(Collectors.toList());
  return entityList;
}

分布式锁

分布式锁演进

  • 业务代码

    private Map<String, List<Category2Vo>> getDataFromDb() {
      // 得到锁之后,我们需要再去缓存中确定一次,如果没有才需要继续查询
      String catalogJson = redisTemplate.opsForValue().get("catalogJson");
      if (!StringUtils.isEmpty(catalogJson)) {
        // 缓存不为空直接返回
        Map<String, List<Category2Vo>> listMap = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Category2Vo>>>() {
        });
        return listMap;
      }
    
      List<CategoryEntity> selectList = baseMapper.selectList(null);
    
      System.out.println("查询了数据库...");
    
      // 获取全部一级分类
      List<CategoryEntity> category1List = getCategoryByParentCid(selectList, 0L);
      // 封装数据
      Map<String, List<Category2Vo>> listMap = category1List.stream().collect(Collectors.toMap(key -> key.getCatId().toString(), v -> {
        // 查询二级分类
        List<CategoryEntity> category2List = getCategoryByParentCid(selectList, v.getCatId());
        List<Category2Vo> category2VoList = category2List.stream().map(item -> {
          Category2Vo category2Vo = new Category2Vo(item.getParentCid().toString(), item.getCatId().toString(), item.getName(), null);
    
          // 查询三级分类
          List<CategoryEntity> category3List = getCategoryByParentCid(selectList, item.getCatId());
          List<Category2Vo.Category3Vo> category3VoList = category3List.stream().map(item2 -> {
            return new Category2Vo.Category3Vo(item2.getParentCid().toString(), item2.getCatId().toString(), item2.getName());
          }).collect(Collectors.toList());
    
          category2Vo.setCatalog3List(category3VoList);
          return category2Vo;
        }).collect(Collectors.toList());
    
        return category2VoList;
      }));
    
      // 需要在锁中进行缓存,否则会再释放锁的同时,有其他请求进来,导致查询了多次数据库,出现没有锁住的问题
      // 将数据转换为Json加入到缓存中
      String jsonString = JSON.toJSONString(listMap);
      // 解决缓存穿透问题
      if (listMap == null) {
        redisTemplate.opsForValue().set("catalogJson", "0");
      } else {
        // 结局缓存雪崩问题
        redisTemplate.opsForValue().set("catalogJson", jsonString, new Random().nextInt(5) + 1, TimeUnit.DAYS);
      }
    
      return listMap;
    }
    
  • 阶段一

    public Map<String, List<Category2Vo>> getCatalogJsonFromDbWithRedisLock() {
        // 占分布式锁,去redis占坑 命令:set lock 111 NX
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
        // 加锁成功...执行业务
        if (lock) {
          	// 执行业务代码
            Map<String, List<Category2Vo>> dataFromDb = getDataFromDb();
            redisTemplate.delete("lock");// 删除锁
            return dataFromDb;
        } else {
            // 加锁失败...重试
            // 休眠100ms重试
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return getCatalogJsonFromDbWithRedisLock();// 自旋的方式
        }
    }
    
    • 问题

      setnx占好了位,业务代码异常或者程序在页面过程中宕机。没有执行删除锁逻辑,这就造成了死锁

    • 解决

      设置锁的自动过期,即使没有删除,会自动删除

  • 阶段二

    public Map<String, List<Category2Vo>> getCatalogJsonFromDbWithRedisLock() {
      // 占分布式锁,去redis占坑
      Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
      // 加锁成功...执行业务
      if (lock) {
        // 设置过期时间
        redisTemplate.expire("lock", 30, TimeUnit.SECONDS);
        // 执行业务代码
        Map<String, List<Category2Vo>> dataFromDb = getDataFromDb();
        redisTemplate.delete("lock");// 删除锁
        return dataFromDb;
      } else {
        // 加锁失败...重试
        // 休眠100ms重试
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        return getCatalogJsonFromDbWithRedisLock();// 自旋的方式
      }
    }
    
    • 问题

      setnx设置好,正要去设置过期时间,宕机。又死锁了。

    • 解决

      设置过期时间和占位必须是原子的。redis支持使用setnx ex 命令

  • 阶段三

    public Map<String, List<Category2Vo>> getCatalogJsonFromDbWithRedisLock() {
      // 占分布式锁,去redis占坑 设置过期时间 必须和加锁是同步的,原子的
      Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111", 300, TimeUnit.SECONDS);
      // 加锁成功...执行业务
      if (lock) {
        // 执行业务代码
        Map<String, List<Category2Vo>> dataFromDb = getDataFromDb();
        redisTemplate.delete("lock");// 删除锁
        return dataFromDb;
      } else {
        // 加锁失败...重试
        // 休眠100ms重试
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        return getCatalogJsonFromDbWithRedisLock();// 自旋的方式
      }
    }
    
    • 问题

      删除锁直接删除??? 如果由于业务时间很长,锁自己过期了,我们直接删除,有可能把别人正在持有的锁删除了。

    • 解决

      占锁的时候,值指定为uuid,每个人匹配是自己 的锁才删除。

  • 阶段四

    public Map<String, List<Category2Vo>> getCatalogJsonFromDbWithRedisLock() {
      // 占分布式锁,去redis占坑 设置过期时间 必须和加锁是同步的,原子的
      String uuid = UUID.randomUUID().toString();
      Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
      // 加锁成功...执行业务
      if (lock) {
        // 执行业务代码
        Map<String, List<Category2Vo>> dataFromDb = getDataFromDb();
    
        // 判断是否为自己的锁 获取值对比+对比成功删除=原子操作
        String lockValue = redisTemplate.opsForValue().get("lock");
        if (uuid.equals(lockValue)) {
          redisTemplate.delete("lock");// 删除锁
        }
        return dataFromDb;
      } else {
        // 加锁失败...重试
        // 休眠100ms重试
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        return getCatalogJsonFromDbWithRedisLock();// 自旋的方式
      }
    }
    
    • 问题

      如果正好判断是当前值,正要删除锁的时候,锁已经过期, 别人已经设置到了新的值。那么我们删除的是别人的锁

    • 解决

      删除锁必须保证原子性。使用redis+Lua脚本完成

  • 阶段五-最终形态

    保证加锁【占位+过期时间】和删除锁【判断+删除】的原子性。 更难的事情,锁的自动续期

     public Map<String, List<Category2Vo>> getCatalogJsonFromDbWithRedisLock() {
       // 占分布式锁,去redis占坑 设置过期时间 必须和加锁是同步的,原子的
       String uuid = UUID.randomUUID().toString();
       Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
       // 加锁成功...执行业务
       if (lock) {
         System.out.println("获取分布式锁成功~~~");
         Map<String, List<Category2Vo>> dataFromDb;
         try {
           // 执行业务代码
           dataFromDb = getDataFromDb();
         } finally {
           // 判断是否为自己的锁
           String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
           // 删除锁
           Long execute = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);
         }
    
         return dataFromDb;
       } else {
         // 加锁失败...重试
         // 休眠100ms重试
         try {
           Thread.sleep(200);
         } catch (Exception e) { }
         return getCatalogJsonFromDbWithRedisLock();// 自旋的方式
       }
     }
    

分布式锁框架:Redisson

整合redisson作为分布式锁等功能框架

  • 引入依赖

    <dependency>
       <groupId>org.redisson</groupId>
       <artifactId>redisson</artifactId>
       <version>3.12.0</version>
    </dependency>  
    
  • 配置

     /**
      * 所有对Redisson的使用都是通过RedissonClient对象
      *
      * @return
      */
    @Bean(destroyMethod = "shutdown")
    public RedissonClient redisson() {
      // 创建配置
      Config config = new Config();
      config.useSingleServer().setAddress("redis://127.0.0.1:6379");
    
      // 根据config创建出RedissonClient实例
      RedissonClient redisson = Redisson.create(config);
      return redisson;
    }
    
  • lock锁测试

    • 情况一:不设置自动解锁时间
    @ResponseBody
    @GetMapping("/hello")
    public String hello() {
      // 获取一把锁。只要锁的名字一样,就是同一把锁
      RLock lock = redissonClient.getLock("my-lock");
    
      // 加锁
      lock.lock();// 阻塞式等待。默认加的锁都是30s时间。
      try {
        System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
        Thread.sleep(20000);
      } catch (Exception e) {
    
      } finally {
        // 解锁
        System.out.println("释放锁..." + Thread.currentThread().getId());
        lock.unlock();
      }
    
      return "hello";
    }
    

    问题

    假设解锁代码没有运行,redisson会不会出现死锁

    1. 锁的自动延期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期就会被删掉

    2. 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。

    • 情况二:设置了自动解锁时间
    @ResponseBody
    @GetMapping("/hello")
    public String hello() {
      // 获取一把锁。只要锁的名字一样,就是同一把锁
      RLock lock = redissonClient.getLock("my-lock");
    
      // 加锁
      lock.lock(10, TimeUnit.SECONDS);// 10秒自动解锁,自动解锁时间一定要大于业务的执行时间。
    
      try {
        System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
        Thread.sleep(20000);
      } catch (Exception e) {
    
      } finally {
        // 解锁
        System.out.println("释放锁..." + Thread.currentThread().getId());
        lock.unlock();
      }
    
      return "hello";
    }
    

    问题

    lock.lock(10, TimeUnit.SECONDS);在锁时间到了以后,不会自动续期

    1. 如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间

    2. 如果我们未指定锁的超时时间,就使用LockWatchdogTimeout看门狗的默认时间 30 * 1000;

      只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10s都会自动再次续期成30s。

      续期时间:internalLockLeaseTime【看门狗时间】 / 3L,10s续期一次

    3. 最佳实战:

      lock.lock(30, TimeUnit.SECONDS); 省掉了整个续期操作。手动解锁

  • 读写锁测试

    // 保证一定能读到最新数据,修改期间,写锁是一个排它锁(互斥锁、独享锁)。读锁是一个共享锁
    // 写锁没释放,读就必须等待
    // 读 + 读:相当于无锁,并发读,只会在redis中记录好,所有当前的读锁,他们都会同时加锁成功
    // 写 + 读:等待写锁释放
    // 写 + 写:阻塞方式
    // 读 + 写:有读锁,写也需要等待
    // 只要有写的存在,都必须等待
    @ResponseBody
    @GetMapping("/write")
    public String writeValue() {
      String s = "";
      RReadWriteLock lock = redissonClient.getReadWriteLock("rw-lock");
      RLock writeLock = lock.writeLock();
      try {
        // 1、改数据加写锁,读数据加读锁
        writeLock.lock();
        s = UUID.randomUUID().toString();
        Thread.sleep(20000);
        stringRedisTemplate.opsForValue().set("writeValue", s);
      } catch (Exception e) {
        e.printStackTrace();
      } finally {
        // 解锁
        writeLock.unlock();
      }
    
      return s;
    }
    
    @ResponseBody
    @GetMapping("/read")
    public String readValue() {
      String s = "";
      RReadWriteLock lock = redissonClient.getReadWriteLock("rw-lock");
      lock.readLock().lock();
      try {
        s = stringRedisTemplate.opsForValue().get("writeValue");
      } catch (Exception e) {
        e.printStackTrace();
      } finally {
        lock.readLock().unlock();
      }
      return s;
    }
    
  • 信号量

      /**
       * 车库停车举例
       * 停车 初始化缓存中park = 3
       * 信号量也可以用作分布式限流。
       * 
       * @return
       * @throws InterruptedException
       */
    @ResponseBody
    @GetMapping("/park")
    public String park() throws InterruptedException {
      RSemaphore park = redissonClient.getSemaphore("park");
      //        park.acquire();// 获取一个信号,获取一个值,占一个车位  阻塞的
    
      boolean b = park.tryAcquire();// 非阻塞的
      if (b) {
        // 执行业务
        return "ok";
      }
    
      return "error";
    }
    
    /**
     * 从车位开走
     * @return
     */
    @GetMapping("/go")
    @ResponseBody
    public String go() {
      RSemaphore park = redissonClient.getSemaphore("park");
      park.release();// 释放一个车位
    
      return "ok";
    }
    
  • 闭锁

    /**
     * 举例:放假锁门
     * 假设有5个班,全部走完,才可以锁大门
     */
    @GetMapping("lockDoor")
    @ResponseBody
    public String lockDoor() throws InterruptedException {
      RCountDownLatch door = redissonClient.getCountDownLatch("door");
      door.trySetCount(5);
      door.await(); // 等待闭锁都完成
    
      return "放假了...";
    }
    
    @GetMapping("/go/{id}")
    @ResponseBody
    public String gogogo(@PathVariable("id") Long id) {
      RCountDownLatch door = redissonClient.getCountDownLatch("door");
      door.countDown();// 计数减1
    
      return id+"班的人都走了...";
    }
    
    
  • 使用Redisson解决分布式锁问题

    public Map<String, List<Category2Vo>> getCatalogJsonFromDbWithRedissonLock() {
      // 1、锁的名字,锁的粒度,越细越快
      // 锁的粒度:具体缓存的是某个数据,比如:11号商品: product-11-lock product-12-lock
      RLock lock = redisson.getLock("Catalog-lock");
      lock.lock();
    
      // 加锁成功...执行业务
      Map<String, List<Category2Vo>> dataFromDb;
      try {
        // 执行业务代码
        dataFromDb = getDataFromDb();// 该业务方法在上面
      } finally {
        lock.unlock();
      }
      return dataFromDb;
    }