原创

Java-异步多线程-处理任务List A-最后结果放在List B中并保持相对的顺序

在 Java 中使用多线程处理任务列表(List A),并将结果按原顺序存入另一个列表(List B),可以通过多种方式实现。为了确保结果的顺序与原始任务列表一致,可以使用以下几种方法:

  1. 使用 ExecutorServiceFuture
  2. 使用 CompletableFuture
  3. 使用 ForkJoinPool

下面我们将详细介绍这三种方法,并提供相应的代码示例。

方法一:使用 ExecutorServiceFuture

这种方法利用 ExecutorService 来管理线程池,并使用 Future 对象来获取异步任务的结果。

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;

public class MultiThreadedListProcessing {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 假设这是你的任务列表
        List<Integer> tasks = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            tasks.add(i);
        }

        // 创建一个固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(4);

        // 存储 Future 对象的列表
        List<Future<Integer>> futures = new ArrayList<>();

        // 提交任务并存储 Future 对象
        for (Integer task : tasks) {
            Future<Integer> future = executor.submit(() -> processTask(task));
            futures.add(future);
        }

        // 按顺序收集结果
        List<Integer> results = new ArrayList<>(Collections.nCopies(tasks.size(), null));
        int index = 0;
        for (Future<Integer> future : futures) {
            results.set(index++, future.get());
        }

        // 关闭线程池
        executor.shutdown();

        // 输出结果
        System.out.println(results);
    }

    // 示例任务处理函数
    private static Integer processTask(Integer task) {
        try {
            Thread.sleep(100); // 模拟任务处理时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return task * 2; // 简单的任务处理逻辑
    }
}

方法二:使用 CompletableFuture

CompletableFuture 提供了更灵活的异步编程模型,支持链式调用和组合多个异步操作。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class CompletableFutureExample {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 假设这是你的任务列表
        List<Integer> tasks = IntStream.range(0, 10).boxed().collect(Collectors.toList());

        // 使用 CompletableFuture 处理任务
        List<CompletableFuture<Integer>> futures = new ArrayList<>();
        for (Integer task : tasks) {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> processTask(task));
            futures.add(future);
        }

        // 按顺序收集结果
        List<Integer> results = futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());

        // 输出结果
        System.out.println(results);
    }

    // 示例任务处理函数
    private static Integer processTask(Integer task) {
        try {
            Thread.sleep(100); // 模拟任务处理时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return task * 2; // 简单的任务处理逻辑
    }
}

方法三:使用 ForkJoinPool

ForkJoinPool 是 Java 提供的一个专门用于分治算法的线程池,也可以用来处理并行任务。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.stream.IntStream;

public class ForkJoinPoolExample {

    public static void main(String[] args) {
        // 假设这是你的任务列表
        List<Integer> tasks = IntStream.range(0, 10).boxed().collect(Collectors.toList());

        // 创建 ForkJoinPool
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        // 提交任务
        List<Integer> results = forkJoinPool.invoke(new TaskProcessor(tasks));

        // 输出结果
        System.out.println(results);
    }

    // 自定义 RecursiveTask 类
    static class TaskProcessor extends RecursiveTask<List<Integer>> {
        private final List<Integer> tasks;

        TaskProcessor(List<Integer> tasks) {
            this.tasks = tasks;
        }

        @Override
        protected List<Integer> compute() {
            if (tasks.size() <= 1) {
                List<Integer> result = new ArrayList<>(1);
                result.add(processTask(tasks.get(0)));
                return result;
            }

            int mid = tasks.size() / 2;
            TaskProcessor left = new TaskProcessor(tasks.subList(0, mid));
            TaskProcessor right = new TaskProcessor(tasks.subList(mid, tasks.size()));

            left.fork();
            List<Integer> rightResult = right.compute();
            List<Integer> leftResult = left.join();

            List<Integer> result = new ArrayList<>(leftResult.size() + rightResult.size());
            result.addAll(leftResult);
            result.addAll(rightResult);

            return result;
        }
    }

    // 示例任务处理函数
    private static Integer processTask(Integer task) {
        try {
            Thread.sleep(100); // 模拟任务处理时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return task * 2; // 简单的任务处理逻辑
    }
}

总结

  • ExecutorServiceFuture:适合需要显式控制线程池大小和任务提交的情况。
  • CompletableFuture:提供了更灵活的异步编程模型,适合需要组合多个异步操作的场景。
  • ForkJoinPool:适用于分治算法或需要并行处理大量任务的场景。

根据具体需求选择合适的方法。如果你只是需要简单的并行处理并保持结果顺序,CompletableFutureExecutorService 可能是更好的选择。如果你有复杂的任务拆分和合并需求,ForkJoinPool 可能更适合。

正文到此结束
本文目录