likes
comments
collection
share

微服务设计模式:速率限制模式 实现 | Java & Vert.x & Redis

作者站长头像
站长
· 阅读数 35

速率限制模式用于限制一定时间内某个用户、服务等对资源的使用,来避免系统过载。这里不会太详细地介绍该模式,更多地是描述代码实现,不过你可以访问如下资料了解更多:

设计

这里主要用 Java 代码做出一个速率限制模式的实现——对 HTTP API 接口进行速率限制,限制一个用户一段时间对接口的有效访问次数。抽象流程如下:

  1. 用户请求某 api /api/some-resources
  2. 服务器认证鉴权用户,失败返回 401403;成功则继续
  3. 记录本次请求,并检查用户是否访问受限(达到最大请求次数),如果受限则返回 429 Too Many Requests;否则继续
  4. 访问通过,执行接下来的业务代码
  5. 如果鉴权通过,HTTP Response 会带有相关 headers 来告诉用户当前速率限制情况
Header说明
X-RateLimit-Limit最大请求次数
X-RateLimit-Remaining剩余请求次数
X-RateLimit-Reset下次重置时间,单位秒时间戳

实现用的 Web 框架是 Vert.X Web,速率限制器是自己实现写的,当然也有很多流行的第三方库,比如 guava RateLimiterfailsafe RateLimiter,但是自己为了能记录和得到剩余请求次数和下次重置时间,就自己设计实现了两个:

  • 本地速率限制 - 使用 Caffeine Cache 做存储,使用 ReentrantReadWriteLock 保证并发操作时数据一致性;仅限单实例使用
  • 分布式速率限制 - 使用 Redis 做存储,使用 Lua script 保证并发操作时数据一致性;支持多实例使用

详细代码可以在我的 github 项目 microservices-patterns-in-action找到

抽象速率限制处理器

为了符合 Vert.X 的规范,先写一个 RateLimitHandler 继承 Handler<RoutingContext>,里面保存了 headers、工具方法和工厂方法。

/**
 * RateLimitHandler
 * 
 * <p>
 * 实现 <a href=
 * "https://learn.microsoft.com/zh-cn/azure/architecture/patterns/rate-limiting-pattern">
 * 速率限制模式 </a>
 */
public interface RateLimitHandler extends Handler<RoutingContext> {
    
    public static final String HEADER_LIMIT = "X-RateLimit-Limit";
    public static final String HEADER_REMAINING = "X-RateLimit-Remaining";
    /** 下次重置时间,单位 s */
    public static final String HEADER_RESET = "X-RateLimit-Reset";  

    public static final int STATUS_CODE_LIMITED = 429;
    
    @Override
    public void handle(RoutingContext context);

    public static RateLimitHandler createLocal(int limit, Duration duration) {
        return new LocalRateLimitHandler(limit, duration);
    }

    public static RateLimitHandler createRedis(Redis redis, int limit, Duration duration) {
        return new RedisRateLimitHandler(redis, limit, duration);
    }

    public record LimitDetail(int limit, int count, long reset) {

        public boolean canAccess() {
            return count <= limit;
        }

        public int remaining() {
            return count <= limit ? limit - count : 0;
        }
    }

    /**
     * Add X-RateLimit-* headers
     * @param context
     * @param limitDetail
     */
    public static void addHeadersEndHandler(RoutingContext context, LimitDetail limitDetail) {
        context.addHeadersEndHandler( v -> {
            var detail = limitDetail;
            context.response()
                    .putHeader(HEADER_LIMIT, String.valueOf(detail.limit()))
                    .putHeader(HEADER_REMAINING, String.valueOf(detail.remaining()))
                    .putHeader(HEADER_RESET, String.valueOf(detail.reset()));
        });
    }

    /**
     * End response with status code 429
     * @param context
     */
    public static void endLimited(RoutingContext context) {
        context.response().setStatusCode(STATUS_CODE_LIMITED).end();
    }
}

认证和鉴权

认证和鉴权的实现比较简单,毕竟这不是重点:

  1. 客户端请求携带 Request header X-Auth-UserId: userId
  2. 服务端检查 X-Auth-UserId 是否存在,不存在返回 401
  3. 检查 userId 是否有效,无效则返回 403
  4. userId 存到请求上下文中,继续请求处理链

Vert.X 的代码如下:

public class SimpleAuthHandler implements AuthenticationHandler {

    private static final Set<String> ALLOW_USERIDS = Set.of("spring", "vertx", "helidon");

    @Override
    public void handle(RoutingContext context) {
        String userId = context.request().getHeader("X-Auth-UserId");
        // 认证和鉴权用户
        if (userId == null) {
            context.response().setStatusCode(401).end();
        } else if (!verifyUserId(userId)) {
            context.response().setStatusCode(403).end();
        } else {
            context.setUser(User.create(new JsonObject().put("userId", userId)));
            context.next();
        }
    }
    
    public static SimpleAuthHandler create() {
        return new SimpleAuthHandler();
    }

    private boolean verifyUserId(String userId) {
        return ALLOW_USERIDS.contains(userId) || userId.startsWith("LoadTestUser-");
    }
}

本地速率限制

使用 Cache 和 JUC 读写锁代码实现:

/**
 * 本地速率限制处理器实现
 */
@Slf4j
public class LocalRateLimitHandler implements RateLimitHandler {

    private static class RateLimiter {
        
        private final int limit;
        private final AtomicInteger count;
        private final int duration; // 单位 s
        private volatile long reset; // 单位 s

        private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
        private final Lock rl = lock.readLock();
        private final Lock wl = lock.writeLock();

        public RateLimiter(int limit, Duration duration) {
            this.limit = limit;
            this.duration = (int) duration.toSeconds();
            count = new AtomicInteger(0);
        }

        public LimitDetail tryAcquire() {
            rl.lock();
            if (needReset()) {
                rl.unlock();
                wl.lock();
                try {
                    if (needReset()) {
                        count.set(0);
                        reset = Instant.now().plusSeconds(duration).getEpochSecond();
                    }
                    rl.lock();
                } finally {
                    wl.unlock();
                }
            }
            try{
                return new LimitDetail(limit, count.incrementAndGet(), reset);
            } finally {
                rl.unlock();
            }
        }

        public int getLimit() {
            return limit;
        }

        private boolean needReset() {
            return reset <= Instant.now().getEpochSecond();
        }

    }

    private final Cache<String, RateLimiter> rateLimiters = Caffeine.newBuilder()
        .expireAfterAccess(24, TimeUnit.HOURS)
        .maximumSize(100_000)
        .build();

    private final int limit;

    private final Duration duration;

    public LocalRateLimitHandler(int limit, Duration duration) {
        this.limit = limit;
        this.duration = duration;
    }

    @Override
    public void handle(RoutingContext context) {

        String userId = context.user().get("userId");
        RateLimiter rateLimiter = rateLimiters.get(userId, k -> newRateLimiter());
        var limitDetail = rateLimiter.tryAcquire();

        RateLimitHandler.addHeadersEndHandler(context, limitDetail);

        if (limitDetail.canAccess()) {
            context.next();
        } else {
            RateLimitHandler.endLimited(context);
        }
    }

    private RateLimiter newRateLimiter() {
        return new RateLimiter(limit, duration);
    }

}

速率限制逻辑就如代码 tryAcquire方法(之所以前面加个 try 是因为它不会阻塞等待,立即返回)实现的那样,不文字介绍或画图了。

在调试的时候,发生过一些死锁情况,原因在于我在 rl.lock() 的代码块里直接进行了 wl.lock(),解决照抄 juc 代码注释,wl.lock() 之前必须先 rl.unlock(),之后再重新 rl.lock()

Redis分布式速率限制

因为网络IO阻塞和Vert.X框架的情况使用了响应式编写,不得不说响应式代码比同步代码难写多了,我优化了一会儿才能像下面这样简洁:

@Slf4j
public class RedisRateLimitHandler implements RateLimitHandler{

    private static final String SCRIPT_CLASSPATH = "/script/rateLimiter.lua";

    private final Redis redis;
    private final int limit;
    private final long duration;


    public RedisRateLimitHandler(Redis redis, int limit, Duration duration) {
        this.redis = Objects.requireNonNull(redis);
        this.limit = limit;
        this.duration = Objects.requireNonNull(duration).toSeconds();
    }

    private final AtomicReference<String> scriptSHA = new AtomicReference<>("null");

    @Override
    public void handle(RoutingContext context) {
        String userId = context.user().get("userId");
        
        tryAcquire(userId)
            .onSuccess(limitDetail -> {
                RateLimitHandler.addHeadersEndHandler(context, limitDetail);
                if (limitDetail.canAccess()) {
                    context.next();
                } else {
                    RateLimitHandler.endLimited(context);
                }
            }).onFailure(context::fail);

    }

    private Future<LimitDetail> tryAcquire(String userId) {
        String key = "rl:userid:" + userId;
        return sendEval(key, limit, duration)
            .compose(Future::succeededFuture, t -> loadScript().compose(res -> sendEval(key, limit, duration)))
            .map( res -> {
                int count = res.get(0).toInteger();
                long reset = res.get(1).toLong();
                return new LimitDetail(limit, res.get(0).toInteger(), res.get(1).toLong());
            });
    }

    private Future<Response> sendEval(String key, int limit, long duration) {
        return redis.send(Request.cmd(Command.EVALSHA, scriptSHA.toString(), 1, key, limit, duration));
    }

    private Future<Response> loadScript() throws StatusCodeResponseException {
        String script = null;
        try {
            var bytes = RedisRateLimitHandler.class.getResourceAsStream(SCRIPT_CLASSPATH).readAllBytes();
            script = new String(bytes);
        } catch (Exception e) {
            log.error("Read script failed - {} {}", SCRIPT_CLASSPATH, e.getMessage());
            return Future.failedFuture(StatusCodeResponseException.create(500));
        }
        return redis.send(Request.cmd(Command.SCRIPT, "load", script))
            .onSuccess(res -> scriptSHA.set(res.toString()));
    }

}

Lua 脚本,用于检查和更新速率限制情况:

-- rateLimiter.lua
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local duration = tonumber(ARGV[2])

local exists = redis.call('exists', key)

local now = tonumber(redis.call('time')[1])

if (exists == 0) then
    local reset = now + duration
    redis.call('HSET', key, 'count', 1, 'reset', reset)
    redis.call('EXPIREAT', key, reset)
    return {1, reset}
else
    local result = redis.call('HMGET', key, 'count', 'reset')
    local count, reset = tonumber(result[1]), tonumber(result[2])

    if (now >= reset) then
        local newReset = now + duration
        redis.call('HSET', key, 'count', 1, 'reset', newReset)
        redis.call('EXPIREAT', key, newReset)
        return {1, newReset}
    end
    redis.call("HINCRBY", key, 'count', 1)
    return {count + 1, reset}
end

速率限制器的逻辑如 rateLimiter.lua 所描述的那样,因为 lua 脚本在redis中是串行执行的,所以能保证并发情况下数据的一致性。

因为如果每次都调用 eval big-script-text keys args 的话,请求体积太大了,所以可以使用 script load 先加载 script 到 redis 中,每次调用时使用 evalsha sha-key keys args 可大大减少请求体积。

使用和测试

在 Vert.X 配置 Router 时,添加 认证处理器速率限制处理器

        router.route("/api/*").handler(SimpleAuthHandler.create());

        // 限制每用户每分钟最多请求100次

        // 本地速率限制处理器
        router.route("/api/*").handler(RateLimitHandler.createLocal(100, Duration.ofSeconds(60)));

        // 或者
        // Redis 速率限制处理器

        Redis redis = Redis.createClient(vertx);
        RateLimitHandler rateLimitHandler = RateLimitHandler.createRedis(redis, 100, Duration.ofSeconds(60));
        router.route("/api/*").handler(rateLimitHandler);

这里配置的是:限制每用户每分钟最多请求100次

需要注意的是处理器的顺序认证处理器 应该在 速率限制处理器 前面,速率限制处理器 应该在业务处理器前面,而日志处理器超时处理器等应该在 认证处理器 之前。这里有一份官方的处理器顺序建议 whats-new-in-vert-x-4-3

冒烟测试/功能测试

先测试下基本的功能。

认证:

GET {{VERTX_SERVICE}}/api/test-data

HTTP/1.1 401 Unauthorized 
connection: close 
content-length: 0

鉴权

GET {{VERTX_SERVICE}}/api/test-data
X-Auth-UserId: bad-user

HTTP/1.1 403 Forbidden 
connection: close 
content-length: 0

vertx用户请求一次

GET {{VERTX_SERVICE}}/api/test-data
X-Auth-UserId: vertx

HTTP/1.1 200 OK 
content-type: application/json 
connection: close 
content-length: 20 
X-RateLimit-Limit: 100 
X-RateLimit-Remaining: 99 
X-RateLimit-Reset: 1689133896 

{ "data": "test-data" }

vertx用户请求好几次后

微服务设计模式:速率限制模式 实现 | Java & Vert.x & Redis

GET {{VERTX_SERVICE}}/api/test-data
X-Auth-UserId: vertx

HTTP/1.1 200 OK 
content-type: application/json 
connection: close 
content-length: 20 
X-RateLimit-Limit: 100 
X-RateLimit-Remaining: 64
X-RateLimit-Reset: 1689133896 

{ "data": "test-data" }

vertx用户请求一分钟内超过100次后

GET {{VERTX_SERVICE}}/api/test-data
X-Auth-UserId: vertx

HTTP/1.1 429 Too Many Requests 
connection: close 
content-length: 0 
X-RateLimit-Limit: 100 
X-RateLimit-Remaining: 0 
X-RateLimit-Reset: 1689134120

换个用户spring

GET {{VERTX_SERVICE}}/api/test-data
X-Auth-UserId: spring

HTTP/1.1 200 OK 
content-type: application/json 
connection: close 
content-length: 20 
X-RateLimit-Limit: 100 
X-RateLimit-Remaining: 99 
X-RateLimit-Reset: 1689134062 

{ "data": "test-data" }

一分钟后vertx再次请求

GET {{VERTX_SERVICE}}/api/test-data
X-Auth-UserId: vertx

HTTP/1.1 200 OK 
content-type: application/json 
connection: close 
content-length: 20 
X-RateLimit-Limit: 100 
X-RateLimit-Remaining: 99 
X-RateLimit-Reset: 1689134226 

{ "data": "test-data" }

当然这个功能测试过程可以使用 K6 编写脚本来自动化。

负载测试

使用 K6 编写脚本来测试 Redis 速率限制器的性能。

k6脚本如下

import http from 'k6/http';
import execution from 'k6/execution'
import { sleep } from 'k6';
export const options = {
  stages: [
    { duration: "30s", target: 200 },
    {duration: "3m", target: 200},
    {duration: "30s", target: 0}
  ],
};
export default function () {

  const user = "LoadTestUser-" + execution.vu.idInTest

  const params = {
    "headers": {
      "X-Auth-UserId": user,
      "X-LoadTest": "true"
    }
  }

  const res = http.get('http://localhost:7703/api/test-data', params)
  console.log(user, "-", res.status_text, "-",
  res.headers['X-Ratelimit-Limit'],
  res.headers['X-Ratelimit-Remaining'],
  res.headers['X-Ratelimit-Reset'])
}

负载测试有三个阶段

  1. 前30秒,并发用户从 0 增长到 200
  2. 之后的3分钟,并发用户在 200
  3. 最后的30秒,并发用户从 200 到 0

需要注意的是

  • HTTP headers X-LoadTest: ture 的请求会被服务器日志记录过滤掉
  • 每个并发用户执行完请求后会继续执行下个请求,没有 sleep 时间
  • 操作系统和JDK使用的是Windows和JDK17
  • Redis 在 wsl 中启动,所以网络IO很短
  • Vertx Redis 连接配置使用默认的,没有优化
  • Vertx HTTP 服务器配置使用默认,没有优化
  • K6 中的日志打印会影响客户端请求速度

这是其中的一段日志,可以看出功能还都正常

微服务设计模式:速率限制模式 实现 | Java & Vert.x & Redis

负载测试结果如下:

微服务设计模式:速率限制模式 实现 | Java & Vert.x & Redis

可以看出在服务器单实例情况下,QPS每秒 2000 多还行,平均迭代用时80ms,这可能是客户端日志打印用点时间,服务端redis连接池没有优化用点时间。85% 的失败率在意料之中,毕竟每用户每分钟只能请求100个,否则就返回429算作失败。

总结

速率限制模式中最大限制次数单位时间应该可以根据不同的用户动态配置,比如一个用户A本来是普通用户,每小时限制使用2000次,但是称为VIP后,每小时限制提升至10000次,如果在同一个小时内发生,这时候 limit 从 2000 变成 10000,count 不变,那么用户将可以比较 count <= limit 来继续正常使用服务。

代码实现上,我可能会后续更新到这篇博客上,如果你感兴趣可以关注我或收藏这篇文章一段时间后再来看看。

我挺喜欢亲自写代码实现微服务中的设计模式的,这是一个不错的学习方式。后续我还会继续用这种 不过多介绍模式只写代码实现 的方式去写更多关于微服务设计模式的文章。

如果你也感兴趣,欢迎一起交流❤。

转载自:https://juejin.cn/post/7254737947084570679
评论
请登录