SpringBoot异步实现DeferredResult实战与原理分析
1.DeferredResult
示例
1.1 创建实例对象
DeferredResult<ResponseEntity<List<User>>> deferredResult =
new DeferredResult<>(20000L, new ResponseEntity<>(HttpStatus.NOT_MODIFIED));
1.2 设置回调
deferredResult.onTimeout(() -> {
log.info("调用超时");
});
deferredResult.onCompletion(() -> {
log.info("调用完成");
});
1.3 设置结果
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(10);
deferredResult.setResult(new ResponseEntity<>(userService.listUser(), HttpStatus.OK));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
1.4 完整示例
@GetMapping("/deferredResultUser")
public DeferredResult<ResponseEntity<List<User>>> deferredResultListUser() {
DeferredResult<ResponseEntity<List<User>>> deferredResult =
new DeferredResult<>(20000L, new ResponseEntity<>(HttpStatus.NOT_MODIFIED));
deferredResult.onTimeout(() -> {
log.info("调用超时");
});
deferredResult.onCompletion(() -> {
log.info("调用完成");
});
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(10);
deferredResult.setResult(new ResponseEntity<>(userService.listUser(), HttpStatus.OK));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
return deferredResult;
}
客户端请求映射到控制器方法返回值为DeferredResult
时,会立即释放Tomcat
线程并将请求挂起,直到调用setResult()
方法或者超时,才会响应客户端请求。
2.DeferredResult
原理分析
2.1 返回值处理器处理返回值
控制器方法的返回值都由对应的处理器进行处理,关于DeferredResult
,自然由DeferredResultMethodReturnValueHandler
进行处理。
@Override
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
DeferredResult<?> result;
// 返回值类型为DeferredResult
if (returnValue instanceof DeferredResult) {
result = (DeferredResult<?>) returnValue;
}
// 返回值类型为ListenableFuture进行适配转换
else if (returnValue instanceof ListenableFuture) {
result = adaptListenableFuture((ListenableFuture<?>) returnValue);
}
// 返回值类型为CompletionStage进行适配转换
else if (returnValue instanceof CompletionStage) {
result = adaptCompletionStage((CompletionStage<?>) returnValue);
} else {
// Should not happen...
throw new IllegalStateException("Unexpected return value type: " + returnValue);
}
// 处理DeferredResult
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
}
2.2 设置DeferredResultHandler
public void startDeferredResultProcessing(
final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {
// 1. 开启异步处理
startAsyncProcessing(processingContext);
try {
// 2. 设置DeferredResultHandler
deferredResult.setResultHandler(result -> {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
setConcurrentResultAndDispatch(result);
});
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
}
2.3 调用setResult()
当我们的异步任务执行完成后,会调用DeferredResult
的setResult()
方法
public boolean setResult(T result) {
return setResultInternal(result);
}
private boolean setResultInternal(Object result) {
DeferredResultHandler resultHandlerToUse;
synchronized (this) {
// At this point, we got a new result to process
this.result = result;
resultHandlerToUse = this.resultHandler;
if (resultHandlerToUse == null) {
return true;
}
this.resultHandler = null;
}
// 调用DeferredResultHandler的handleResult()方法
resultHandlerToUse.handleResult(result);
return true;
}
在2.2
章节设置了DeferredResultHandler
,因此会调用setConcurrentResultAndDispatch()
private void setConcurrentResultAndDispatch(Object result) {
synchronized (WebAsyncManager.this) {
if (this.concurrentResult != RESULT_NONE) {
return;
}
// 1.设置结果
this.concurrentResult = result;
this.errorHandlingInProgress = (result instanceof Throwable);
}
if (this.asyncWebRequest.isAsyncComplete()) {
if (logger.isDebugEnabled()) {
logger.debug("Async result set but request already complete: " + formatRequestUri());
}
return;
}
if (logger.isDebugEnabled()) {
boolean isError = result instanceof Throwable;
logger.debug("Async " + (isError ? "error" : "result set") + ", dispatch to " + formatRequestUri());
}
// 2.请求调度,就是模拟客户端再次向服务器端发起请求
this.asyncWebRequest.dispatch();
}
2.4 调度处理
@Nullable
protected ModelAndView invokeHandlerMethod(HttpServletRequest request,
HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
ServletWebRequest webRequest = new ServletWebRequest(request, response);
try {
// 在2.3章节设置了结果,因此该逻辑返回true
if (asyncManager.hasConcurrentResult()) {
Object result = asyncManager.getConcurrentResult();
mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0];
asyncManager.clearConcurrentResult();
LogFormatUtils.traceDebug(logger, traceOn -> {
String formatted = LogFormatUtils.formatValue(result, !traceOn);
return "Resume with async result [" + formatted + "]";
});
invocableMethod = invocableMethod.wrapConcurrentResult(result);
}
invocableMethod.invokeAndHandle(webRequest, mavContainer);
if (asyncManager.isConcurrentHandlingStarted()) {
return null;
}
return getModelAndView(mavContainer, modelFactory, webRequest);
}
finally {
webRequest.requestCompleted();
}
}
2.5 构建ServletInvocableHandlerMethod
ServletInvocableHandlerMethod wrapConcurrentResult(Object result) {
return new ConcurrentResultHandlerMethod(result, new ConcurrentResultMethodParameter(result));
}
public ConcurrentResultHandlerMethod(final Object result, ConcurrentResultMethodParameter returnType) {
// 创建Callable对象并指定调用call()方法
super((Callable<Object>) () -> {
if (result instanceof Exception) {
throw (Exception) result;
}
else if (result instanceof Throwable) {
throw new NestedServletException("Async processing failed", (Throwable) result);
}
return result;
}, CALLABLE_METHOD);
if (ServletInvocableHandlerMethod.this.returnValueHandlers != null) {
setHandlerMethodReturnValueHandlers(ServletInvocableHandlerMethod.this.returnValueHandlers);
}
this.returnType = returnType;
}
如上逻辑创建了Callable
对象并指定调用call()
方法
2.6 调用目标方法
调用Callable
的call()方法,得到返回结果,再使用返回值处理器处理返回值
3. 总结
控制器中定义方法返回值为
DeferredResult
,会立即释放Tomcat
线程,使用业务线程处理业务由
DeferredResultMethodReturnValueHandler
处理返回结果,开启异步处理并设置DeferredResultHandler
业务执行完成后调用
setResult()
方法,紧接着回调DeferredResultHandler
的handleResult()
设置结果并调度请求
创建
Callable
对象并设置调用方法为call()
通过反射方式调用
call()
得到返回值使用返回值处理器处理返回值
转载自:https://juejin.cn/post/6934963596765954084