查看原文
其他

.NET ThreadPool 实现(下)

黑洞视界 微软开发者MSDN 2023-01-21

点击上方蓝字

关注我们

(本文阅读时间:20分钟)

上文《.NET ThreadPool 实现(上)》介绍了线程池调度任务的机制,交给线程池的任务会被放到全局队列或者本地队列中,最终由线程池中的 Worker Thread 去执行任务。接下来就和大家介绍一下线程池是如何去管理这些 Worker Thread 的生命周期的。
为了更方便的解释线程管理的机制,这边使用下面使用一些代码做演示。
代码参考自
  • https://devblogs.microsoft.com/dotnet/performance-improvements-in-net-6/#threading




线程注入实验

Task.Run 会将 Task 调度到线程池中执行,下面的示例代码中等效于 ThreadPool.QueueUserWorkItem(WaitCallback callBack),会把 Task 放到队列系统的全局队列中(顺便一提,如果在一个线程池线程中执行 Task.Run 会将 Task 调度到此线程池线程的本地队列中)。

.NET 5 实验一 默认线程池配置
static void Main(string[] args){ var sw = Stopwatch.StartNew(); var tcs = new TaskCompletionSource(); var tasks = new List<Task>(); for (int i = 1; i <= Environment.ProcessorCount * 2; i++) { int id = i; Console.WriteLine($"Loop Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}"); tasks.Add(Task.Run(() => { Console.WriteLine($"Task Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}"); tcs.Task.Wait(); })); }
tasks.Add(Task.Run(() => { Console.WriteLine($"Task SetResult | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}"); tcs.SetResult(); })); Task.WaitAll(tasks.ToArray()); Console.WriteLine($"Done: | {sw.Elapsed.TotalSeconds:0.000}");}
static int GetBusyThreads(){ ThreadPool.GetAvailableThreads(out var available, out _); ThreadPool.GetMaxThreads(out var max, out _); return max - available;}


首先在代码在 .NET 5 环境中运行以下代码,CPU 逻辑核心数 12。
Loop Id: 01 | 0.000 | Busy Threads: 0Loop Id: 02 | 0.112 | Busy Threads: 1…过长省略Task Id: 11 | 0.114 | Busy Threads: 12Task Id: 12 | 0.114 | Busy Threads: 12Task Id: 13 | 1.091 | Busy Threads: 13Task Id: 14 | 1.594 | Busy Threads: 14Task Id: 15 | 2.099 | Busy Threads: 15Task Id: 16 | 3.102 | Busy Threads: 16Task Id: 17 | 3.603 | Busy Threads: 17Task Id: 18 | 4.107 | Busy Threads: 18Task Id: 19 | 4.611 | Busy Threads: 19Task Id: 20 | 5.113 | Busy Threads: 20Task Id: 21 | 5.617 | Busy Threads: 21Task Id: 22 | 6.122 | Busy Threads: 22Task Id: 23 | 7.128 | Busy Threads: 23Task Id: 24 | 7.632 | Busy Threads: 24Task SetResult | 8.135 | Busy Threads: 25Done: | 8.136


Task.Run 会把 Task 调度到线程池上执行,前 24 个 task 都会被阻塞住,直到第 25 个被执行。每次都会打印出当前线程池中正在执行任务的线程数(也就是创建完成的线程数)。
可以观察到以下结果:
  • 前几次循环,线程随着 Task 数量递增,后面几次循环直到循环结束为止,线程数一直维持在 12 没有发生变化。
  • 线程数在达到 12 之前,零间隔时间增加。第 12 到 第 13 线程间隔 1s 不到,往后约 500ms 增加一个线程。

.NET 5 实验二 调整 ThreadPool 设置

在上面的代码最前面加入以下两行代码,继续在 .NET 5 环境运行一次。
ThreadPool.GetMinThreads(out int defaultMinThreads, out int completionPortThreads);Console.WriteLine($"DefaultMinThreads: {defaultMinThreads}");ThreadPool.SetMinThreads(14, completionPortThreads);


运行结果如下
DefaultMinThreads: 12Loop Id: 01 | 0.000 | Busy Threads: 0Loop Id: 02 | 0.003 | Busy Threads: 1…过长省略Task Id: 14 | 0.005 | Busy Threads: 14Task Id: 15 | 0.982 | Busy Threads: 15Task Id: 16 | 1.486 | Busy Threads: 16Task Id: 17 | 1.991 | Busy Threads: 17Task Id: 18 | 2.997 | Busy Threads: 18Task Id: 19 | 3.501 | Busy Threads: 19Task Id: 20 | 4.004 | Busy Threads: 20Task Id: 21 | 4.509 | Busy Threads: 21Task Id: 22 | 5.014 | Busy Threads: 22Task Id: 23 | 5.517 | Busy Threads: 23Task Id: 24 | 6.021 | Busy Threads: 24Task SetResult | 6.522 | Busy Threads: 25Done: | 6.523

在调整完线程池的最小线程数量之后,线程注入速度发生转折的时间点从第 12(默认min threads) 个线程换到了第 14(修改后的min threads)个线程。

整体时间也从 8s 缩到 6s。

.NET 5 实验三 tcs.Task.Wait() 改为 Thread.Sleep
static void Main(string[] args){ var sw = Stopwatch.StartNew(); var tasks = new List<Task>(); for (int i = 1; i <= Environment.ProcessorCount * 2; i++) { int id = i; Console.WriteLine( $"Loop Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}"); tasks.Add(Task.Run(() => { Console.WriteLine( $"Task Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}"); Thread.Sleep(Environment.ProcessorCount * 1000); })); }
Task.WhenAll(tasks.ToArray()).ContinueWith(_ => { Console.WriteLine($"Done: | {sw.Elapsed.TotalSeconds:0.000}"); }); Console.ReadLine();}


Loop Id: 01 | 0.000 | Busy Threads: 0Loop Id: 02 | 0.027 | Busy Threads: 1…过长省略Task Id: 07 | 0.029 | Busy Threads: 12Task Id: 13 | 1.018 | Busy Threads: 13Task Id: 14 | 1.522 | Busy Threads: 14Task Id: 15 | 2.025 | Busy Threads: 15Task Id: 16 | 2.530 | Busy Threads: 16Task Id: 17 | 3.530 | Busy Threads: 17Task Id: 18 | 4.035 | Busy Threads: 18Task Id: 19 | 4.537 | Busy Threads: 19Task Id: 20 | 5.040 | Busy Threads: 20Task Id: 21 | 5.545 | Busy Threads: 21Task Id: 22 | 6.048 | Busy Threads: 22Task Id: 23 | 7.049 | Busy Threads: 23Task Id: 24 | 8.056 | Busy Threads: 24Done: | 20.060


达到 min threads (默认12)之后,线程注入速度明显变慢,最快间隔 500ms。

.NET 6 实验一 默认 ThreadPool 设置
将 .NET 5 实验一的代码在 .NET 6 执行一次
Loop Id: 01 | 0.001 | Busy Threads: 0Loop Id: 02 | 0.018 | Busy Threads: 1…过长省略Task Id: 21 | 0.020 | Busy Threads: 24Task Id: 23 | 0.020 | Busy Threads: 24Task Id: 22 | 0.020 | Busy Threads: 24Task Id: 24 | 0.020 | Busy Threads: 24Task SetResult | 0.045 | Busy Threads: 25Done: | 0.046

与实验一相比,虽然线程数仍然停留在 12 了一段时间,但随后线程就立即增长了,后文会介绍 .NET 6 在这方面做出的改进。

.NET 6 实验二 调整 ThreadPool 设置
将 .NET 5 实验二的代码在 .NET 6 中执行一次
DefaultMinThreads: 12Loop Id: 01 | 0.001 | Busy Threads: 0Loop Id: 02 | 0.014 | Busy Threads: 1…过长省略Task Id: 23 | 0.018 | Busy Threads: 26Task Id: 24 | 0.018 | Busy Threads: 26Task SetResult | 0.018 | Busy Threads: 25Done: | 0.019


前半部分有部分日志乱序,可以看到,与实验三一样,维持在最大线程数一小段时间之后,立即就开始了线程增长。

.NET 6 实验三 tcs.Task.Wait() 改为 Thread.Sleep
将 .NET 5 实验三的代码在 .NET 6 中执行一次
Loop Id: 01 | 0.003 | Busy Threads: 0Loop Id: 02 | 0.024 | Busy Threads: 1…过长省略Task Id: 07 | 0.026 | Busy Threads: 12Task Id: 12 | 0.026 | Busy Threads: 12Task Id: 13 | 1.026 | Busy Threads: 13Task Id: 14 | 2.027 | Busy Threads: 14Task Id: 15 | 3.028 | Busy Threads: 15Task Id: 16 | 4.030 | Busy Threads: 16Task Id: 17 | 5.031 | Busy Threads: 17Task Id: 18 | 6.032 | Busy Threads: 18Task Id: 19 | 6.533 | Busy Threads: 19Task Id: 20 | 7.035 | Busy Threads: 20Task Id: 21 | 8.036 | Busy Threads: 21Task Id: 22 | 8.537 | Busy Threads: 22Task Id: 23 | 9.538 | Busy Threads: 23Task Id: 24 | 10.039 | Busy Threads: 24Done: | 22.041

结果与 .NET 5 的实验三相差不大。



线程注入

对照上述的几组实验结果,接下来以 .NET 6 中 C# 实现的 ThreadPool 作为资料来理解一下线程注入的几个阶段(按个人理解进行的划分,仅供参考)。

第一个线程的出现
随着任务被调度到队列上,第一个线程被创建出来。
下面是线程池在执行第一个任务的时候的代码摘要,涉及到计数的并执行相关处理的地方,代码都使用了 while(xxx) + Interlocked 的方式来进行并发控制,可以理解成乐观锁。这一阶段,实际上我们只需要关注到 ThreadPoolWorkQueue.EnsureThreadRequested 方法就行了。

可利用 Rider 的反编译 Debug 功能帮助我们学习。
下面是第一个 Task.Run 的代码执行路径

注意:执行环节是 Main Thread

* 请前往阅读原文查看代码示例

达到线程数量目标(NumThreadsGoal) 之前的线程数增长

细心的朋友会发现上面代码里 EnsureThreadRequested 方法有一个终止条件,_separated.numOutstandingThreadRequests == Environment.ProcessorCount,每次新增一个 ThreadRequested,这个数就会 +1,似乎允许创建的最大 Worker Thread 是 Environment.ProcessorCount?
其实 ThreadPoolWorkQueue 维护的 NumOutstandingThreadRequests 这个值会在线程池线程真正跑起来之后,会在 ThreadPoolWorkQueue.Dispatch 方法中 -1。也就是说,只要有一个线程真正运行起来了,就能创建第 Environment.ProcessorCount + 1 个 Thread。当然,在向 ThreadPoolWorkQueue 加入第13个任务的时候,第13个 Worker Thread 就算不允许创建也没关系,因为任务已经入队了,会被运行起来的 Worker Thread 取走。
PortableThreadPool里维护了一个计数器 PortableThreadPool.ThreadPoolInstance._separated.counts,记录了 Worker Thread 相关的三个数值:
  • NumProcessingWork:当前正在执行任务的 Worker Thread。
  • NumExistingThreads:当前线程池中实际有的 Worker Thread。
  • NumThreadsGoal:当前允许创建的最大 Worker Thread,初始值为 min threads,最大值受限于 max threads。
min threads 初始值:运行环境 CPU 核心数,可通过 ThreadPool.SetMinThreads 进行设置,参数有效范围是 [1, max threads]。
max threads 初始值:32位平台 1023,64位平台 short.MaxValue,可通过 ThreadPool.SetMaxThreads 进行设置。
核心的变量就是这个 NumThreadsGoal 了,它会在下面几种情况中被更新,后文会补充说明:
  • 更新 ThreadPool 的 min threads 或 max threads 时可能会更新 NumThreadsGoal。
  • 避免饥饿机制(Starvation Avoidance)里的 GateThread 会更新 NumThreadsGoal。
  • 有 Worker Thread 被同步代码阻塞时 NumThreadsGoal 可能会被更新以避免 Worker Thread 不够用,这是.NET6开始新增的逻辑。
  • 爬山算法根据 ThreadPool 吞吐量态更新 NumThreadsGoal。
internal class PortableThreadPool {
public static readonly PortableThreadPool ThreadPoolInstance = new PortableThreadPool();
private CacheLineSeparated _separated;
private struct CacheLineSeparated { public ThreadCounts counts; }
/// <summary> /// Tracks information on the number of threads we want/have in different states in our thread pool. /// </summary> private struct ThreadCounts { /// <summary> /// Number of threads processing work items. /// </summary> public short NumProcessingWork { get; set; }
/// <summary> /// Number of thread pool threads that currently exist. /// </summary> public short NumExistingThreads { get; set; }
// <summary> /// Max possible thread pool threads we want to have. /// </summary> public short NumThreadsGoal { get; set; } } }

避免饥饿机制(Starvation Avoidance)

上面讲到,随着任务进入队列系统,Worker Thread 将随之增长,直到达到 NumThreadsGoal。

NumThreadsGoal 是12,前 12 个线程都被堵住了,加入到队列系统的第 13 个任务没办法被这前 12 个线程领走执行。
在这种情况下,线程池的 Starvation Avoidance 机制就起到作用了。
在上述所说的第一个阶段,除了线程池中的第一个线程会被创建之外,GateThread 也会随之被初始化。在第一阶段的代码摘录中,可以看到 GateThread 的初始化。
internal sealed class PortableThreadPool{ public static readonly PortableThreadPool ThreadPoolInstance = new PortableThreadPool();
internal void RequestWorker() { Interlocked.Increment(ref _separated.numRequestedWorkers); WorkerThread.MaybeAddWorkingWorker(this); // 初始化 GateThread GateThread.EnsureRunning(this); }}


在 GateThread 是一个独立的线程,每隔 500ms 进行检查一下,如果 NumProcessingWork >= NumThreadsGoal(WorkerThread.MaybeAddWorkingWorker 不添加 Worker Thread 的判断条件),就设置新的 NumThreadsGoal = NumProcessingWork + 1,并调用 WorkerThread.MaybeAddWorkingWorker,这样新的 Worker Thread 就可以被 WorkerThread.MaybeAddWorkingWorker 创建。
这就解释了,为什么 .NET 5 实验一、二在线程数达到min threads(NumThreadsGoal 的默认值)之后,后面 Worker Thread 的增长是每 500ms 一个。

由于在第三阶段中,线程的增长会比较缓慢,有经验的开发会在应用启动的时候设置一个较大的 min threads,使其较晚或不进入第三阶段。



线程注入在 .NET 6 中的改进
.NET 6 与 .NET 5 的实验二相比,达到 min threads 之后,线程的增长速度有明显的差异,而两者的实验三却相差不大。
.NET 6 对于 Task.Wait 导致线程池线程阻塞的场景进行了优化,但如果并非此原因导致的线程数不够用,依旧是 Starvation Avoidance 的策略。
新的 ThreadPool 提供了一个 ThreadPool.NotifyThreadBlocked 的内部接口,里面会调用 GateThread.Wake 去唤醒 GateThread 本来 500ms 执行一次的逻辑,这 500ms 的间隔时间是通过 AutoResetEvent 实现的,所以 GateThread.Wake 也很简单。
关键代码示意,非真实代码:
internal class PortableThreadPool{ public bool NotifyThreadBlocked() { // ... GateThread.Wake(this); return true; }
private static class GateThread { private static readonly AutoResetEvent DelayEvent = new AutoResetEvent(initialState: false);
// GateThread 入口方法 private static void GateThreadStart() { while(true) { DelayEvent.WaitOne(500); // ... } }
public static void Wake(PortableThreadPool threadPoolInstance) { DelayEvent.Set(); EnsureRunning(threadPoolInstance); } }





爬山算法(Hill Climbing)
除了上述介绍的线程注入机制外,从CLR 4.0开始,线程池内实现了一个根据采集到线程池吞吐率数据(每次任务完成时记录数据),推导出该算法认为最优的线程池线程数量。
算法实现位于 HillClimbing.ThreadPoolHillClimber.Update,有兴趣的朋友可以去看一下。
public (int newThreadCount, int newSampleMs) Update(int currentThreadCount, double sampleDurationSeconds, int numCompletions)


  • currentThreadCount:当前线程数
  • sampleDurationSeconds:采样间隔
  • numCompletions:这段采样时间间隔内完成的任务数
  • newThreadCount:新的线程数
  • newSample:新的采样间隔时间



不必要线程的销毁

如果线程需要被移除的时候,本地队列还存在待执行任务,则会将这些任务转移到全局队列中。
在以下几个场景中,线程池将会销毁掉不需要的线程,并不一定全面,只限于我当前认知。

  • 在无法从队列系统领取到任务时。

  • 通过爬山算法认定当前线程属于多余线程时。




小结
Worker Thread 的数量会随着进入 ThreadPool 的任务数量增加,直至 Worker Thread 的数量达到 NumThreadsGoal。
NumThreadsGoal 可能会在下述情况中更新:
  • 更新 ThreadPool 的 min threads 或 max threads 时。
  • 避免饥饿机制(Starvation Avoidance)。
  • 有 Worker Thread 被同步代码阻塞时。
  • 爬山算法的动态更新。

Worker Thread 无任务可执行及被爬山算法判定为多余时会被销毁。



总结
交给线程池去执行的任务会进入线程池的队列系统最终交给 Worker Thread 去执行。
线程池会根据线程池中任务的执行情况去动态的调整 Worker Thread 的创建与销毁。

参考资料

  • https://www.codeproject.com/Articles/3813/NET-s-ThreadPool-Class-Behind-The-Scenes

  • https://devblogs.microsoft.com/dotnet/performance-improvements-in-net-6/#threading

  • https://mattwarren.org/2017/04/13/The-CLR-Thread-Pool-Thread-Injection-Algorithm/

  • https://docs.microsoft.com/zh-CN/previous-versions/msp-n-p/ff963549(v=pandp.10)?redirectedfrom=MSDN#thread-injection

点击「阅读原文」查看原博客代码~

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存