使用Redis实现接口限流的几种方式
在没有引入sentinel
和gateway
等框架时,在一个分布式应用中,可以考虑使用redis
实现限流。redis
的几种数据接口特别适合进行接口限流功能的实现,下面介绍几种使用redis实现限流的方式。注意,这几种方式都是简单的实现,并没有应用在生产环境试验。
俗话说:“理论基础决定上层建筑”。那就先从理论开始,后面根据理论进行实现。
理论基础
基于Redis的setnx的操作
在使用redis
的分布式锁的时候,大家都知道是依靠了setnx
的指令,在CAS
(Compare and swap)操作的时候,同时给指定的key设置了过期时间(expire)。限流的主要目的就是为了在单位时间内,有且仅有N数量的请求能够访问代码程序。所以依靠setnx
可以很轻松的做到这方面的功能。比如需要在10秒内限定20个请求,那么在setnx
的时候可以设置过期时间10,当请求的setnx
数量达到20时候即达到了限流效果。当然这种做法的弊端是很多的,比如当统计1-10秒的时候,无法统计2-11秒之内,如果需要统计N秒内的M个请求,那么redis
中需要保持N个key等等问题。
基于Redis的数据结构zset
其实限流涉及的最主要的就是滑动窗口,上面也提到1-10怎么变成2-11。其实也就是起始值和末端值都各+1即可。 如果用redis
的list
数据结构可以轻而易举的实现该功能 可以将请求打造成一个zset
数组,当每一次请求进来的时候,value保持唯一,可以用UUID
生成,而score
可以用当前时间戳表示,因为score
可以用来计算当前时间戳之内有多少的请求数量。而zset
数据结构也提供了range
方法让我们可以很轻易的获取到2个时间戳内有多少请求。
通过上述代码可以做到滑动窗口的效果,并且能保证每N秒内至多M个请求,缺点就是zset
的数据结构会越来越大。实现方式相对也是比较简单的。
基于Redis的令牌桶算法
提到限流就不得不提到令牌桶算法了。 令牌桶算法提及到输入速率和输出速率,当输出速率大于输入速率,那么就是超出流量限制了。 也就是说每访问一次请求的时候,可以从redis
中获取一个令牌,如果拿到令牌了,那就说明没超出限制,而如果拿不到,则结果相反。 依靠上述的思想,可以结合redis
的List
数据结构很轻易的做到这样的代码,只是简单实现,依靠List
的leftPop
来获取令牌。
public boolean limit() {
Object leftPop = redisTemplate.opsForList().leftPop(KEY3);
if (ObjectUtil.isEmpty(leftPop)) {
return false;
}
return true;
}
再依靠Java的定时任务,定时往List
中rightPush
令牌,当然令牌也需要唯一性,所以可以用UUID
进行了生成
@Scheduled(fixedRate = 5000)
public void setToken(){
System.out.println("往limit3中放入一个令牌。。。。。");
redisTemplate.opsForList().leftPush(KEY3, UUID.randomUUID().toString());
System.out.println("放入令牌完毕。。。");
}
基于Redis+lua的方式
Lua
脚本和 MySQL
数据库的存储过程比较相似,他们执行一组命令,所有命令的执行要么全部成功或者失败,以此达到原子性。也可以把Lua
脚本理解为,一段具有业务逻辑的代码块。
相比redis
事务,Lua脚本
的优点:
- 减少网络开销:使用
Lua
脚本,无需向redis
发送多次请求,执行一次即可,减少网络传输 - 原子操作:
redis
将整个Lua
脚本作为一个命令执行,原子,无需担心并发 - 复用:
Lua
脚本一旦执行,会永久保存Redis
中,,其他客户端可复用
Lua
脚本大致逻辑如下:
-- 获取调用脚本时传入的第一个key值(用作限流的 key)
local key = KEYS[1]
-- 获取调用脚本时传入的第一个参数值(限流大小)
local limit = tonumber(ARGV[1])
-- 获取当前流量大小
local curentLimit = tonumber(redis.call('get', key) or "0")
-- 是否超出限流
if curentLimit + 1 > limit then
-- 返回(拒绝)
return 0
else
-- 没有超出 value + 1
redis.call("INCRBY", key, 1)
-- 设置过期时间
redis.call("EXPIRE", key, 2)
-- 返回(放行)
return 1
end
解释:
- 通过
KEYS[1]
获取传入的key参数 - 通过
ARGV[1]
获取传入的limit
参数 redis.call
方法,从缓存中get
和key
相关的值,如果为null
那么就返回0- 接着判断缓存中记录的数值是否会大于限制大小,如果超出表示该被限流,返回0
- 如果未超过,那么该key的缓存值+1,并设置过期时间为1秒钟以后,并返回缓存值+1
代码实现
前置准备
pom文件引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.0.1-jre</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置文件
server:
port: 9899
spring:
redis:
host: localhost
port: 6379
database: 2
password: 654321
Redis配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* @author Ash
* @date 2024/7/16 10:47
* @description: redis配置类
*/
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setConnectionFactory(factory);
return redisTemplate;
}
}
配置拦截器
import com.zhurx.springcloud.service.LimitPolicy;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.HandlerInterceptor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* @author Ash
* @date 2024/7/16 13:04
* @description: 设置拦截器
*/
@Configuration
public class LimitInterceptor implements HandlerInterceptor {
private final LimitPolicy limitPolicy;
public LimitInterceptor(LimitPolicy limitPolicy) {
this.limitPolicy = limitPolicy;
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (limitPolicy.limit()) {
return true;
}
throw new RuntimeException("接口信息被拦截");
}
}
设置资源限流方式
import com.zhurx.springcloud.service.LimitMethodOne;
import com.zhurx.springcloud.service.LimitMethodThree;
import com.zhurx.springcloud.service.LimitMethodTwo;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import javax.annotation.Resource;
/**
* @author Ash
* @date 2024/7/17 10:18
* @description: 针对资源设置响应的限流方法
*/
@Configuration
public class UrlLimitConfig implements WebMvcConfigurer {
@Resource
RedisTemplate<String, Object> redisTemplate;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LimitInterceptor(new LimitMethodOne(redisTemplate))).addPathPatterns("/resource/setnx");
registry.addInterceptor(new LimitInterceptor(new LimitMethodTwo(redisTemplate))).addPathPatterns("/resource/zset");
registry.addInterceptor(new LimitInterceptor(new LimitMethodThree(redisTemplate))).addPathPatterns("/resource/zlist");
}
}
配置限流类型
/**
* @author Ash
* @date 2024/7/17 14:37
* @description: 限流类型
*/
public enum CustomType {
CUSTOM,
IP
}
限流抽象类
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import javax.annotation.Resource;
/**
* @author Ash
* @date 2024/7/16 12:44
* @description: 限流抽象类
*/
public abstract class LimitPolicy {
// 设置访问次数
public static final int COUNT = 3;
// 设置时间范围 10s
public static final int TIME = 10;
public abstract boolean limit();
public RedisTemplate<String, Object> redisTemplate;
public LimitPolicy(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
}
方式1:setnx
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;
/**
* @author Ash
* @date 2024/7/16 12:48
* @description: setnx 限流方法
*/
@Component
public class LimitMethodOne extends LimitPolicy {
private static final String KEY1 = "limit1";
public LimitMethodOne(RedisTemplate<String, Object> redisTemplate) {
super(redisTemplate);
}
@PostConstruct
public void init() {
setNxExpire();
}
private boolean setNxExpire() {
Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(KEY1, COUNT, TIME, TimeUnit.SECONDS);
return Boolean.TRUE.equals(ifAbsent);
}
@Override
public boolean limit() {
if (setNxExpire()) {
// 设置成功
return true;
} else {
return redisTemplate.opsForValue().decrement(KEY1) > 0;
}
}
}
方式2:zset
import org.springframework.data.redis.core.RedisTemplate;
import java.util.Date;
import java.util.UUID;
/**
* @author Ash
* @date 2024/7/17 12:27
* @description: zset方法限流
*/
public class LimitMethodTwo extends LimitPolicy{
private static final String KEY2 = "limit2";
public LimitMethodTwo(RedisTemplate<String, Object> redisTemplate) {
super(redisTemplate);
}
@Override
public boolean limit() {
long currentTime = new Date().getTime();
// zcard 计算集合中元素的数量
if (redisTemplate.opsForZSet().zCard(KEY2) > 0) {
long duation = TIME * 1000;
// 返回有序集合中指定分数区间的成员列表
int count = redisTemplate.opsForZSet().rangeByScore(KEY2, currentTime - duation, currentTime).size();
if (count > COUNT) {
return false;
}
}
redisTemplate.opsForZSet().add(KEY2, UUID.randomUUID(), currentTime);
return true;
}
}
方式3:list(令牌桶)
import cn.hutool.core.util.ObjectUtil;
import org.springframework.data.redis.core.RedisTemplate;
/**
* @author Ash
* @date 2024/7/17 12:44
* @description: 令牌同方法
*/
public class LimitMethodThree extends LimitPolicy {
private static final String KEY3 = "limit3";
public LimitMethodThree(RedisTemplate<String, Object> redisTemplate) {
super(redisTemplate);
}
@Override
public boolean limit() {
Object leftPop = redisTemplate.opsForList().leftPop(KEY3);
if (ObjectUtil.isEmpty(leftPop)) {
return false;
}
return true;
}
}
设置一个定时任务往redis中存入令牌
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.Resource;
import java.util.UUID;
/**
* @author Ash
* @date 2024/7/17 12:47
* @description: 往redis中放入令牌
*/
@Configuration
@EnableScheduling
public class LimitMethodThreeBatch {
@Resource
private RedisTemplate<String, Object> redisTemplate;
private static final String KEY3 = "limit3";
@Scheduled(fixedRate = 5000)
public void setToken(){
System.out.println("往limit3中放入一个令牌。。。。。");
redisTemplate.opsForList().leftPush(KEY3, UUID.randomUUID().toString());
System.out.println("放入令牌完毕。。。");
}
}
方式4:lua
这种方式实现的思路和setnx方式相同,不同的是使用lua和aop实现自定义注解在接口上直接使用。
自定义限流注解
/**
* @author Ash
* @date 2024/7/17 14:20
* @description: 限流注解
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Limit {
String key() default "";
String keyPrefix() default "";
int time() default 10;
int count() default 3;
CustomType limitType() ;
}
aop拦截注解
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.ImmutableList;
import com.zhurx.springcloud.annotation.Limit;
import com.zhurx.springcloud.entities.CustomType;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import java.lang.reflect.Method;
/**
* @author Ash
* @date 2024/7/17 14:26
* @description: redis + lua
*/
@Component
@Aspect
public class LimitMethodFour {
@Resource
RedisTemplate<String, Object> redisTemplate;
@Pointcut("execution(public * *(..)) && @annotation(com.zhurx.springcloud.annotation.Limit)")
public void test() {
}
@Around("test()")
public Object around(ProceedingJoinPoint point) {
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
Limit limit = method.getAnnotation(Limit.class);
try {
if (ObjectUtil.isEmpty(limit)) {
// 没有@Limit 直接放行
return point.proceed();
}
// 如果存在@Limit
CustomType limitType = limit.limitType();
String key;
// 如果不配置limitType 默认使用方法名
switch (limitType) {
case CUSTOM:
key = limit.key();
break;
case IP:
key = getIpAddress();
break;
default:
key = method.getName();
}
String keyPrefix = limit.keyPrefix();
ImmutableList<String> keyList = ImmutableList.of(StringUtils.join(keyPrefix, key));
// 使用lua
String script = buildLuaScript();
RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
Long number = redisTemplate.execute(redisScript, keyList, limit.count(), limit.time());
if (number != null && number.intValue() <= limit.count()) {
return point.proceed();
} else {
throw new RuntimeException("接口被限制");
}
} catch (Throwable e) {
if (e instanceof RuntimeException) {
throw new RuntimeException(e.getMessage());
}
throw new RuntimeException("服务错误");
}
}
private String getIpAddress() {
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = requestAttributes.getRequest();
String unknown = "unknown";
String ip = request.getHeader("X-Forwarded-For");
if (ip == null || ip.length() == 0 || unknown.equalsIgnoreCase(ip)) {
ip = request.getHeader("Proxy-Client-IP");
}
if (ip == null || ip.length() == 0 || unknown.equalsIgnoreCase(ip)) {
ip = request.getHeader("WL-Proxy-Client-IP");
}
if (ip == null || ip.length() == 0 || unknown.equalsIgnoreCase(ip)) {
ip = request.getHeader("HTTP_CLIENT_IP");
}
if (ip == null || ip.length() == 0 || unknown.equalsIgnoreCase(ip)) {
ip = request.getHeader("HTTP_X_FORWARDED_FOR");
}
if (ip == null || ip.length() == 0 || unknown.equalsIgnoreCase(ip)) {
ip = request.getHeader("X-Real-IP");
}
if (ip == null || ip.length() == 0 || unknown.equalsIgnoreCase(ip)) {
ip = request.getRemoteAddr();
}
return ip;
}
public String buildLuaScript() {
StringBuilder lua = new StringBuilder();
lua.append("local c");
lua.append("\nc = redis.call('get',KEYS[1])");
//调用不超过最大值,则直接返回
lua.append("\nif c and tonumber(c) > tonumber(ARGV[1]) then");
lua.append("\nreturn c;");
lua.append("\nend");
//执行计算器自加
lua.append("\nc = redis.call('incr',KEYS[1])");
lua.append("\nif tonumber(c) == 1 then");
//从第一次调用开始限流,设置对应键值的过期
lua.append("\nredis.call('expire',KEYS[1],ARGV[2])");
lua.append("\nend");
lua.append("\nreturn c;");
return lua.toString();
}
}
参考文章
转载自:https://juejin.cn/post/7395863831300096050