Redis高并发问题这么解决,泰裤啦
前言
当今互联网世界中,高并发一直是各大网站、应用面临的一大挑战。为了应对高并发的流量压力,各种技术手段被不断研究和应用。其中,Redis 作为一款高性能的内存数据库,被广泛应用于解决高并发问题。
与传统的关系型数据库不同,Redis 采用了内存存储的方式,可以实现快速的读写操作。同时,Redis 还具有丰富的数据结构和强大的缓存功能,可以大大提升系统的性能和可靠性。在处理高并发问题方面,Redis 也提供了诸如分布式锁、限流、队列等常用的解决方案,可以帮助开发者轻松应对高并发场景。
本篇文章将介绍 Redis 在处理高并发问题方面的应用,包括缓存穿透、缓存击穿和缓存雪崩问题等,并针对这些问题给出实际的解决方案(附代码),持续更新。希望本文能够对大家在解决高并发问题时提供帮助和启示。
缓存穿透
缓存穿透是指用户查询数据时,数据库和缓存中都没有数据。导致了查询请求直接绕过缓存,直接穿透到数据库。
解决方法:
缓存空值
查询Redis为null,查询数据库也为null,此时设置该key在缓存中,且值为null,过期时间为随机时间。random(10)。这样子能保证数据在这段时间暴力请求,也只会在这短暂的时间内获取null,而有另外的线程在读取数据库表,并缓存在Redis中
/**
* 解决缓存穿透
* @return
*/
public User getUser(String userId) {
//从缓存中获取user信息
User user = (User) redisTemplate.opsForValue().get(userId);
if(user == null) {
//如果缓存数据为空,从数据库中获取user信息
user = userService.getUserByUserId(userId);
if(user == null) {
//如果数据库中数据为空,则存入一个空值,设置短时间内过期,防止缓存穿透
redisTemplate.opsForValue().set(userId,null,5, TimeUnit.MINUTES);
}else {
//将数据写入缓存
redisTemplate.opsForValue().set(userId,user);
}
}
return user;
}
布隆过滤器
布隆过滤器(英语:Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。
优点:
- 空间效率高,不用像Set集合一样保存元素的值,极大地节省了内存空间。只需将插入的key通过Hash计算放到bitMap中的一个位置,在判断是否存在该key的时候,只需判断bitMap中的位置是0还是1即可,达到了Set集合判断是否存在某值的效果。
- 查询效率高:布隆过滤器可以在非常快的时间内判断一个元素是否存在于集合中,而不需要像传统数据结构那样进行线性扫描。这对于大规模数据集和高并发查询场景尤其有用。
缺点:
- 布隆过滤器中的存储的key越多,误判率越高。将不存在的元素误判为存在。
- 不能删除布隆过滤器中已存在的key
具体使用:使用Guava中的API
导入依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version>
</dependency>
构造BloomFilter的最多参数的静态工厂方法是BloomFilter create(Funnel funnel, long expectedInsertions, double fpp, BloomFilter.Strategy strategy),参数如下:
- funnel:主要是把任意类型的数据转化成HashCode,是一个顶层接口,有大量内置实现,见Funnels
- expectedInsertions:期望插入的元素个数
- fpp:猜测是False Positive Percent,误判率,小数而非百分数,默认值0.03
- strategy:映射策略,目前只有MURMUR128_MITZ_32和MURMUR128_MITZ_64(默认策略)
@RestController
@RequestMapping("user")
public class UserController{
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final int expectedInsertions = 10000;
private static final double fpp = 0.0444D;
private static BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), expectedInsertions, fpp);
@GetMapping("/user/{id}")
public User getUserById(@PathVariable Long id){
// 先从布隆过滤器中判断此id是否存在,初始化的时候需要将数据库的所有id都存到布隆过滤器,缺点不能删除已经不存在的id
if(!bloomFilter.mightContain(String.valueof(id))){
return null;
}
// 查询缓存数据
String userKey = "user_"+id;
User user = (User) redisTemplate.opsForValue().get(userKey);
if(user == null){
// 查询数据库
user = userRepository.findById(id).orElse(null);
if(user != null){
// 将查询到的数据加入缓存
redisTemplate.opsForValue().set(userKey, user, 300, TimeUnit.SECONDS);
}
}
return user;
}
}
缓存击穿
缓存击穿是指一个非常热门的、但是不存在的数据被大量请求,导致请求直接落到数据库上,从而使得数据库瞬间承受巨大的压力,从而导致数据库响应变慢,甚至宕机的现象。和缓存雪崩的区别在于热点数据的量多不多。
解决方法:
缓存数据永不过期
将热门的、但是不经常更新的数据设置为永不过期,可以避免缓存击穿的风险。但是这种方法可能会导致缓存数据的时效性降低,需要根据实际情况进行权衡。
分布式锁
在加载缓存数据时,添加互斥锁可以保证只有一个请求去加载数据并更新缓存,其他请求等待缓存更新完成后再获取数据,从而避免了大量请求直接落到数据库上的情况。
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RedissonClient redissonClient;
public Object getData(String key) {
// 尝试从缓存中获取数据
Object value = redisTemplate.opsForValue().get(key);
if (value != null) {
return value;
}
// 如果缓存中不存在数据,获取分布式锁
RLock lock = redissonClient.getLock(key);
try {
lock.lock();
// 再次尝试从缓存中获取数据,避免其他线程在获取锁之前已经写入了缓存
value = redisTemplate.opsForValue().get(key);
if (value != null) {
return value;
}
// 如果缓存中不存在数据,从数据库中查询
value = getDataFromDatabase(key);
if (value == null) {
// 如果数据库中也不存在数据,将空对象写入缓存,并设置较短的过期时间
redisTemplate.opsForValue().set(key, NullValue.INSTANCE, 1, TimeUnit.MINUTES);
} else {
// 如果数据库中存在数据,将数据写入缓存,并设置较长的过期时间
redisTemplate.opsForValue().set(key, value, 5, TimeUnit.MINUTES);
}
return value;
} finally {
// 释放分布式锁
lock.unlock();
}
}
缓存雪崩
缓存雪崩是指在缓存中大量的缓存数据同时过期或者失效,导致大量的请求直接落到了数据库上,使得数据库瞬间承受巨大的压力,从而导致数据库响应变慢,甚至宕机的现象。
解决方法:
过期时间随机化
将缓存数据的过期时间设置为随机值,可以避免大量缓存数据同时过期的情况发生,从而减少缓存雪崩的风险
/**
* 解决缓存雪崩
* @return
*/
public User getUser2(String userId) {
//从缓存中获取user信息
User user = (User) redisTemplate.opsForValue().get(userId);
if(user == null) {
//如果缓存数据为空,从数据库中获取user信息
user = lUserMapper.getUserByUserId(userId);
if(user == null) {
redisTemplate.opsForValue().set(userId,null,3, TimeUnit.MINUTES);
}else {
//设置随机过期时间,将数据写入缓存,防止缓存雪崩
long mins = random.nextInt(60) + 60;
redisTemplate.opsForValue().set(userId, user, mins, TimeUnit.MINUTES);
}
}
return user;
}
分布式锁
使用分布式锁可以保证在缓存失效时,只有一个请求去加载数据并更新缓存,其他请求等待缓存更新完成后再获取数据,从而避免了大量请求同时落到数据库上的情况(同缓存穿透)
引出问题
通过上面的例子我们已经了解到了Redis在高并发状态下可能出现的问题以及解决方法,但是如果应用到实际场景中,针对每个接口都需要考虑这么处理,那代码中会充斥着大量的重复代码,那肯定是不能接受的。那有没有一种好的通用的解决方案呢?
这里就不得不提起Spring Cache,Spring Cache 是Spring 提供的一整套的缓存解决方案。 虽然它本身并没有提供缓存的实现,但是它提供了一整套的接口和代码规范、配置、注解等,这样它就可以整合各种缓存方案了,比如Redis、Ehcache,我们也就不用关心操作缓存的细节。Spring Cache怎么整合Redis,本篇文章不做介绍,大家自行上网搜索。
简单介绍一下Spring Cache的几个注解使用:
@Cacheable(key="#id") 根据id查询或者查询会启动缓存
@CachePut(key="#post.postId") 插入或者更新会启动缓存
@CacheEvict(key="#id") 删除时启动缓存
Spring Cache解决方案
Spring Cache解决缓存穿透
有一个很简单的解决方案,就是缓存null值,从缓存取不到的数据,在数据库中也没有取到,直接返回空值。本身是不支持缓存null值的,需要在配置文件开启支持
spring.cache.redis.cache-null-values=true
Spring Cache解决缓存击穿
@Cacheable(cacheNames="user", sync="true")
解释:如果设置 sync 属性为 true,表示该方法的缓存操作会使用同步锁来保证线程安全,防止多个线程同时访问该方法导致缓存出现问题。如果 sync 属性为 false,则不会使用同步锁,缓存操作可能存在并发问题。通过设置 sync 属性为 true,可以保证多个线程同时访问同一个缓存方法时,只有一个线程能够执行方法,并将返回值缓存到缓存中。其他线程会等待第一个线程执行完方法后,从缓存中获取返回值。这样可以避免多个线程同时执行缓存方法,导致缓存出现问题的情况。sync = true 可以有效的避免缓存击穿的问题。
Spring Cache解决缓存雪崩
最简单的方法是过期时间加上随机值,但是很麻烦的是,我们在使用@Cacheable注解的时候,原生功能没法直接设置随机过期时间的,需要继承RedisCacheManager,重写里面的getCache方法。
可参考:Spring Boot缓存实战 Redis 设置有效时间和自动刷新缓存,时间支持在配置文件中配置
从上面可以看出Spring Cache解决Redis缓存问题还是比较麻烦的,特别是在解决缓存雪崩问题上。既然如此,我们为什么不自己实现一个属于我们自己的缓存机制,开干!!!
设计一套缓存机制
综上Redis出现的三个问题,给出综合的解决方案:
- 缓存空值(布隆过滤器不建议) + 分布式锁更新缓存解决Redis问题
- AOP + 自定义注解减少重复代码。增加复用性
读取缓存型注解@MyCacheable
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MyCacheable {
/**
* 缓存的Key,默认使用方法名作为Key
*/
String value() default "";
/**
* 缓存的过期时间,单位为秒,默认值为60秒
*/
int expireInSeconds() default 60;
}
读取缓存型切面MyCacheableAspect
@Component
@Aspect
public class MyCacheableAspect {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RedissonClient redissonClient;
/**
* 定义缓存的切点,拦截所有标记了@Cached注解的方法
*/
@Pointcut("@annotation(com.plus.annotation.MyCacheable)")
public void cachedPointcut() {
}
private static final int expectedInsertions = 10000;
private static final double fpp = 0.0444D;
private static BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), expectedInsertions, fpp);
private static final String prefix = "lock-";
/**
* 在方法执行前尝试从缓存中获取数据,如果缓存中存在数据,直接返回
*/
@Around("cachedPointcut()")
public Object cachedAround(ProceedingJoinPoint joinPoint) throws Throwable {
// 获取注解信息
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Method method = methodSignature.getMethod();
Cached cached = method.getAnnotation(MyCacheable.class);
String key = StringUtils.isEmpty(cached.value()) ? method.getName() : cached.value();
int expireInSeconds = cached.expireInSeconds();
// 先从布隆过滤器中判断此id是否存在,初始化的时候需要将数据库的
// 所有id都存到布隆过滤器。这么多个数据表,id肯定有重复的。单独存储id是不行的。那就加上
// 前缀,例如user-id。初始化的时候就要遍历所有需要缓存数据的表,将该表的id都存到布隆过滤器,
// 数据量很大,这也会导致布隆过滤器误判率增加。且后续这些表新增数据都要用将id也要存到
// 布隆过滤器。缺点不能删除布隆过滤器已经不需要的id。
// 布隆过滤器按我的见解是真不好用,虽然在一些特殊场景好用,但不包括此场景
// 'mightContain(T)' is declared in unstable class 'com.google.common.hash.BloomFilter' marked with @Beta
// 说明这个方法是不稳定的,有可能误判
//if (key.contains("#id") && !bloomFilter.mightContain(key)) {
// return null;
//}
// 尝试从缓存中获取数据
Object value = redisTemplate.opsForValue().get(key);
if (value != null) {
if (value instanceof NullValue) {
// 如果缓存中存在空对象,返回null
return null;
}
return value;
}
// 如果缓存中不存在数据,获取分布式锁
RLock lock = redissonClient.getLock(prefix + key);
try {
lock.lock();
// 再次尝试从缓存中获取数据,避免其他线程在获取锁之前已经写入了缓存
value = redisTemplate.opsForValue().get(key);
if (value != null) {
if (value instanceof NullValue) {
// 如果缓存中存在空对象,返回null
return null;
}
return value;
}
// 如果缓存中不存在数据,执行方法获取数据
value = joinPoint.proceed();
if (value == null) {
// 如果数据源中也不存在数据,将空对象写入缓存,并设置较短的过期时间。防止缓存穿透,算是布隆过滤器的兜底
redisTemplate.opsForValue().set(key, NullValue.INSTANCE, 1, TimeUnit.MINUTES);
} else {
// 如果数据源中存在数据,将数据写入缓存,随机设置过期时间,避免缓存同时失效导致缓存雪崩
// 随机时间算法:以正常缓存时间为基准,取十分之一的范围内生成随机数
int seed = expireInSeconds / 10 == 0 ? expireInSeconds : expireInSeconds / 10;
int randomTime = new Random().nextInt(seed);
redisTemplate.opsForValue().set(key, value, expireInSeconds + randomTime, TimeUnit.SECONDS);
}
return value;
} finally {
// 释放分布式锁
lock.unlock();
}
}
/**
* 缓存空对象类,受不了代码规范插件报的null值警告才增加的。
注意RedisTemplate<String, String> redisTemplate不能缓存null值,需要RedisTemplate<String, Object> redisTemplate
*/
private static class NullValue implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 单例模式
*/
private static final NullValue INSTANCE = new NullValue();
private Object readResolve() {
return INSTANCE;
}
}
}
更新缓存型注解@MyCacheEvict
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MyCacheEvict {
/**
* 缓存的Key
*/
String value();
}
更新缓存型切面MyCacheEvictAspect
@Aspect
@Component
public class MyCacheEvictAspect {
/**
* 定义缓存的切点,拦截所有标记了@MyCacheEvict注解的方法
*/
@Pointcut("@annotation(com.plus.annotation.MyCacheEvict)")
public void cachedPointcut() {
}
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@Around("cachedPointcut()")
public Object cachedAround(ProceedingJoinPoint joinPoint) throws Throwable {
// 获取注解信息
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Method method = methodSignature.getMethod();
MyCacheEvict cacheEvict = method.getAnnotation(MyCacheEvict.class);
String key = cacheEvict.value();
// 执行方法
Object result = joinPoint.proceed();
redisTemplate.delete(key);
return result;
}
}
具体使用
@GetMapping("getAllUser")
@MyCacheable(value = "getAllUser")
public R<List<User>> getAllUser(){
return R.data(userService.list(new UserDTO()));
}
@GetMapping("getUserById")
@MyCacheable(value = "id")
public R<User> getUserById(int id){
return R.data(userService.getUserById(id));
}
@PostMapping("/save")
@MyCacheEvict("getAllUser")
public R save(@RequestBody @Validated User user) {
return R.data(userService.save(user));
}
以上方法基本能解决大部分场景下的缓存问题,大家有需求可以自行拓展,例如支持多种格式的key处理。对此有疑问的,希望大家多多指导!!!
参考文章:
转载自:https://juejin.cn/post/7231731488927465527