查看原文
其他

一起读源码C#并发队列内部世界 .NET Core篇

DotNet 2021-09-23

 (给DotNet加星标,提升.Net技能

转自:balahoho
cnblogs.com/hohoa/p/12685237.html

在上一篇《走进C#并发队列ConcurrentQueue的内部世界》中解析了Framework下的ConcurrentQueue实现原理,经过抛砖引玉,得到了一众大佬的指点,找到了.NET Core版本下的ConcurrentQueue源码,位于以下地址:


  • https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueue.cs


  • https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueueSegment.cs


我大致看了一下,虽然两者的实现有不少相似的地方,不过在细节上新增了许多有意思的东西,还是觉得要单独拉出来说一下。画外音:谁叫我上篇立了flag,现在跪着也要写完。


必须要吐糟的是,代码中ConcurrentQueue类明明是包含在System.Collections.Concurrent命名空间下,但是源码结构中的文件却放在System.Private.CoreLib目录中,这是闹哪出~


存储结构



一张图看清它的真实面目,这里继续沿用上一篇的结构图稍作修改:

 


从图中可以看到,整体结构上基本一致,核心改动就是Segment中增加了Slot(槽)的概念,这是真正存储数据的地方,同时有一个序列号与之对应。


从代码来看一下Segment的核心定义:


internal sealed class ConcurrentQueueSegment<T>
{
//存放数据的容器
internal readonly Slot[] _slots;

//这个mask用来计算槽点,可以防止查找越界
internal readonly int _slotsMask;

//首尾位置指针
internal PaddedHeadAndTail _headAndTail;

//观察保留标记,表示当前段在出队时能否删除数据
internal bool _preservedForObservation;

//标记当前段是否被锁住
internal bool _frozenForEnqueues;

//下一段的指针
internal ConcurrentQueueSegment<T>? _nextSegment;
}


其中_preservedForObservation和_frozenForEnqueues会比较难理解,后面再详细介绍。


再看一下队列的核心定义:


public class ConcurrentQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>
{
//每一段的初始化长度,也是最小长度
private const int InitialSegmentLength = 32;
//每一段的最大长度
private const int MaxSegmentLength = 1024 * 1024;
//操作多个段时的锁对象
private readonly object _crossSegmentLock;
//尾段指针
private volatile ConcurrentQueueSegment<T> _tail;
//首段指针
private volatile ConcurrentQueueSegment<T> _head;
}


常规操作


还是按上一篇的套路为主线循序渐进。


创建实例


ConcurrentQueue依然提供了2个构造函数,分别可以创建一个空队列和指定数据集的队列。


/// <summary>
/// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class.
/// </summary>
public ConcurrentQueue()
{
_crossSegmentLock = new object();
_tail = _head = new ConcurrentQueueSegment<T>(InitialSegmentLength);
}


还是熟悉的操作,创建了一个长度是32的Segment并把队列的首尾指针都指向它,同时创建了锁对象实例,仅此而已。


进一步看看Segment是怎么创建的:


internal ConcurrentQueueSegment(int boundedLength)
{

//这里验证了长度不能小于2并且必须是2的N次幂
Debug.Assert(boundedLength >= 2, $"Must be >= 2, got {boundedLength}");
Debug.Assert((boundedLength & (boundedLength - 1)) == 0, $"Must be a power of 2, got {boundedLength}");
_slots = new Slot[boundedLength];
//这个mask的作用就是用来计算数组索引的防止越界,可以用`& _slotsMask`取代`% _slots.Length`
_slotsMask = boundedLength - 1;
//设置初始序列号
for (int i = 0; i < _slots.Length; i++)
{
_slots[i].SequenceNumber = i;
}
}
internal struct Slot
{
[AllowNull, MaybeNull] public T Item;
public int SequenceNumber;
}


再看看怎么用集合初始化队列,这个过程稍微麻烦点,但是很有意思:


public ConcurrentQueue(IEnumerable<T> collection)
{
if (collection == null)
{ ThrowHelper.ThrowArgumentNullException(ExceptionArgument.collection);
}
_crossSegmentLock = new object();
//计算得到第一段的长度
int length = InitialSegmentLength;
if (collection is ICollection<T> c)
{
int count = c.Count;
if (count > length)
{
length = Math.Min(ConcurrentQueueSegment<T>.RoundUpToPowerOf2(count), MaxSegmentLength);
}
}

//根据前面计算出来的长度创建一个Segment,再把数据依次入队
_tail = _head = new ConcurrentQueueSegment<T>(length);
foreach (T item in collection)
{
Enqueue(item);
}
}



元素进队


/// <summary>在队尾追加一个元素</summary>
public void Enqueue(T item)
{
// 先尝试在尾段插入一个元素
if (!_tail.TryEnqueue(item))
{
// 如果插入失败,就意味着尾段已经填满,需要往后扩容
EnqueueSlow(item);
}
}
private void EnqueueSlow(T item)
{
while (true)
{
ConcurrentQueueSegment<T> tail = _tail;
// 先尝试再队尾插入元素,如果扩容完成了就会成功
if (tail.TryEnqueue(item))
{
return;
}
// 获得一把锁,避免多个线程同时进行扩容
lock (_crossSegmentLock)
{
//检查是否扩容过了
if (tail == _tail)
{
// 尾段冻结
tail.EnsureFrozenForEnqueues();
// 计算下一段的长度
int nextSize = tail._preservedForObservation ? InitialSegmentLength : Math.Min(tail.Capacity * 2, MaxSegmentLength);
var newTail = new ConcurrentQueueSegment<T>(nextSize);
// 改变队尾指向
tail._nextSegment = newTail;
// 指针交换
_tail = newTail;
}
}
}
}



// must only be called while queue's segment lock is held
internal void EnsureFrozenForEnqueues()
{
// flag used to ensure we don't increase the Tail more than once if frozen more than once
if (!_frozenForEnqueues)
{
_frozenForEnqueues = true;
Interlocked.Add(ref _headAndTail.Tail, FreezeOffset);
}
}



接着是重头戏,看一下如何给段追加元素:


public bool TryEnqueue(T item)
{
Slot[] slots = _slots;
// 如果发生竞争就自旋等待
SpinWait spinner = default;
while (true)
{
// 获取当前段的尾指针
int currentTail = Volatile.Read(ref _headAndTail.Tail);
// 计算槽点
int slotsIndex = currentTail & _slotsMask;
// 读取对应槽的序列号
int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);

// 判断槽点序列号和指针是否匹配
int diff = sequenceNumber - currentTail;
if (diff == 0)
{
// 通过原子操作比较交换,保证了只有一个入队者获得可用空间
if (Interlocked.CompareExchange(ref _headAndTail.Tail, currentTail + 1, currentTail) == currentTail)
{
// 把数据存入对应的槽点,以及更新序列号
slots[slotsIndex].Item = item;
Volatile.Write(ref slots[slotsIndex].SequenceNumber, currentTail + 1);
return true;
}
}
else if (diff < 0)
{
// 序列号小于指针就说明该段已经装满了,直接返回false
return false;
}
// 这次竞争失败了,只好等下去
spinner.SpinOnce(sleep1Threshold: -1);
}
}



元素出队


可以猜测到,入队的时候要根据容量大小进行扩容,那么与之对应的,出队的时候就需要对它进行压缩,也就是丢弃没有数据的段。


/// <summary>从队首移除一个元素</summary>
public bool TryDequeue([MaybeNullWhen(false)] out T result) =>
_head.TryDequeue(out result) ||
TryDequeueSlow(out result);
private bool TryDequeueSlow([MaybeNullWhen(false)] out T item)
{
// 不断循环尝试出队,直到成功或失败为止
while (true)
{
ConcurrentQueueSegment<T> head = _head;
// 尝试从队首移除,如果成功就直接返回了
if (head.TryDequeue(out item))
{
return true;
}
// 如果首段为空并且没有下一段了,则说明整个队列都没有数据了,返回失败
if (head._nextSegment == null)
{
item = default!;
return false;
}
// 既然下一段不为空,那就再次确认本段是否还能出队成功,否则就要把它给移除了,等待下次循环从下一段出队
if (head.TryDequeue(out item))
{
return true;
}
// 首段指针要往后移动,表示当前首段已丢弃,跨段操作要先加锁
lock (_crossSegmentLock)
{
if (head == _head)
{
_head = head._nextSegment;
}
}
}
}



public bool TryDequeue([MaybeNullWhen(false)] out T item)
{
Slot[] slots = _slots;
// 遇到竞争时自旋等待
SpinWait spinner = default;
while (true)
{
// 获取头指针地址
int currentHead = Volatile.Read(ref _headAndTail.Head);
// 计算槽点
int slotsIndex = currentHead & _slotsMask;

// 获取槽点对应的序列号
int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);
// 比较序列号是否和期望值一样,为什么要加1的原因前面入队时说过
int diff = sequenceNumber - (currentHead + 1);
if (diff == 0)
{
// 通过原子操作比较交换得到可以出队的槽点,并把头指针往后移动一位
if (Interlocked.CompareExchange(ref _headAndTail.Head, currentHead + 1, currentHead) == currentHead)
{
// 取出数据
item = slots[slotsIndex].Item!;
// 此时如果该段没有被标记观察保护,要把这个槽点的数据清空
if (!Volatile.Read(ref _preservedForObservation))
{
slots[slotsIndex].Item = default;
Volatile.Write(ref slots[slotsIndex].SequenceNumber, currentHead + slots.Length);
}
return true;
}
}
else if (diff < 0)
{
// 这种情况说明该段已经没有有效数据了,直接返回失败。
bool frozen = _frozenForEnqueues;
int currentTail = Volatile.Read(ref _headAndTail.Tail);
if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0)))
{
item = default!;
return false;
}
}
// 竞争失败进入下一轮等待
spinner.SpinOnce(sleep1Threshold: -1);
}
}




/// <summary>
/// 判断队列是否为空,千万不要使用Count==0来判断,也不要直接TryPeek
/// </summary>
public bool IsEmpty => !TryPeek(out _, resultUsed: false);
private bool TryPeek([MaybeNullWhen(false)] out T result, bool resultUsed)
{
ConcurrentQueueSegment<T> s = _head;
while (true)
{
ConcurrentQueueSegment<T>? next = Volatile.Read(ref s._nextSegment);
// 从首段中获取头部元素,成功的话直接返回true,获取失败就意味着首段为空了
if (s.TryPeek(out result, resultUsed))
{
return true;
}
// 如果下一段不为空那就再尝试从下一段重新获取
if (next != null)
{
s = next;
}
//如果下一段为空就说明整个队列为空,跳出循环直接返回false了
else if (Volatile.Read(ref s._nextSegment) == null)
{
break;
}
}
result = default!;
return false;
}


上面的代码可以看到有一个特殊的参数resultUsed,它具体会有什么影响呢,那就得看看Segment是如何peek的:


public bool TryPeek([MaybeNullWhen(false)] out T result, bool resultUsed)
{
// 实际上队列的TryPeek是一个观察保护操作,这时resultUsed会标记成true,如果是IsEmpty操作的话就为false,因为并不关心这个元素是否被释放了
if (resultUsed)
{
_preservedForObservation = true;
Interlocked.MemoryBarrier();
}
Slot[] slots = _slots;
SpinWait spinner = default;
while (true)
{
int currentHead = Volatile.Read(ref _headAndTail.Head);
int slotsIndex = currentHead & _slotsMask;
int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);
int diff = sequenceNumber - (currentHead + 1);
if (diff == 0)
{
result = resultUsed ? slots[slotsIndex].Item! : default!;
return true;
}
else if (diff < 0)
{
bool frozen = _frozenForEnqueues;
int currentTail = Volatile.Read(ref _headAndTail.Tail);
if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0)))
{
result = default!;
return false;
}
}
spinner.SpinOnce(sleep1Threshold: -1);
}
}


除了最开始的resultUsed判断,其他的基本和出队的逻辑一致,前面说的很详细,这里不多介绍了。



public T[] ToArray()
{
// 这一步可以理解为保护现场
SnapForObservation(out ConcurrentQueueSegment<T> head, out int headHead, out ConcurrentQueueSegment<T> tail, out int tailTail);
// 计算队列长度,这也是要返回的数组大小
long count = GetCount(head, headHead, tail, tailTail);
T[] arr = new T[count];
// 开始迭代数据塞到目标数组中
using (IEnumerator<T> e = Enumerate(head, headHead, tail, tailTail))
{
int i = 0;
while (e.MoveNext())
{
arr[i++] = e.Current;
}
Debug.Assert(count == i);
}
return arr;
}



private void SnapForObservation(out ConcurrentQueueSegment<T> head, out int headHead, out ConcurrentQueueSegment<T> tail, out int tailTail)
{
// 要保护现场肯定要先来一把锁
lock (_crossSegmentLock)
{
head = _head;
tail = _tail;
// 一段一段进行遍历
for (ConcurrentQueueSegment<T> s = head; ; s = s._nextSegment!)
{
// 把每一段的观察保护标记设置成true
s._preservedForObservation = true;
// 遍历到最后一段了就结束
if (s == tail) break;
}
// 尾段冻结,这样就不能新增元素
tail.EnsureFrozenForEnqueues();
// 返回两个指针地址用来对每一个元素进行遍历
headHead = Volatile.Read(ref head._headAndTail.Head);
tailTail = Volatile.Read(ref tail._headAndTail.Tail);
}
}



带着疑问提了个Issue问一下:

https://github.com/dotnet/runtime/issues/35094


到这里就基本把.NET Core ConcurrentQueue说完了。


总结


至于性能对比,我找到一个官方给出的测试结果,有兴趣的可以看看:


https://github.com/dotnet/runtime/issues/27458#issuecomment-423964046


最后基于.NET Core平台的开源分布式任务调度系统ScheduleMaster有兴趣的star支持一下,2.0版本即将上线:


  • https://github.com/hey-hoho/ScheduleMasterCore


  • https://gitee.com/hey-hoho/ScheduleMasterCore(只从github同步)


推荐阅读  点击标题可跳转
Nginx知多少系列之介绍.NET微服务实战之技术架构分层篇Docker 常用命令(.NET Core示例)


看完本文有收获?请转发分享给更多人

关注「DotNet」加星标,提升.Net技能 

好文章,我在看❤️

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存