.NET ThreadPool 实现(下)
点击上方蓝字
关注我们
(本文阅读时间:20分钟)
代码参考自
https://devblogs.microsoft.com/dotnet/performance-improvements-in-net-6/#threading
线程注入实验
Task.Run 会将 Task 调度到线程池中执行,下面的示例代码中等效于 ThreadPool.QueueUserWorkItem(WaitCallback callBack),会把 Task 放到队列系统的全局队列中(顺便一提,如果在一个线程池线程中执行 Task.Run 会将 Task 调度到此线程池线程的本地队列中)。
static void Main(string[] args)
{
var sw = Stopwatch.StartNew();
var tcs = new TaskCompletionSource();
var tasks = new List<Task>();
for (int i = 1; i <= Environment.ProcessorCount * 2; i++)
{
int id = i;
Console.WriteLine($"Loop Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}");
tasks.Add(Task.Run(() =>
{
Console.WriteLine($"Task Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}");
tcs.Task.Wait();
}));
}
tasks.Add(Task.Run(() =>
{
Console.WriteLine($"Task SetResult | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}");
tcs.SetResult();
}));
Task.WaitAll(tasks.ToArray());
Console.WriteLine($"Done: | {sw.Elapsed.TotalSeconds:0.000}");
}
static int GetBusyThreads()
{
ThreadPool.GetAvailableThreads(out var available, out _);
ThreadPool.GetMaxThreads(out var max, out _);
return max - available;
}
Loop Id: 01 | 0.000 | Busy Threads: 0
Loop Id: 02 | 0.112 | Busy Threads: 1
…过长省略
Task Id: 11 | 0.114 | Busy Threads: 12
Task Id: 12 | 0.114 | Busy Threads: 12
Task Id: 13 | 1.091 | Busy Threads: 13
Task Id: 14 | 1.594 | Busy Threads: 14
Task Id: 15 | 2.099 | Busy Threads: 15
Task Id: 16 | 3.102 | Busy Threads: 16
Task Id: 17 | 3.603 | Busy Threads: 17
Task Id: 18 | 4.107 | Busy Threads: 18
Task Id: 19 | 4.611 | Busy Threads: 19
Task Id: 20 | 5.113 | Busy Threads: 20
Task Id: 21 | 5.617 | Busy Threads: 21
Task Id: 22 | 6.122 | Busy Threads: 22
Task Id: 23 | 7.128 | Busy Threads: 23
Task Id: 24 | 7.632 | Busy Threads: 24
Task SetResult | 8.135 | Busy Threads: 25
Done: | 8.136
前几次循环,线程随着 Task 数量递增,后面几次循环直到循环结束为止,线程数一直维持在 12 没有发生变化。 线程数在达到 12 之前,零间隔时间增加。第 12 到 第 13 线程间隔 1s 不到,往后约 500ms 增加一个线程。
▍.NET 5 实验二 调整 ThreadPool 设置
ThreadPool.GetMinThreads(out int defaultMinThreads, out int completionPortThreads);
Console.WriteLine($"DefaultMinThreads: {defaultMinThreads}");
ThreadPool.SetMinThreads(14, completionPortThreads);
DefaultMinThreads: 12
Loop Id: 01 | 0.000 | Busy Threads: 0
Loop Id: 02 | 0.003 | Busy Threads: 1
…过长省略
Task Id: 14 | 0.005 | Busy Threads: 14
Task Id: 15 | 0.982 | Busy Threads: 15
Task Id: 16 | 1.486 | Busy Threads: 16
Task Id: 17 | 1.991 | Busy Threads: 17
Task Id: 18 | 2.997 | Busy Threads: 18
Task Id: 19 | 3.501 | Busy Threads: 19
Task Id: 20 | 4.004 | Busy Threads: 20
Task Id: 21 | 4.509 | Busy Threads: 21
Task Id: 22 | 5.014 | Busy Threads: 22
Task Id: 23 | 5.517 | Busy Threads: 23
Task Id: 24 | 6.021 | Busy Threads: 24
Task SetResult | 6.522 | Busy Threads: 25
Done: | 6.523
整体时间也从 8s 缩到 6s。
static void Main(string[] args)
{
var sw = Stopwatch.StartNew();
var tasks = new List<Task>();
for (int i = 1; i <= Environment.ProcessorCount * 2; i++)
{
int id = i;
Console.WriteLine(
$"Loop Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}");
tasks.Add(Task.Run(() =>
{
Console.WriteLine(
$"Task Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}");
Thread.Sleep(Environment.ProcessorCount * 1000);
}));
}
Task.WhenAll(tasks.ToArray()).ContinueWith(_ =>
{
Console.WriteLine($"Done: | {sw.Elapsed.TotalSeconds:0.000}");
});
Console.ReadLine();
}
Loop Id: 01 | 0.000 | Busy Threads: 0
Loop Id: 02 | 0.027 | Busy Threads: 1
…过长省略
Task Id: 07 | 0.029 | Busy Threads: 12
Task Id: 13 | 1.018 | Busy Threads: 13
Task Id: 14 | 1.522 | Busy Threads: 14
Task Id: 15 | 2.025 | Busy Threads: 15
Task Id: 16 | 2.530 | Busy Threads: 16
Task Id: 17 | 3.530 | Busy Threads: 17
Task Id: 18 | 4.035 | Busy Threads: 18
Task Id: 19 | 4.537 | Busy Threads: 19
Task Id: 20 | 5.040 | Busy Threads: 20
Task Id: 21 | 5.545 | Busy Threads: 21
Task Id: 22 | 6.048 | Busy Threads: 22
Task Id: 23 | 7.049 | Busy Threads: 23
Task Id: 24 | 8.056 | Busy Threads: 24
Done: | 20.060
达到 min threads (默认12)之后,线程注入速度明显变慢,最快间隔 500ms。
Loop Id: 01 | 0.001 | Busy Threads: 0
Loop Id: 02 | 0.018 | Busy Threads: 1
…过长省略
Task Id: 21 | 0.020 | Busy Threads: 24
Task Id: 23 | 0.020 | Busy Threads: 24
Task Id: 22 | 0.020 | Busy Threads: 24
Task Id: 24 | 0.020 | Busy Threads: 24
Task SetResult | 0.045 | Busy Threads: 25
Done: | 0.046
与实验一相比,虽然线程数仍然停留在 12 了一段时间,但随后线程就立即增长了,后文会介绍 .NET 6 在这方面做出的改进。
DefaultMinThreads: 12
Loop Id: 01 | 0.001 | Busy Threads: 0
Loop Id: 02 | 0.014 | Busy Threads: 1
…过长省略
Task Id: 23 | 0.018 | Busy Threads: 26
Task Id: 24 | 0.018 | Busy Threads: 26
Task SetResult | 0.018 | Busy Threads: 25
Done: | 0.019
前半部分有部分日志乱序,可以看到,与实验三一样,维持在最大线程数一小段时间之后,立即就开始了线程增长。
Loop Id: 01 | 0.003 | Busy Threads: 0
Loop Id: 02 | 0.024 | Busy Threads: 1
…过长省略
Task Id: 07 | 0.026 | Busy Threads: 12
Task Id: 12 | 0.026 | Busy Threads: 12
Task Id: 13 | 1.026 | Busy Threads: 13
Task Id: 14 | 2.027 | Busy Threads: 14
Task Id: 15 | 3.028 | Busy Threads: 15
Task Id: 16 | 4.030 | Busy Threads: 16
Task Id: 17 | 5.031 | Busy Threads: 17
Task Id: 18 | 6.032 | Busy Threads: 18
Task Id: 19 | 6.533 | Busy Threads: 19
Task Id: 20 | 7.035 | Busy Threads: 20
Task Id: 21 | 8.036 | Busy Threads: 21
Task Id: 22 | 8.537 | Busy Threads: 22
Task Id: 23 | 9.538 | Busy Threads: 23
Task Id: 24 | 10.039 | Busy Threads: 24
Done: | 22.041
线程注入
对照上述的几组实验结果,接下来以 .NET 6 中 C# 实现的 ThreadPool 作为资料来理解一下线程注入的几个阶段(按个人理解进行的划分,仅供参考)。
可利用 Rider 的反编译 Debug 功能帮助我们学习。
下面是第一个 Task.Run 的代码执行路径
注意:执行环节是 Main Thread
* 请前往阅读原文查看代码示例
▍达到线程数量目标(NumThreadsGoal) 之前的线程数增长
NumProcessingWork:当前正在执行任务的 Worker Thread。 NumExistingThreads:当前线程池中实际有的 Worker Thread。 NumThreadsGoal:当前允许创建的最大 Worker Thread,初始值为 min threads,最大值受限于 max threads。
max threads 初始值:32位平台 1023,64位平台 short.MaxValue,可通过 ThreadPool.SetMaxThreads 进行设置。
更新 ThreadPool 的 min threads 或 max threads 时可能会更新 NumThreadsGoal。 避免饥饿机制(Starvation Avoidance)里的 GateThread 会更新 NumThreadsGoal。 有 Worker Thread 被同步代码阻塞时 NumThreadsGoal 可能会被更新以避免 Worker Thread 不够用,这是.NET6开始新增的逻辑。 爬山算法根据 ThreadPool 吞吐量态更新 NumThreadsGoal。
internal class PortableThreadPool
{
public static readonly PortableThreadPool ThreadPoolInstance = new PortableThreadPool();
private CacheLineSeparated _separated;
private struct CacheLineSeparated
{
public ThreadCounts counts;
}
/// <summary>
/// Tracks information on the number of threads we want/have in different states in our thread pool.
/// </summary>
private struct ThreadCounts
{
/// <summary>
/// Number of threads processing work items.
/// </summary>
public short NumProcessingWork { get; set; }
/// <summary>
/// Number of thread pool threads that currently exist.
/// </summary>
public short NumExistingThreads { get; set; }
// <summary>
/// Max possible thread pool threads we want to have.
/// </summary>
public short NumThreadsGoal { get; set; }
}
}
▍避免饥饿机制(Starvation Avoidance)
上面讲到,随着任务进入队列系统,Worker Thread 将随之增长,直到达到 NumThreadsGoal。
internal sealed class PortableThreadPool
{
public static readonly PortableThreadPool ThreadPoolInstance = new PortableThreadPool();
internal void RequestWorker()
{
Interlocked.Increment(ref _separated.numRequestedWorkers);
WorkerThread.MaybeAddWorkingWorker(this);
// 初始化 GateThread
GateThread.EnsureRunning(this);
}
}
由于在第三阶段中,线程的增长会比较缓慢,有经验的开发会在应用启动的时候设置一个较大的 min threads,使其较晚或不进入第三阶段。
线程注入在 .NET 6 中的改进
internal class PortableThreadPool
{
public bool NotifyThreadBlocked()
{
// ...
GateThread.Wake(this);
return true;
}
private static class GateThread
{
private static readonly AutoResetEvent DelayEvent = new AutoResetEvent(initialState: false);
// GateThread 入口方法
private static void GateThreadStart()
{
while(true)
{
DelayEvent.WaitOne(500);
// ...
}
}
public static void Wake(PortableThreadPool threadPoolInstance)
{
DelayEvent.Set();
EnsureRunning(threadPoolInstance);
}
}
爬山算法(Hill Climbing)
public (int newThreadCount, int newSampleMs) Update(int currentThreadCount, double sampleDurationSeconds, int numCompletions)
currentThreadCount:当前线程数 sampleDurationSeconds:采样间隔 numCompletions:这段采样时间间隔内完成的任务数 newThreadCount:新的线程数 newSample:新的采样间隔时间
不必要线程的销毁
如果线程需要被移除的时候,本地队列还存在待执行任务,则会将这些任务转移到全局队列中。
在以下几个场景中,线程池将会销毁掉不需要的线程,并不一定全面,只限于我当前认知。
在无法从队列系统领取到任务时。
通过爬山算法认定当前线程属于多余线程时。
小结
更新 ThreadPool 的 min threads 或 max threads 时。 避免饥饿机制(Starvation Avoidance)。 有 Worker Thread 被同步代码阻塞时。 爬山算法的动态更新。
Worker Thread 无任务可执行及被爬山算法判定为多余时会被销毁。
总结
参考资料
https://www.codeproject.com/Articles/3813/NET-s-ThreadPool-Class-Behind-The-Scenes
https://devblogs.microsoft.com/dotnet/performance-improvements-in-net-6/#threading
https://mattwarren.org/2017/04/13/The-CLR-Thread-Pool-Thread-Injection-Algorithm/
https://docs.microsoft.com/zh-CN/previous-versions/msp-n-p/ff963549(v=pandp.10)?redirectedfrom=MSDN#thread-injection
点击「阅读原文」查看原博客代码~