程序丨任务系统的核心问题(三):如何实行无锁工作队列?
译者:王磊(未来的未来)
审校:崔国军(飞扬971)
这个星期,我们终于要解决任务系统的核心问题了:实行无锁工作队列。 请继续阅读来了解底层编程的实现方案。
这个系列教程中的其他文章(点击下方文字,即可阅读):
概括
记住任务窃取队列需要提供三个操作:
l Push():将一个任务添加到队列的私有(后进先出队列)端。
l Pop():从队列的私有(后进先出队列)端删除任务。
l 窃取(Steal):从队列中的公共(先入先出队列)端的结尾窃取任务。
进一步记住Push()操作和Pop()操作只会由拥有队列的线程调用,因此不会同时调用。 任何其他线程可以随时调用Steal()操作,所以Steal()操作可以和Push()操作和Pop()操作同时进行。
一个锁的实现
在深入无锁编程领域进行研究之前,我们先来研究一下使用传统锁的实现方案。从概念上讲,我们该如何构建一个类似双端队列(deque)的数据结构,其一端表现得像一个后进先出队列,而另一端表现得像一个先入先出队列?
幸运的是,这可以很容易地通过两个索引来解决,两个索引分别表示一端,如果Chase和Y. Lev。在“Dynamic circular work-stealing deque”所描述的那样。如果我们假设我们有无限的内存,并且因此有一个无限数量的数组,我们可以引入两个称为bottom和top的计数器,它们具有以下属性:
l bottom表示数组中的下一个可用插槽,以便下一个任务可以被推入。 Push()操作首先存储任务,然后对bottom进行递增。
l 类似地,Pop()可以对bottom进行递减,然后返回存储在数组中该位置的任务。
l top表示可以被窃取的下一个任务(即双层中最顶层的元素)。 Steal()操作从数组中的这个槽中抓取任务,然后对top进行递增。
这里可以得到的一个重要观察是,在任何给定的时间点,bottom - top都会产生当前存储在双端队列中的任务数量。 如果bottom小于或等于top,那么这个双端队列是空的,没有什么任务可以窃取。
另一个有趣的事实是,Push()操作和Pop()操作只能改变bottom,而Steal()只会改变top。 这是一个非常重要的属性,可以使任务窃取队列的所有者的同步开销最小化,并证明这一点对于无锁实现是有利的。
为了更好地说明双端队列是如何工作的,请查看以下在双端队列上执行的操作列表:
如前所述,Push()操作和Pop()操作以后进先出的方式进行工作,而Steal()操作以先进先出方式进行工作。 在上面的例子中,第一次调用Steal()操作会返回索引为0的任务,而对Pop()操作的后续调用将按照索引2和索引1的顺序返回操作。
在C ++代码中,任务窃取队列的三个操作的实现可能如下所示:
void Push(Job* job)
{
ScopedLock lock(criticalSection);
m_jobs[m_bottom] = job;
++m_bottom;
}
Job* Pop(void)
{
ScopedLock lock(criticalSection);
const int jobCount = m_bottom - m_top;
if (jobCount <= 0)
{
// no job left in the queue
return nullptr;
}
--m_bottom;
return m_jobs[m_bottom];
}
Job* Steal(void)
{
ScopedLock lock(criticalSection);
const int jobCount = m_bottom - m_top;
if (jobCount <= 0)
{
// no job there to steal
return nullptr;
}
Job* job = m_jobs[m_top];
++m_top;
return job;
}
现在唯一要做的就是把一个无限制的无限数组变成一个循环数组。这可以通过相应地对bottom和top进行封装来完成,举个简单的例子来说,使用模操作。 但是,这将使得计算当前在双端数组deque中的任务数量变得更加困难,因为我们将不得不考虑环绕的情况。 更好更简单的解决方案是在访问数组的时候将模操作应用到bottom和top上。 只要存储在双端队列中的元素的数量是二的幂,这就是一个二进制与操作:
static const unsigned int NUMBER_OF_JOBS = 4096u;
static const unsigned int MASK = NUMBER_OF_JOBS - 1u;
void Push(Job* job)
{
ScopedLock lock(criticalSection);
m_jobs[m_bottom & MASK] = job;
++m_bottom;
}
Job* Pop(void)
{
ScopedLock lock(criticalSection);
const int jobCount = m_bottom - m_top;
if (jobCount <= 0)
{
// no job left in the queue
return nullptr;
}
--m_bottom;
return m_jobs[m_bottom & MASK];
}
Job* Steal(void)
{
ScopedLock lock(criticalSection);
const int jobCount = m_bottom - m_top;
if (jobCount <= 0)
{
// no job there to steal
return nullptr;
}
Job* job = m_jobs[m_top & MASK];
++m_top;
return job;
}
现在你有了这么东西,可以使用传统的锁来实现一个任务窃取的队列了。
先决条件:无锁编程
无锁编程是一个巨大的话题,已经写了很多这个主题的文章。 我不想重复已经那些已经论述过的内容,我想推荐几篇这个方面比较好的文章,希望大家在继续这篇文章之前阅读这几篇文章:
§ 《无锁编程简介》:这里面涉及了原子操作,顺序一致性和内存排序。 这是一个梦幻般的“无锁编程的101军规”,必读!
§ 《编译时的内存排序》:通过一些比较深入的例子,讨论编译器重新排序和内存屏障。
§ 《实际捕获内存重新排序》:显示了一个在x86 / 64平台上发生内存重新排序的例子,即使Intel的x86 / 64架构提供了一个强有序的内存模型也会发生这样的情况。
§ 弱内存模型vs.强内存模型:讨论弱内存模型和强内存模型,以及顺序一致性的问题。
总的来说,Jeff Preshing的博客是一个充满了关于无锁编程的文章的金矿。 如果您有疑问,请查看他的博客。
一个无锁的任务窃取队列
你已经读完了所有的文章? 很好。
假设现在没有编译器重新排序和完全没有内存重新排序,让我们一个接一个地以无锁方式实现所有三个操作。 稍后我们将讨论所需的编译器和内存屏障。
首先考虑Push()操作的无锁实现:
void Push(Job* job)
{
long b = m_bottom;
m_jobs[b & MASK] = job;
m_bottom = b+1;
}
同时发生的其他操作会发生什么情况?Pop()操作不能同时执行,所以我们只需要考虑一下Steal()操作。 但是,Steal()操作只写入顶部,并从bottom里面进行读取。 所以可能发生的最糟糕的情况是,在第5行发送一个新项的可用性通知之前,Push()操作的对象被一个对Steal()的调用抢抢先清空。 这其实没有关系,因为这只是意味着Steal()操作不能窃取一个已经存在的物品 –这没有什么危害。
接下来,让我们看看Steal()操作:
Job* Steal(void)
{
long t = m_top;
long b = m_bottom;
if (t < b)
{
// non-empty queue
Job* job = m_jobs[t & MASK];
if (_InterlockedCompareExchange(&m_top, t+1, t) != t)
{
// a concurrent steal or pop operation removed an element from the deque in the meantime.
return nullptr;
}
return job;
}
else
{
// empty queue
return nullptr;
}
}
只要top比bottom小的话,仍然有任务留在队列中可以偷取。如果双端队列不为空,则函数首先读取存储在数组中的任务,然后尝试使用比较和交换操作来增加top的值。如比较和交换操作失败,则说明一个并行操作的Steal()操作成功从该双端队列中移除了一个任务。
请注意,在执行比较和交换操作之前读取任务非常的重要,因为在比较和交换操作完成之后,数组中的位置可能会被并行的Push()操作所覆盖。
这里需要注意的一个非常重要的事情是,top的值总是在bottom的值之前进行读取,这样来确保值代表的是一致的内存视图。尽管如此,如果在读取bottom的值之后并且在比较和交换操作被执行之前由并发的Pop()操作清空双端队列,则可能发生微妙的竞争。我们需要确保没有并发的Pop()操作和Steal()操作都返回剩余在双端队列中的最后一个任务,这也是通过使用比较和交换操作尝试在Pop()操作的实现中修改top的值来实现的。
Pop()操作是最有趣的一组:
Job* Pop(void)
{
long b = m_bottom - 1;
m_bottom = b;
long t = m_top;
if (t <= b)
{
// non-empty queue
Job* job = m_jobs[b & MASK];
if (t != b)
{
// there's still more than one item left in the queue
return job;
}
// this is the last item in the queue
if (_InterlockedCompareExchange(&m_top, t+1, t) != t)
{
// failed race against steal operation
job = nullptr;
}
m_bottom = t+1;
return job;
}
else
{
// deque was already empty
m_bottom = t;
return nullptr;
}
}
与Steal()的实现形式相反,这次我们需要确保在尝试读取top的值之前先减少bottom的值。否则,并发的Steal()操作可能会在Pop()没有注意的情况下从双端队列中删除多个任务。
此外,如果双端队列已经是空的,我们需要将其重置为bottom==top的规范空状态。
只要还有剩余的任务,我们就可以直接返回剩余的任务,而不需要进行任何额外的原子操作。但是,正如上面的实现注释中所指出的那样,我们需要保护代码免受来自并行调用Steal()的竞争,以防只剩一个任务的情况下出现错误。
如果是这样的话,代码执行一个比较和交换操作来增加top的值,并检查我们在与一个并发的Steal()的竞争过程中是否赢了或输了。这个操作的结果只有两种可能:
l 比较和交换操作成功了,我们赢得了与Steal()操作的竞争。在这种情况下,我们设置bottom = t + 1,来将双端队列设置为规范的空状态。
l 比较和交换操作失败了,我们在与Steal()操作的竞争中失败了。在这种情况下,我们返回一个空的任务,但仍然设置bottom = t + 1。这是为什么?因为在与Steal()操作的竞争中失败意味着并发的Steal()操作成功地设置了top = t + 1,所以我们仍然必须将双端队列设置为空的状态。
添加编译器和内存屏障
到现在为止,一切都还挺好。 在目前的形式下,这个实现将无法工作,因为我们完全没有考虑编译器和内存排序。 这个事情需要修复。
考虑下Push()操作:
void Push(Job* job)
{
long b = m_bottom;
m_jobs[b & MASK] = job;
m_bottom = b+1;
}
没有人保证编译器不会对上面的任何语句进行重新排序。具体而言,我们不能确定,我们是首先将任务存储在数组中,然后通过递增bottom来向其他线程发信号 - 这可能是另一种方式,这会导致其他线程窃取到不在那里的任务!
我们在这种情况下需要的是编译器的屏障:
void Push(Job* job)
{
long b = m_bottom;
m_jobs[b & MASK] = job;
// ensure the job is written before b+1 is published to other threads.
// on x86/64, a compiler barrier is enough.
COMPILER_BARRIER;
m_bottom = b+1;
}
请注意,在x86 / 64架构上,编译器屏障就足够了,因为强排序内存模型不允许内存与其他存储空间重新排序。 在其他平台上(PowerPC,ARM,。。。。。),你需要一个内存屏障。 另外请注意,在这种情况下,存储操作也不需要进行原子操作,因为写入bottom的唯一操作是Pop(),这是不能同时执行的操作。
同样,我们在实现Steal()时也需要一个编译器屏障:
Job* Steal(void)
{
long t = m_top;
// ensure that top is always read before bottom.
// loads will not be reordered with other loads on x86, so a compiler barrier is enough.
COMPILER_BARRIER;
long b = m_bottom;
if (t < b)
{
// non-empty queue
Job* job = m_jobs[t & MASK];
// the interlocked function serves as a compiler barrier, and guarantees that the read happens before the CAS.
if (_InterlockedCompareExchange(&m_top, t+1, t) != t)
{
// a concurrent steal or pop operation removed an element from the deque in the meantime.
return nullptr;
}
return job;
}
else
{
// empty queue
return nullptr;
}
}
在这里,我们需要一个编译器屏障来确保top的读取真正发生在bottom的读取之前。 另外,我们还需要另外一个保证在比较和交换操作之前执行数组读操作的屏障。 然而,在这种情况下,interlocked函数隐式地充当编译器屏障。
还剩下的其他操作实现如下:
Job* Pop(void)
{
long b = m_bottom - 1;
m_bottom = b;
long t = m_top;
if (t <= b)
{
// non-empty queue
Job* job = m_jobs[b & MASK];
if (t != b)
{
// there's still more than one item left in the queue
return job;
}
// this is the last item in the queue
if (_InterlockedCompareExchange(&m_top, t+1, t) != t)
{
// failed race against steal operation
job = nullptr;
}
m_bottom = t+1;
return job;
}
else
{
// deque was already empty
m_bottom = t;
return nullptr;
}
}
这个实现中最重要的部分是前三行。 在那里我们需要一个真正的内存屏障,即使这在英特尔的x86 / 64架构的非常罕见的情况之一。 更具体地说,在存储bottom = b和long t = top的读取之间增加一个编译器屏障是不够的,因为内存模型明确地允许“加载可能被旧存储重新排序到不同的位置”,这正好是这里的情况。
这意味着前几行代码需要修正如下:
long b = m_bottom - 1;
m_bottom = b;
MEMORY_BARRIER;
long t = m_top;
请注意,在这种情况下,SFENCE和LFENCE都不够用,而且它确实需要成为一个完整的MFENCE屏障。 或者,我们也可以使用XCHG等互锁操作,而不是使用完全的屏障。 XCHG在内部就像是一个屏障,但是在这种情况下,它变得开销低一点:
long b = m_bottom - 1;
_InterlockedExchange(&m_bottom, b);
long t = m_top;
Pop()操作的其他实现可以保持原样。 与Steal()操作中的情况类似,比较和交换操作充当了编译器的屏障,而bottom = t + 1这个存储不需要以原子方式发生,因为不会有任何同时写入底层的操作。
性能
无锁实现在性能方面能获得了什么样子的提升? 这里有一个概述:
与我们的第一个实现相比,无锁和使用线程本地分配器给了我们近7倍的性能提升。 即使在使用了线程本地分配器的基础上,无锁实现的执行速度也提高了1.8倍到3.3倍。
展望
这就是今天的全部内容了。在这个系列教程的下一篇文章中,我们将看到在这个系统之上如何实现像parallel_for这样的高级算法。
【版权声明】
原文作者未做权利声明,视为共享知识产权进入公共领域,自动获得授权。
今日推荐
一键添加
加小编微信,享双重福利
1.加入GAD程序猿交流群,获取行业干货;
2.领取60G腾讯内部分享等独家程序资料。