原创

Java-异步多线程专题-for批量任务-多线程等待-CountDownLatch.countDown-ExecutorService-切换处理任务方法或参数

另外还有《Java-异步多线程专题-for批量任务-多线程等待-CompletableFuture》

https://blog.csdn.net/weixin_44823875/article/details/123610475

案例二:线程池模拟批量导入数据

CountDownLatch & ThreadPoolExecutor 封装


package cn.jiangjiesheng.edu.utils.singlebatchtask;

import cn.jiangjiesheng.edu.utils.DateUtils;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 注意实际线程池可能更慢
*
*/
@Slf4j
public class ThreadPoolSingleBatchTaskUtil {

// private static Map<Integer, String> executorMap = new HashMap<>();
//一般情况下,如果corePoolSize为10,相当于速度提升9倍。原来50秒完成的任务,现在5秒就可以结束。
private static ThreadPoolExecutor defaultExecutor = new ThreadPoolExecutor(15, 10 * 3, 100L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());

//单次批量任务。
public static void doSingleBatchTask(ThreadPoolBatchTaskCallBack threadPoolBatchTaskCallBack) {
log.info("本次多线程任务开始");
long start = System.currentTimeMillis();
//不同的业务场景应该使用不同的线程池
ThreadPoolExecutor executor = threadPoolBatchTaskCallBack.getStaticThreadPool();
if (executor == null) {
log.error("getStaticThreadPool未实现,使用默认线程池");
executor = defaultExecutor;
// throw new BizzException("getStaticThreadPool未实现");
}
int hashCode = executor.hashCode();
log.info("当前executor信息:hashCode:{}", hashCode);

// String invokeFrom = threadPoolBatchTaskCallBack.getInvokeFrom();
// if (invokeFrom != null) {
// String invokeFromSave = executorMap.get(hashCode);
// if(invokeFromSave == null){
// executorMap.put(hashCode, invokeFrom);
// }else {
// if (!StringUtils.equals(invokeFromSave, invokeFrom)) {
// throw new BizzException("不同的调用位置,请不要使用同一个ThreadPoolExecutor对象,且ThreadPoolExecutor推荐使用static对象");
// }
// }
// }

int size = threadPoolBatchTaskCallBack.getSingleBatchTaskSize();
CountDownLatch countDownLatch = new CountDownLatch(size);
for (int taskIndex = 0; taskIndex < size; taskIndex++) {
int finalTaskIndex = taskIndex;
executor.submit(() -> {
try {
log.info("开始执行任务:finalTaskIndex:{}", finalTaskIndex);
threadPoolBatchTaskCallBack.addTask(finalTaskIndex);
} catch (Exception e) {
log.error("处理第{}条任务出现异常,{}", finalTaskIndex, e);
} finally {
countDownLatch.countDown();
}
});
}

try {
//等待计数器归零
countDownLatch.await();
//也可以给await()设置超时时间,如果超过300s(也可以是时,分)则不再等待,直接执行下面代码。
//countDownLatch.await(300,TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("countDownLatch.await出现异常,{}", e);
}
threadPoolBatchTaskCallBack.currentBatchTaskComplete();
log.info("本次多线程任务结束,耗时:{}", DateUtils.ms2Hms(start));
}

//参数固定传Thread.currentThread().getStackTrace()[1]
public static String getInvokeFrom(StackTraceElement stackTraceElement) {
if (stackTraceElement != null) {
//https://jingyan.baidu.com/article/7f766daf8d8ac54101e1d082.html
String className = stackTraceElement.getClassName();
String methodName = stackTraceElement.getMethodName();
int lineNumber = stackTraceElement.getLineNumber();
return className + "#" + methodName + "#" + lineNumber;
}
return null;
}

public interface ThreadPoolBatchTaskCallBack {

//
//private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10 * 3, 100L,
//TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());
ThreadPoolExecutor getStaticThreadPool();

//如果setStaticThreadPool()未实现,这个方法可以不重新
//返回值是固定的 ThreadPoolSingleBatchTaskUtil.getInvokeFrom(Thread.currentThread().getStackTrace()[1]);
//String getInvokeFrom();

int getSingleBatchTaskSize();

//自行在方法实现类中处理结果
void addTask(int taskIndex);

void currentBatchTaskComplete();

}

}

@GetMapping("/testtask")
public long testtask(int count) throws InterruptedException {

//不要用List<Integer> list = new ArrayList<>();list.add(1); 做测试
//多线程并发导致List的add()失败,元素为null, 应该对最后的结果处理处理时提取一个方法,然后加锁?

ThreadPoolSingleBatchTaskUtilDemo.Student student = null;
List<ThreadPoolSingleBatchTaskUtilDemo.Student> studentList = new ArrayList<>();
int age = 0;
int height = 0;
//添加count个数据
for (int i = 0; i < count; i++) {
age = (int) Math.floor((Math.random() * 10) + 20);
height = (int) Math.floor((Math.random() * 160) + 180);
student = new ThreadPoolSingleBatchTaskUtilDemo.Student("name" + i, age, height);
studentList.add(student);
}

ThreadPoolSingleBatchTaskUtil.doSingleBatchTask(new ThreadPoolSingleBatchTaskUtil.ThreadPoolBatchTaskCallBack() {

// @Override
// public String getInvokeFrom() {
// return ThreadPoolSingleBatchTaskUtil.getInvokeFrom(Thread.currentThread().getStackTrace()[1]);
// }

@Override
public ThreadPoolExecutor getStaticThreadPool() {
return null;
}

@Override
public int getSingleBatchTaskSize() {
return count;
}

@Override
public void addTask(int taskIndex) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
ThreadPoolSingleBatchTaskUtilDemo.Student student1 = studentList.get(taskIndex);
student1.setName(student1.getName() + "这是后缀");
}

@Override
public void currentBatchTaskComplete() {
System.out.println("处理成功的任务数量:"+studentList.stream().filter(x->x.getName().contains("这是后缀")).count());
}
});
return studentList.stream().filter(x->x.getName().contains("这是后缀")).count();
}

package cn.jiangjiesheng.edu.utils.singlebatchtask;


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolSingleBatchTaskUtilDemo {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10 * 3, 100L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());

public static void main(String[] args) {
Student student = null;
List<Student> studentList = new ArrayList<>();
int age = 0;
int height = 0;
//添加50万个数据
for (int i = 0; i < 500; i++) {
age = (int) Math.floor((Math.random() * 10) + 20);
height = (int) Math.floor((Math.random() * 160) + 180);
student = new Student("name" + i, age, height);
studentList.add(student);
}

ThreadPoolSingleBatchTaskUtil.doSingleBatchTask(new ThreadPoolSingleBatchTaskUtil.ThreadPoolBatchTaskCallBack() {

// @Override
// public String getInvokeFrom() {
// return ThreadPoolSingleBatchTaskUtil.getInvokeFrom(Thread.currentThread().getStackTrace()[1]);
// }

@Override
public ThreadPoolExecutor getStaticThreadPool() {
return executor;
}

@Override
public int getSingleBatchTaskSize() {
return studentList.size();
}

@Override
public void addTask(int taskIndex) {
Student student1 = studentList.get(taskIndex);
student1.setName(student1.getName() + "这是后缀");
}

@Override
public void currentBatchTaskComplete() {
System.out.println("任务结束");
System.out.println("处理成功的任务数量:"+studentList.stream().filter(x->x.getName().contains("这是后缀")).count());
}
});

ThreadPoolSingleBatchTaskUtil.doSingleBatchTask(new ThreadPoolSingleBatchTaskUtil.ThreadPoolBatchTaskCallBack() {

// @Override
// public String getInvokeFrom() {
// return ThreadPoolSingleBatchTaskUtil.getInvokeFrom(Thread.currentThread().getStackTrace()[1]);
// }

@Override
public ThreadPoolExecutor getStaticThreadPool() {
return executor;
}

@Override
public int getSingleBatchTaskSize() {
return studentList.size();
}

@Override
public void addTask(int taskIndex) {
Student student1 = studentList.get(taskIndex);
student1.setName(student1.getName() + "这是后缀");
}

@Override
public void currentBatchTaskComplete() {
System.out.println("任务结束");
System.out.println("处理成功的任务数量:"+studentList.stream().filter(x->x.getName().contains("这是后缀")).count());
}
});
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Student {
int age;
int height;
String name;

public Student(String name, int age, int height) {
this.name = name;
this.age = age;
this.height = height;
}
}
}

--------------------------------------------------------------------------------
来自 安元项目 fst 转码服务
可能还存在另一个for循环在低概率下的死循环问题(20230125补充)

其他转码资料
https://www.iteye.com/blog/crabdave-2338256
https://www.jb51.net/article/139273.htm
https://stackoverflow.com/questions/19517342/how-to-resolve-pdf-parsing-error
http://cn.voidcc.com/question/p-kieusgmi-uh.html

--------------------------------------------------------------------------------
public static void main(String[] args) throws IOException {
//删除缓存文件
ConvertThreadTaskService convertThreadTaskService = new ConvertThreadTaskService();
convertThreadTaskService.resetConvStatusForLeakedAndDelTempFile();

FileConvertActService outFile = new FileConvertActService();
File sourceFile = new File("D:\\WorkSpace\\projects\\idea\\company\\anyuan\\fst\\fst-web\\src\\test\\resources\\pdf_self_test_convert-slowly.pdf");
System.err.println("开始转码慢测试");
outFile.pdfToImages(sourceFile,FileConvertActService.INVOKE_FUNCTION_NORMAL);
}
--------------------------------------------------------------------------------

//这个数字大了会导致阻塞,转图片的全部阻塞
//TDDO 后期可以 jstack 确认一下有没有死锁
ExecutorService pdfConvert2ImageExecutorService = Executors.newFixedThreadPool(1);
ExecutorService clearExecutorService = Executors.newFixedThreadPool(2);

public final static Integer INVOKE_FUNCTION_NORMAL = 0;
public final static Integer INVOKE_FUNCTION_SMALL = 1;
private final static int TIME_OUT_SECOND = 20;
/**
* PDF转为图片
*
* @param input
* @param invokeFunction 0 外部调用传 FileConvertActService.INVOKE_FUNCTION_NORMAL
* @return
* @throws IOException
*/
public List<File> pdfToImages(File input,Integer invokeFunction) throws IOException {
// @Cleanup
if (invokeFunction == null) {
invokeFunction = FileConvertActService.INVOKE_FUNCTION_NORMAL;
}
PDDocument document = null;
List<File> files = new ArrayList<>();
Integer finalInvokeFunction = invokeFunction;
try {
log.info("pdf转图片,log1:input:{}", input);
document = PDDocument.load(input);
log.info("pdf转图片,log2:input:{}", input);
PDFRenderer pdfRenderer = new PDFRenderer(document);
log.info("pdf转图片,log3:input:{}", input);
CountDownLatch latch = new CountDownLatch(document.getNumberOfPages());

for (int page = 0; page < document.getNumberOfPages(); ++page) {
// if(page > 1){
// //测试
// break;
// }
//每页最多执行1+3次转换,如果其中一张失败,都终止
log.info("pdf转图片,log4:input:{},page={}", input, page);
int finalPage = page;
if(page == 0 && FileConvertActService.INVOKE_FUNCTION_NORMAL.equals(invokeFunction)){
//todo 这里只是判断了第一张图,不是很好,理论上检测renderImage的最大时间
FutureTask<File> future = new FutureTask<>(() -> {
//todo 理论上应该检测这个方法的最大执行时间
File image = renderImage(pdfRenderer,input, finalPage, finalInvokeFunction);
if (image != null && isNotNeedFile(image, finalInvokeFunction)) {
image.delete();
}
return image;
});
pdfConvert2ImageExecutorService.submit(future);
//在上面的
File image = null;
try {
//System.out.println("开始get结果,方式1,page:" + finalPage);
//这一步是阻塞的,所以这种只适合用来判断和一张图处理
image = future.get(TIME_OUT_SECOND, TimeUnit.SECONDS);
if (isNotNeedFile(image, finalInvokeFunction)) {
image.delete();
} else {
if (image != null) {
log.info("pdf转图片,log5:input:{},page={},finalInvokeFunction={},image={}", input, finalPage,finalInvokeFunction,image.getAbsolutePath());
files.add(image);
} else {
log.error("pdf转图片,log9:page={}", finalPage);
}
}
latch.countDown();
//System.out.println("当前方式1 count:"+latch.getCount()); ;
} catch (TimeoutException | ExecutionException e) {
latch = null;
files.forEach(File::delete);
if (FileConvertActService.INVOKE_FUNCTION_SMALL.equals(invokeFunction)) {
//如果已经是小图模式,就要中止了,直接抛出异常
releaseResource(input, document);
log.error("pdf转图片,log10:转码太慢,已放弃,page={},input:{}", finalPage, input.getAbsolutePath());
throw new BizzException("该文档转码过慢,请修改文件格式");
}
log.error("pdf转图片,log10:转码太慢,直接重新转码,转成小图,page={},input:{}", finalPage, input.getAbsolutePath());
//重新转码
return pdfToImages(input, FileConvertActService.INVOKE_FUNCTION_SMALL);
}
} else {
CountDownLatch finalLatch = latch;
pdfConvert2ImageExecutorService.submit(() -> {
try {
//应该来检测这个方法的最大时间,超时真整体重走
File image = null;
//todo 理论上应该检测这个方法的最大执行时间
image = renderImage(pdfRenderer, input, finalPage, finalInvokeFunction);
if (image != null) {
log.info("pdf转图片,log5:input:{},page={}", input, finalPage);
files.add(image);
} else {
log.error("pdf转图片,log9:page={}", finalPage);
}
} catch (Exception e) {
log.error("pdf转图片出现异常,log10:page={}", finalPage,e);
throw new BizzException("该文档转码出现异常,请联系管理员:"+e.getMessage());
}
//System.out.println("开始get结果,方式2,page:"+finalPage);
finalLatch.countDown();
// System.out.println("当前方式2 count:"+finalLatch.getCount()); ;
});
}
}

try {
log.info("pdf转图片,log11,等待多线程转码结果中,input:{},invokeFunction:{}",input,invokeFunction);
latch.await();
} catch (InterruptedException e) {
files.forEach(File::delete);
log.error("pdf转图片中断异常:input:{},invokeFunction:{}",input,invokeFunction,e);
releaseResource(input, document);
throw new BizzException("转码中断异常"+e.getMessage());
}
//重新排序
files.sort(Comparator.comparing(file -> Integer.valueOf(file.getName().split("_")[2])));
} catch (Exception e) {// InterruptedException
files.forEach(File::delete);
log.error("pdf转图片中断异常:input:{},invokeFunction:{}",input,invokeFunction,e);
releaseResource(input, document);
//这里抛出异常,还能走finally吗?
throw new BizzException("转码中断异常"+e.getMessage());
} finally {
releaseResource(input, document);
}
Iterator<File> iterator = files.iterator();
while (iterator.hasNext()) {
File file = iterator.next();
if (isNotNeedFile(file,finalInvokeFunction)) {
file.delete();
iterator.remove();
}
}

clearExecutorService.submit(() -> {
if (convertThreadTaskService == null) {
//单元测试下调用
convertThreadTaskService = new ConvertThreadTaskService();
}
convertThreadTaskService.delTempFile(getFilePrefix(input)+"_"+ (!INVOKE_FUNCTION_NORMAL.equals(finalInvokeFunction) ? "normal" : "small"));
});

log.info("pdf转图片结果:input:{},invokeFunction:{},files:{}",input.getAbsolutePath(),invokeFunction, files.size());//调试的时候传 files
return files;
}

--------------------------------------------------------------------------------
//invokeFunction 0 正常
//invokeFunction 1 转码慢的,改成小图
//@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 0L, multiplier = 1, maxDelay = 0))
//这里思路上是可以通过异常来重试,但是可能不同的页面拿到的大小不同,看起来奇怪
public File renderImage(PDFRenderer pdfRenderer, File input, int page, Integer invokeFunction){ //synchronized 这个也去掉
log.info("pdf转图片,log6:page={},input:{},invokeFunction:{}", page, input.getAbsolutePath(), invokeFunction);

try {
//BufferedImage bim = pdfRenderer.renderImage(page,2,ImageType.RGB,RenderDestination.EXPORT);
BufferedImage bim = null;
if (FileConvertActService.INVOKE_FUNCTION_NORMAL.equals(invokeFunction) ) {
//来自网络:经过测试,dpi为96,100,105,120,150,200中,105显示效果较为清晰,体积稳定,dpi越高图片体积越大,一般电脑显示分辨率为96
//早期是150
bim = pdfRenderer.renderImageWithDPI(page, 150, ImageType.RGB);
} else {
// 1f = 72dip
log.info("pdf转图片,log7-0:page={}", page);
bim = pdfRenderer.renderImage(page, 1f);//1.25f 应该比较合适
}

正文到此结束
本文目录