原创

Java-异步多线程专题-ParamFieldLockUtils-对指定字段加锁-排队等待

可能还有一种【类似】:用这个锁ReentrantLock,把不同的关键字放到map中,Map 这样,遇到 关键字了,取出lock,
调用 lock.lock();用完之后lock.unlock();一定要放到finally中解锁,要不出异常一致锁着
public static void main(String[] args) {
String bzz = "";
//实际是 @Autowired 注入单例模式
ConcurrentTaskBySingleKeyExecutor concurrentTaskBySingleKeyExecutor = new ConcurrentTaskBySingleKeyExecutorImpl();
for (int i = 0; i < 10; i++) {
//这个for循环模拟的多线程
//bzz模拟不同的业务场景
if(i%2==0){
bzz = "yw1_";
}else {
bzz = "yw2_";
}
//便于观察,固定为一个业务场景
bzz = "yw1_";
//bzzid模拟当前场景的业务id
String bzzid = String.valueOf(i%3);
//bzzid = "111";

String finalBzz = bzz;
String finalBzzid = bzzid;
Thread t = new Thread(new Runnable() {
@Override
public void run() {
//doTaskInMultiThreadByWaiting(finalBzz, finalBzzid);
doTaskInMultiThreadByRemove(concurrentTaskBySingleKeyExecutor,finalBzz, finalBzzid);
}
});
t.start();
}
}


//和ConcurrentTaskBySingleKeyExecutor的区别是,ConcurrentTaskBySingleKeyExecutor会废弃并发中的重复key请求
//重复的key会ParamFieldLockUtils.getLock等待
private static void doTaskInMultiThreadByWaiting(String bzz, String bzzid){

String locker = ParamFieldLockUtils.getLock(bzz ,bzzid);
System.err.println("锁:locker="+locker+",hashCode="+locker.hashCode());
synchronized (locker){
System.err.println("拿到锁:bzz="+bzz+",bzzid="+bzzid +",time="+DateUtils.getDateStringNow());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("耗时业务完成,锁:bzz="+ bzz +",bzzid="+bzzid +",time="+DateUtils.getDateStringNow());
}
}

//重复的key会remove掉
private static void doTaskInMultiThreadByRemove(ConcurrentTaskBySingleKeyExecutor concurrentTaskBySingleKeyExecutor,
String bzz, String bzzid){
concurrentTaskBySingleKeyExecutor.doTask(bzz, bzzid, new Runnable() {
@Override
public void run() {
System.err.println("进入业务处理逻辑:bzz=" + bzz + ",bzzid=" + bzzid + ",time=" + DateUtils.getDateStringNow());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("业务处理逻辑,耗时业务完成,锁:bzz=" + bzz + ",bzzid=" + bzzid + ",time=" + DateUtils.getDateStringNow());
}
});

}
package cn.jiangjiesheng.edu.service.train;

import java.util.HashMap;
import java.util.Map;

/**
* 由于synchronized只能锁对象的地址,所有像Long为1000的用户id是锁不住的
* 此类来解决这个问题
* synchronized (ParamFieldLockUtils.getLock("test" + userId)) {}
* 因为是全局的入参当然是"接口名+userId"最好了,这个可能是为了限定业务的,不然不同的业务进来,就混乱了。所以把入参强制改成两个
*
* ParamFieldLockUtils和ConcurrentTaskBySingleKeyExecutor的区别是,ConcurrentTaskBySingleKeyExecutor会废弃并发中的重复key请求
* ParamFieldLockUtils是等待
*/
public class ParamFieldLockUtils {
private static Map<String,String> mMapId = new HashMap<>(), mMapIdCache = new HashMap<>();
/**
* 缓存切换的开始时间,等待{@link #mCacheDeleteTime}时间后将清空切换数据
*/
private static long mCacheCreatTime;
/**
* 最大缓存数(当超出这一数值时,会自动清空),缓存切换等待时间
*/
private static int mMaxCache = 1000, mCacheDeleteTime = 10000;

/**
* 返回的值地址?
* @param bizzFrom
* @param bizzOldId
* @return
*/
public static synchronized String getLock(String bizzFrom, String bizzOldId) {
String returnSt;
bizzOldId = bizzFrom + "_" + bizzOldId;
if (mMapId.size() < mMaxCache) {//数据比较少,普通的返回锁
if (!mMapId.containsKey(bizzOldId)) {
mMapId.put(bizzOldId, bizzOldId);
}
returnSt = mMapId.get(bizzOldId);
} else {//累加的残留数据太多,切换至缓存
//缓存开始时间
long nowMills = System.currentTimeMillis();
if (mMapIdCache.size() == 0) {
mCacheCreatTime = nowMills;
}

if (!mMapIdCache.containsKey(bizzOldId)) {
mMapIdCache.put(bizzOldId, mMapId.getOrDefault(bizzOldId, bizzOldId));
}
returnSt = mMapIdCache.get(bizzOldId);

//等待mCacheChangeTime时间后清除原始数据
if (nowMills - mCacheCreatTime > mCacheDeleteTime) {
mMapId.clear();
//原始和缓存对调即可实现切换
Map<String,String> change = mMapId;
mMapId = mMapIdCache;
mMapIdCache = change;
}
}
return returnSt;
}
}

正文到此结束
本文目录