当我用 CompletableFuture 当作 SpringMVC 返回值,会发生什么?如果我使用一个 Callabl
Hello,经过上次写了一篇 CDS 类加速的文章稍作歇息之后,我又继续回来写 Java 异步文章了,因为市面上此类文章实在太少,而我又对此非常感兴趣,所以最近也更新了不少关于异步并发的东西,有兴趣的同学,可以给本篇文章点赞之后移步主页查看我的其他异步文章。
今天这篇文章是关于 Spring MVC 中使用 JDK Future 相关异步响应的源码解析文章,对异步感兴趣的同学可以反复研读。
Future 这个东西在各种语言里都有,比如在 JS 中它叫 Promise,它是并发编程中用于表示异步操作结果的抽象概念,它代表一个可能还没有完成的计算,允许你以非阻塞的方式处理异步操作。
它主要有以下几个主要特点:
- 异步执行:操作在后台进行,不阻塞主线程。
- 状态表示:可以查询操作是否完成、是否出错。
- 结果获取:提供方法来获取操作的最终结果。
- 回调机制:可以注册回调函数,在操作完成时自动调用。
- 链式操作:支持对结果进行进一步的处理或转换。
- 异常处理:可以捕获和处理异步操作中的异常。
Future/Promise 使得编写异步、并发代码变得更简单,提高了程序的响应性和效率。
它们广泛应用于网络请求、文件 I/O、复杂计算等场景,是现代编程语言中处理异步操作的核心概念。
然而由于 JDK 中 Future 接口功能性上(链式处理)的缺失,JDK8 中又引入了 CompletableFuture 作为 Future 工具链,也是我们现在最为常用的异步工具。
我相信现在已经不会有人在使用 new Thread 和 new Callable 的方式来写异步代码了吧~
你可以简单的把 CompletableFuture 理解为 Future + 线程池的组合,它不光承担了 Future 的职责,还内置了一个线程池,让你快速执行一段代码并返回一个 CompletableFuture 结果(当前也可以不使用它的线程池功能)。
那么当我们使用一个 CompletableFuture 当作 SpringMVC 的返回值的时候,它会如何处理呢?
@GetMapping("/test1")
public CompletableFuture<String> test1() {
return CompletableFuture.supplyAsync(() -> {
return "Hello";
});
}
如果我使用一个 Callable 当作返回值呢? 或者我使用 Servlet 3.0 中的异步特性 DeferredResult 当作返回值呢,Spring MVC 会像以前那样同步阻塞线程进行响应吗?
@GetMapping("/test2")
public Callable<String> test2() {
return new Callable<>() {
@Override
public String call() throws Exception {
return "Hello";
}
};
}
要弄懂以上问题,就需要探究源码了~
DispatcherServlet
任何一个学习过 Spring MVC 八股文的选手,都应该对 DispatcherServlet 并不陌生,它是 Spring MVC 中对于请求处理的核心类,DispatcherServlet 中的核心就是下面我精简过的 doDispatch 方法。
protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception {
HttpServletRequest processedRequest = request;
HandlerExecutionChain mappedHandler = null;
boolean multipartRequestParsed = false;
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
try {
ModelAndView mv = null;
Exception dispatchException = null;
try {
processedRequest = checkMultipart(request);
multipartRequestParsed = (processedRequest != request);
// Determine handler for the current request.
mappedHandler = getHandler(processedRequest);
if (mappedHandler == null) {
noHandlerFound(processedRequest, response);
return;
}
// Determine handler adapter for the current request.
HandlerAdapter ha = getHandlerAdapter(mappedHandler.getHandler());
if (!mappedHandler.applyPreHandle(processedRequest, response)) {
return;
}
// Actually invoke the handler.
mv = ha.handle(processedRequest, response, mappedHandler.getHandler());
if (asyncManager.isConcurrentHandlingStarted()) {
return;
}
applyDefaultViewName(processedRequest, mv);
mappedHandler.applyPostHandle(processedRequest, response, mv);
}
}
通过代码我们可以看到,其执行逻辑可以简要分为四步:
- getHandler 方法获取一个执行器链。
- getHandlerAdapter 方法包装出一个包装对象,它就是主要执行类。
- 执行 applyPreHandle 方法,即执行执行器链中拦截器的 preHandle 方法。
- ha.handle 真正开始执行此请求。
- 执行 applyPostHandle 方法,即执行执行器链中拦截器的 postHandle 方法。
由于我们的目的是查看 SpringMVC 对于异步返回值的处理,所以我们需要深入 handle 方法进行追寻。
前面我们已经知道了这里获取的是一个包装类 —— HandlerAdapter
,这个包装类使用了抽象模板设计模式,所以我们执行 handle 的时候会先进入它的抽象类——AbstractHandlerMethodAdap
ter。
在这里,我们看一下 handle 的源码:
@Override
@Nullable
public final ModelAndView handle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {
return handleInternal(request, response, (HandlerMethod) handler);
}
抽象类中的 handle 方法直接调用了它本身的抽象方法——handleInternal 方法,这个抽象方法只有一个实现类 —— RequestMappingHandlerAdap
ter,在 RequestMappingHandlerAdapter 的实现中,它会调用内部的 invokeHandlerMethod 方法继续执行:
protected ModelAndView invokeHandlerMethod(HttpServletRequest request,
HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
ServletWebRequest webRequest = new ServletWebRequest(request, response);
try {
WebDataBinderFactory binderFactory = getDataBinderFactory(handlerMethod);
ModelFactory modelFactory = getModelFactory(handlerMethod, binderFactory);
ServletInvocableHandlerMethod invocableMethod = createInvocableHandlerMethod(handlerMethod);
if (this.argumentResolvers != null) {
invocableMethod.setHandlerMethodArgumentResolvers(this.argumentResolvers);
}
if (this.returnValueHandlers != null) {
invocableMethod.setHandlerMethodReturnValueHandlers(this.returnValueHandlers);
}
invocableMethod.setDataBinderFactory(binderFactory);
invocableMethod.setParameterNameDiscoverer(this.parameterNameDiscoverer);
ModelAndViewContainer mavContainer = new ModelAndViewContainer();
mavContainer.addAllAttributes(RequestContextUtils.getInputFlashMap(request));
modelFactory.initModel(webRequest, mavContainer, invocableMethod);
mavContainer.setIgnoreDefaultModelOnRedirect(this.ignoreDefaultModelOnRedirect);
AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
asyncWebRequest.setTimeout(this.asyncRequestTimeout);
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
asyncManager.setTaskExecutor(this.taskExecutor);
asyncManager.setAsyncWebRequest(asyncWebRequest);
asyncManager.registerCallableInterceptors(this.callableInterceptors);
asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);
invocableMethod.invokeAndHandle(webRequest, mavContainer);
if (asyncManager.isConcurrentHandlingStarted()) {
return null;
}
return getModelAndView(mavContainer, modelFactory, webRequest);
}
finally {
webRequest.requestCompleted();
}
}
通过上面的代码我们可以看到,它先是创建了一个 ServletWebRequest,又创建了 invocableMethod 这个真正的执行对象,同时通过 setHandlerMethodArgumentResolvers 方法为它设置了返回值处理器,这个返回值处理器是一个数组:
接着初始化了模型与视图对象和异步任务管理器——WebAsyncManager
,这个异步任务管理器是直接通过请求获取的。
然后使用 invocableMethod 调用 invokeAndHandle 去真正执行方法内部逻辑,这里的两个参数 webRequest 和 mavContainer,一个代表请求数据,一个代表模型视图数据容器。
因为有异步的支持,所以在调用结束后,它马上会通过 asyncManager.isConcurrentHandlingStarted 判断这个请求是否是异步,如果是异步现在不会直接有结果,所以这里直接 return null。
接下来我们重点来看 invokeAndHandle 方法。
InvokeAndHandle
通过方法名字我们就知道,这里一个先执行我们的方法逻辑后对返回值进行处理的方法:
public void invokeAndHandle(ServletWebRequest webRequest, ModelAndViewContainer mavContainer,
Object... providedArgs) throws Exception {
Object returnValue = invokeForRequest(webRequest, mavContainer, providedArgs);
setResponseStatus(webRequest);
if (returnValue == null) {
if (isRequestNotModified(webRequest) || getResponseStatus() != null || mavContainer.isRequestHandled()) {
disableContentCachingIfNecessary(webRequest);
mavContainer.setRequestHandled(true);
return;
}
}
else if (StringUtils.hasText(getResponseStatusReason())) {
mavContainer.setRequestHandled(true);
return;
}
mavContainer.setRequestHandled(false);
Assert.state(this.returnValueHandlers != null, "No return value handlers");
try {
this.returnValueHandlers.handleReturnValue(
returnValue, getReturnValueType(returnValue), mavContainer, webRequest);
}
catch (Exception ex) {
if (logger.isTraceEnabled()) {
logger.trace(formatErrorForReturnValue(returnValue), ex);
}
throw ex;
}
}
这里的逻辑比较简单先通过 invokeForRequest 执行了方法,然后获取了返回值,由于我们的返回值是一个 CompletableFuture,所以这里的结果也是一个 CompletableFuture 对象。
接着进入判断,它并不等于 null,所以进入 StringUtils.hasText(getResponseStatusReason()),由于此时我们并没有真正的响应结果,所以 getResponseStatusReason() 返回值是 null,所以这个判断也不会成立。
继续往下走,我们就进入了返回值处理阶段,通过 this.returnValueHandlers.handleReturnValue 进行执行,returnValueHandlers 变量就是我们的返回值处理链,它的内部就是我们前面截图过的:
所以下面的逻辑我们应该能猜到,它会通过循环这个数组中的所有返回值处理器,拿到一个我们能够使用的,使用这个返回值处理器进行处理:
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
HandlerMethodReturnValueHandler handler = selectHandler(returnValue, returnType);
if (handler == null) {
throw new IllegalArgumentException("Unknown return value type: " + returnType.getParameterType().getName());
}
handler.handleReturnValue(returnValue, returnType, mavContainer, webRequest);
}
源码就是这个逻辑,selectHandler 方法会匹配这些处理器中能匹配到的类,比如我们今天的例子有 CompletableFuture 返回值的和 Callable 返回值的,他们对应数组中的 7 号和 8 号两个处理器:
CallableMethodReturnValueHandler
public boolean supportsReturnType(MethodParameter returnType) {
return Callable.class.isAssignableFrom(returnType.getParameterType());
}
DeferredResultMethodReturnValueHandler
public boolean supportsReturnType(MethodParameter returnType) {
Class<?> type = returnType.getParameterType();
return (DeferredResult.class.isAssignableFrom(type) ||
ListenableFuture.class.isAssignableFrom(type) ||
CompletionStage.class.isAssignableFrom(type));
}
没错,就是简单粗暴的用类名匹配,其中 CompletableFuture 是 CompletionStage 的子类,所以会被 DeferredResultMethodReturnValueHandler 处理,它还同时会处理 Future 和 DeferredResult 作为返回值的方法。
Callable 返回值处理
当你的方法返回值是 Callable 时,SpringMVC 会使用 CallableMethodReturnValueHandler 进行处理,之所以先讲这个,是因为它的代码更直观且简单。
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
Callable<?> callable = (Callable<?>) returnValue;
WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(callable, mavContainer);
}
它的核心代码是 startCallableProcessing 这个方法,这个方法会通过通过 Servlet3.0 的AsyncContext 特性,开启一个异步方法处理:
startAsyncProcessing(processingContext);
try {
Future<?> future = this.taskExecutor.submit(() -> {
Object result = null;
try {
interceptorChain.applyPreProcess(this.asyncWebRequest, callable);
result = callable.call();
}
catch (Throwable ex) {
result = ex;
}
finally {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, result);
}
setConcurrentResultAndDispatch(result);
});
interceptorChain.setTaskFuture(future);
}
startAsyncProcessing 方法就是开启异步的方法的核心,这里面会调用 this.asyncWebRequest.startAsync();,然后可以看到 Callabe 其实会被放到一个 taskExecutor 中进行执行,它直接调用了 call 方法,等于有一个线程在线程池里一直等待结果,然后通过 setConcurrentResultAndDispatch 方法将结果返回回去,返回的步骤则是通过 this.asyncWebRequest.dispatch(); 进行处理,这也是 AsyncContext 的 API。
所以我们可以发现,如果你使用 Callable 当返回值,内部其实是一个线程池帮你执行并返回。
CompletableFuture 返回值处理
前文已经提到过,无论是使用DeferredResult 还是 Future,抑或是CompletableFuture,它的处理类都是 DeferredResultMethodReturnValueHandler。
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
DeferredResult<?> result;
if (returnValue instanceof DeferredResult) {
result = (DeferredResult<?>) returnValue;
}
else if (returnValue instanceof ListenableFuture) {
result = adaptListenableFuture((ListenableFuture<?>) returnValue);
}
else if (returnValue instanceof CompletionStage) {
result = adaptCompletionStage((CompletionStage<?>) returnValue);
}
else {
// Should not happen...
throw new IllegalStateException("Unexpected return value type: " + returnValue);
}
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
}
它的内部处理是一个 DeferredResult 处理,其它两种返回值都会被包装成一个 DeferredResult 进行统一处理。
在其内部方法 startDeferredResultProcessing 可以看到它的流程和 Callable 处理差不多,调用的 AsyncContext API 都是一样的:
startAsyncProcessing(processingContext);
try {
interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
deferredResult.setResultHandler(result -> {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
setConcurrentResultAndDispatch(result);
});
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
但是在返回值处理中,略有不同,这里并没有使用一个线程池等待结果,而是注册了 ResultHandler,这个方法是 DeferredResult 的内置方法,用于设置一个结果处理对象。
这里就要提到一个 DeferredResult 的不同,它必须在使用之后通过 setResult 方法手动设置结果值,这个方法也使用注册的 ResultHandler,所以这里其实是定义了一个 DeferredResult 的 hook。
这里处理完了之后整个流程再次回归到 SpringMVC, 然后就是SpringMVC 的 ModelView 流程写入结果了。
补充说明~
通过今天的文章可以看到,在 Spring MVC 中使用异步响应,其实是通过 AsyncContext 的 API 进行处理的。
那么既然都是异步,SpringMVC 和 Spring WebFlux 其实还是大有不同的,它们最大的不同点就是阻塞和非阻塞,Spring WebFlux 是一个完全非阻塞的框架,而 MVC 不是。
虽然看起来 DeferredResult 已经通过回调的方式进行异步写入操作了,但是在写的时候依然是一个阻塞 IO 操作,而且内部还是有很多 sync 关键字加锁操作的,具体解释可以参考官方文档中的这里查看。
要想避免这个阻塞 IO,实现非阻塞 IO 返回数据,我们需要通过 Servlet3.1 的特性——setWriteListener API,这里可以给大家一个示例:
@GetMapping("/nonblocking")
public void handleNonBlocking(HttpServletRequest request, HttpServletResponse response) {
AsyncContext asyncContext = request.startAsync();
ServletOutputStream output = response.getOutputStream();
output.setWriteListener(new WriteListener() {
@Override
public void onWritePossible() throws IOException {
while (output.isReady()) {
output.write("Data chunk".getBytes());
}
asyncContext.complete();
}
@Override
public void onError(Throwable t) {
asyncContext.complete();
}
});
}
好了,今天的文章就到这里,接下来可能会更新一篇关于异步、阻塞、非阻塞、响应式编程、以及事件驱动的文章,基本上是一篇讲清楚各种名词,以及如何分辨它们的区别,大家可以狠狠点赞,一个赞可以让我提前更新 1 个小时。
转载自:https://juejin.cn/post/7407004271328378932