Java-异步多线程专题-TransmittableThreadLocal-ThreadLocal-异步场景值传递
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.6.1</version>
</dependency>
package cn.jiangjiesheng;
import com.alibaba.ttl.TransmittableThreadLocal;
import cn.jiangjiesheng.edu.GinEduApplication;
import cn.jiangjiesheng.edu.core.executor.AfterCommitExecutor;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.transaction.annotation.Transactional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@SpringBootTest(classes = GinEduApplication.class)
class GinEduApplicationTests {
@Autowired
AfterCommitExecutor afterCommitExecutor;
@Test
@Transactional//触发afterCommitExecutor异步
void contextLoads() {
ThreadLocal<Integer> transmittableThreadLocal = new TransmittableThreadLocal<>();
transmittableThreadLocal.set(3);
ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
transmittableThreadLocal.set(3);
System.err.println("transmittableThreadLocal t0="+ transmittableThreadLocal.get());
System.err.println("threadLocal t0="+ transmittableThreadLocal.get());
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
System.err.println("transmittableThreadLocal t1=" + transmittableThreadLocal.get());
System.err.println("threadLocal t1=" + threadLocal.get());
}
});
t1.start();
Runnable r2 = new Runnable() {
@Override
public void run() {
System.err.println("transmittableThreadLocal r2=" + transmittableThreadLocal.get());
System.err.println("threadLocal r2=" + threadLocal.get());
}
};
r2.run();
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(new Runnable() {
@Override
public void run() {
System.err.println("transmittableThreadLocal e3=" + transmittableThreadLocal.get());
System.err.println("threadLocal e3=" + threadLocal.get());
}
});
afterCommitExecutor.execute(new Runnable() {
@Override
public void run() {
System.err.println("transmittableThreadLocal a4=" + transmittableThreadLocal.get());
System.err.println("threadLocal a4=" + threadLocal.get());
}
});
// transmittableThreadLocal t0=3
// threadLocal t0=3
// transmittableThreadLocal t1=3
// threadLocal t1=null
// transmittableThreadLocal r2=3
// threadLocal r2=null
// transmittableThreadLocal e3=3
// threadLocal e3=null
}
}
ThreadLocal系列(三)-TransmittableThreadLocal的使用及原理解析
https://www.cnblogs.com/hama1993/p/10409740.html
一、基本使用
首先,TTL是用来解决ITL解决不了的问题而诞生的,所以TTL一定是支持父线程的本地变量传递给子线程这种基本操作的,ITL也可以做到,但是前面有讲过,ITL在线程池的模式下,就没办法再正确传递了,所以TTL做出的改进就是即便是在线程池模式下,也可以很好的将父线程本地变量传递下去,先来看个例子:
// 需要注意的是,使用TTL的时候,要想传递的值不出问题,线程池必须得用TTL加一层代理(下面会讲这样做的目的)
private static ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(2));
private static ThreadLocal tl = new TransmittableThreadLocal<>(); //这里采用TTL的实现
public static void main(String[] args) {
new Thread(() -> {
String mainThreadName = "main_01";
tl.set(1);
executorService.execute(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
});
executorService.execute(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
});
executorService.execute(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
});
sleep(1L); //确保上面的会在tl.set执行之前执行
tl.set(2); // 等上面的线程池第一次启用完了,父线程再给自己赋值
executorService.execute(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
});
executorService.execute(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
});
executorService.execute(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
});
System.out.println(String.format("线程名称-%s, 变量值=%s", Thread.currentThread().getName(), tl.get()));
}).start();
new Thread(() -> {
String mainThreadName = "main_02";
tl.set(3);
executorService.execute(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
});
executorService.execute(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
});
executorService.execute(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
});
sleep(1L); //确保上面的会在tl.set执行之前执行
tl.set(4); // 等上面的线程池第一次启用完了,父线程再给自己赋值
executorService.execute(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
});
executorService.execute(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
});
executorService.execute(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
});
System.out.println(String.format("线程名称-%s, 变量值=%s", Thread.currentThread().getName(), tl.get()));
}).start();
}
private static void sleep(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
运行结果:
线程名称-Thread-2, 变量值=4
本地变量改变之前(3), 父线程名称-main_02, 子线程名称-pool-1-thread-1, 变量值=3
线程名称-Thread-1, 变量值=2
本地变量改变之前(1), 父线程名称-main_01, 子线程名称-pool-1-thread-2, 变量值=1
本地变量改变之前(1), 父线程名称-main_01, 子线程名称-pool-1-thread-1, 变量值=1
本地变量改变之前(3), 父线程名称-main_02, 子线程名称-pool-1-thread-2, 变量值=3
本地变量改变之前(3), 父线程名称-main_02, 子线程名称-pool-1-thread-2, 变量值=3
本地变量改变之前(1), 父线程名称-main_01, 子线程名称-pool-1-thread-1, 变量值=1
本地变量改变之后(2), 父线程名称-main_01, 子线程名称-pool-1-thread-2, 变量值=2
本地变量改变之后(4), 父线程名称-main_02, 子线程名称-pool-1-thread-1, 变量值=4
本地变量改变之后(4), 父线程名称-main_02, 子线程名称-pool-1-thread-1, 变量值=4
本地变量改变之后(4), 父线程名称-main_02, 子线程名称-pool-1-thread-2, 变量值=4
本地变量改变之后(2), 父线程名称-main_01, 子线程名称-pool-1-thread-1, 变量值=2
本地变量改变之后(2), 父线程名称-main_01, 子线程名称-pool-1-thread-2, 变量值=2
程序有些啰嗦,为了说明问题,加了很多说明,但至少通过上面的例子,不难发现,两个主线程里都使用线程池异步,而且值在主线程里还发生过改变,测试结果展示一切正常,由此可以知道TTL在使用线程池的情况下,也可以很好的完成传递,而且不会发生错乱。
那么是不是对普通线程异步也有这么好的支撑呢?
改造下上面的测试代码:
private static ThreadLocal tl = new TransmittableThreadLocal<>();
public static void main(String[] args) {
new Thread(() -> {
String mainThreadName = "main_01";
tl.set(1);
new Thread(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
}).start();
new Thread(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
}).start();
new Thread(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
}).start();
sleep(1L); //确保上面的会在tl.set执行之前执行
tl.set(2); // 等上面的线程池第一次启用完了,父线程再给自己赋值
new Thread(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
}).start();
new Thread(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
}).start();
new Thread(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
}).start();
System.out.println(String.format("线程名称-%s, 变量值=%s", Thread.currentThread().getName(), tl.get()));
}).start();
new Thread(() -> {
String mainThreadName = "main_02";
tl.set(3);
new Thread(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
}).start();
new Thread(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
}).start();
new Thread(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
}).start();
sleep(1L); //确保上面的会在tl.set执行之前执行
tl.set(4); // 等上面的线程池第一次启用完了,父线程再给自己赋值
new Thread(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
}).start();
new Thread(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
}).start();
new Thread(() -> {
sleep(1L);
System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
}).start();
System.out.println(String.format("线程名称-%s, 变量值=%s", Thread.currentThread().getName(), tl.get()));
}).start();
}
相比第一段测试代码,这一段的异步全都是普通异步,未采用线程池的方式进行异步,看下运行结果:
本地变量改变之后(4), 父线程名称-main_02, 子线程名称-Thread-14, 变量值=4
本地变量改变之前(1), 父线程名称-main_01, 子线程名称-Thread-5, 变量值=1
线程名称-Thread-1, 变量值=2
本地变量改变之前(1), 父线程名称-main_01, 子线程名称-Thread-3, 变量值=1
本地变量改变之后(2), 父线程名称-main_01, 子线程名称-Thread-11, 变量值=2
本地变量改变之前(3), 父线程名称-main_02, 子线程名称-Thread-6, 变量值=3
本地变量改变之后(4), 父线程名称-main_02, 子线程名称-Thread-12, 变量值=4
本地变量改变之后(4), 父线程名称-main_02, 子线程名称-Thread-10, 变量值=4
本地变量改变之前(3), 父线程名称-main_02, 子线程名称-Thread-8, 变量值=3
本地变量改变之前(3), 父线程名称-main_02, 子线程名称-Thread-4, 变量值=3
本地变量改变之前(1), 父线程名称-main_01, 子线程名称-Thread-7, 变量值=1
线程名称-Thread-2, 变量值=4
本地变量改变之后(2), 父线程名称-main_01, 子线程名称-Thread-9, 变量值=2
本地变量改变之后(2), 父线程名称-main_01, 子线程名称-Thread-13, 变量值=2
ok,可以看到,达到了跟第一个测试一致的结果。
到这里,通过上述两个例子,TTL的基本使用,以及其解决的问题,我们已经有了初步的了解,下面我们来解析一下其内部原理,看看TTL是怎么完成对ITL的优化的。
二、原理分析
先来看TTL里面的几个重要属性及方法
TTL定义:
public class TransmittableThreadLocal extends InheritableThreadLocal
可以看到,TTL继承了ITL,意味着TTL首先具备ITL的功能。
再来看看一个重要属性holder:
/**
* 这是一个ITL类型的对象,持有一个全局的WeakMap(weakMap的key是弱引用,同TL一样,也是为了解决内存泄漏的问题),里面存放了TTL对象
* 并且重写了initialValue和childValue方法,尤其是childValue,可以看到在即将异步时父线程的属性是直接作为初始化值赋值给子线程的本地变量对象(TTL)的
*/
private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
@Override
protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
}
@Override
protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
}
};
再来看下set和get:
//下面的方法均属于TTL类
@Override
public final void set(T value) {
super.set(value);
if (null == value) removeValue();
else addValue();
}
@Override
public final T get() {
T value = super.get();
if (null != value) addValue();
return value;
}
private void removeValue() {
holder.get().remove(this); //从holder持有的map对象中移除
}
private void addValue() {
if (!holder.get().containsKey(this)) {
holder.get().put(this, null); //从holder持有的map对象中添加
}
}
TTL里先了解上述的几个方法及对象,可以看出,单纯的使用TTL是达不到支持线程池本地变量的传递的,通过第一部分的例子,可以发现,除了要启用TTL,还需要通过TtlExecutors.getTtlExecutorService包装一下线程池才可以,那么,下面就来看看在程序即将通过线程池异步的时候,TTL帮我们做了哪些操作(这一部分是TTL支持线程池传递的核心部分):
首先打开包装类,看下execute方法在执行时做了些什么。
// 此方法属于线程池包装类ExecutorTtlWrapper
@Override
public void execute(@Nonnull Runnable command) {
executor.execute(TtlRunnable.get(command)); //这里会把Rannable包装一层,这是关键,有些逻辑处理,需要在run之前执行
}
// 对应上面的get方法,返回一个TtlRunnable对象,属于TtLRannable包装类
@Nullable
public static TtlRunnable get(@Nullable Runnable runnable) {
return get(runnable, false, false);
}
// 对应上面的get方法
@Nullable
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
if (null == runnable) return null;
if (runnable instanceof TtlEnhanced) { // 若发现已经是目标类型了(说明已经被包装过了)直接返回
// avoid redundant decoration, and ensure idempotency
if (idempotent) return (TtlRunnable) runnable;
else throw new IllegalStateException("Already TtlRunnable!");
}
return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun); //最终初始化
}
// 对应上面的TtlRunnable方法
private TtlRunnable(@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
this.capturedRef = new AtomicReference<Object>(capture()); //这里将捕获后的父线程本地变量存储在当前对象的capturedRef里
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
// 对应上面的capture方法,用于捕获当前线程(父线程)里的本地变量,此方法属于TTL的静态内部类Transmitter
@Nonnull
public static Object capture() {
Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) { // holder里目前存放的k-v里的key,就是需要传给子线程的TTL对象
captured.put(threadLocal, threadLocal.copyValue());
}
return captured; // 这里返回的这个对象,就是当前将要使用线程池异步出来的子线程,所继承的本地变量合集
}
// 对应上面的copyValue,简单的将TTL对象里的值返回(结合之前的源码可以知道get方法其实就是获取当前线程(父线程)里的值,调用super.get方法)
private T copyValue() {
return copy(get());
}
protected T copy(T parentValue) {
return parentValue;
}
结合上述代码,大致知道了在线程池异步之前需要做的事情,其实就是把当前父线程里的本地变量取出来,然后赋值给Rannable包装类里的capturedRef属性,到此为止,下面会发生什么,我们大致上可以猜出来了,接下来大概率会在run方法里,将这些捕获到的值赋给子线程的holder赋对应的TTL值,那么我们继续往下看Rannable包装类里的run方法是怎么实现的:
//run方法属于Rannable的包装类TtlRunnable
@Override
public void run() {
Object captured = capturedRef.get(); // 获取由之前捕获到的父线程变量集
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
/**
* 重点方法replay,此方法用来给当前子线程赋本地变量,返回的backup是此子线程原来就有的本地变量值(原生本地变量,下面会详细讲),
* backup用于恢复数据(如果任务执行完毕,意味着该子线程会归还线程池,那么需要将其原生本地变量属性恢复)
*/
Object backup = replay(captured);
try {
runnable.run(); // 执行异步逻辑
} finally {
restore(backup); // 结合上面对于replay的解释,不难理解,这个方法就是用来恢复原有值的
}
}
根据上述代码,我们看到了TTL在异步任务执行前,会先进行赋值操作(就是拿着异步发生时捕获到的父线程的本地变量,赋给自己),当任务执行完,就恢复原生的自己本身的线程变量值。
下面来具体看这俩方法:
//下面的方法均属于TTL的静态内部类Transmittable
@Nonnull
public static Object replay(@Nonnull Object captured) {
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured; //使用此线程异步时捕获到的父线程里的本地变量值
Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>(); //当前线程原生的本地变量,用于使用完线程后恢复用
//注意:这里循环的是当前子线程原生的本地变量集合,与本方法相反,restore方法里循环这个holder是指:该线程运行期间产生的变量+父线程继承来的变量
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
backup.put(threadLocal, threadLocal.get()); // 所有原生的本地变量都暂时存储在backup里,用于之后恢复用
/**
* 检查,如果捕获到的线程变量里,不包含当前原生变量值,则从当前原生变量里清除掉,对应的线程本地变量也清掉
* 这就是为什么会将原生变量保存在backup里的原因,为了恢复原生值使用
* 那么,为什么这里要清除掉呢?因为从使用这个子线程做异步那里,捕获到的本地变量并不包含原生的变量,当前线程
* 在做任务时的首要目标,是将父线程里的变量完全传递给任务,如果不清除这个子线程原生的本地变量,
* 意味着很可能会影响到任务里取值的准确性。
*
* 打个比方,有ttl对象tl,这个tl在线程池的某个子线程里存在对应的值2,当某个主线程使用该子线程做异步任务时
* tl这个对象在当前主线程里没有值,那么如果不进行下面这一步的操作,那么在使用该子线程做的任务里就可以通过
* 该tl对象取到值2,不符合预期
*/
if (!capturedMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// 这一步就是直接把父线程本地变量赋值给当前线程了(这一步起就刷新了holder里的值了,具体往下看该方法,在异步线程运行期间,还可能产生别的本地变量,比如在真正的run方法内的业务代码,再用一个tl对象设置一个值)
setTtlValuesTo(capturedMap);
// 这个方法属于扩展方法,ttl本身支持重写异步任务执行前后的操作,这里不再具体赘述
doExecuteCallback(true);
return backup;
}
// 结合之前Rannable包装类的run方法来看,这个方法就是使用上面replay记录下的原生线程变量做恢复用的
public static void restore(@Nonnull Object backup) {
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
// call afterExecute callback
doExecuteCallback(false);
// 注意,这里的holder取出来的,实际上是replay方法设置进去的关于父线程里的所有变量(结合上面来看,就是:该线程运行期间产生的变量+父线程继承来的变量)
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
/**
* 同样的,如果子线程原生变量不包含某个父线程传来的对象,那么就删除,可以思考下,这里的清除跟上面replay里的有什么不同?
* 这里会把不属于原生变量的对象给删除掉(这里被删除掉的可能是父线程继承下来的,也可能是异步任务在执行时产生的新值)
*/
if (!backupMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// 同样调用这个方法,进行值的恢复
setTtlValuesTo(backupMap);
}
// 真正给当前子线程赋值的方法,对应上面的setTtlValuesTo方法
private static void setTtlValuesTo(@Nonnull Map<TransmittableThreadLocal<?>, Object> ttlValues) {
for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) {
@SuppressWarnings("unchecked")
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
threadLocal.set(entry.getValue()); //赋值,注意,从这里开始,子线程的holder里的值会被重新赋值刷新,可以参照上面ttl的set方法的实现
}
}
ok,到这里基本上把TTL比较核心的代码看完了,下面整理下整个流程,这是官方给出的时序图:
图2-1
上图第一行指的是类名称,下面的流程指的是类所做的事情,根据上面罗列出来的源码,结合这个时序图,可以比较直观一些的理解整个流程。
三、TTL中线程池子线程原生变量的产生
这一节是为了验证上面replay和restore,现在通过一个例子来验证下,先把源码down下来,在源码的restore和replay上分别加上输出语句,遍历holder:
//replay前后打印holder里面的值
public static Object replay(@Nonnull Object captured) {
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
System.out.println("--------------------replay前置,当前拿到的holder里的TTL列表");
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
System.out.println(String.format("replay前置里拿到原生的ttl_k=%s, ttl_value=%s", threadLocal.hashCode(), threadLocal.get()));
}
for...//代码省略,具体看上面
setTtlValuesTo(capturedMap);
doExecuteCallback(true);
System.out.println("--------------------reply后置,当前拿到的holder里的TTL列表");
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
System.out.println(String.format("replay后置里拿到原生的ttl_k=%s, ttl_value=%s", threadLocal.hashCode(), threadLocal.get()));
}
return backup;
}
//restore前后打印holder里面的值
public static void restore(@Nonnull Object backup) {
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
// call afterExecute callback
doExecuteCallback(false);
System.out.println("--------------------restore前置,当前拿到的holder里的TTL列表");
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
System.out.println(String.format("restore前置里拿到当前线程内变量,ttl_k=%s, ttl_value=%s", threadLocal.hashCode(), threadLocal.get()));
}
for...//省略代码,具体具体看上面
setTtlValuesTo(backupMap);
System.out.println("--------------------restore后置,当前拿到的holder里的TTL列表");
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
System.out.println(String.format("restore后置里拿到当前线程内变量,ttl_k=%s, ttl_value=%s", threadLocal.hashCode(), threadLocal.get()));
}
}
代码这样做的目的,就是要说明线程池所谓的原生本地变量是怎么产生的,以及replay和restore是怎么设置和恢复的,下面来看个简单的例子:
private static ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(1));
private static ThreadLocal tl = new TransmittableThreadLocal();
private static ThreadLocal tl2 = new TransmittableThreadLocal();
public static void main(String[] args) throws InterruptedException {
tl.set(1);
tl2.set(2);
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
运行结果如下:
--------------------replay前置,当前拿到的holder里的TTL列表
replay前置里拿到原生的ttl_k=1259475182, ttl_value=2
replay前置里拿到原生的ttl_k=929338653, ttl_value=1
--------------------reply后置,当前拿到的holder里的TTL列表
replay后置里拿到原生的ttl_k=1259475182, ttl_value=2
replay后置里拿到原生的ttl_k=929338653, ttl_value=1
--------------------restore前置,当前拿到的holder里的TTL列表
restore前置里拿到当前线程内变量,ttl_k=1259475182, ttl_value=2
restore前置里拿到当前线程内变量,ttl_k=929338653, ttl_value=1
--------------------restore后置,当前拿到的holder里的TTL列表
restore后置里拿到当前线程内变量,ttl_k=1259475182, ttl_value=2
restore后置里拿到当前线程内变量,ttl_k=929338653, ttl_value=1
我们会发现,原生值产生了,从异步开始,就确定了线程池里的线程具备了1和2的值,那么,再来改动下上面的测试代码:
public static void main(String[] args) throws InterruptedException {
tl.set(1);
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread.sleep(1000L);
tl2.set(2);//较第一次换下位置,换到第一次使用线程池后执行(这意味着下面这次异步不会再触发Thread的init方法了)
System.out.println("---------------------------------------------------------------------------------");
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
运行结果为:
--------------------replay前置,当前拿到的holder里的TTL列表
replay前置里拿到原生的ttl_k=929338653, ttl_value=1
--------------------reply后置,当前拿到的holder里的TTL列表
replay后置里拿到原生的ttl_k=929338653, ttl_value=1
--------------------restore前置,当前拿到的holder里的TTL列表
restore前置里拿到当前线程内变量,ttl_k=929338653, ttl_value=1
--------------------restore后置,当前拿到的holder里的TTL列表
restore后置里拿到当前线程内变量,ttl_k=929338653, ttl_value=1
---------------------------------------------------------------------------------
--------------------replay前置,当前拿到的holder里的TTL列表
replay前置里拿到原生的ttl_k=929338653, ttl_value=1
--------------------reply后置,当前拿到的holder里的TTL列表
replay后置里拿到原生的ttl_k=1020371697, ttl_value=2
replay后置里拿到原生的ttl_k=929338653, ttl_value=1
--------------------restore前置,当前拿到的holder里的TTL列表
restore前置里拿到当前线程内变量,ttl_k=1020371697, ttl_value=2
restore前置里拿到当前线程内变量,ttl_k=929338653, ttl_value=1
--------------------restore后置,当前拿到的holder里的TTL列表
restore后置里拿到当前线程内变量,ttl_k=929338653, ttl_value=1
可以发现,第一次异步时,只有一个值被传递了下去,然后第二次异步,新加了一个tl2的值,但是看第二次异步的打印,会发现,restore恢复后,仍然是第一次异步发生时放进去的那个tl的值。
通过上面的例子,基本可以确认,所谓线程池内线程的本地原生变量,其实是第一次使用线程时被传递进去的值,我们之前有说过TTL是继承至ITL的,之前的文章也说过,线程池第一次启用时是会触发Thread的init方法的,也就是说,在第一次异步时拿到的主线程的变量会被传递给子线程,作为子线程的原生本地变量保存起来,后续是replay操作和restore操作也是围绕着这个原生变量(即原生holder里的值)来进行设置、恢复的,设置的是当前父线程捕获到的本地变量,恢复的是子线程原生本地变量。
holder里持有的可以理解就是当前线程内的所有本地变量,当子线程将异步任务执行完毕后,会执行restore进行恢复原生本地变量,具体参照上面的代码和测试代码。
四、总结
到这里基本上确认了TTL是如何进行线程池传值的,以及被包装的run方法执行异步任务之前,会使用replay进行设置父线程里的本地变量给当前子线程,任务执行完毕,会调用restore恢复该子线程原生的本地变量(目前原生本地变量的产生,就只碰到上述测试代码中的这一种情况,即线程第一次使用时通过ITL属性以及Thread的init方法传给子线程,还不太清楚有没有其他方式设置)。
其实,正常程序里想要完成线程池上下文传递,使用TL就足够了,我们可以效仿TTL包装线程池对象的原理,进行值传递,异步任务结束后,再remove,以此类推来完成线程池值传递,不过这种方式过于单纯,且要求上下文为只读对象,否则子线程存在写操作,就会发生上下文污染。
TTL项目地址(可以详细了解下它的其他特性和用法):https://github.com/alibaba/transmittable-thread-local
正文到此结束