说实话,Continuation源码的可读性比想象中低,连代码注释也留下了"丑陋的"这句吐槽 。通过上面源码分析和上一节Continuation的一个例子,可以得知Continuation#yield()可以让程序代码中断,然后再次调用Continuation#run()可以从上一个中断位置继续执行,JVM在这个过程中为使用者屏蔽了Continuation和运行此Continuation的平台线程之间的交互细节,让使用者可以专注实际的任务开发即可 。
VirtualThread前面花了不少篇幅介绍Continuation,它是一个全新的API 。已有的JUC类库已经十分完善,如果可以把Continuation融入到已有的JUC体系,那么就可以通过线程池技术去管理运载线程,原有的大多数并发相关API也能直接在协程体系中使用 。从这个背景来看,创造一个Thread类的全新子类用于融合JUC和Continuation是十分合适的,这样通过很小的改造成本就能通过Java继承特性把这个全新子类适配JUC体系,也能扩展一些API让它适配协程新引入的特性,这个全新的子类就是java.lang.VirtualThread:
文章插图
VirtualThread类的继承体系如下:
package java.lang;final class VirtualThread extends BaseVirtualThread { // ...}package java.lang;sealed abstract class BaseVirtualThread extends Thread permits VirtualThread, ThreadBuilders.BoundVirtualThread { // ...}VirtualThread是BaseVirtualThread的子类,而BaseVirtualThread是一个"密封类",它是Thread的子类,只对VirtualThread和ThreadBuilders.BoundVirtualThread开放,并且VirtualThread是包私有访问权限的同时用final关键字修饰,无法被继承 。接着看VirtualThread的成员变量和构造函数:
// java.lang.VirtualThread// Unsafe实例private static final Unsafe U = Unsafe.getUnsafe();// 虚拟线程的ContinuationScope静态常量private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");// 调度器,或者说执行器,默认就是用此调度器运行虚拟线程private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();// 调度线程池实例,用于唤醒带超时阻塞的虚拟线程实例,主要用于sleep的唤醒private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();// pin模式,也就是pined thread的跟踪模式,决定打印堆栈的详细程度,来自于系统参数jdk.tracePinnedThreads,full表示详细,short表示简略private static final int TRACE_PINNING_MODE = tracePinningMode();// 下面几个都是成员地址,用于Unsafe直接操作成员private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");// 调度器实例private final Executor scheduler;// Continuation实例private final Continuation cont;// Continuation实例的Runnable包装实例private final Runnable runContinuation;// 虚拟线程状态,这个值由JVM访问和修改private volatile int state;// 下面的状态集合private static final int NEW = 0;private static final int STARTED = 1;private static final int RUNNABLE = 2; // runnable-unmountedprivate static final int RUNNING = 3; // runnable-mountedprivate static final int PARKING = 4;private static final int PARKED = 5; // unmountedprivate static final int PINNED = 6; // mountedprivate static final int YIELDING = 7; // Thread.yieldprivate static final int TERMINATED = 99; // final state// 虚拟线程unmount后可以从调度过程中挂起的状态private static final int SUSPENDED = 1 << 8;private static final int RUNNABLE_SUSPENDED = (RUNNABLE | SUSPENDED);private static final int PARKED_SUSPENDED = (PARKED | SUSPENDED);// park操作许可private volatile boolean parkPermit;// 运载线程实例private volatile Thread carrierThread;// 终结倒数栅栏实例,主要用于join操作private volatile CountDownLatch termination;// 唯一构造函数VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { // 默认标记bound为false,当bound为true的时候标记为绑定到系统线程 super(name, characteristics, /*bound*/ false); Objects.requireNonNull(task); // 如果传入的调度器实例非空则直接使用 // 否则,如果父线程是虚拟线程,则使用父虚拟线程的调度器实例 // 如果传入的调度器实例为空,父线程为平台线程,那么使用默认的调度器 // choose scheduler if not specified if (scheduler == null) { Thread parent = Thread.currentThread(); if (parent instanceof VirtualThread vparent) { scheduler = vparent.scheduler; } else { scheduler = DEFAULT_SCHEDULER; } } // 赋值调度器 this.scheduler = scheduler; // 封装和初始化Continuation this.cont = new VThreadContinuation(this, task); // 初始化Continuation的Runnable包装器,最终提交到调度器中执行 this.runContinuation = this::runContinuation;}// 虚拟线程Continuation的专有子类,默认为ContinuationScope("VirtualThreads"),从而实现Continuation.enter()执行时候实际上执行的是VirtualThread.run()方法// 也就是 Runnable.run()[runContinuation by carrier thread from executor] --> Continuation.run() --> Continuation.enter() --> VirtualThread.run() --> Runnable.run()[user task]private static class VThreadContinuation extends Continuation { VThreadContinuation(VirtualThread vthread, Runnable task) { super(VTHREAD_SCOPE, () -> vthread.run(task)); } // pin之前回调的方法,基于TRACE_PINNING_MODE的返回值决定pinned线程栈的打印详略 @Override protected void onPinned(Continuation.Pinned reason) { if (TRACE_PINNING_MODE > 0) { boolean printAll = (TRACE_PINNING_MODE == 1); PinnedThreadPrinter.printStackTrace(System.out, printAll); } }}// 在当前线程上运行或继续Continuation的执行,必须由平台线程运行此方法,最终会封装为Runnble包装器提交到执行器中运行private void runContinuation() { // the carrier must be a platform thread if (Thread.currentThread().isVirtual()) { throw new WrongThreadException(); } // set state to RUNNING boolean firstRun; int initialState = state(); // 当前为STARTED状态并且CAS更新为RUNNING状态则标记首次运行为true if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) { // first run firstRun = true; } else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) { // 当前为RUNNABLE状态并且CAS更新为RUNNING状态则标记首次运行为false,并且设置park许可为false // consume parking permit setParkPermit(false); firstRun = false; } else { // not runnable return; } // notify JVMTI before mount if (notifyJvmtiEvents) notifyJvmtiMountBegin(firstRun); try { // 执行Continuation.run() cont.run(); } finally { // Continuation执行完成,回调钩子方法afterTerminate if (cont.isDone()) { afterTerminate(/*executed*/ true); } else { // Continuation没有执行完成,说明调用了Continuation.yield或者pin到运载线程中进行了park操作 afterYield(); } }}// Continuation执行完成回调的钩子方法private void afterTerminate(boolean executed) { assert (state() == TERMINATED) && (carrierThread == null); if (executed) { if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(true); } // 如果有其他线程阻塞等待虚拟线程的返回,例如调用了join方法,那么在这里解除阻塞 CountDownLatch termination = this.termination; if (termination != null) { assert termination.getCount() == 1; termination.countDown(); } // 如果执行成功则通知线程容器当前线程实例退出,清空线程本地变量引用 if (executed) { // notify container if thread executed threadContainer().onExit(this); // clear references to thread locals clearReferences(); }}// 由于Continuation的yield操作或者调用了Thread.yield()导致Continuation挂起,需要重新把Continuation的包装器"懒提交"到调度器中private void afterYield() { int s = state(); assert (s == PARKING || s == YIELDING) && (carrierThread == null); // 如果是PARKING状态,这种对应于Continuation的yield操作调用 if (s == PARKING) { // 更变为PARKED状态 setState(PARKED); // notify JVMTI that unmount has completed, thread is parked if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false); // 得到park许可,并且CAS为RUNNABLE状态 if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) { // 进行懒提交,如果可能的话,用当前线程作为运载线程继续执行任务 lazySubmitRunContinuation(); } } else if (s == YIELDING) { // 如果是YIELDING状态,这种对应于调用了Thread.yield // 更变为RUNNABLE状态 setState(RUNNABLE); // notify JVMTI that unmount has completed, thread is runnable if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false); // 进行懒提交,如果可能的话,用当前线程作为运载线程继续执行任 lazySubmitRunContinuation(); }}
经验总结扩展阅读
- 第一篇 TTD 专题 :C# 那些短命线程都在干什么?
- Java并发编程 | 从进程、线程到并发问题实例解决
- 七 Netty 学习:NioEventLoop 对应线程的创建和启动源码说明
- 云原生虚拟网络 tun/tap & veth-pair
- 补充部分---ScheduledThreadPoolExecutor类分析 线程池底层原理详解与源码分析
- 附Anaconda安装包 Anaconda安装和卸载+虚拟环境Tensorflow安装以及末尾问题大全,这一篇就够了!!!
- 虚拟发货怎么确认收货
- 建议收藏 Java线程同步的四种方式详解
- 用AR Engine手部骨骼跟踪能力实现虚拟手表试戴
- 通过Thread Pool Executor类解析线程池执行任务的核心流程