虚拟线程 - VirtualThread源码透视( 八 )

对于默认调度器(DEFAULT_SCHEDULER)的创建,它是一个ForkJoinPool实例,构造参数的选取如下:

  • parallelism参数由系统变量jdk.virtualThreadScheduler.parallelism决定,默认值为Runtime.getRuntime().availableProcessors(),如果配置了系统参数jdk.virtualThreadScheduler.maxPoolSize则取min(parallelism,maxPoolSize)
  • maxPoolSize参数由系统变量jdk.virtualThreadScheduler.maxPoolSize决定,默认值为min(parallelism, maxPoolSize)
  • minRunnable参数由系统变量jdk.virtualThreadScheduler.minRunnable决定,默认值为max(parallelism / 2, 1)
  • asyncMode参数固定值true,也就是选用FIFO模式
  • keepAliveTime参数为固定值30秒
  • saturate参数在JDK17引入,是一个Predicate函数,在此固定返回true,用于忽略minRunnable值允许线程池饱和
  • 线程工厂用于创建CarrierThread实例,CarrierThread是ForkJoinWorkerThread的子类
在Intel 4C8T开发机器环境中,该ForkJoinPool实例创建时候的几个参数分别为:parallelism = 8, maxPoolSize = 256, minRunnable = 4 。
对于调度线程池(UNPARKER)的创建,它是一个ScheduledThreadPoolExecutor实例,构造参数的选取如下:
  • corePoolSize参数由系统变量jdk.unparker.maxPoolSize决定,并且确保最小值为1
  • 线程工厂用于创建InnocuousThread实例,线程名称为VirtualThread-unparker
接着看虚拟线程的启动方法start():
// java.lang.VirtualThread@Overridepublic void start() {    start(ThreadContainers.root());}// 调度虚拟线程让之运行@Overridevoid start(ThreadContainer container) {    // CAS由NEW转换为STARTED状态    if (!compareAndSetState(NEW, STARTED)) {        throw new IllegalThreadStateException("Already started");    }    // 绑定当前虚拟线程到线程容器    setThreadContainer(container);    // 标记为未启动    boolean started = false;    // 回调start钩子方法    container.onStart(this); // may throw    try {        // 从给定容器继承extent-local绑定参数        inheritExtentLocalBindings(container);        // 提交'runContinuation'任务到调度器        submitRunContinuation();        // 标记为启动完成        started = true;    } finally {        // 如果启动失败,则标记最终状态和回调终结钩子方法        if (!started) {            setState(TERMINATED);            container.onExit(this);            afterTerminate(/*executed*/ false);        }    }}// 提交'runContinuation'任务到调度器private void submitRunContinuation() {    submitRunContinuation(false);}// 提交'runContinuation'任务到调度器,lazySubmit参数决定是否"懒提交"private void submitRunContinuation(boolean lazySubmit) {    try {        if (lazySubmit && scheduler instanceof ForkJoinPool pool) {            // ForkJoinPool类型调度器并且lazySubmit为true,对runContinuation这个Runnable实例适配为ForkJoinTask类型,进行"懒提交"到ForkJoinPool            pool.lazySubmit(ForkJoinTask.adapt(runContinuation));        } else {            // 非ForkJoinPool类型调度器或者lazySubmit为false,直接使用Executor.execute()提交任务            scheduler.execute(runContinuation);        }    } catch (RejectedExecutionException ree) {        // 线程池拒绝接收任务,发布提交失败事件到JVM        var event = new VirtualThreadSubmitFailedEvent();        if (event.isEnabled()) {            event.javaThreadId = threadId();            event.exceptionMessage = ree.getMessage();            event.commit();        }        throw ree;    }}

经验总结扩展阅读