虚拟线程 - VirtualThread源码透视( 六 )

说实话,Continuation源码的可读性比想象中低,连代码注释也留下了"丑陋的"这句吐槽 。通过上面源码分析和上一节Continuation的一个例子,可以得知Continuation#yield()可以让程序代码中断,然后再次调用Continuation#run()可以从上一个中断位置继续执行,JVM在这个过程中为使用者屏蔽了Continuation和运行此Continuation的平台线程之间的交互细节,让使用者可以专注实际的任务开发即可 。
VirtualThread前面花了不少篇幅介绍Continuation,它是一个全新的API 。已有的JUC类库已经十分完善,如果可以把Continuation融入到已有的JUC体系,那么就可以通过线程池技术去管理运载线程,原有的大多数并发相关API也能直接在协程体系中使用 。从这个背景来看,创造一个Thread类的全新子类用于融合JUC和Continuation是十分合适的,这样通过很小的改造成本就能通过Java继承特性把这个全新子类适配JUC体系,也能扩展一些API让它适配协程新引入的特性,这个全新的子类就是java.lang.VirtualThread:

虚拟线程 - VirtualThread源码透视

文章插图
VirtualThread类的继承体系如下:
package java.lang;final class VirtualThread extends BaseVirtualThread {  // ...}package java.lang;sealed abstract class BaseVirtualThread extends Thread        permits VirtualThread, ThreadBuilders.BoundVirtualThread {  // ...}VirtualThread是BaseVirtualThread的子类,而BaseVirtualThread是一个"密封类",它是Thread的子类,只对VirtualThread和ThreadBuilders.BoundVirtualThread开放,并且VirtualThread是包私有访问权限的同时用final关键字修饰,无法被继承 。接着看VirtualThread的成员变量和构造函数:
// java.lang.VirtualThread// Unsafe实例private static final Unsafe U = Unsafe.getUnsafe();// 虚拟线程的ContinuationScope静态常量private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");// 调度器,或者说执行器,默认就是用此调度器运行虚拟线程private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();// 调度线程池实例,用于唤醒带超时阻塞的虚拟线程实例,主要用于sleep的唤醒private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();// pin模式,也就是pined thread的跟踪模式,决定打印堆栈的详细程度,来自于系统参数jdk.tracePinnedThreads,full表示详细,short表示简略private static final int TRACE_PINNING_MODE = tracePinningMode();// 下面几个都是成员地址,用于Unsafe直接操作成员private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");// 调度器实例private final Executor scheduler;// Continuation实例private final Continuation cont;// Continuation实例的Runnable包装实例private final Runnable runContinuation;// 虚拟线程状态,这个值由JVM访问和修改private volatile int state;// 下面的状态集合private static final int NEW      = 0;private static final int STARTED  = 1;private static final int RUNNABLE = 2;     // runnable-unmountedprivate static final int RUNNING  = 3;     // runnable-mountedprivate static final int PARKING  = 4;private static final int PARKED   = 5;     // unmountedprivate static final int PINNED   = 6;     // mountedprivate static final int YIELDING = 7;     // Thread.yieldprivate static final int TERMINATED = 99;  // final state// 虚拟线程unmount后可以从调度过程中挂起的状态private static final int SUSPENDED = 1 << 8;private static final int RUNNABLE_SUSPENDED = (RUNNABLE | SUSPENDED);private static final int PARKED_SUSPENDED   = (PARKED | SUSPENDED);// park操作许可private volatile boolean parkPermit;// 运载线程实例private volatile Thread carrierThread;// 终结倒数栅栏实例,主要用于join操作private volatile CountDownLatch termination;// 唯一构造函数VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {    // 默认标记bound为false,当bound为true的时候标记为绑定到系统线程    super(name, characteristics, /*bound*/ false);    Objects.requireNonNull(task);    // 如果传入的调度器实例非空则直接使用    // 否则,如果父线程是虚拟线程,则使用父虚拟线程的调度器实例    // 如果传入的调度器实例为空,父线程为平台线程,那么使用默认的调度器    // choose scheduler if not specified    if (scheduler == null) {        Thread parent = Thread.currentThread();        if (parent instanceof VirtualThread vparent) {            scheduler = vparent.scheduler;        } else {            scheduler = DEFAULT_SCHEDULER;        }    }    // 赋值调度器    this.scheduler = scheduler;    // 封装和初始化Continuation    this.cont = new VThreadContinuation(this, task);    // 初始化Continuation的Runnable包装器,最终提交到调度器中执行    this.runContinuation = this::runContinuation;}// 虚拟线程Continuation的专有子类,默认为ContinuationScope("VirtualThreads"),从而实现Continuation.enter()执行时候实际上执行的是VirtualThread.run()方法// 也就是 Runnable.run()[runContinuation by carrier thread from executor] --> Continuation.run() --> Continuation.enter() --> VirtualThread.run() --> Runnable.run()[user task]private static class VThreadContinuation extends Continuation {    VThreadContinuation(VirtualThread vthread, Runnable task) {        super(VTHREAD_SCOPE, () -> vthread.run(task));    }    // pin之前回调的方法,基于TRACE_PINNING_MODE的返回值决定pinned线程栈的打印详略    @Override    protected void onPinned(Continuation.Pinned reason) {        if (TRACE_PINNING_MODE > 0) {            boolean printAll = (TRACE_PINNING_MODE == 1);            PinnedThreadPrinter.printStackTrace(System.out, printAll);        }    }}// 在当前线程上运行或继续Continuation的执行,必须由平台线程运行此方法,最终会封装为Runnble包装器提交到执行器中运行private void runContinuation() {    // the carrier must be a platform thread    if (Thread.currentThread().isVirtual()) {        throw new WrongThreadException();    }    // set state to RUNNING    boolean firstRun;    int initialState = state();    // 当前为STARTED状态并且CAS更新为RUNNING状态则标记首次运行为true    if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) {        // first run        firstRun = true;    } else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) {        // 当前为RUNNABLE状态并且CAS更新为RUNNING状态则标记首次运行为false,并且设置park许可为false        // consume parking permit        setParkPermit(false);        firstRun = false;    } else {        // not runnable        return;    }    // notify JVMTI before mount    if (notifyJvmtiEvents) notifyJvmtiMountBegin(firstRun);    try {        // 执行Continuation.run()        cont.run();    } finally {        // Continuation执行完成,回调钩子方法afterTerminate        if (cont.isDone()) {            afterTerminate(/*executed*/ true);        } else {            // Continuation没有执行完成,说明调用了Continuation.yield或者pin到运载线程中进行了park操作            afterYield();        }    }}// Continuation执行完成回调的钩子方法private void afterTerminate(boolean executed) {    assert (state() == TERMINATED) && (carrierThread == null);    if (executed) {        if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(true);    }    // 如果有其他线程阻塞等待虚拟线程的返回,例如调用了join方法,那么在这里解除阻塞    CountDownLatch termination = this.termination;    if (termination != null) {        assert termination.getCount() == 1;        termination.countDown();    }    // 如果执行成功则通知线程容器当前线程实例退出,清空线程本地变量引用    if (executed) {        // notify container if thread executed        threadContainer().onExit(this);        // clear references to thread locals        clearReferences();    }}// 由于Continuation的yield操作或者调用了Thread.yield()导致Continuation挂起,需要重新把Continuation的包装器"懒提交"到调度器中private void afterYield() {    int s = state();    assert (s == PARKING || s == YIELDING) && (carrierThread == null);    // 如果是PARKING状态,这种对应于Continuation的yield操作调用    if (s == PARKING) {        // 更变为PARKED状态        setState(PARKED);        // notify JVMTI that unmount has completed, thread is parked        if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false);        // 得到park许可,并且CAS为RUNNABLE状态        if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) {            // 进行懒提交,如果可能的话,用当前线程作为运载线程继续执行任务            lazySubmitRunContinuation();        }    } else if (s == YIELDING) {   // 如果是YIELDING状态,这种对应于调用了Thread.yield        // 更变为RUNNABLE状态        setState(RUNNABLE);        // notify JVMTI that unmount has completed, thread is runnable        if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false);        // 进行懒提交,如果可能的话,用当前线程作为运载线程继续执行任        lazySubmitRunContinuation();    }}

经验总结扩展阅读