原创

Java-异步多线程专题-ForkJoinPool-不分批次处理-分批次处理-最后排序-并发测试-CompletableFuture使用实例


优先使用ForkJoinPool方案,如果使用CompletableFuture处理很多任务或耗时长还是要慎用

一定要用parallelStream+CopyOnWriteArrayList或ArrayList加方法内的锁,另外parallelStream是无序的

一定要打印线程名称确认是否实际是多线程 log.info("当前查询线程;{}", Thread.currentThread().getName());

ForkJoinPool核心数也不要太低,ForkJoinPool(5)


1. ForkJoinPool不分批次多线程

/**
* CopyOnWriteArrayList 应该适合读多写少,
。如果用new ArrayList,在list add 手动加锁不知道行不行?行
。
。CopyOnWriteArrayList 的特性是在每次进行写操作(如 add、remove 等)时,都会创建底层数组的一个全新副本
*
**/
public static void main(String[] args) {
    List<Map<String, Object>> result = new CopyOnWriteArrayList<>();

    List<Map<String, Object>> result = new ArrayList<>();
    final Object lock = new Object(); // 锁对象 放在方法内,保护当前的方法就行。这里应该不能要全局静态常量

    ForkJoinPool pool = new ForkJoinPool(5);
    pool.submit(() -> {
        tempInfoList.**parallelStream**().forEach(m -> {

            log.info("当前打印的线程,一定要是不同的:{}",Thread.currentThread().getName());

            AutoMonitorInfoVO data = BeanConverterUtil.convert(m, AutoMonitorInfoVO.class);

            result.add(data);
            //或者 推荐
            synchronized (lock){
                recordMapList.add(processDataTask(request, monitorType, monitorInfoMap, subTableFilterDataMap,factorNameMap,m));
            }
        });
    }).join();
    //不要忘记
    pool.shutdown(); 
}
/**
 * 因为parallelStream是无序的,所以可以参考下面的方法,
 * 最后根据一个有序的list入参重新排序结果。
 * 或者根据业务,直接排序结果。

 * 使用 Java Streams 根据 mainTableMonitorIdList 的顺序重新排序 recordMapList
 *
 * @param recordMapList         包含记录的列表
 * @param mainTableMonitorIdList 主表中的监控ID列表
 * @return 按照 mainTableMonitorIdList 顺序排序后的记录列表
 */
public static List<Map<String, Object>> reorderRecordsUsingStreams(List<Map<String, Object>> recordMapList, List<Integer> mainTableMonitorIdList) {
    // 将 recordMapList 转换为以 monitorId 为键的映射
    // 这里如果是一对多,可能要改下
    Map<Integer, Map<String, Object>> monitorIdToRecordMap = recordMapList.stream()
            .collect(Collectors.toMap(
                    record -> (Integer) record.get("monitorId"),
                    record -> record
            ));

    // 根据 mainTableMonitorIdList 的顺序构建新的列表
    return mainTableMonitorIdList.stream()
            .map(monitorIdToRecordMap::get)
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
}

2. ForkJoinPool分批次多线程

public static void main(String[] args) {
        // 创建 ForkJoinPool,同一个请求中的ForkJoinPool应该可以公用
        ForkJoinPool forkJoinPool = new ForkJoinPool(5);

        Function<List<Integer>, List<WaterRealDataDO>> mainDataQueryFunction = batch -> {
            LambdaQueryWrapper<WaterRealDataDO> wrapper = new LambdaQueryWrapper<>();
            wrapper.in(WaterRealDataDO::getMonitorId, batch);
            // return waterRealDataMapper.selectList(wrapper);
            List<WaterRealDataDO> rem = new ArrayList<>();
            for (int i = 0; i < batch.size(); i++) {
                WaterRealDataDO on = new WaterRealDataDO();
                on.setId(batch.get(i).toString());
                rem.add(on);
            }
            return rem;
        };
        List<Integer> list = CollUtil.newArrayList(6,1979,1981,1982,1983,1986,1991,1992,1994,1996,2002,2006,2010,2011,2016,2017,2019,2020,2021,2027,2033,2034,2035,2037,2038,2043,2045);
        // 提交任务并等待完成,同时收集结果
        List<WaterRealDataDO> mainDataList = forkJoinPool.invoke(new ProcessPartitionsTask<>(list, 10, mainDataQueryFunction));
        // 打印或处理最终的结果
        System.out.println("Final results: " + mainDataList.size());
         //不要忘记
        forkJoinPool.shutdown();
    }
package cn.jiangjiesheng.biz;

import cn.hutool.core.date.DateUtil;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.RecursiveTask;
import java.util.function.Function;

@Slf4j
public class ProcessPartitionsTask<P, T> extends RecursiveTask<List<T>> {
    private static final long serialVersionUID = 6200007441725293669L;

    private final List<P> ids;
    private final int size;
    private final Function<List<P>, List<T>> queryFunction;

    public ProcessPartitionsTask(List<P> ids, int size, Function<List<P>, List<T>> queryFunction) {
        this.ids = ids;
        this.size = size;
        this.queryFunction = queryFunction;
    }

    @Override
    protected List<T> compute() {
        if (ids.size() <= size) {
            // 如果当前列表大小小于等于分区大小,则直接处理并返回结果
            return processPartition(ids, queryFunction);
        } else {
            // 否则继续拆分并并行处理
            List<List<P>> partitions = Lists.partition(ids, size);
            List<ProcessPartitionsTask<P, T>> subTasks = new ArrayList<>();

            for (List<P> partition : partitions) {
                ProcessPartitionsTask<P, T> task = new ProcessPartitionsTask<>(partition, size, queryFunction);
                subTasks.add(task);
                // 异步执行子任务
                task.fork(); 
            }
            // 收集所有子任务的结果
            List<T> combinedResult = new ArrayList<>();
            for (ProcessPartitionsTask<P, T> task : subTasks) {
                combinedResult.addAll(task.join());
            }
            return combinedResult;
        }
    }

    private static <P, T> List<T> processPartition(List<P> partitionParamList, Function<List<P>, List<T>> queryFunction) {
        log.info("{},currentThread:{},调用partitionParamList size:{}", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"), Thread.currentThread().getName(), partitionParamList);
        //main方法调用打印
        System.out.println(DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss.SSS") + ",currentThread:" + Thread.currentThread().getName() + ",调用partitionParamList size:" + partitionParamList.size());
        // 返回查询结果
        return queryFunction.apply(partitionParamList);
    }
}

3. 代码并发测试


【并发调用测试,响应不一定快,但是别死锁】:

public static void main(String[] args) {
        String url = "http://127.0.0.1:8801/post";
        Map<String, Object> params = new HashMap<>();
        params.put("current", 1);
        params.put("size", 20);
        params.put("monitorType", 0);
        params.put("monitorNameSortFlag", 0);
        String authorization = "";
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            new Thread(() -> {
                Map<String, Object> map = postMethod(finalI,url, params, authorization);
                System.out.println(DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss.SSS") + ",currentThread:" + Thread.currentThread().getName()  + ",调用序号:" + finalI + ",costTime:" + map.get("costTime") + ",code:" + map.get("code") + ",error:" + map.get("error"));
            }).start();
        }
    }

    //根据返回的string是XML格式还是json格式的文件,然后对该文件进行选择不同的处理方式
    public static Map<String,Object> postMethod(Integer index, String url, Map<String, Object> params, String authorization)  {

        okhttp3.OkHttpClient client =  new okhttp3.OkHttpClient.Builder()
                .connectTimeout(120, TimeUnit.SECONDS)
                .readTimeout(120, TimeUnit.SECONDS)
                .writeTimeout(120, TimeUnit.SECONDS)
                .build();
        // 创建请求体
        okhttp3.RequestBody body = okhttp3.RequestBody.create(
                MediaType.parse("application/json; charset=utf-8"),
                JSONUtil.toJsonStr(params)
        );
        // 创建请求
        okhttp3.Request request = new okhttp3.Request.Builder()
                .url(url)
                .post(body)
                .addHeader("Authorization", authorization)
                .build();
        // 发送请求并获取响应

        System.out.println(DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss.SSS") + ",currentThread:" + Thread.currentThread().getName() + ",调用序号:" + index);

        long startTime = System.currentTimeMillis();
        Map<String,Object> map= Maps.newLinkedHashMap();

        try (Response response = client.newCall(request).execute()) {
            String costTimeString = getCostTimeString(startTime);
            map.put("code",response.code());
            map.put("costTime",costTimeString);
            map.put("i", index);
            if (!response.isSuccessful()) {
                //throw new IOException("Unexpected code " + response);
                map.put("error",response.body() != null?response.body().string():response);
                return map;
            }
            // 打印响应内容
            //System.out.println(response.body().string());
            return map;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static String getCostTimeString(long startTime) {
        long endTime = System.currentTimeMillis();
        long ms = endTime - startTime;

        long hour = ms / 1000 / 60 / 60;
        long min = ms / 1000 / 60 % 60;
        long sec = ms / 1000 % 60;
        long mi = ms % 1000;
        return hour + "时 " + min + "分 " + sec + "秒 " + mi + "毫秒";
    }

4. 其他代码使用说明

4.1 future.get()阻塞


【慎用,会卡死在 future.get(),并影响后续的请求,cancel Future好像也不行,completeFuture也有问题,感觉不适合大list又耗时长的】
【慎用,会卡死在 future.get(),并影响后续的请求,cancel Future好像也不行,completeFuture也有问题,感觉不适合大list又耗时长的】
【慎用,会卡死在 future.get(),并影响后续的请求,cancel Future好像也不行,completeFuture也有问题,感觉不适合大list又耗时长的】


//可公用
private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(5); 
//也保证EXECUTOR可复用
List<Future<?>> futures = new ArrayList<>();
// 存储结果的线程安全列表
List<Map<String, Object>> recordMapList = Collections.synchronizedList(new ArrayList<>());

CollUtil.split(mainPageRecordList, MagicNumberConstants.NUM_200).forEach(mainTaskIdListListSub -> {
    futures.add(EXECUTOR.submit(() -> {
        System.out.println("Processing task in thread: " + Thread.currentThread().getName());
        for (RealTimeForPCMainDataVO m : mainTaskIdListListSub) {
            Map<String, Object> recordMap = new HashMap<>();
            recordMap.put("mainOrgName", monitorInfoMap.get(m.getMonitorId()).getPolluterName());
            recordMap.put("monitorName", monitorInfoMap.get(m.getMonitorId()).getMonitorName());
            recordMap.put("monitorId", m.getMonitorId());
            recordMapList.add(recordMap);
        }
    }));
});
// 等待所有任务完成

应该要用 CompletableFuture 来替代 Future 
for (Future<?> future : futures) {
    //确保在提交任务后没有立即调用 .join() 或 .get(),因为这些方法会阻塞当前线程,导致其他任务无法并行执行。 不好用,还会柱塞其他的线程
    try {
       future.get(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
    } catch (TimeoutException e) {
        log.info("实时监控-任务超时,超时时间:{}秒",TIMEOUT_IN_SECONDS);
        // 中断任务
        future.cancel(true);
    } catch (InterruptedException | ExecutionException e) {
        log.info("实时监控-任务执行异常",e);
        // 恢复中断状态
        Thread.currentThread().interrupt();
        // 中断任务
        future.cancel(true);
    }
}

4.2 CompletableFuture分批次多线程

/**
 * 通用查询方法,支持批量并发查询,
   【应该适合处理时间较长的多个任务,串行改并行,但是任务数量不能太多】
 *
 * @param monitorIds 监控ID列表
 * @param batchSize 批次大小
 * @param queryFunction 查询函数,用于生成具体的查询逻辑
 * @param <T> 查询结果类型
 * @return 查询结果列表
 * @throws ExecutionException 如果异步任务抛出异常
 * @throws InterruptedException 如果线程被中断
 */
public static <T> List<T> fetchSubDataInBatches(List<Integer> monitorIds, int batchSize, Function<List<Integer>, List<T>> queryFunction) 
        throws ExecutionException, InterruptedException {
    // 使用 Lists.partition 将 monitorIds 分割成多个批次
    List<List<Integer>> batches = Lists.partition(monitorIds, batchSize);

    // 创建所有批次的 CompletableFuture 并收集到一个列表中
    List<CompletableFuture<List<T>>> futures = batches.stream()
        .map(batch -> CompletableFuture.supplyAsync(() -> {
            try {
                return queryFunction.apply(batch);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }))
        .collect(Collectors.toList());

    // 使用 CompletableFuture.allOf 等待所有任务完成,并合并结果
    CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

    // 获取所有结果并合并到一个列表中
    CompletableFuture<List<T>> finalResultFuture = allFutures.thenApply(v ->
        futures.stream()
            .flatMap(future -> {
                try {
                    return future.get().stream();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            })
            .collect(Collectors.toList())
    );

    return finalResultFuture.get();
}

调用:
 Function<List<Integer>, List<WaterRealDataDetailDO>> queryFunction = batch -> {
    LambdaQueryWrapper<WaterRealDataDetailDO> wrapper = new LambdaQueryWrapper<>();
    wrapper.in(WaterRealDataDetailDO::getMonitorId, batch);
    return waterRealDataDetailMapper.selectList(wrapper);
};

fetchSubDataInBatches(monitorIds, batchSize, queryFunction);

再结合同时查询两个表,应该适合处理时间较长的多个任务,串行改并行,但是任务数量不能太多:
    // 获取 monitorIds 列表
    List<Integer> monitorIds = request.getMonitorIds();
    int batchSize = 500;

    // 定义 mainData 查询函数
    Function<List<Integer>, List<WaterRealDataDO>> mainDataQueryFunction = batch -> {
        LambdaQueryWrapper<WaterRealDataDO> wrapper = new LambdaQueryWrapper<>();
        wrapper.in(WaterRealDataDO::getMonitorId, batch);
        return waterRealDataMapper.selectList(wrapper);
    };

    // 定义 subData 查询函数
    Function<List<Integer>, List<WaterRealDataDetailDO>> subDataQueryFunction = batch -> {
        LambdaQueryWrapper<WaterRealDataDetailDO> wrapper = new LambdaQueryWrapper<>();
        wrapper.in(WaterRealDataDetailDO::getMonitorId, batch);
        return waterRealDataDetailMapper.selectList(wrapper);
    };

    // 创建两个 CompletableFuture 并发执行查询任务
    CompletableFuture<List<WaterRealDataDO>> mainDataTask = CompletableFuture.supplyAsync(() -> {
        try {
            return fetchSubDataInBatches(monitorIds, batchSize, mainDataQueryFunction);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    });

    CompletableFuture<List<WaterRealDataDetailDO>> subDataTask = CompletableFuture.supplyAsync(() -> {
        try {
            return fetchSubDataInBatches(monitorIds, batchSize, subDataQueryFunction);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    });

    // 使用 CompletableFuture.allOf 等待所有任务完成,并获取结果
    CompletableFuture<Void> allFutures = CompletableFuture.allOf(mainDataTask, subDataTask);

    // 阻塞等待所有任务完成
    allFutures.join();

    // 获取查询结果
    List<WaterRealDataDO> mainDataList = null;
    List<WaterRealDataDetailDO> subDataList= null;
    try {
        mainDataList = mainDataTask.get();
       subDataList = subDataTask.get();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    if (CollectionUtils.isEmpty(mainDataList) || CollectionUtils.isEmpty(subDataList)) {
        return result;
    }

4.3 CompletableFuture不分批次多线程

利用多线程处理list中每个对象,可以共用EXECUTOR,慎用:

List<RealTimeForPCSubDataVO> subDataVOList = subTableFilterDataMap.get(m.getMonitorId());

// 等待所有任务完成
CompletableFuture.allOf(subDataVOList.stream()
        .map(n -> CompletableFuture.runAsync(() -> {
            try {
                n.setCodeName(codeName + "(" + (StrUtil.isEmpty(unit) ? "无量纲" : unit) + ")");
                recordMap.put(n.getCode(), n);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, EXECUTOR)).toArray(CompletableFuture[]::new)).join();

4.4 CompletableFuture多线程找数据并排序

处理mainList中的数据,找到符合的数据添加到resultDataList结果集中(要保持原来普通for循环的添加到结果集的顺序)

List<Map<String, Object>> resultDataList = Collections.synchronizedList(new ArrayList<>());

// 使用IntStream生成索引列表,这样即使在异步处理后也能保证顺序
List<CompletableFuture<Map<String, Object>>> futures =
        mainList.stream().map(pollutantRealData -> CompletableFuture.supplyAsync(() -> {
            Map<String, Object> oneDataMap =
                    getResultFromMain(req, monitorInfoMap, pollutantRealData, schemeGroupMap,
                            subDataGroupMap, dataTimeOption, monitorOfflineOverTime,
                            systemTime, monitorType);
            return oneDataMap; //有可能为空
        })).collect(Collectors.toList());

// 等待所有异步任务完成并按顺序合并结果
List<Map<String, Object>> orderedResults = new ArrayList<>(Collections.nCopies(mainList.size(), null));
for (int i = 0; i < futures.size(); i++) {
    Map<String, Object> oneDataMap = futures.get(i).join();
    if (oneDataMap != null) { // 根据条件判断是否需要添加到结果列表
        orderedResults.set(i, oneDataMap);
    }
}
// 将orderedResults中的有效结果添加到resultDataList中
synchronized (resultDataList) {
    orderedResults.stream()
            .filter(Objects::nonNull)
            .forEach(resultDataList::add);
}

//这是这是最终结果
return resultDataList;
正文到此结束
本文目录