原创

Java-异步多线程专题-ConcurrentTaskBySingleKeyExecutor-同一次并发中,只接受1个key的任务


ConcurrentTaskBySingleKeyExecutor会废弃并发中的重复key请求 ,实际上要不要加上一定时间的参数控制,不然完全取决于key任务的耗时时间,如果本来耗时就少,那实际上可能还是类似于排队等待效果了。
简单的实现,应该可以参考 《Java-并发重复请求拒绝-判断参数缓存10秒-ngxin返回499》
ConcurrentTaskBySingleKeyExecutor.java

package com.safety51.edu.service.train;

import com.google.common.collect.Lists;

import java.util.List;

public interface ConcurrentTaskBySingleKeyExecutor {
//补加一个参数,防止不同的业务调用进来,被混淆了后丢弃数据
void doTask(String bizzFrom,String bizzKey,Runnable run);

/**
* 测试
* @param args
*/
public static void main(String[] args) {
List<String> userIdList = Lists.newArrayList("user1", "user1", "user2", "user2", "user3", "user3");
ConcurrentTaskBySingleKeyExecutor asyncTaskByParam = new ConcurrentTaskBySingleKeyExecutorImpl();//正式环境应该注入
userIdList.forEach(userId->{
asyncTaskByParam.doTask("testBizzFrom", userId, new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println("userId:"+userId+"执行完了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
});
}
}

ConcurrentTaskBySingleKeyExecutorImpl.java

package com.safety51.edu.core.executor;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.safety51.bootstrap.commons.exception.BizzException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* 同一次并发中,只接受1个key的任务,重复key的任务废弃
* main方法测试时发现任务执行完了,程序不会终止,是因为有线程池ExecutorService,
* 一般这个线程池是不需要关闭的,就算要关闭也要特别小心,不能随意关闭,确保任务执行完毕或达到超时时间,具体百度。
* ConcurrentTaskBySingleKeyExecutor会废弃并发中的重复key请求 ,实际上要不要加上一定时间的参数控制,
* 不然完全取决于key任务的耗时时间,如果本来耗时就少,那实际上可能还是类似于排队等待效果了。
* create by jiangjiesheng,20211222
*/
@Service
@Slf4j
public class ConcurrentTaskBySingleKeyExecutorImpl implements ConcurrentTaskBySingleKeyExecutor {
//如果是分布式场景,lockByKeySet应该使用redisson实现分布式锁
private final static Set<String> lockByKeySet = Sets.newConcurrentHashSet();
//Set<Future> 可以改成 单个对象【实测没用到】
private final static HashMap<String, Set<Future>> hashMapFutureList1 = Maps.newHashMap();
private final ExecutorService executorService = Executors.newFixedThreadPool(5);

//补加一个参数,防止不同的业务调用进来,被混淆了后丢弃数据
@Override
public void doTask(String bizzFrom, String bizzKey, Runnable runnable) {
if (StringUtils.isBlank(bizzKey) || runnable == null) {
throw new BizzException("key或runnable不能为空");
}
bizzKey = bizzFrom + "_" + bizzKey;
if (checkRepeatByKey(bizzKey)) {
return;
}
String finalKey = bizzKey;
Future future = executorService.submit(new Runnable() {
@Override
public void run() {
doRunnableTask(finalKey, runnable);
checkAndResetKeyLock(finalKey);
}
});
// Set<Future> use4FutureSet = hashMapFutureList.get(key);
// if (use4FutureSet == null) {
// use4FutureSet = Sets.newConcurrentHashSet();
// }
// use4FutureSet.add(future);
// hashMapFutureList.put(key, use4FutureSet);
}

/**
* 因为这里的doTask是异步耗时任务,所以这里直接判断肯定是没有执行完
*/
private void checkAndResetKeyLock(String key) {
boolean isAllDone = true;
// if (!hashMapFutureList.isEmpty()) {
// for (Map.Entry<String, Set<Future>> entry : hashMapFutureList.entrySet()) {
// String futureKey = entry.getKey();
// if (key.equals(futureKey)) {
// Set<Future> futureSet = entry.getValue();
// for (Future future : futureSet) {
// boolean done = future.isDone();
// if (!done) {
// isAllDone = false;
// break;
// }
// }
// }
// }
// }
// if (isAllDone) {
// //居然没走到
// log.info("key:{},isAllDone true", key);
// lockByKeySet.remove(key);
// } else {
// log.info("key:{},isAllDone false",key);
//耗时的情况下要去等待异步线程执行完毕
// 似乎 lockByKeySet.remove 在 doRunnableTask中完成 就行
// while (true) {
// try {
// if (lockByKeySet.contains(key)) {
// Thread.sleep(1000);
// checkAndResetKeyLock(key);
// } else {
// log.info("key:{},任务已处理完毕 ", key);
// lockByKeySet.remove(key);
// break;
// }
// } catch (InterruptedException e) {
// lockByKeySet.remove(key);
// log.error("key:{},检测任务处理状态出现异常 ", key, e);
// break;
// }
// }
// }
}

private static boolean checkRepeatByKey(String key) {
//锁住的是地址。
synchronized (lockByKeySet) {
if (lockByKeySet.contains(key)) {
log.info("key=" + key + ",已在执行");
return true;
}
lockByKeySet.add(key);
}
return false;
}

private static void doRunnableTask(String key, Runnable runnable) {
try {
log.info("key:{},开始任务处理", key);
if (runnable == null) {
return;
}
runnable.run();
} catch (Exception e) {
log.error("key:{},任务处理出现异常", key, e);
} finally {
//耗时也是在这里
lockByKeySet.remove(key);
log.info("key:{},任务已处理完毕 ", key);
}
}
}
正文到此结束
本文目录