虚拟线程 - VirtualThread源码透视( 七 )

这里唯一的构造函数是比较复杂的,抛开一些钩子接口,最终想达到的效果就是:
Runnable.run()[runContinuation by carrier thread from executor] --> Continuation.run() --> Continuation.enter() --> VirtualThread.run() --> Runnable.run()[user task]用户任务实际被包裹了很多层,在最里面一层才会回调 。VirtualThread中提供了两个静态全局的线程池实例,一个用于调度,一个用于唤醒,这里看看两个线程池是如何构造的:
// java.lang.VirtualThreadprivate static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();// 创建默认的调度器private static ForkJoinPool createDefaultScheduler() {    // 线程工厂,默认创建CarrierThread实例,CarrierThread是ForkJoinWorkerThread的一个子类    ForkJoinWorkerThreadFactory factory = pool -> {        PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);        return AccessController.doPrivileged(pa);    };    PrivilegedAction<ForkJoinPool> pa = () -> {        int parallelism, maxPoolSize, minRunnable;        String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");        String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");        String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");        if (parallelismValue != null) {            parallelism = Integer.parseInt(parallelismValue);        } else {            parallelism = Runtime.getRuntime().availableProcessors();        }        if (maxPoolSizeValue != null) {            maxPoolSize = Integer.parseInt(maxPoolSizeValue);            parallelism = Integer.min(parallelism, maxPoolSize);        } else {            maxPoolSize = Integer.max(parallelism, 256);        }        if (minRunnableValue != null) {            minRunnable = Integer.parseInt(minRunnableValue);        } else {            minRunnable = Integer.max(parallelism / 2, 1);        }        Thread.UncaughtExceptionHandler handler = (t, e) -> { };        boolean asyncMode = true; // FIFO        return new ForkJoinPool(parallelism, factory, handler, asyncMode,                        0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);    };    return AccessController.doPrivileged(pa);}// 创建调度线程池,用于虚拟线程带超时时间的unpark操作private static ScheduledExecutorService createDelayedTaskScheduler() {    String propValue = GetPropertyAction.privilegedGetProperty("jdk.unparker.maxPoolSize");    int poolSize;    if (propValue != null) {        poolSize = Integer.parseInt(propValue);    } else {        // 确保至少有一个工作线程        poolSize = 1;    }    ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)        Executors.newScheduledThreadPool(poolSize, task -> {            return InnocuousThread.newThread("VirtualThread-unparker", task);        });    // 任务取消后马上从工作队列移除    stpe.setRemoveOnCancelPolicy(true);    return stpe;}

经验总结扩展阅读