jdk:1.8 spring:5.2.1
AbstractApplicationContext
// Allows post-processing of the bean factory in context subclasses.
postProcessBeanFactory(beanFactory);
// Invoke factory processors registered as beans in the context.
// 在这里把值加到了environment里面,同时构造PropertySourcePlaceholderConfigurer
invokeBeanFactoryPostProcessors(beanFactory);
// Register bean processors that intercept bean creation.
registerBeanPostProcessors(beanFactory);
// Initialize message source for this context.
initMessageSource();
// Initialize event multicaster for this context.
initApplicationEventMulticaster();
// Initialize other special beans in specific context subclasses.
onRefresh();
// Check for listener beans and register them.
// 在这里会调用populateBean方法进行处理属性值,多个PropertySourcePlaceholderConfigurer按优先级排序循环处理
registerListeners();
// Instantiate all remaining (non-lazy-init) singletons.
finishBeanFactoryInitialization(beanFactory);
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<configLocation>${你项目的路径}/checkstyle.xml</configLocation>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>xml-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>transform</goal>
</goals>
</execution>
</executions>
<configuration>
<transformationSets>
<transformationSet>
<dir>${project.basedir}/target</dir>
<stylesheet>${你项目的路径}/checkstyle.xsl</stylesheet>
</transformationSet>
</transformationSets>
</configuration>
</plugin>
jdk: 1.8
springboot: 2.0.5.RELEASE
@Slf4j
@Configuration
public class AsyncExecutorConfig implements AsyncConfigurer {
@Value("${worker.core}")
private Integer workerCore;
@Value("${worker.max}")
private Integer workerMax;
@Value("${worker.queueCapacity}")
private Integer workerQueueCapacity;
// @Value("${executor.cost.enable}")
private boolean costEnable;
@Nullable
@Override
public Executor getAsyncExecutor() {
return null;
}
@Bean(value = "asyncThreadPool", destroyMethod = "shutdown")
public ThreadPoolTaskExecutor getTaskExecutor() {
return create(workerCore,workerMax, workerQueueCapacity, 60, "async-sms-thread-");
}
private ThreadPoolTaskExecutor create(int core, int max, int queueCapacity, int waitSec, String threadPrefix) {
ThreadPoolTaskExecutor delayThreadPool = new ThreadPoolTaskExecutor();
delayThreadPool.setCorePoolSize(core);//当前线程数
delayThreadPool.setMaxPoolSize(max);// 最大线程数
delayThreadPool.setQueueCapacity(queueCapacity);//线程池所使用的缓冲队列
delayThreadPool.setWaitForTasksToCompleteOnShutdown(true);//等待任务在关机时完成--表明等待所有线程执行完
delayThreadPool.setAwaitTerminationSeconds(waitSec);// 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
delayThreadPool.setThreadNamePrefix(threadPrefix);// 线程名称前缀
delayThreadPool.setTaskDecorator(new MDCTaskDecorator());
if (costEnable) {
// delayThreadPool.setTaskDecorator(new ExecCostTaskDecorator());
}
// threadPool.setRejectedExecutionHandler(new BlockingRejectHandler());
// delayThreadPool.setThreadFactory(new GuardThreadFactory(threadPrefix));
delayThreadPool.initialize(); // 初始化
return delayThreadPool;
}
@Nullable
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncExecutorConfig.SpringAsyncExceptionHandler();
}
class SpringAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {
log.error("Exception occurs in async method:{}", method.getName(),throwable);
}
}
}
jdk: 1.8
网上帖子千千万,咱也一起炒冷饭.
闲言少叙 书接上文
@Test
public void thenAcceptEitherTest() {
CompletableFuture.supplyAsync(() -> ProductService.Instance.getProduct(12))
.acceptEither(CompletableFuture.supplyAsync(() -> ProductService.Instance.getProduct(15)),
p -> MixAll.printWithThread(" product:" + p.getName() + " returned..."));
//等待执行完成
MixAll.simulateComputeCost(5);
}
该方法同时包括2个带Async后缀的重载方法
2.applyToEither方法,
1.假如要先获取产生信息,拿到数据后在获取对应的商户信息,这种是存在依赖关系的情况
@Test
public void combineFutureTest() {
CompletableFuture<Merchant> cf = CompletableFuture
.supplyAsync(() -> ProductService.Instance.getProduct(12)) //step1
.thenCompose(product -> CompletableFuture.supplyAsync(() -> MerchantService.Instance.getMerchant(product))); //step2
}
在step1处返回的是CompletableFuture
2.有2个互相独立的计算, 等他们都完成后在合并做一些计算的情况
@Test
public void combineFutureTest2() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> cf = CompletableFuture
.supplyAsync(() -> RiskService.Instance.getRiskScoreFromA4Product())
.thenCombine(CompletableFuture.supplyAsync(() -> RiskService.Instance.getRiskScoreFromB4Product()),
(ascore, bscore) -> (ascore + bscore) / 2);
MixAll.printWithThread(" get score=" + cf.get());
}
注意看下thenCombine的入参即可
1.合并全部Future,等待全部完成, 示例代码和注释如下
@Test
public void combineAllFutureTest() throws ExecutionException, InterruptedException {
List<Integer> list = new ArrayList<>(10);
Random r = new Random();
for (int i = 0; i < 10; i++) {
list.add(i + r.nextInt(10));
}
//创建10个异步获取产品的任务
List<CompletableFuture<Product>> proFutures = list.stream()
.map(in -> CompletableFuture.supplyAsync(() -> ProductService.Instance.getProduct(in)))
.collect(Collectors.toList());
//组装到一起
CompletableFuture<Void> allFutures = CompletableFuture.allOf(proFutures.toArray(new CompletableFuture[proFutures.size()]));
// // 获取最终数据 方式1
// CompletableFuture<List<Product>> productList = allFutures.thenApply(v -> {
// MixAll.printWithThread(" thenApply collect all product info");
// return proFutures.stream().map(cfp -> cfp.join()).collect(Collectors.toList());
// });
//
// List<Product> data = productList.get();
// MixAll.printWithThread(" productInfo:" + data);
// 获取最终数据 方式2
final List<Product> data2 = new ArrayList<>(proFutures.size());
allFutures.thenRun(() -> {
MixAll.printWithThread(" thenRun collect all product info");
data2.addAll(proFutures.stream().map(cfp -> cfp.join()).collect(Collectors.toList()));
});
allFutures.get();
MixAll.printWithThread(" productInfo:" + data2);
//获取最终数据 方式3, 跟上面类似 , 略
//allFutures.thenAccept()
}
2.合并所有Future, 等待任何一个结束即可,示例代码如下
@Test
public void combineAnyFutureTest() throws ExecutionException, InterruptedException {
List<Integer> list = new ArrayList<>(10);
Random r = new Random();
for (int i = 0; i < 10; i++) {
list.add(i + r.nextInt(10));
}
//创建10个异步获取产品的任务
List<CompletableFuture<Integer>> scoreFutures = list.stream()
.map(in -> CompletableFuture.supplyAsync(() -> RiskService.Instance.getRiskScoreFromB4Product()))
.collect(Collectors.toList());
CompletableFuture<Object> cfAny = CompletableFuture.anyOf(scoreFutures.toArray(new CompletableFuture[scoreFutures.size()]));
Integer score = (Integer) cfAny.get();
MixAll.printWithThread(" get score=" + score);
}
Note:
anyOf()返回的是CompletableFuture
1.使用exceptionally的方式
@Test
public void exceptionallyTest() throws ExecutionException, InterruptedException {
final int idx = 2;
CompletableFuture<String> cf = CompletableFuture
.supplyAsync(() -> {
MixAll.printWithThread(" in supplyAsync....");
if (idx % 2 == 0) {
throw new IllegalArgumentException("test illegal argument exception");
}
return UUID.randomUUID().toString();
})
.thenApply(first -> {
if (idx % 2 == 1) {
throw new IllegalArgumentException("test illegal argument exception");
}
MixAll.printWithThread(" in first thenApply....");
return "thenApply1-" + first;
})
.thenApply(second -> {
MixAll.printWithThread(" in second thenApply....");
return "thenApply2-" + second;
})
.exceptionally(ex -> {
MixAll.printWithThread(" in exceptionally....");
return "exceptionally";
});
MixAll.printWithThread(" get data:" + cf.get());
}
调用链的任何一个地方发生异常,后续的调用都不会继续进行
2.使用handle()方法handle方法不管是否有异常,都会执行
@Test
public void exceptionHandleTest() throws ExecutionException, InterruptedException {
final int idx = 1;
CompletableFuture<String> cf = CompletableFuture
.supplyAsync(() -> {
MixAll.printWithThread(" in supplyAsync....");
if (idx % 2 == 0) {
throw new IllegalArgumentException("test illegal argument exception");
}
return UUID.randomUUID().toString();
})
.thenApply(first -> {
if (idx % 2 == 1) {
throw new IllegalArgumentException("test illegal argument exception");
}
MixAll.printWithThread(" in first thenApply....");
return "thenApply1-" + first;
})
.thenApply(second -> {
MixAll.printWithThread(" in second thenApply....");
return "thenApply2-" + second;
})
.handle((info, ex) -> {
MixAll.printWithThread(" in exception handle,info=" + info + ",ex=" + ex);
if (ex != null) {
return "exception handle";
}
return info;
});
MixAll.printWithThread(" get data:" + cf.get());
}