这里唯一的构造函数是比较复杂的,抛开一些钩子接口,最终想达到的效果就是:
Runnable.run()[runContinuation by carrier thread from executor] --> Continuation.run() --> Continuation.enter() --> VirtualThread.run() --> Runnable.run()[user task]用户任务实际被包裹了很多层,在最里面一层才会回调 。VirtualThread中提供了两个静态全局的线程池实例,一个用于调度,一个用于唤醒,这里看看两个线程池是如何构造的:
// java.lang.VirtualThreadprivate static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();// 创建默认的调度器private static ForkJoinPool createDefaultScheduler() { // 线程工厂,默认创建CarrierThread实例,CarrierThread是ForkJoinWorkerThread的一个子类 ForkJoinWorkerThreadFactory factory = pool -> { PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool); return AccessController.doPrivileged(pa); }; PrivilegedAction<ForkJoinPool> pa = () -> { int parallelism, maxPoolSize, minRunnable; String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); if (parallelismValue != null) { parallelism = Integer.parseInt(parallelismValue); } else { parallelism = Runtime.getRuntime().availableProcessors(); } if (maxPoolSizeValue != null) { maxPoolSize = Integer.parseInt(maxPoolSizeValue); parallelism = Integer.min(parallelism, maxPoolSize); } else { maxPoolSize = Integer.max(parallelism, 256); } if (minRunnableValue != null) { minRunnable = Integer.parseInt(minRunnableValue); } else { minRunnable = Integer.max(parallelism / 2, 1); } Thread.UncaughtExceptionHandler handler = (t, e) -> { }; boolean asyncMode = true; // FIFO return new ForkJoinPool(parallelism, factory, handler, asyncMode, 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); }; return AccessController.doPrivileged(pa);}// 创建调度线程池,用于虚拟线程带超时时间的unpark操作private static ScheduledExecutorService createDelayedTaskScheduler() { String propValue = GetPropertyAction.privilegedGetProperty("jdk.unparker.maxPoolSize"); int poolSize; if (propValue != null) { poolSize = Integer.parseInt(propValue); } else { // 确保至少有一个工作线程 poolSize = 1; } ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(poolSize, task -> { return InnocuousThread.newThread("VirtualThread-unparker", task); }); // 任务取消后马上从工作队列移除 stpe.setRemoveOnCancelPolicy(true); return stpe;}
经验总结扩展阅读
- 第一篇 TTD 专题 :C# 那些短命线程都在干什么?
- Java并发编程 | 从进程、线程到并发问题实例解决
- 七 Netty 学习:NioEventLoop 对应线程的创建和启动源码说明
- 云原生虚拟网络 tun/tap & veth-pair
- 补充部分---ScheduledThreadPoolExecutor类分析 线程池底层原理详解与源码分析
- 附Anaconda安装包 Anaconda安装和卸载+虚拟环境Tensorflow安装以及末尾问题大全,这一篇就够了!!!
- 虚拟发货怎么确认收货
- 建议收藏 Java线程同步的四种方式详解
- 用AR Engine手部骨骼跟踪能力实现虚拟手表试戴
- 通过Thread Pool Executor类解析线程池执行任务的核心流程