适用场景

CompletableFuture 在业务代码里最常见的用法,主要是下面这几类:

1、一批相互独立的任务并发执行,最后统一收口

2、先按任务类型分类,再分别并发处理

3、批量过滤、批量校验、批量补充信息

4、异步线程里补 TraceId、兜底超时和异常

只收常用实践,不展开 API 说明。

常见实践一:一批任务并发执行,最后统一收口

核心写法就是:先把每个请求包装成 Future,再统一 allOf 等待。

1
2
3
4
5
6
7
8
9
10
11
List<CompletableFuture<TaskResult<R>>> futures = new ArrayList<>();

for (Q request : requestList) {
CompletableFuture<TaskResult<R>> future = new CompletableFuture<>();
CompletableFuture.supplyAsync(() -> query(request), executor)
.whenComplete((resp, throwable) -> future.complete(new TaskResult<>(resp, throwable)));
futures.add(future);
}

CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allOf.get(3, TimeUnit.SECONDS);

这种写法适合:

1、一批请求相互独立

2、最终结果还要保持和输入顺序对应

3、允许把单个任务异常包装后继续往后处理

批量受理、批量校验、批量补数,都适合这个模式。

常见实践二:先分类,再并发

更接近真实业务的写法是:先分类,再并发。

不是所有任务都走同一条路径,有些支持批处理,有些只能单条处理,应该先分开再起 Future。

1
2
3
4
5
6
7
8
9
10
11
12
if (CollectionUtils.isNotEmpty(batchTaskIds)) {
batchFuture = processBatchAsync(request, batchTaskIds, taskMap);
allFutures.add(batchFuture);
}

if (CollectionUtils.isNotEmpty(singleTaskIds)) {
singleFutures = processSingleAsync(request, singleTaskIds, taskMap);
allFutures.addAll(singleFutures);
}

CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[0]))
.get(30L, TimeUnit.SECONDS);

这类写法适合:

1、同一批请求里,不同对象处理策略不一样

2、部分任务支持批处理,部分任务只能单发

3、最后要统一收集成功和失败结果

重点不是 Future 起了多少个,而是任务边界先分清。

常见实践三:异步线程里要补 Trace 和上下文

异步线程里如果不补上下文,日志基本没法查。

1
2
3
4
5
6
7
8
9
10
11
12
String traceId = MDC.get("traceId");

return taskIdList.stream()
.map(taskId -> CompletableFuture.supplyAsync(() -> {
MDC.put("traceId", traceId + ":" + taskId);
try {
return execute(taskId);
} finally {
MDC.remove("traceId");
}
}, executor))
.collect(Collectors.toList());

异步场景里最容易丢的上下文包括:

1、TraceId

2、租户信息

3、用户身份

4、MDC 日志字段

常见实践四:不要直接上 parallelStream

批量过滤时,优先显式使用自定义线程池,不要直接上 parallelStream

1
2
3
4
5
6
7
8
9
10
11
12
13
List<CompletableFuture<Pair<Integer, Boolean>>> futures = IntStream.range(0, items.size())
.mapToObj(i -> CompletableFuture.supplyAsync(() ->
Pair.of(i, filter(items.get(i), now)),
filterExecutor
))
.collect(Collectors.toList());

for (CompletableFuture<Pair<Integer, Boolean>> future : futures) {
Pair<Integer, Boolean> result = future.join();
if (result.getValue()) {
filteredItems.add(items.get(result.getKey()));
}
}

这里有两个要点:

1、保留原始下标,最后能把结果稳定收回来

2、线程池是业务自己控的,不会把公共线程池打满

批量过滤、批量资格判断、批量补充字段,这种写法通常比 parallelStream 更稳。

常见实践五:线程池必须按业务隔离

线程池必须按职责拆开,不能所有异步任务共用一个池子。

常见拆法:

1、规则计算线程池

2、动作执行线程池

3、批量查询线程池

4、补偿处理线程池

线程池隔离的价值很直接:

1、规则计算慢,不会拖垮动作执行

2、抽奖记录积压,不会影响活动主链路

3、不同业务峰值可以单独调参数

混用一个线程池的后果通常是:高峰期互相抢资源,排查时也分不清到底是哪类任务堆积。

常见实践六:超时必须显式兜住

超时必须显式控制,而且要按场景设。

比较短的任务,比如批量校验、批量受理,可以尽早失败;

比较重的任务,比如外部调用、补偿处理,需要给更宽一点的超时窗口。

不要只写:

1
CompletableFuture.allOf(...).join();

join() 简洁,但不等于有超时治理。

常见实践七:异常不要只打日志,要能回收到业务结果

更稳的写法不是“异常了打印一下”,而是把异常封装回结果里。

这类方式的好处是:

1、异步任务失败不会直接让整个批次失控

2、调用方可以按任务粒度判断成功失败

3、补偿逻辑有据可依

对于最终一致性要求高的链路,这一点比单纯抛异常更重要。

常见坑

1、在方法里临时 new 线程池

线程池必须交给 Spring 管,不然线上很容易出现线程数只涨不降。

2、异步线程不带 TraceId

跨模块链路里,没有日志上下文基本就查不动。

3、所有任务都硬并发

有些任务其实是前后依赖的,硬拆只会让代码更乱。

4、主线程过早 join

Future 刚起完就一个个 join,最后写回串行,还把代码复杂度抬上去了。

5、线程池参数照抄

不同任务的并发模型不一样,参数不能共用一套模板。

小结

CompletableFuture 最常见的价值不是“写出异步代码”,而是把批量任务拆开、把结果稳妥收回来、把超时和异常兜住。

真正值得保留的常用实践,核心就这几类:

1、批量任务统一收口

2、按任务类型拆分 Future

3、线程池按职责隔离

4、异步线程透传 TraceId

5、异常和超时都回收到业务结果里