likes
comments
collection
share

当我用 CompletableFuture 当作 SpringMVC 返回值,会发生什么?如果我使用一个 Callabl

作者站长头像
站长
· 阅读数 40

Hello,经过上次写了一篇 CDS 类加速的文章稍作歇息之后,我又继续回来写 Java 异步文章了,因为市面上此类文章实在太少,而我又对此非常感兴趣,所以最近也更新了不少关于异步并发的东西,有兴趣的同学,可以给本篇文章点赞之后移步主页查看我的其他异步文章。

今天这篇文章是关于 Spring MVC 中使用 JDK Future 相关异步响应的源码解析文章,对异步感兴趣的同学可以反复研读。

Future 这个东西在各种语言里都有,比如在 JS 中它叫 Promise,它是并发编程中用于表示异步操作结果的抽象概念,它代表一个可能还没有完成的计算,允许你以非阻塞的方式处理异步操作。

它主要有以下几个主要特点:

  1. 异步执行:操作在后台进行,不阻塞主线程。
  2. 状态表示:可以查询操作是否完成、是否出错。
  3. 结果获取:提供方法来获取操作的最终结果。
  4. 回调机制:可以注册回调函数,在操作完成时自动调用。
  5. 链式操作:支持对结果进行进一步的处理或转换。
  6. 异常处理:可以捕获和处理异步操作中的异常。

Future/Promise 使得编写异步、并发代码变得更简单,提高了程序的响应性和效率。

它们广泛应用于网络请求、文件 I/O、复杂计算等场景,是现代编程语言中处理异步操作的核心概念。

然而由于 JDK 中 Future 接口功能性上(链式处理)的缺失,JDK8 中又引入了 CompletableFuture 作为 Future 工具链,也是我们现在最为常用的异步工具。

我相信现在已经不会有人在使用 new Thread 和 new Callable 的方式来写异步代码了吧~

你可以简单的把 CompletableFuture 理解为 Future + 线程池的组合,它不光承担了 Future 的职责,还内置了一个线程池,让你快速执行一段代码并返回一个 CompletableFuture 结果(当前也可以不使用它的线程池功能)。

那么当我们使用一个 CompletableFuture 当作 SpringMVC 的返回值的时候,它会如何处理呢?

@GetMapping("/test1")
    public CompletableFuture<String> test1() {
        return CompletableFuture.supplyAsync(() -> {
            return "Hello";
        });
    }

如果我使用一个 Callable 当作返回值呢? 或者我使用 Servlet 3.0 中的异步特性 DeferredResult 当作返回值呢,Spring MVC 会像以前那样同步阻塞线程进行响应吗?

@GetMapping("/test2")
    public Callable<String> test2() {
        return new Callable<>() {
            @Override
            public String call() throws Exception {
                return "Hello";
            }
        };
    }

要弄懂以上问题,就需要探究源码了~

DispatcherServlet

任何一个学习过 Spring MVC 八股文的选手,都应该对 DispatcherServlet 并不陌生,它是 Spring MVC 中对于请求处理的核心类,DispatcherServlet 中的核心就是下面我精简过的 doDispatch 方法。

protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception {
		HttpServletRequest processedRequest = request;
		HandlerExecutionChain mappedHandler = null;
		boolean multipartRequestParsed = false;

		WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);

		try {
			ModelAndView mv = null;
			Exception dispatchException = null;

			try {
				processedRequest = checkMultipart(request);
				multipartRequestParsed = (processedRequest != request);

				// Determine handler for the current request.
				mappedHandler = getHandler(processedRequest);
				if (mappedHandler == null) {
					noHandlerFound(processedRequest, response);
					return;
				}

				// Determine handler adapter for the current request.
				HandlerAdapter ha = getHandlerAdapter(mappedHandler.getHandler());

				if (!mappedHandler.applyPreHandle(processedRequest, response)) {
					return;
				}
              
				// Actually invoke the handler.
				mv = ha.handle(processedRequest, response, mappedHandler.getHandler());

				if (asyncManager.isConcurrentHandlingStarted()) {
					return;
				}

				applyDefaultViewName(processedRequest, mv);
				mappedHandler.applyPostHandle(processedRequest, response, mv);
			}
	}

通过代码我们可以看到,其执行逻辑可以简要分为四步:

  1. getHandler 方法获取一个执行器链。
  2. getHandlerAdapter 方法包装出一个包装对象,它就是主要执行类。
  3. 执行 applyPreHandle 方法,即执行执行器链中拦截器的 preHandle 方法。
  4. ha.handle 真正开始执行此请求。
  5. 执行 applyPostHandle 方法,即执行执行器链中拦截器的 postHandle 方法。

由于我们的目的是查看 SpringMVC 对于异步返回值的处理,所以我们需要深入 handle 方法进行追寻。

前面我们已经知道了这里获取的是一个包装类 —— HandlerAdapter,这个包装类使用了抽象模板设计模式,所以我们执行 handle 的时候会先进入它的抽象类——AbstractHandlerMethodAdapter。

在这里,我们看一下 handle 的源码:

@Override
	@Nullable
	public final ModelAndView handle(HttpServletRequest request, HttpServletResponse response, Object handler)
			throws Exception {

		return handleInternal(request, response, (HandlerMethod) handler);
	}

抽象类中的 handle 方法直接调用了它本身的抽象方法——handleInternal 方法,这个抽象方法只有一个实现类 —— RequestMappingHandlerAdapter,在 RequestMappingHandlerAdapter 的实现中,它会调用内部的 invokeHandlerMethod 方法继续执行:

protected ModelAndView invokeHandlerMethod(HttpServletRequest request,
			HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {

		ServletWebRequest webRequest = new ServletWebRequest(request, response);
		try {
			WebDataBinderFactory binderFactory = getDataBinderFactory(handlerMethod);
			ModelFactory modelFactory = getModelFactory(handlerMethod, binderFactory);

			ServletInvocableHandlerMethod invocableMethod = createInvocableHandlerMethod(handlerMethod);
			if (this.argumentResolvers != null) {
				invocableMethod.setHandlerMethodArgumentResolvers(this.argumentResolvers);
			}
			if (this.returnValueHandlers != null) {
				invocableMethod.setHandlerMethodReturnValueHandlers(this.returnValueHandlers);
			}
			invocableMethod.setDataBinderFactory(binderFactory);
			invocableMethod.setParameterNameDiscoverer(this.parameterNameDiscoverer);

			ModelAndViewContainer mavContainer = new ModelAndViewContainer();
			mavContainer.addAllAttributes(RequestContextUtils.getInputFlashMap(request));
			modelFactory.initModel(webRequest, mavContainer, invocableMethod);
			mavContainer.setIgnoreDefaultModelOnRedirect(this.ignoreDefaultModelOnRedirect);

			AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
			asyncWebRequest.setTimeout(this.asyncRequestTimeout);

			WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
			asyncManager.setTaskExecutor(this.taskExecutor);
			asyncManager.setAsyncWebRequest(asyncWebRequest);
			asyncManager.registerCallableInterceptors(this.callableInterceptors);
			asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);

			invocableMethod.invokeAndHandle(webRequest, mavContainer);
			if (asyncManager.isConcurrentHandlingStarted()) {
				return null;
			}

			return getModelAndView(mavContainer, modelFactory, webRequest);
		}
		finally {
			webRequest.requestCompleted();
		}
	}

通过上面的代码我们可以看到,它先是创建了一个 ServletWebRequest,又创建了 invocableMethod 这个真正的执行对象,同时通过 setHandlerMethodArgumentResolvers 方法为它设置了返回值处理器,这个返回值处理器是一个数组:

当我用 CompletableFuture 当作 SpringMVC 返回值,会发生什么?如果我使用一个 Callabl

接着初始化了模型与视图对象和异步任务管理器——WebAsyncManager,这个异步任务管理器是直接通过请求获取的。

然后使用 invocableMethod 调用 invokeAndHandle 去真正执行方法内部逻辑,这里的两个参数 webRequest 和 mavContainer,一个代表请求数据,一个代表模型视图数据容器。

因为有异步的支持,所以在调用结束后,它马上会通过 asyncManager.isConcurrentHandlingStarted 判断这个请求是否是异步,如果是异步现在不会直接有结果,所以这里直接 return null。

接下来我们重点来看 invokeAndHandle 方法。

InvokeAndHandle

通过方法名字我们就知道,这里一个先执行我们的方法逻辑后对返回值进行处理的方法:

public void invokeAndHandle(ServletWebRequest webRequest, ModelAndViewContainer mavContainer,
			Object... providedArgs) throws Exception {

		Object returnValue = invokeForRequest(webRequest, mavContainer, providedArgs);
		setResponseStatus(webRequest);

		if (returnValue == null) {
			if (isRequestNotModified(webRequest) || getResponseStatus() != null || mavContainer.isRequestHandled()) {
				disableContentCachingIfNecessary(webRequest);
				mavContainer.setRequestHandled(true);
				return;
			}
		}
		else if (StringUtils.hasText(getResponseStatusReason())) {
			mavContainer.setRequestHandled(true);
			return;
		}

		mavContainer.setRequestHandled(false);
		Assert.state(this.returnValueHandlers != null, "No return value handlers");
		try {
			this.returnValueHandlers.handleReturnValue(
					returnValue, getReturnValueType(returnValue), mavContainer, webRequest);
		}
		catch (Exception ex) {
			if (logger.isTraceEnabled()) {
				logger.trace(formatErrorForReturnValue(returnValue), ex);
			}
			throw ex;
		}
	}

这里的逻辑比较简单先通过 invokeForRequest 执行了方法,然后获取了返回值,由于我们的返回值是一个 CompletableFuture,所以这里的结果也是一个 CompletableFuture 对象。

接着进入判断,它并不等于 null,所以进入 StringUtils.hasText(getResponseStatusReason()),由于此时我们并没有真正的响应结果,所以 getResponseStatusReason() 返回值是 null,所以这个判断也不会成立。

继续往下走,我们就进入了返回值处理阶段,通过 this.returnValueHandlers.handleReturnValue 进行执行,returnValueHandlers 变量就是我们的返回值处理链,它的内部就是我们前面截图过的:

当我用 CompletableFuture 当作 SpringMVC 返回值,会发生什么?如果我使用一个 Callabl

所以下面的逻辑我们应该能猜到,它会通过循环这个数组中的所有返回值处理器,拿到一个我们能够使用的,使用这个返回值处理器进行处理:

public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
			ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

		HandlerMethodReturnValueHandler handler = selectHandler(returnValue, returnType);
		if (handler == null) {
			throw new IllegalArgumentException("Unknown return value type: " + returnType.getParameterType().getName());
		}
		handler.handleReturnValue(returnValue, returnType, mavContainer, webRequest);
	}

源码就是这个逻辑,selectHandler 方法会匹配这些处理器中能匹配到的类,比如我们今天的例子有 CompletableFuture 返回值的和 Callable 返回值的,他们对应数组中的 7 号和 8 号两个处理器:

CallableMethodReturnValueHandler

public boolean supportsReturnType(MethodParameter returnType) {
		return Callable.class.isAssignableFrom(returnType.getParameterType());
	}

DeferredResultMethodReturnValueHandler

public boolean supportsReturnType(MethodParameter returnType) {
		Class<?> type = returnType.getParameterType();
		return (DeferredResult.class.isAssignableFrom(type) ||
				ListenableFuture.class.isAssignableFrom(type) ||
				CompletionStage.class.isAssignableFrom(type));
	}

没错,就是简单粗暴的用类名匹配,其中 CompletableFuture 是 CompletionStage 的子类,所以会被 DeferredResultMethodReturnValueHandler 处理,它还同时会处理 Future 和 DeferredResult 作为返回值的方法。

Callable 返回值处理

当你的方法返回值是 Callable 时,SpringMVC 会使用 CallableMethodReturnValueHandler 进行处理,之所以先讲这个,是因为它的代码更直观且简单。

public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
			ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

		if (returnValue == null) {
			mavContainer.setRequestHandled(true);
			return;
		}

		Callable<?> callable = (Callable<?>) returnValue;
		WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(callable, mavContainer);
	}

它的核心代码是 startCallableProcessing 这个方法,这个方法会通过通过 Servlet3.0 的AsyncContext 特性,开启一个异步方法处理:

startAsyncProcessing(processingContext);
		try {
			Future<?> future = this.taskExecutor.submit(() -> {
				Object result = null;
				try {
					interceptorChain.applyPreProcess(this.asyncWebRequest, callable);
					result = callable.call();
				}
				catch (Throwable ex) {
					result = ex;
				}
				finally {
					result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, result);
				}
				setConcurrentResultAndDispatch(result);
			});
			interceptorChain.setTaskFuture(future);
		}

startAsyncProcessing 方法就是开启异步的方法的核心,这里面会调用 this.asyncWebRequest.startAsync();,然后可以看到 Callabe 其实会被放到一个 taskExecutor 中进行执行,它直接调用了 call 方法,等于有一个线程在线程池里一直等待结果,然后通过 setConcurrentResultAndDispatch 方法将结果返回回去,返回的步骤则是通过 this.asyncWebRequest.dispatch(); 进行处理,这也是 AsyncContext 的 API。

所以我们可以发现,如果你使用 Callable 当返回值,内部其实是一个线程池帮你执行并返回。

CompletableFuture 返回值处理

前文已经提到过,无论是使用DeferredResult 还是 Future,抑或是CompletableFuture,它的处理类都是 DeferredResultMethodReturnValueHandler

public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
			ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

		if (returnValue == null) {
			mavContainer.setRequestHandled(true);
			return;
		}

		DeferredResult<?> result;

		if (returnValue instanceof DeferredResult) {
			result = (DeferredResult<?>) returnValue;
		}
		else if (returnValue instanceof ListenableFuture) {
			result = adaptListenableFuture((ListenableFuture<?>) returnValue);
		}
		else if (returnValue instanceof CompletionStage) {
			result = adaptCompletionStage((CompletionStage<?>) returnValue);
		}
		else {
			// Should not happen...
			throw new IllegalStateException("Unexpected return value type: " + returnValue);
		}

		WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
	}

它的内部处理是一个 DeferredResult 处理,其它两种返回值都会被包装成一个 DeferredResult 进行统一处理。

在其内部方法 startDeferredResultProcessing 可以看到它的流程和 Callable 处理差不多,调用的 AsyncContext API 都是一样的:

startAsyncProcessing(processingContext);

		try {
			interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
			deferredResult.setResultHandler(result -> {
				result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
				setConcurrentResultAndDispatch(result);
			});
		}
		catch (Throwable ex) {
			setConcurrentResultAndDispatch(ex);
		}

但是在返回值处理中,略有不同,这里并没有使用一个线程池等待结果,而是注册了 ResultHandler,这个方法是 DeferredResult 的内置方法,用于设置一个结果处理对象。

这里就要提到一个 DeferredResult 的不同,它必须在使用之后通过 setResult 方法手动设置结果值,这个方法也使用注册的 ResultHandler,所以这里其实是定义了一个 DeferredResult 的 hook。

这里处理完了之后整个流程再次回归到 SpringMVC, 然后就是SpringMVC 的 ModelView 流程写入结果了。

补充说明~

通过今天的文章可以看到,在 Spring MVC 中使用异步响应,其实是通过 AsyncContext 的 API 进行处理的。

那么既然都是异步,SpringMVC 和 Spring WebFlux 其实还是大有不同的,它们最大的不同点就是阻塞和非阻塞,Spring WebFlux 是一个完全非阻塞的框架,而 MVC 不是。

虽然看起来 DeferredResult 已经通过回调的方式进行异步写入操作了,但是在写的时候依然是一个阻塞 IO 操作,而且内部还是有很多 sync 关键字加锁操作的,具体解释可以参考官方文档中的这里查看。

要想避免这个阻塞 IO,实现非阻塞 IO 返回数据,我们需要通过 Servlet3.1 的特性——setWriteListener API,这里可以给大家一个示例:

@GetMapping("/nonblocking")
public void handleNonBlocking(HttpServletRequest request, HttpServletResponse response) {
    AsyncContext asyncContext = request.startAsync();
    ServletOutputStream output = response.getOutputStream();
    output.setWriteListener(new WriteListener() {
        @Override
        public void onWritePossible() throws IOException {
            while (output.isReady()) {
                output.write("Data chunk".getBytes());
            }
            asyncContext.complete();
        }

        @Override
        public void onError(Throwable t) {
            asyncContext.complete();
        }
    });
}

好了,今天的文章就到这里,接下来可能会更新一篇关于异步、阻塞、非阻塞、响应式编程、以及事件驱动的文章,基本上是一篇讲清楚各种名词,以及如何分辨它们的区别,大家可以狠狠点赞,一个赞可以让我提前更新 1 个小时。

转载自:https://juejin.cn/post/7407004271328378932
评论
请登录