likes
comments
collection

高并发技巧-流量聚合和高并发写入处理技巧

作者站长头像
站长
· 阅读数 17

这篇文章我将介绍对于写入流量高的处理技巧,并且介绍一款快手开源的很实用的用来做流量聚合的工具BufferTrigger。通过这篇文章的介绍,相信对于大多写流量(后面就称为写qps吧)高的业务场景都能找到解决方案。

读流量(qps)高的场景其实见的更多,比如常用的淘宝和抖音,大部分都是读场景,而写的流量相对读的流量整体要低不少。但是如果遇到做活动,比如电商促销秒杀;或者一些本身确实写流量高的场景,比如直播(点赞,收礼)等有什么好的解决方案呢,下面通过个人遇到的一些业务场景来介绍实际用到的解决方案以及使用的场景和局限性。

消息队列

使用消息队列进行流量削峰来解决写qps高的问题应该是最常见的解决方案,将每次的请求扔到消息队列,后续由消费者慢慢处理即可,使用消息队列能够自己控制qps和线程数,所以就能够缓解下游的压力了。

使用场景介绍

这里就举两个常见的例子

  1. 电商秒杀:每次用户秒杀的结果虽然在页面展示了,实际上一般是把请求丢到了消息队列了,后续慢慢消费处理(db中库存的改动等);
  2. 直播间收礼:在一些大V开直播的时候,直播收礼的流量10W/s很正常,服务端需要对礼物进行归类和统计等处理,这显然也可以使用mq去异步处理;

注意事项

消息队列的处理异步处理的,所以对于需要及时响应处理结果的业务可能就不大适合了。当然,对于处理结果比较固定,比如就是返回成功(商城秒杀),然后交由消息队列处理并且保证处理成功的范畴,不在考虑范围。

设置mq的消费速度要评估好,保证不能打垮下游服务;同时对于不断产生消息的业务,也不能让消费速度赶不上生产速度,导致消息不断堆积,引爆队列,必要时要对下游进行适当的横向扩容。总之,要在性能和消费速度之间进行权衡,找到一个平衡点。

流量聚合-BufferTrigger

BufferTrigger引入

流量聚合简单来说就是把多次的请求整合为一个请求处理,显然只有在业务对单词的请求不敏感时并且能接受一定延迟时才能使用,比如直播间点赞,主播对单次点赞根本不敏感,直播间展示的赞数也不是实时的,完全可以对多次点赞进行聚合,最后再进行一些列判断再累计起来存放到db或缓存中,直播间从db或缓存中进行拉取。

那么这个聚合的工具需要什么功能呢?简单来说主要就三点

  1. 提供个能存放数据的容器
  2. 当达到指定数量时输出容器中的数据
  3. 当达到指定时候时输出容器中的数据

刚好快手提供了这么个工具-BufferTrigger,这个工具在快手内部大量使用,尤其是主站直播,下面将简单介绍下这个简单易用的工具。

使用场景

对大量的数据进行聚合,然后进行批量操作,适用于数据量大且相似或相同数据多的任务或者能接受一定时间内的延迟问题

如何使用

引入依赖

<dependency>
  <groupId>com.github.phantomthief</groupId>
  <artifactId>buffer-trigger</artifactId>
  <version>0.2.9</version>
</dependency>

使用方式1

public class BufferTriggerDemo {
     BufferTrigger<Long> bufferTrigger = BufferTrigger.<Long, Map<Long, AtomicInteger>> simple()
            .maxBufferCount(10)
            .interval(4, TimeUnit.SECONDS)
            .setContainer(ConcurrentHashMap::new, (map, uid) -> {
                map.computeIfAbsent(uid, key -> new AtomicInteger()).addAndGet(1);
                return true;
            })
            .consumer(this::consumer)
            .build();



    public void consumer(Map<Long, AtomicInteger> map) {
        System.out.println(map);
    }

    public void test() throws InterruptedException {
        // 进程退出时手动消费一次
        Runtime.getRuntime().addShutdownHook(new Thread(() -> bufferTrigger.manuallyDoTrigger()));
        // 最大容量是10,这里尝试添加11个元素0-10
        for (int i = 0; i < 5; i ++) {
            for (long j = 0; j < 11; j ++) {
                bufferTrigger.enqueue(j);
            }
        }

        Thread.sleep(7000);
    }

使用simple方法来进行构建

  1. maxBuffeCount(long count): 指定容器最大容量,比如这里指定了10,当在下次聚合前容器元素数量达到10就无法添加了,-1表示无限制;
  2. internal(long interval, TimeUnit unit) :表示多久聚合一次,如果没达到时间那么consumer是不会输出的,聚合后容器就空了。
  3. setContainer(Supplier<? extends C> factory, BiPredicate<? super C, ? super E> queueAdder): 第一个变量为factory,是个Supplier,获取容器用的,要求线程安全;第二个变量是缓存更新的方法BiPredicate<? super C, ? super E> queueAdder C为容器类型,E为元素类型
  4. consumer(ThrowableConsumer<? super C, Throwable> consumer): 表示如何消费聚合后的数据,标识我们如何去消费聚合后的数据,我这里就是简单打印。
  5. enqueue(E element): 添加元素;
  6. manuallyDoTrigger: 主动触发一次消费,通常在java进程关闭的时候调用

执行一遍,输入如下

{0=5, 1=5, 2=5, 3=5, 4=5, 5=5, 6=5, 7=5, 8=5, 9=5}

使用方式2

每次将元素原封不动保存下来,然后一次性消费一整个列表元素。而上面的方式,每次添加元素都会进行计算。

public class BufferTriggerDemo2 {
     BufferTrigger<Long> bufferTrigger = BufferTrigger.<Long>batchBlocking()
             .bufferSize(50)
             .batchSize(10)
             .linger(Duration.ofSeconds(1))
             .setConsumerEx(this::consume)
             .build();

    private void consume(List<Long> nums) {
        System.out.println(nums);
    }

    public void test() throws InterruptedException {
        // 进程退出时手动消费一次
        Runtime.getRuntime().addShutdownHook(new Thread(() -> bufferTrigger.manuallyDoTrigger()));
        for (long j = 0; j < 60; j ++) {
            bufferTrigger.enqueue(j);
        }

        Thread.sleep(7000);
    }
  1. batchBlocking():提供自带背压(back-pressure)的简单批量归并消费能力;
  2. bufferSize(int bufferSize): 缓存队列的最大容量;
  3. batchSize(int size): 批处理元素的数量阈值,达到这个数量后也会进行消费
  4. linger(Duration duration): 多久消费一次
  5. setConsumerEx(ThrowableConsumer<? super List, Exception> consumer): 消费函数,注入的对象为缓存队列中尚存的所有元素,非逐个元素消费;

执行一遍,输入如下

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
[40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
[50, 51, 52, 53, 54, 55, 56, 57, 58, 59]

full gc陷阱和消费能力提速

需要注意的是BufferTrigger是单线程消费的,这个是一个比较大的陷阱,做活动的时候是踩了坑的,尤其是在消费操作中涉及到大量io操作的场景,因为在流量很高的时候可能会出现消费速度跟不上生产速度,这很容易导致full gc问题。所以如果有必要的话需要使用线程池来提升消费速度。

消息队列和BufferTrigger的组合方式

有没有考虑过消息队列的消费速度过慢,如何在不影响下游性能的情况下提升消费速?比如直播点赞,在mq每次收到一条点赞消息的时候是不是就可以使用BufferTrigger来进行聚合?然后每分钟消费一次,在流量剧增的时候是不是能十倍以上的提速?

散库散表

从框架层面我介绍了上面两种解决方案,其实使用这两种方案主要是为了缓解数据库压力,尤其是单表的情况。但是高流量一般涉及到的都是用户维度的流量,所以如果必要的话可以根据userId来进行分库分表(我一般习惯就叫散表)并且优化表的设计,需要注意的是,最好不要分了表而不分库,因为多表共用一个资源性能提升还是不大。比如我们内部分表都是1000张,库都是至少10个库(集群)。这样处理一般的请求上万的tps是完全没问题的。

下面将从业务设计(技巧)的层面来介绍如何处理写qps过高的问题。

请求丢弃

在很多业务场景下,丢弃部分请求完全不影响业务,给个合理的提示的话用户根本无法感知,这其中最常见的就是秒杀、抢红包、发弹幕这类业务,请求量很大,但只有少部分请求能拿到钱,大部分请求直接丢弃都是没问题的,到时候告诉用户没抢到或者抢没了或者弹幕发生成功即可。

预处理

这类场景在抢红包的时候经常用到,可能很多人都知道很多app抢红包其实并不是在用户每次去抢的时候再去计算金额并落库的,一般是在红包发出去的时候就计算好了,比如1W块钱,100个人,那么就会随机生成100个数额推入到缓存的队列,用户抢的时候从队列直接pop即可。

当然,可以通过消息队列异步落库,也就是同步写缓存,异步写db

又或者是在用户完成活动的时候得到一个资格,然后在指定的时间点去瓜分1W块钱,其实也是在这时间点之前就把能用的金额和获得资格的人拿出来,计算出每个人的金额再存储到缓存和db,最后用户虽然说是去抢,其实就是从缓存中把已经给他存好的钱拿出来了而已。

流量打散

这种方案可能用到的机会不是太多,组里有老人做过所以拿来借鉴。其实这种手段在活动尤其春节活动的时候常见,特点是先互动后开奖(即在业务能接受一定的延迟情况)。快手的20年春节活动就是这样,先播一段春节明星拜年的视频,视频播放完之后弹出抢红包按钮。在这一瞬间流量肯定是非常高的,如果在这时候再去算每个人的钱对服务性能要求就太高了,这是没必要去做的挑战。因为在视频播放的时候我们就可以计算出来金额。

具体的做法是,服务端给客户端返回一个打散最大时间,比如30S,那么客户端就生成随机生成一个(0,30)的时间t,并在过now+t的时候去请求服务端拿到用户的金额,用户点击抢红包的时候就能直接告诉他这个金额。