原创

Java-异步多线程专题-ExecutorService-自定义参数-拒绝策略-ThreadPoolExecutor-newFixedThreadPool的源码

《关于newFixedThreadPool的源码探究》
https://blog.csdn.net/NewBeeMu/article/details/124046019

《线程池的RejectedExecutionHandler(拒绝策略)》
https://blog.csdn.net/jgteng/article/details/54411423

自定义参数创建线程池
// threadPool = Executors.newFixedThreadPool(threadCount);
threadPool = new ThreadPoolExecutor(threadCount, threadCount * 3,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());


另一个使用示例:
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(0,25,100L,
TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadPoolExecutor.CallerRunsPolicy());
来自 https://blog.csdn.net/weixin_44823875/article/details/123610475
关注ThreadPoolExecutor说明:
来自 https://blog.csdn.net/qq_15122663/article/details/80464825
ThreadPoolExecutor
ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, BlockingQueue)
ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, BlockingQueue, ThreadFactory)
ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, BlockingQueue, RejectedExecutionHandler)
ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, BlockingQueue , ThreadFactory, RejectedExecutionHandler)
参数
上面就是官方ThreadPoolExecutor所有构造方法

参数 解析
corePoolSize 设置同时处理最大的线程核数
maximumPoolSize 设置线程池最大的线程总数
keepAliveTime 设置允许空余线程存活的时间
unit 时间单位
BlockingQueue 设置储存队列
ThreadFactory 设置线程工厂设置 线程各种属性
RejectedExecutionHandler 这个属性当存储队列满了,最大线程数满载的时候,还有新增的线程就是报错触发这个方法进行相应的处理
这个几个参数的意思

corePoolSize = 5 同时最大处理的线程数, 如果同时进来4个线程, 那么 4 < 5这个时候就可以同时将这个4个线程同时处理,那么就是还有一个空闲线程,如果再进来一个新线程,程序还是会加进 线程池里面处理,这个时候就是线程池处理最大的 并发数了 , 要是这个时候 再进来一个新线程,这个时候就是 设置BlockingQueue储存队列了,如果有设置长度为 5 那么就是说明储存队列 只能储存5个新线程,那么就把新线程加到队列里面 ,当前面的线程处理完就会从储存队列里面提取线程进行处理,如果没有设置长度,默认是Integer.MAX_VALUE就是最大,这种情况就不说了。当储存队列已经满了,再进来一个新的线程,这个时候怎么办, 这个时候设置 maximumPoolSize = 5,设置最大处理的线程总数,他的效果就是当储存队列满了,才会去激活 去处理新进来的线程 , 如果maximumPoolSize线程也已经满载了, 就是说默认线程数 满了,队列满了,最大线程数也满了,这个时候程序会触发RejectedExecutionHandler接口rejectedExecution方法,这个进行异常处理,如果没有配置这个方法,程序就是出现错误异常
,以上的流程就是线程池 处理的流程

看不懂文字看看下面简单的流程图

那么keepAliveTime 这个参数是什么意思呢

when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating
这个是官方的解释
意思就是 当前的开启总线程数量 > 核数线程数量 那么多余的空闲线程存活时间 ,这个什么意思呢
总线程 10 > 核数线程 5 ,keepAliveTime = 5s 那么空闲的剩下5个线程就在 5s 后进行回收 ,

讲 了这么多 直接上代码
代码块
public static void main(String[] args) {
//创建线程池 核数线程5 最大总线程 10 线程存活时间 10s 缓存队列 5
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 7, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new MyThreadFactory() , new MyRejectedExecutionHandler());

for (int i = 0; i < 10; i++)
//处理线程
threadPoolExecutor.execute(new MyRunnable(i));

System.out.println("核心线程数:" + threadPoolExecutor.getCorePoolSize());
System.out.println("线程数总数:" + threadPoolExecutor.getMaximumPoolSize());
System.out.println("队列线程数:" + threadPoolExecutor.getQueue().size());
System.out.println("当前活动线程数:" + threadPoolExecutor.getActiveCount());
}


static class MyRunnable implements Runnable {
int flag = 0;
public MyRunnable(int flag) {
this.flag = flag;
}

@Override
public void run() {
System.out.println("开始运行线程:" + flag);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
System.out.println("线程终止:" + flag);
}
System.out.println("运行结束:" + flag);
}
}

static class MyRejectedExecutionHandler implements RejectedExecutionHandler{


@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("处理已满");
}
}

static class MyThreadFactory implements ThreadFactory {

@Override
public Thread newThread(Runnable r) {
return new Thread(r);
}
}
运行结果

处理已满
开始运行线程:7
开始运行线程:4
开始运行线程:1
开始运行线程:8
开始运行线程:6
核心线程数:2
开始运行线程:5
开始运行线程:0
线程数总数:7
队列线程数:2
当前活动线程数:7

运行结束:6
运行结束:7
运行结束:0
运行结束:1
运行结束:5
运行结束:4
开始运行线程:3
开始运行线程:2
运行结束:8
运行结束:3
运行结束:2
程序首先出 0和1线程 之后 将2和3加缓存队列 之后4 , 5 , 6 , 7 , 8 , 9 线程开启到之后线程数量 , 剩下10没有打印就是因为触发RejectedExecutionHandler异常。


其他自定义线程池方式
@Bean("asyncExecutor")
public Executor threadPoolExecutor(){
// 创建自定义的线程池
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 返回可用处理器的Java虚拟机的数量
int processNum = Runtime.getRuntime().availableProcessors();
int corePoolSize = (int) (processNum / (1 - 0.2));
int maxPoolSize = (int) (processNum / (1 - 0.5));
// 核心线程池大小
executor.setCorePoolSize(corePoolSize);
// 最大线程数
executor.setMaxPoolSize(maxPoolSize);
// 队列程度
executor.setQueueCapacity(maxPoolSize * 1000);
// 线程空闲时间
executor.setKeepAliveSeconds(300);
// 线程名字前缀
executor.setThreadNamePrefix("asyncExecutor-");
// 线程优先级
executor.setThreadPriority(Thread.MAX_PRIORITY);
executor.setDaemon(false);
// setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
// CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 该方法用来设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean,这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。
executor.setWaitForTasksToCompleteOnShutdown(true);
// 该方法用来设置线程池中 任务的等待时间,如果超过这个时间还没有销毁就 强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}

/**
* service层调用
* 异步删除考卷题型
* @param examPaperId
*/
@Async("asyncExecutor")
public void deleteExamPaperQuestionType(Integer examPaperId, String containerId, String userId) {
epqtService.asyncDeleteExamPaperQuestionType(examPaperId, containerId, userId);
}

正文到此结束
本文目录