对于默认调度器(DEFAULT_SCHEDULER)的创建,它是一个ForkJoinPool实例,构造参数的选取如下:
- parallelism参数由系统变量jdk.virtualThreadScheduler.parallelism决定,默认值为Runtime.getRuntime().availableProcessors(),如果配置了系统参数jdk.virtualThreadScheduler.maxPoolSize则取min(parallelism,maxPoolSize)
- maxPoolSize参数由系统变量jdk.virtualThreadScheduler.maxPoolSize决定,默认值为min(parallelism, maxPoolSize)
- minRunnable参数由系统变量jdk.virtualThreadScheduler.minRunnable决定,默认值为max(parallelism / 2, 1)
- asyncMode参数固定值true,也就是选用FIFO模式
- keepAliveTime参数为固定值30秒
- saturate参数在JDK17引入,是一个Predicate函数,在此固定返回true,用于忽略minRunnable值允许线程池饱和
- 线程工厂用于创建CarrierThread实例,CarrierThread是ForkJoinWorkerThread的子类
对于调度线程池(UNPARKER)的创建,它是一个ScheduledThreadPoolExecutor实例,构造参数的选取如下:
- corePoolSize参数由系统变量jdk.unparker.maxPoolSize决定,并且确保最小值为1
- 线程工厂用于创建InnocuousThread实例,线程名称为VirtualThread-unparker
// java.lang.VirtualThread@Overridepublic void start() { start(ThreadContainers.root());}// 调度虚拟线程让之运行@Overridevoid start(ThreadContainer container) { // CAS由NEW转换为STARTED状态 if (!compareAndSetState(NEW, STARTED)) { throw new IllegalThreadStateException("Already started"); } // 绑定当前虚拟线程到线程容器 setThreadContainer(container); // 标记为未启动 boolean started = false; // 回调start钩子方法 container.onStart(this); // may throw try { // 从给定容器继承extent-local绑定参数 inheritExtentLocalBindings(container); // 提交'runContinuation'任务到调度器 submitRunContinuation(); // 标记为启动完成 started = true; } finally { // 如果启动失败,则标记最终状态和回调终结钩子方法 if (!started) { setState(TERMINATED); container.onExit(this); afterTerminate(/*executed*/ false); } }}// 提交'runContinuation'任务到调度器private void submitRunContinuation() { submitRunContinuation(false);}// 提交'runContinuation'任务到调度器,lazySubmit参数决定是否"懒提交"private void submitRunContinuation(boolean lazySubmit) { try { if (lazySubmit && scheduler instanceof ForkJoinPool pool) { // ForkJoinPool类型调度器并且lazySubmit为true,对runContinuation这个Runnable实例适配为ForkJoinTask类型,进行"懒提交"到ForkJoinPool pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); } else { // 非ForkJoinPool类型调度器或者lazySubmit为false,直接使用Executor.execute()提交任务 scheduler.execute(runContinuation); } } catch (RejectedExecutionException ree) { // 线程池拒绝接收任务,发布提交失败事件到JVM var event = new VirtualThreadSubmitFailedEvent(); if (event.isEnabled()) { event.javaThreadId = threadId(); event.exceptionMessage = ree.getMessage(); event.commit(); } throw ree; }}
经验总结扩展阅读
- 第一篇 TTD 专题 :C# 那些短命线程都在干什么?
- Java并发编程 | 从进程、线程到并发问题实例解决
- 七 Netty 学习:NioEventLoop 对应线程的创建和启动源码说明
- 云原生虚拟网络 tun/tap & veth-pair
- 补充部分---ScheduledThreadPoolExecutor类分析 线程池底层原理详解与源码分析
- 附Anaconda安装包 Anaconda安装和卸载+虚拟环境Tensorflow安装以及末尾问题大全,这一篇就够了!!!
- 虚拟发货怎么确认收货
- 建议收藏 Java线程同步的四种方式详解
- 用AR Engine手部骨骼跟踪能力实现虚拟手表试戴
- 通过Thread Pool Executor类解析线程池执行任务的核心流程