【深度长文】还是没忍住,聊聊神奇的无锁队列吧!
来源:公众号:【鱼鹰谈单片机】
作者:鱼鹰Osprey
ID:emOsprey
这篇长文除了由浅入深的一步步迭代出无锁队列的实现原理,也会借此说说如何在项目中注意避免写出有 BUG 的程序,与此同时也会简单聊聊如何测试一段代码,而这些能力应该是所有软件开发工作者都应该引起注意的。而在介绍的过程中也会让你明白理论和实际的差距到底在哪。
高级程序员和初级程序员之间,鱼鹰认为差距之一就在于写出来的代码 BUG 多还是少,当然解决 BUG 的能力也很重要。
既要有挖坑的能力,也要有填坑的实力!
其实在避免BUG这一块,鱼鹰不管是在前面的《延时功能进化论(合集)》,还是《引脚输出的隐藏BUG | 深入思考》,甚至于鱼鹰最先思考的关于信号量保护《信号量保护之位带操作》《信号量保护之禁止中断》那里都有介绍过,如果你能真正理解文章所介绍的那些情况,那么写一个可靠的程序应该不难。
而关于队列,鱼鹰也在很早之前就写过一篇《数据结构系列文章之队列 FIFO》,只是那个时候总觉得那种方式效率太低(后面会说为什么效率低),应用场合太少。
很早之前鱼鹰就听说过无锁队列,但由于以前的项目不是很复杂,很多时候都可以不需要无锁队列,所以就没有更深入的学习,直到这次串口通信想试试无锁队列的效果,才最终用上了这一神奇的代码。
网上有个词形容无锁队列鱼鹰觉得很贴切:巧夺天工!
现在就从队列开始说起吧。
什么是队列,顾名思义,就类似于超市面前排起的一个队伍,当最前面的顾客买完了东西,后面的顾客整体向前移动,而处于新队头的顾客继续消费。如果没有后来的顾客,那么最终这个队伍将消失。而如果有新的顾客到来,那么他将排在队伍最后等待购买。
// 鱼鹰*公***众**号:鱼鹰谈单片机,
uint16_t queue[10000]; // 队列,0~9999,共 10000
uint16_t in;
uint16_t out;
int get(void *parameter)
{
if(in == out)
{
rt_kprintf("当前没有顾客等待服务\n");
return -1; // 没有顾客在排队
}
rt_kprintf("请编号:【%04u】 到柜台办理服务\n",queue[out]);
// 自加 1 ,但因为数组只有 10000 大小,所以需要让 out 在 0~9999 之间进行循环
out = (out + 1) % 10000;
return 0;
}
MSH_CMD_EXPORT(get, RT-Thread first led sample);
int put(void)
{
if((in + 1) % 10000 == out)
{ // 因为 in 和 out 值相等时,可能为空和满的,无法判断哪种情况
// 所以为了避免这种情况,总是空一个位置,这样满的时候 in 和 out 的值就不一样
rt_kprintf("现在排队人数太多,请下次尝试\n");
return -1; // 队列满
}
// 把当前的编号存入数组中进行排队
rt_kprintf("您当前领取的编号为 %04u\n",in);
queue[in] = in;
// 自加 1 ,但因为数组只有 10000 大小,所以需要让 in 在 0~9999 之间进行循环
in = (in + 1) % 10000;
return 0;
}
MSH_CMD_EXPORT(put, RT-Thread first led sample);
V 1.0
// 测试代码 V1.0
// 鱼鹰*公***众**号:鱼鹰谈单片机,
#define BUFF_SIZE 100
uint16_t queue[BUFF_SIZE]; // 队列,0~99,共 100
uint16_t in;
uint16_t out;
uint16_t counter; // 记录
int get(void *parameter)
{
if(in == out)
{
rt_kprintf("当前没有顾客等待服务\n");
return -1; // 没有顾客在排队
}
rt_kprintf("请编号:【%04u】 到柜台办理服务\n",queue[out]);
// 自加 1 ,但因为数组只有 100 大小,所以需要让 out 在 0~99 之间进行循环
out = (out + 1) % BUFF_SIZE;
return 0;
}
MSH_CMD_EXPORT(get, RT-Thread first led sample);
int put(void)
{
if((in + 1) % BUFF_SIZE == out)
{ // 因为 in 和 out 值相等时,可能为空和满的,无法判断哪种情况
// 所以为了避免这种情况,总是空一个位置,这样满的时候 in 和 out 的值就不一样
rt_kprintf("现在排队人数太多,请下次尝试\n");
return -1; // 队列满
}
// 把当前的编号存入数组中进行排队
rt_kprintf("您当前领取的编号为 %04u\n",counter);
queue[in] = counter;
counter = (counter + 1) % 10000; // 限制它的大小,不可超过四位数
// 自加 1 ,但因为数组只有 100 大小,所以需要让 in 在 0~99 之间进行循环
in = (in + 1) % BUFF_SIZE;
return 0;
}
MSH_CMD_EXPORT(put, RT-Thread first led sample);
V 1.5
// 测试代码 V1.5
// 鱼鹰*公***众**号:鱼鹰谈单片机
#define BUFF_SIZE 7
typedef struct
{
uint16_t queue[BUFF_SIZE]; // 队列,0~99,共 100
uint16_t in;
uint16_t out;
uint16_t used;
}fifo_def; // 把队列相关的变量整合在一块,方便使用
uint16_t counter; // 记录
fifo_def fifo;
int get(void *parameter)
{
if(fifo.used == 0)
{
rt_kprintf("当前没有顾客等待服务\n");
return -1; // 没有顾客在排队
}
rt_kprintf("请编号:【%04u】 到柜台办理服务\n",fifo.queue[fifo.out]);
// 自加 1 ,但因为数组只有 100 大小,所以需要让 out 在 0~99 之间进行循环
fifo.out = (fifo.out + 1) % BUFF_SIZE;
fifo.used--;
return 0;
}
MSH_CMD_EXPORT(get, RT-Thread first led sample); // 导出到命令行中使用
int put(void)
{
if(fifo.used >= BUFF_SIZE)
{ // 因为 in 和 out 值相等时,可能为空和满的,无法判断哪种情况
// 所以为了避免这种情况,总是空一个位置,这样满的时候 in 和 out 的值就不一样
rt_kprintf("现在排队人数太多,请下次尝试\n");
return -1; // 队列满
}
// 把当前的编号存入数组中进行排队
rt_kprintf("您当前领取的编号为 %04u\n",counter);
fifo.queue[fifo.in] = counter;
counter = (counter + 1) % 10000; // 限制它的大小,不可超过四位数
// 自加 1 ,但因为数组只有 100 大小,所以需要让 in 在 0~99 之间进行循环
fifo.in = (fifo.in + 1) % BUFF_SIZE;
fifo.used++;
return 0;
}
MSH_CMD_EXPORT(put, RT-Thread first led sample);
V 2.0
// 测试代码 V2.0
// 鱼鹰*公***众**号:鱼鹰谈单片机
#define BUFF_SIZE 8 // 文章说的是 7,但是这是不能用的,只能为 2 的幂次方
typedef struct
{
uint32_t in; // 注意,这里是 32 位
uint32_t out; // 注意,这里是 32 位
uint8_t *queue; // 这里不再指定大小,而是根据实际情况设置缓存大小
}fifo_def; // 把队列相关的变量整合在一块,方便使用
uint8_t counter; // 记录人数,也是发放的编号,因为队列类型变了,所以这里也改成 uint8_t
uint8_t fifo_buff[BUFF_SIZE];
fifo_def fifo = {0,0,fifo_buff}; // 为了简化初始化过程,直接定义时初始化
int get(void *parameter)
{
if(fifo.in - fifo.out == 0)
{ // 也可以写成 fifo.in == fifo.out,效率或许会高一些
rt_kprintf("当前没有顾客等待服务\n");
return -1; // 没有顾客在排队
}
rt_kprintf("请编号:【%04u】 到柜台办理服务\n",fifo.queue[fifo.out % BUFF_SIZE]);
fifo.out++; // 直接自加
return 0;
}
MSH_CMD_EXPORT(get, RT-Thread first led sample); // 导出到命令行中使用
int put(void)
{
if(BUFF_SIZE - fifo.in + fifo.out == 0)
{
rt_kprintf("现在排队人数太多,请下次尝试\n");
return -1; // 队列满
}
// 把当前的编号存入数组中进行排队
rt_kprintf("您当前领取的编号为 %04u\n",counter);
fifo.queue[fifo.in % BUFF_SIZE] = counter; // 这里用 fifo.in % (BUFF_SIZE - 1) 效率会高一些
counter += 1; // 递增计数器
fifo.in++; // 直接自加
return 0;
}
MSH_CMD_EXPORT(put, RT-Thread first led sample);
if(BUFF_SIZE - (fifo.in - fifo.out) == 0)
{
rt_kprintf("现在排队人数太多,请下次尝试\n");
return -1; // 队列满
}
if(fifo.in - fifo.out == 0)
{ // 也可以写成 fifo.in == fifo.out,效率或许会高一些
rt_kprintf("当前没有顾客等待服务\n");
return -1; // 没有顾客在排队
}
fifo.in++; // 直接自加
fifo.out++; // 直接自加
BUFF_SIZE - (fifo.in - fifo.out)
BUFF_SIZE - (fifo.in - fifo.out)
fifo.in++; // 直接自加
fifo.out++; // 直接自加
V 2.5
// 测试代码 V2.5
// 鱼鹰*公***众**号:鱼鹰谈单片机
#define BUFF_SIZE 8
typedef struct
{
uint32_t in; // 注意,这里是 32 位
uint32_t out; // 注意,这里是 32 位
uint32_t size; // 设置缓存大小
uint8_t *buffer; // 这里不再指定大小,而是根据实际情况设置缓存大小
}fifo_def; // 把队列相关的变量整合在一块,方便使用
uint8_t counter; // 记录人数,也是发放的编号,因为队列类型变了,所以这里也改成 uint8_t
uint8_t fifo_buff[BUFF_SIZE];
fifo_def fifo = {0,0,BUFF_SIZE,fifo_buff}; // 为了简化初始化过程,直接定义时初始化
void init(fifo_def *pfifo,uint8_t *buff,uint32_t size)
{
if(size == 0 || (size & (size - 1))) // 2 的 幂次方
{
retern;
}
pfifo->in = 0
pfifo->out = 0;
pfifo->size = size;
pfifo->buffer = buff;
}
int get(void *parameter)
{
fifo_def *pfifo = &fifo; // 这里直接使用全局变量,因为命令行不方便传参
uint32_t len = 2; // 这里假设一次获取 2 个字节数据
uint8_t buffer[2]; // 将队列中的数据提取到这个缓存中
uint32_t l;
len = min(len, pfifo->in - pfifo->out);
/* first get the data from fifo->out until the end of the buffer */
l = min(len, pfifo->size - (pfifo->out & (pfifo->size - 1)));
memcpy(buffer, pfifo->buffer + (pfifo->out & (pfifo->size - 1)), l);
/* then get the rest (if any) from the beginning of the buffer */
memcpy(buffer + l, pfifo->buffer, len - l);
pfifo->out += len;
if(len) // 为了判断是否写入数据,加入调试信息
{
for(int i = 0; i < len; i++)
{
rt_kprintf("请编号:【%04u】 到柜台办理服务\n",buffer[i]);
}
}
else
{
rt_kprintf("当前没有顾客等待服务\n");
}
return 0;
}
MSH_CMD_EXPORT(get, RT-Thread first led sample); // 导出到命令行中使用
int put(void)
{
fifo_def *pfifo = &fifo; // 这里直接使用全局变量,因为命令行不方便传参
uint32_t len = 1; // 这里假设一次写入 1 个字节数据
uint32_t l;
uint8_t buffer[1] = {counter}; // 存入编号,即将写入到队列中
len = min(len, pfifo->size - pfifo->in + pfifo->out);
/* first put the data starting from pfifo->in to buffer end */
l = min(len, pfifo->size - (pfifo->in & (pfifo->size - 1)));
memcpy(pfifo->buffer + (pfifo->in & (pfifo->size - 1)), buffer, l);
/* then put the rest (if any) at the beginning of the buffer */
memcpy(pfifo->buffer, buffer + l, len - l);
pfifo->in += len;
if(len) // 为了判断是否写入数据,加入调试信息
{
rt_kprintf("您当前领取的编号为 %04u\n",buffer[0]);
}
else
{
rt_kprintf("当前没有顾客等待服务\n");
}
counter++;// 写完自加,不属于无锁队列内容
return 0;
}
MSH_CMD_EXPORT(put, RT-Thread first led sample);
len = min(len, pfifo->in - pfifo->out);
/* first get the data from fifo->out until the end of the buffer */
l = min(len, pfifo->size - (pfifo->out & (pfifo->size - 1)));
memcpy(buffer, pfifo->buffer + (pfifo->out & (pfifo->size - 1)), l);
/* then get the rest (if any) from the beginning of the buffer */
memcpy(buffer + l, pfifo->buffer, len - l);
效率讨论
BUG 讨论
#define min(x,y) ((x) < (y) ? (x) : (y))
len = len < in - out ? len : in - out
测试
推荐阅读:
-THE END-
如果对你有帮助,记得转发分享哦!
微信公众号「鱼鹰谈单片机」
每周一更单片机知识
长按后前往图中包含的公众号关注
鱼鹰,一个被嵌入式耽误的畅销书作家
个人微信「EmbeddedOsprey」
长按后打开对方的名片关注