likes
comments
collection
share

Spring Boot「44」服务端实时消息推送(一)

作者站长头像
站长
· 阅读数 15

开启掘金成长之旅!这是我参与「掘金日新计划 · 2 月更文挑战」的第 29 天,点击查看活动详情

在 HTTP/1.1 及以前的版本里,客户端、服务端之间的通讯模式只支持一种请求-响应模式,这是一种半双工通讯模式。 而且,在这个模型中,服务端是“被动方”,只能响应用户的请求,不能主动地推送消息给客户端。 为了解决这个问题,出现了支持实时通讯的 WebSocket 协议,以及 HTTP/2、HTTP/3 中的 Stream、Server Push 等特性。 但是,在 HTTP/1.1 版本中,要实现“实时通讯”的效果只能通过轮询(Polling)技术。而且,轮询也只能达到近似“实时通讯”效果。

今天,我将介绍如何在 Spring Boot 中通过轮询实现服务端数据变动后通知客户端。

01-短轮询和长轮询

短轮询比较好理解,指在客户端通过循环的方式每隔一段时间就请求一下服务端,是否有数据更新。 短轮询的伪代码类似于:

while(true) {
    // 请求服务端
    resposne = query(request);
    // 如果服务端有数据返回,则处理;否则,继续请求服务端   
    if (hasData(response)) {    
        process(response);
    } else {
        // 隔 100ms 再次请求
        sleep(100);
    }
}

短轮询方案中最大的问题是,频繁的请求对服务端的压力太大,而且也浪费网络带宽资源。 长轮询是对短轮询方案的一种改进,旨在减少对服务器资源的浪费。 长轮询在中间件中使用比较常见,例如 Nacos 配置中心,RocketMQ 消息队列等。

长轮询与短轮询的机制是类似的,客户端的逻辑不变,主要在服务端优化。 在长轮询中,服务端在没有数据更新时并不会立即响应客户端的请求,而是会 hold 住一段时间。 在这段时间中,如果数据有更新,则立即返回;如果没有更新,超时后客户端会再次发起请求。

01.1-阻塞长轮询实现方式

一种标准的实现方式是,在服务端直接等待数据更新,等待一段时间后再响应请求。

@RestController
@RequestMapping("/polling")
public class BlockingController {
    @GetMapping("/blocking")
    public ResponseEntity<?> processBlocking(Model model) {
        try {
            // 模拟长时间等待数据更新
            TimeUnit.SECONDS.sleep(30);
        } catch (InterruptedException ie) { }
        return ResponseEntity.ok("ok");
    }
}

这种方式有一种明显的缺点,Servlet 容器中处理请求的线程会被阻塞,导致服务端的吞吐量降低,难以应对高并发场景。

01.2-非阻塞长轮询实现方式

对上节中的一种改进是把 Servlet 中处理线程的工作交给其他的线程去做。

@RestController
@RequestMapping("/polling")
public class NonBlockingController {

    private ExecutorService pool = Executors.newFixedThreadPool(5);

    @GetMapping("/nonblocking")
    public DeferredResult<String> processNonblocking(Model model) {
        DeferredResult<String> output = new DeferredResult<>();
        pool.execute(() -> {
            try {
                // 模拟长时间等待数据更新
                TimeUnit.SECONDS.sleep(30);
                output.setResult("ok");
            } catch (InterruptedException e) {
                output.setErrorResult("Something went wrong with your order!");
            }
        });
        return output;
    }
}

可以看到,在 handler 中,创建了一个任务提交到了线程池里面去执行,Servlet 请求处理线程立即返回了。

我们来看下 Spring 的日志.

[nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : GET "/polling/nonblocking", parameters={}
[nio-8080-exec-1] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped to self.samson.example.polling.controller.NonBlockingController#processNonblocking(Model)
[nio-8080-exec-1] o.s.w.c.request.async.WebAsyncManager    : Started async request
[nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Exiting but response remains open for further handling
[pool-1-thread-1] o.s.w.c.request.async.WebAsyncManager    : Async result set, dispatch to /polling/nonblocking
[nio-8080-exec-2] o.s.web.servlet.DispatcherServlet        : "ASYNC" dispatch for GET "/polling/nonblocking", parameters={}
[nio-8080-exec-2] s.w.s.m.m.a.RequestMappingHandlerAdapter : Resume with async result ["ok"]
[nio-8080-exec-2] m.m.a.RequestResponseBodyMethodProcessor : Using 'text/html', given [text/html, application/xhtml+xml, image/avif, image/webp, image/apng, application/xml;q=0.9, application/signed-exchange;v=b3;q=0.7, */*;q=0.8] and supported [text/plain, */*, text/plain, */*, application/json, application/*+json, application/json, application/*+json]
[nio-8080-exec-2] m.m.a.RequestResponseBodyMethodProcessor : Writing ["ok"]
[nio-8080-exec-2] o.s.web.servlet.DispatcherServlet        : Exiting from "ASYNC" dispatch, status 200

第 4 行时,Servlet 请求处理线程执行完 NonBlockingController#processNonblocking 方法,但是此时 Servlet 容器并没有回复客户端的请求,直到线程池执行完任务,通过 output.setResult 返回结果,响应才发送给客户端。 可以看到,这里的 DeferredResult 是实现长轮询机制的关键,这个我将在下一节中介绍。

这种异步响应特性需要 Servlet 3.0 及以上容器才能支持,而且 Spring 的版本要高于 3.2。 在上面的实现中,请求的处理过程是在另外一个线程(线程池中)完成的,并且在处理完成后,调用了 DeferredResult#setResult 方法。 在这个过程中,底层 Servlet 容器会保持住与客户端之间的连接,直到响应完成,或者请求超时(默认为 60s)。

02. Spring 中的 DeferredResult

DeferredResult 是 Spring 提供的一个异步请求处理接口,在 I/O 密集的场景中非常有用。 它支持三种类型的回调:

  1. onCompletion,当异步请求处理完毕后,会执行这部分代码。
  2. onError,当异步执行遇到问题后,会执行这部分代码。
  3. onTimeout,当异步执行超时后,会执行这部分代码。

接下来,我将通过一个例子来演示下这三类回调的使用场景。

假设我经营了一家面包店,顾客可以通过 /bakery/order/{something} 来点单,点的单都会放到一个订单表中。 面包师会根据订单制作,制作好了通过 /bakery/finish/{order} 来完成订单。 如果顾客比较心急,等待一定时间后,会取消订单。 我们来看下如何实现:

@RestController
@RequestMapping("/bakery")
public class BakeryController {

    // 存储订单,假设目前因为人手原因,每种商品最多同时接一单
    private Map<String, DeferredResult<String>> orders = new HashMap<>();
    
    @GetMapping("/order/{something}")
    public DeferredResult<String> order(@PathVariable("something") String something) {

        DeferredResult<String> result = new DeferredResult<>(20 * 1000L);
        result.onCompletion(() -> {
            orders.remove(something);
            System.out.println("顾客,您的" + something + "做好了!");
        });
        
        result.onTimeout(() -> {
            orders.remove(something);
            System.out.println("做得太慢了,我不要了");
        });
        
        result.onError((e) -> {
            orders.remove(something);
            System.out.println("出错了,好吧,我换一家");
        });
        
        orders.put(something, result);
        return result;
    }

    @GetMapping("/finish/{order}")
    public String finish(@PathVariable("order") String order) {
        if (orders.containsKey(order)) {
            DeferredResult<String> result = orders.get(order);
            result.setResult("我完成了一单 " + order);
        }
        return "success";
    }
}

顾客点单后会等待,此时如果面包师能够在顾客耐心消失前做好他的订单,就会通知他取商品(onCompletion); 如果顾客等待的耐心全无,就会取消订单离开(onTimeout); 如果制作过程中,遇到问题,顾客会取消订单(onError)。

03-总结

今天介绍了通过轮询方式向服务端“实时”推送消息的实现方式。 这实际上是一种伪实时方式。如果要实现真正的实时通讯,需要用到其他的技术,例如 WebSocket、更高版本的 HTTP 协议等。

希望今天的内容能对你有所帮助。