Java-异步多线程专题-ForkJoinPool-不分批次处理-分批次处理-最后排序-并发测试-CompletableFuture使用实例
优先使用ForkJoinPool方案,如果使用CompletableFuture处理很多任务或耗时长还是要慎用
一定要用parallelStream+CopyOnWriteArrayList或ArrayList加方法内的锁,另外parallelStream是无序的
一定要打印线程名称确认是否实际是多线程 log.info("当前查询线程;{}", Thread.currentThread().getName());
ForkJoinPool核心数也不要太低,ForkJoinPool(5)
1. ForkJoinPool不分批次多线程
/**
* CopyOnWriteArrayList 应该适合读多写少,
。如果用new ArrayList,在list add 手动加锁不知道行不行?行
。
。CopyOnWriteArrayList 的特性是在每次进行写操作(如 add、remove 等)时,都会创建底层数组的一个全新副本
*
**/
public static void main(String[] args) {
List<Map<String, Object>> result = new CopyOnWriteArrayList<>();
List<Map<String, Object>> result = new ArrayList<>();
final Object lock = new Object(); // 锁对象 放在方法内,保护当前的方法就行。这里应该不能要全局静态常量
ForkJoinPool pool = new ForkJoinPool(5);
pool.submit(() -> {
tempInfoList.**parallelStream**().forEach(m -> {
log.info("当前打印的线程,一定要是不同的:{}",Thread.currentThread().getName());
AutoMonitorInfoVO data = BeanConverterUtil.convert(m, AutoMonitorInfoVO.class);
result.add(data);
//或者 推荐
synchronized (lock){
recordMapList.add(processDataTask(request, monitorType, monitorInfoMap, subTableFilterDataMap,factorNameMap,m));
}
});
}).join();
//不要忘记
pool.shutdown();
}
/**
* 因为parallelStream是无序的,所以可以参考下面的方法,
* 最后根据一个有序的list入参重新排序结果。
* 或者根据业务,直接排序结果。
* 使用 Java Streams 根据 mainTableMonitorIdList 的顺序重新排序 recordMapList
*
* @param recordMapList 包含记录的列表
* @param mainTableMonitorIdList 主表中的监控ID列表
* @return 按照 mainTableMonitorIdList 顺序排序后的记录列表
*/
public static List<Map<String, Object>> reorderRecordsUsingStreams(List<Map<String, Object>> recordMapList, List<Integer> mainTableMonitorIdList) {
// 将 recordMapList 转换为以 monitorId 为键的映射
// 这里如果是一对多,可能要改下
Map<Integer, Map<String, Object>> monitorIdToRecordMap = recordMapList.stream()
.collect(Collectors.toMap(
record -> (Integer) record.get("monitorId"),
record -> record
));
// 根据 mainTableMonitorIdList 的顺序构建新的列表
return mainTableMonitorIdList.stream()
.map(monitorIdToRecordMap::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
2. ForkJoinPool分批次多线程
public static void main(String[] args) {
// 创建 ForkJoinPool,同一个请求中的ForkJoinPool应该可以公用
ForkJoinPool forkJoinPool = new ForkJoinPool(5);
Function<List<Integer>, List<WaterRealDataDO>> mainDataQueryFunction = batch -> {
LambdaQueryWrapper<WaterRealDataDO> wrapper = new LambdaQueryWrapper<>();
wrapper.in(WaterRealDataDO::getMonitorId, batch);
// return waterRealDataMapper.selectList(wrapper);
List<WaterRealDataDO> rem = new ArrayList<>();
for (int i = 0; i < batch.size(); i++) {
WaterRealDataDO on = new WaterRealDataDO();
on.setId(batch.get(i).toString());
rem.add(on);
}
return rem;
};
List<Integer> list = CollUtil.newArrayList(6,1979,1981,1982,1983,1986,1991,1992,1994,1996,2002,2006,2010,2011,2016,2017,2019,2020,2021,2027,2033,2034,2035,2037,2038,2043,2045);
// 提交任务并等待完成,同时收集结果
List<WaterRealDataDO> mainDataList = forkJoinPool.invoke(new ProcessPartitionsTask<>(list, 10, mainDataQueryFunction));
// 打印或处理最终的结果
System.out.println("Final results: " + mainDataList.size());
//不要忘记
forkJoinPool.shutdown();
}
package cn.jiangjiesheng.biz;
import cn.hutool.core.date.DateUtil;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.RecursiveTask;
import java.util.function.Function;
@Slf4j
public class ProcessPartitionsTask<P, T> extends RecursiveTask<List<T>> {
private static final long serialVersionUID = 6200007441725293669L;
private final List<P> ids;
private final int size;
private final Function<List<P>, List<T>> queryFunction;
public ProcessPartitionsTask(List<P> ids, int size, Function<List<P>, List<T>> queryFunction) {
this.ids = ids;
this.size = size;
this.queryFunction = queryFunction;
}
@Override
protected List<T> compute() {
if (ids.size() <= size) {
// 如果当前列表大小小于等于分区大小,则直接处理并返回结果
return processPartition(ids, queryFunction);
} else {
// 否则继续拆分并并行处理
List<List<P>> partitions = Lists.partition(ids, size);
List<ProcessPartitionsTask<P, T>> subTasks = new ArrayList<>();
for (List<P> partition : partitions) {
ProcessPartitionsTask<P, T> task = new ProcessPartitionsTask<>(partition, size, queryFunction);
subTasks.add(task);
// 异步执行子任务
task.fork();
}
// 收集所有子任务的结果
List<T> combinedResult = new ArrayList<>();
for (ProcessPartitionsTask<P, T> task : subTasks) {
combinedResult.addAll(task.join());
}
return combinedResult;
}
}
private static <P, T> List<T> processPartition(List<P> partitionParamList, Function<List<P>, List<T>> queryFunction) {
log.info("{},currentThread:{},调用partitionParamList size:{}", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"), Thread.currentThread().getName(), partitionParamList);
//main方法调用打印
System.out.println(DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss.SSS") + ",currentThread:" + Thread.currentThread().getName() + ",调用partitionParamList size:" + partitionParamList.size());
// 返回查询结果
return queryFunction.apply(partitionParamList);
}
}
3. 代码并发测试
【并发调用测试,响应不一定快,但是别死锁】:
public static void main(String[] args) {
String url = "http://127.0.0.1:8801/post";
Map<String, Object> params = new HashMap<>();
params.put("current", 1);
params.put("size", 20);
params.put("monitorType", 0);
params.put("monitorNameSortFlag", 0);
String authorization = "";
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(() -> {
Map<String, Object> map = postMethod(finalI,url, params, authorization);
System.out.println(DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss.SSS") + ",currentThread:" + Thread.currentThread().getName() + ",调用序号:" + finalI + ",costTime:" + map.get("costTime") + ",code:" + map.get("code") + ",error:" + map.get("error"));
}).start();
}
}
//根据返回的string是XML格式还是json格式的文件,然后对该文件进行选择不同的处理方式
public static Map<String,Object> postMethod(Integer index, String url, Map<String, Object> params, String authorization) {
okhttp3.OkHttpClient client = new okhttp3.OkHttpClient.Builder()
.connectTimeout(120, TimeUnit.SECONDS)
.readTimeout(120, TimeUnit.SECONDS)
.writeTimeout(120, TimeUnit.SECONDS)
.build();
// 创建请求体
okhttp3.RequestBody body = okhttp3.RequestBody.create(
MediaType.parse("application/json; charset=utf-8"),
JSONUtil.toJsonStr(params)
);
// 创建请求
okhttp3.Request request = new okhttp3.Request.Builder()
.url(url)
.post(body)
.addHeader("Authorization", authorization)
.build();
// 发送请求并获取响应
System.out.println(DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss.SSS") + ",currentThread:" + Thread.currentThread().getName() + ",调用序号:" + index);
long startTime = System.currentTimeMillis();
Map<String,Object> map= Maps.newLinkedHashMap();
try (Response response = client.newCall(request).execute()) {
String costTimeString = getCostTimeString(startTime);
map.put("code",response.code());
map.put("costTime",costTimeString);
map.put("i", index);
if (!response.isSuccessful()) {
//throw new IOException("Unexpected code " + response);
map.put("error",response.body() != null?response.body().string():response);
return map;
}
// 打印响应内容
//System.out.println(response.body().string());
return map;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static String getCostTimeString(long startTime) {
long endTime = System.currentTimeMillis();
long ms = endTime - startTime;
long hour = ms / 1000 / 60 / 60;
long min = ms / 1000 / 60 % 60;
long sec = ms / 1000 % 60;
long mi = ms % 1000;
return hour + "时 " + min + "分 " + sec + "秒 " + mi + "毫秒";
}
4. 其他代码使用说明
4.1 future.get()阻塞
【慎用,会卡死在 future.get(),并影响后续的请求,cancel Future好像也不行,completeFuture也有问题,感觉不适合大list又耗时长的】
【慎用,会卡死在 future.get(),并影响后续的请求,cancel Future好像也不行,completeFuture也有问题,感觉不适合大list又耗时长的】
【慎用,会卡死在 future.get(),并影响后续的请求,cancel Future好像也不行,completeFuture也有问题,感觉不适合大list又耗时长的】
//可公用
private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(5);
//也保证EXECUTOR可复用
List<Future<?>> futures = new ArrayList<>();
// 存储结果的线程安全列表
List<Map<String, Object>> recordMapList = Collections.synchronizedList(new ArrayList<>());
CollUtil.split(mainPageRecordList, MagicNumberConstants.NUM_200).forEach(mainTaskIdListListSub -> {
futures.add(EXECUTOR.submit(() -> {
System.out.println("Processing task in thread: " + Thread.currentThread().getName());
for (RealTimeForPCMainDataVO m : mainTaskIdListListSub) {
Map<String, Object> recordMap = new HashMap<>();
recordMap.put("mainOrgName", monitorInfoMap.get(m.getMonitorId()).getPolluterName());
recordMap.put("monitorName", monitorInfoMap.get(m.getMonitorId()).getMonitorName());
recordMap.put("monitorId", m.getMonitorId());
recordMapList.add(recordMap);
}
}));
});
// 等待所有任务完成
应该要用 CompletableFuture 来替代 Future
for (Future<?> future : futures) {
//确保在提交任务后没有立即调用 .join() 或 .get(),因为这些方法会阻塞当前线程,导致其他任务无法并行执行。 不好用,还会柱塞其他的线程
try {
future.get(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException e) {
log.info("实时监控-任务超时,超时时间:{}秒",TIMEOUT_IN_SECONDS);
// 中断任务
future.cancel(true);
} catch (InterruptedException | ExecutionException e) {
log.info("实时监控-任务执行异常",e);
// 恢复中断状态
Thread.currentThread().interrupt();
// 中断任务
future.cancel(true);
}
}
4.2 CompletableFuture分批次多线程
/**
* 通用查询方法,支持批量并发查询,
【应该适合处理时间较长的多个任务,串行改并行,但是任务数量不能太多】
*
* @param monitorIds 监控ID列表
* @param batchSize 批次大小
* @param queryFunction 查询函数,用于生成具体的查询逻辑
* @param <T> 查询结果类型
* @return 查询结果列表
* @throws ExecutionException 如果异步任务抛出异常
* @throws InterruptedException 如果线程被中断
*/
public static <T> List<T> fetchSubDataInBatches(List<Integer> monitorIds, int batchSize, Function<List<Integer>, List<T>> queryFunction)
throws ExecutionException, InterruptedException {
// 使用 Lists.partition 将 monitorIds 分割成多个批次
List<List<Integer>> batches = Lists.partition(monitorIds, batchSize);
// 创建所有批次的 CompletableFuture 并收集到一个列表中
List<CompletableFuture<List<T>>> futures = batches.stream()
.map(batch -> CompletableFuture.supplyAsync(() -> {
try {
return queryFunction.apply(batch);
} catch (Exception e) {
throw new RuntimeException(e);
}
}))
.collect(Collectors.toList());
// 使用 CompletableFuture.allOf 等待所有任务完成,并合并结果
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
// 获取所有结果并合并到一个列表中
CompletableFuture<List<T>> finalResultFuture = allFutures.thenApply(v ->
futures.stream()
.flatMap(future -> {
try {
return future.get().stream();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList())
);
return finalResultFuture.get();
}
调用:
Function<List<Integer>, List<WaterRealDataDetailDO>> queryFunction = batch -> {
LambdaQueryWrapper<WaterRealDataDetailDO> wrapper = new LambdaQueryWrapper<>();
wrapper.in(WaterRealDataDetailDO::getMonitorId, batch);
return waterRealDataDetailMapper.selectList(wrapper);
};
fetchSubDataInBatches(monitorIds, batchSize, queryFunction);
再结合同时查询两个表,应该适合处理时间较长的多个任务,串行改并行,但是任务数量不能太多:
// 获取 monitorIds 列表
List<Integer> monitorIds = request.getMonitorIds();
int batchSize = 500;
// 定义 mainData 查询函数
Function<List<Integer>, List<WaterRealDataDO>> mainDataQueryFunction = batch -> {
LambdaQueryWrapper<WaterRealDataDO> wrapper = new LambdaQueryWrapper<>();
wrapper.in(WaterRealDataDO::getMonitorId, batch);
return waterRealDataMapper.selectList(wrapper);
};
// 定义 subData 查询函数
Function<List<Integer>, List<WaterRealDataDetailDO>> subDataQueryFunction = batch -> {
LambdaQueryWrapper<WaterRealDataDetailDO> wrapper = new LambdaQueryWrapper<>();
wrapper.in(WaterRealDataDetailDO::getMonitorId, batch);
return waterRealDataDetailMapper.selectList(wrapper);
};
// 创建两个 CompletableFuture 并发执行查询任务
CompletableFuture<List<WaterRealDataDO>> mainDataTask = CompletableFuture.supplyAsync(() -> {
try {
return fetchSubDataInBatches(monitorIds, batchSize, mainDataQueryFunction);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
CompletableFuture<List<WaterRealDataDetailDO>> subDataTask = CompletableFuture.supplyAsync(() -> {
try {
return fetchSubDataInBatches(monitorIds, batchSize, subDataQueryFunction);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
// 使用 CompletableFuture.allOf 等待所有任务完成,并获取结果
CompletableFuture<Void> allFutures = CompletableFuture.allOf(mainDataTask, subDataTask);
// 阻塞等待所有任务完成
allFutures.join();
// 获取查询结果
List<WaterRealDataDO> mainDataList = null;
List<WaterRealDataDetailDO> subDataList= null;
try {
mainDataList = mainDataTask.get();
subDataList = subDataTask.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
if (CollectionUtils.isEmpty(mainDataList) || CollectionUtils.isEmpty(subDataList)) {
return result;
}
4.3 CompletableFuture不分批次多线程
利用多线程处理list中每个对象,可以共用EXECUTOR,慎用:
List<RealTimeForPCSubDataVO> subDataVOList = subTableFilterDataMap.get(m.getMonitorId());
// 等待所有任务完成
CompletableFuture.allOf(subDataVOList.stream()
.map(n -> CompletableFuture.runAsync(() -> {
try {
n.setCodeName(codeName + "(" + (StrUtil.isEmpty(unit) ? "无量纲" : unit) + ")");
recordMap.put(n.getCode(), n);
} catch (Exception e) {
throw new RuntimeException(e);
}
}, EXECUTOR)).toArray(CompletableFuture[]::new)).join();
4.4 CompletableFuture多线程找数据并排序
处理mainList中的数据,找到符合的数据添加到resultDataList结果集中(要保持原来普通for循环的添加到结果集的顺序)
List<Map<String, Object>> resultDataList = Collections.synchronizedList(new ArrayList<>());
// 使用IntStream生成索引列表,这样即使在异步处理后也能保证顺序
List<CompletableFuture<Map<String, Object>>> futures =
mainList.stream().map(pollutantRealData -> CompletableFuture.supplyAsync(() -> {
Map<String, Object> oneDataMap =
getResultFromMain(req, monitorInfoMap, pollutantRealData, schemeGroupMap,
subDataGroupMap, dataTimeOption, monitorOfflineOverTime,
systemTime, monitorType);
return oneDataMap; //有可能为空
})).collect(Collectors.toList());
// 等待所有异步任务完成并按顺序合并结果
List<Map<String, Object>> orderedResults = new ArrayList<>(Collections.nCopies(mainList.size(), null));
for (int i = 0; i < futures.size(); i++) {
Map<String, Object> oneDataMap = futures.get(i).join();
if (oneDataMap != null) { // 根据条件判断是否需要添加到结果列表
orderedResults.set(i, oneDataMap);
}
}
// 将orderedResults中的有效结果添加到resultDataList中
synchronized (resultDataList) {
orderedResults.stream()
.filter(Objects::nonNull)
.forEach(resultDataList::add);
}
//这是这是最终结果
return resultDataList;
正文到此结束