springboot中的async注解表示异步执行,将其标注的方法交给线程池进行处理。 如果要在项目里使用async注解,需要在首先加上EnableAsync注解。 当然我们也可以使用“javax.ejb.Asynchronous”注解或者自定义注解实现异步,如果是自定义注解,需要在EnableAsync的annotation属性处指定。一旦指定了EnableAsync的annotation属性,则async和“javax.ejb.Asynchronous”注解将无效。 对于EnableAsync注解,通过@import引入了AsyncConfigurationSelector类。
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> { private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; /** * {@inheritDoc} * @return {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration} for * {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()}, respectively */ @Override @Nullable public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String[] { ProxyAsyncConfiguration.class.getName() }; case ASPECTJ: return new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME }; default: return null; } } }springboot默认的adviceMode是proxy,将配置类ProxyAsyncConfiguration注入IOC容器。 在ProxyAsyncConfiguration分类AbstractAsyncConfiguration我们通过了与其它springboot中注解类似的方式实现类可配置化。
@Autowired(required = false) void setConfigurers(Collection<AsyncConfigurer> configurers) { if (CollectionUtils.isEmpty(configurers)) { return; } if (configurers.size() > 1) { throw new IllegalStateException("Only one AsyncConfigurer may exist"); } AsyncConfigurer configurer = configurers.iterator().next(); this.executor = configurer.getAsyncExecutor(); this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler(); }我们可以自定义一个实现AsyncConfigurer接口的类,在其中指定executor和exceptionHandler,并将其注入IOC容器来实现默认线程池和异常处理的定制化。 回到ProxyAsyncConfiguration类,这是一个configuration类,在其中通过@bean注入了AsyncAnnotationBeanPostProcessor 类。
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } if (this.executor != null) { bpp.setExecutor(this.executor); } if (this.exceptionHandler != null) { bpp.setExceptionHandler(this.exceptionHandler); } bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); bpp.setOrder(this.enableAsync.<Integer>getNumber("order")); return bpp; }AsyncAnnotationBeanPostProcessor的asyncAnnotationType属性是为了用户除了Async和javax.ejb.Asynchronous可以自定义其它的一部注解,proxyTargetClass属性是确认是否使用cglib代理来替代jdk动态代理,executor和setExceptionHandler是为了生成AsyncAnnotationAdvisor类作为advisor,asyncAnnotationType如果不为null也会影响advisor的属性,这部分操作在setBeanFactory方法实现。
@Override public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; }进入AsyncAnnotationAdvisor的构造器和相关方法
public AsyncAnnotationAdvisor(@Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler) { Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2); asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class<? extends Annotation>) ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { // If EJB 3.1 API not present, simply ignore. } if (exceptionHandler != null) { this.exceptionHandler = exceptionHandler; } else { this.exceptionHandler = new SimpleAsyncUncaughtExceptionHandler(); } this.advice = buildAdvice(executor, this.exceptionHandler); this.pointcut = buildPointcut(asyncAnnotationTypes); } protected Advice buildAdvice(@Nullable Executor executor, AsyncUncaughtExceptionHandler exceptionHandler) { return new AnnotationAsyncExecutionInterceptor(executor, exceptionHandler); } protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) { ComposablePointcut result = null; for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) { Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true); if (result == null) { result = new ComposablePointcut(cpc); } else { result.union(cpc); } result = result.union(mpc); } return (result != null ? result : Pointcut.TRUE); } public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) { Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null"); Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<>(); asyncAnnotationTypes.add(asyncAnnotationType); this.pointcut = buildPointcut(asyncAnnotationTypes); }可以看出创建AsyncAnnotationAdvisor实例可以分为3步:
参数校验和默认值设置 设置默认的异步注解集asyncAnnotationTypes为async和javax.ejb.Asynchronous,可以通过setAsyncAnnotationType进行修改 异常处理类为null,则设置为默认值SimpleAsyncUncaughtExceptionHandler类。生成通知advice AnnotationAsyncExecutionInterceptor作为异步注解的advice生成切点pointcut 遍历异步注解集asyncAnnotationTypes,某个方法被注解修饰或者所在的类被注解修饰均为pointcut,最后封装到ComposablePointcut 类中。最后看AnnotationAsyncExecutionInterceptor的增强处理,在其分类AcyncExecutionInterceptor的invoke方法中。
@Override @Nullable public Object invoke(final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); //1.找到对应的处理异步的线程池 AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } 2.将异步方法封装成callback Callable<Object> task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; }; //3.执行异步处理逻辑 return doSubmit(task, executor, invocation.getMethod().getReturnType()); }1.找到对应的处理异步的线程池
@Nullable protected AsyncTaskExecutor determineAsyncExecutor(Method method) { 1.1从缓存executors中查找 AsyncTaskExecutor executor = this.executors.get(method); if (executor == null) { Executor targetExecutor; 1.2根据async的value属性查询 String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier); } else { 1.3选择defaultExecutor作为execuotr targetExecutor = this.defaultExecutor; if (targetExecutor == null) { synchronized (this.executors) { if (this.defaultExecutor == null) { this.defaultExecutor = getDefaultExecutor(this.beanFactory); } targetExecutor = this.defaultExecutor; } } } if (targetExecutor == null) { return null; } 1.4将execuotr进行封装并加入executors缓存 executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); this.executors.put(method, executor); } return executor; } @Override @Nullable protected String getExecutorQualifier(Method method) { // Maintainer's note: changes made here should also be made in // AnnotationAsyncExecutionAspect#getExecutorQualifier Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class); if (async == null) { async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class); } return (async != null ? async.value() : null); }1.1从缓存executors中查找 AsyncExecutionInterceptor实例中建立了以method为key值,AsyncTaskExecutor为value的map作为缓存,因此首先从缓存中获取。 1.2根据async的value属性查询 从缓存executors没有找到,说明第一次对方法进行异步处理,还没有加入到缓存中。 此时,获取方法上的async注解,获取其value属性,作为beanName从beanFacotry中查找。 这一步只针对具有value值的async类型的异步注解,其他类型的异步注解则跳过。 1.3选择defaultExecutor作为execuotr 选择默认值defaultExecutor作为execuotr属性值作为execuotr,如果execuotr为null说明还没有设置默认值,因此需要继续向下走进行默认值的设置。 由于默认值defaultExecutor的设置只需要执行一次,因此采用了同步方法+双重校验的方式。
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { Executor defaultExecutor = super.getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); } @Nullable protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { if (beanFactory != null) { try { // Search for TaskExecutor bean... not plain Executor since that would // match with ScheduledExecutorService as well, which is unusable for // our purposes here. TaskExecutor is more clearly designed for it. return beanFactory.getBean(TaskExecutor.class); } catch (NoUniqueBeanDefinitionException ex) { logger.debug("Could not find unique TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskExecutor bean found within the context, and none is named " + "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " + "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { logger.debug("Could not find default TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { logger.info("No task executor bean found for async processing: " + "no bean of type TaskExecutor and no bean named 'taskExecutor' either"); } // Giving up -> either using local default executor or none at all... } } return null; }默认值defaultExecutor的优先级为:
beanFacotry中的TaskExecutor实例beanName为taskExecutor的Executor实例SimpleAsyncTaskExecutor实例 从beanFactory中查询TaskExecutor实例,如果TaskExecutor类型的bean不存在或者不唯一,则使用beanName为taskExecutor的Executor实例,如果找不到则使用SimpleAsyncTaskExecutor实例 此时的SimpleAsyncTaskExecutor虽然被称作线程池,但并没有针对多线程做优化处理,只是对于每一个一步方法新建一个线程去处理。 1.4将execuotr进行封装并加入executors缓存 封装成AsyncListenableTaskExecutor是为了后续的异步处理,同时加入executors缓存方便下次直接获取。2.将异步方法封装成callback 封装成callback用于多线程的异步处理,可以看到只有当方法返回值是Future类型时才能获取到返回值。 3.执行异步处理逻辑 根据返回值的类型不同执行不同的方法并返回,最后一步再次说明了只有当方法返回值是Future类型时才能获取到返回值,否则返回null。 @Nullable protected Object doSubmit(Callable task, AsyncTaskExecutor executor, Class<?> returnType) { if (CompletableFuture.class.isAssignableFrom(returnType)) { return CompletableFuture.supplyAsync(() -> { try { return task.call(); } catch (Throwable ex) { throw new CompletionException(ex); } }, executor); } else if (ListenableFuture.class.isAssignableFrom(returnType)) { return ((AsyncListenableTaskExecutor) executor).submitListenable(task); } else if (Future.class.isAssignableFrom(returnType)) { return executor.submit(task); } else { executor.submit(task); return null; } }
最后,我们在整理一下可以用来指定异步处理的线程池的方法,按照优先级从高到低依次是:
将一个Executor类注入IOC容器,并将其bean赋给async的value属性将一个实现AsyncConfiguer接口的类注入IOC容器,并在getAsyncExecutor方法中返回对应的Executor实例。AsyncConfiguer类型的bean最多只能存在一个,否则将抛出异常。将一个TaskExecutor类注入容器,如果TaskExecutor类型的bean超过一个,该设置将失效。将一个Executor类注入IOC并制定beanName为taskExecutor。采用SimpleAsyncTaskExecutor类型的实例作为处理异步的线程池。