原创

Java-异步多线程专题-for批量任务-多线程等待-CompletableFuture

另外还有《Java-异步多线程专题-for批量任务-多线程等待-CountDownLatch.countDown-ExecutorService-切换处理任务方法或参数》

https://www.jianshu.com/p/1db996cf7574
我们在处理业务时,有时会有多任务异步处理,同步返回结果的情况,在java中,我们可以使用CompletableFuture的allOf方法来实现多实例的同时返回。
public void futureTest() { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("future1 finished!"); return "future1 finished!"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("future2 finished!"); return "future2 finished!"; }); CompletableFuture<Void> combindFuture = CompletableFuture.allOf(future1, future2); try { combindFuture.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("future1: " + future1.isDone() + " future2: " + future2.isDone()); }
在这里我们可以将对各future实例添加到allOf方法中,然后通过future的get()方法获取future的状态。如果allOf里面的所有线程为执行完毕,主线程会阻塞,直到allOf里面的所有线程都执行,线程就会被唤醒。
异步编程,不想阻塞线程的话,可以使用thenAccpt、thenApply、thenRun,future结束后,执行异步方法。

thenAccpt、thenApply、thenRun,future没明白具体怎么使用


CompletableFuture.还有很多方法


以下来自 参考安元 proj feature/cloudqpaas
没有看明白为什么要封装 FutureTaskWorker

mport lombok.AllArgsConstructor;
import lombok.Data;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* @author liangjun
* @version 1.0
* @description: 针对平台接口性能问题的优化处理类
* @date 2022/7/11 15:16
*/
@Data
@AllArgsConstructor
public class FutureTaskWorker<T, R> {
/**
* @description 需要异步执行的任务
*/
private List<T> taskList;
/**
* @description 需要执行的方法
*/
private Function<T, CompletableFuture<R>> workFunction;

/**
* @description 搜集执行结果
* @author gang.tu
* @rieturn: java.util.List<R>
*/
public List<R> getAllResult() {
List<CompletableFuture<R>> futureList = taskList.stream().map(workFunction).collect(Collectors.toList());
CompletableFuture<Void> allCompletableFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
return allCompletableFuture.thenApply(e -> futureList.stream().map(CompletableFuture::join).collect(Collectors.toList())).join();
}
}
FutureTaskWorker<XinGongMonthTableEnum, String> tableRenderWorkTask = new FutureTaskWorker<>(

EnumSet.allOf(XinGongMonthTableEnum.class).stream()
.filter(tableEnum -> tableTlServiceMap.containsKey(tableEnum.name())).collect(Collectors.toList()),

tableEnum -> CompletableFuture.supplyAsync(() -> {
String res = "业务返回";
return res;
}, taskExecutor));

CompletableFuture.allOf(
CompletableFuture.runAsync(()->log.info("表格: {} 已完成数据抽取!", tableRenderWorkTask.getAllResult()))
).join();

正文到此结束
本文目录