查看原文
其他

Kubernetes 架构之 workqueue 原理解析

Cylon 云原生实验室 2022-11-11


本文转自 Cylon 的笔记收藏,原文:https://www.cnblogs.com/Cylon/p/16386575.html,版权归原作者所有。欢迎投稿,投稿请添加微信好友:cloud-native-yang

通用队列

在 kubernetes 中,使用 go 的 channel 无法满足 kubernetes 的应用场景,如延迟、限速等;在 kubernetes 中存在三种队列通用队列 common queue ,延迟队列 delaying queue,和限速队列 rate limiters queue

Inferface

Interface 作为所有队列的一个抽象定义。

type Interface interface {
 Add(item interface{})
 Len() int
 Get() (item interface{}, shutdown bool)
 Done(item interface{})
 ShutDown()
 ShuttingDown() bool
}

Implementation

type Type struct { // 一个work queue
 queue []t // queue用slice做存储
 dirty set // 脏位,定义了需要处理的元素,类似于操作系统,表示已修改但为写入
 processing set // 当前正在处理的元素集合
 cond *sync.Cond
 shuttingDown bool
 metrics queueMetrics
 unfinishedWorkUpdatePeriod time.Duration
 clock                      clock.Clock
}
type empty struct{}
type t interface{} // t queue中的元素
type set map[t]empty // dirty 和 processing中的元素

可以看到其中核心属性就是 queue , dirty , processing

延迟队列

在研究优先级队列前,需要对 Heap 有一定的了解,因为 delay queue 使用了 heap 做延迟队列。

Heap

Heap 是基于树属性的特殊数据结构;heap 是一种完全二叉树类型,具有两种类型:

  • 如:B 是 A 的子节点,则  𝑘𝑒𝑦(𝐴)≥𝑘𝑒𝑦(𝐵)

  • 。这就意味着具有最大 Key 的元素始终位于根节点,这类 Heap 称为最大堆 MaxHeap

  • 父节点的值小于或等于其左右子节点的值叫做 MinHeap

二叉堆的存储规则:

  • 每个节点包含的元素大于或等于该节点子节点的元素。
  • 树是完全二叉树。

那么下列图片中,那个是堆

heap 的实现

实例:向左边添加一个值为 42 的元素的过程

步骤一:将新元素放入堆中的第一个可用位置。这将使结构保持为完整的二叉树,但它可能不再是堆,因为新元素可能具有比其父元素更大的值。

步骤二:如果新元素的值大于父元素,将新元素与父元素交换,直到达到新元素到根,或者新元素大于等于其父元素的值时将停止

这种过程被称为 向上调整reheapification upward

实例:移除根

步骤一:将根元素复制到用于返回值的变量中,将最深层的最后一个元素复制到根,然后将最后一个节点从树中取出。该元素称为 out-of-place

步骤二:而将异位元素与其最大值的子元素交换,并返回在步骤 1 中保存的值。

这个过程被称为向下调整reheapification downward

优先级队列

优先级队列的行为:

  • 元素被放置在队列中,然后被取出。
  • 优先级队列中的每个元素都有一个关联的数字,称为优先级。
  • 当元素离开优先级队列时,最高优先级的元素最先离开。

如何实现的:

  • 在优先级队列中,heap 的每个节点都包含一个元素以及元素的优先级,并且维护树以便它遵循使用元素的优先级来比较节点的堆存储规则:
    • 每个节点包含的元素的优先级大于或等于该节点子元素的优先级。
    • 树是完全二叉树。
  • 实现的代码:golang priorityQueue[1]

Reference

heap[2]

Client-go 的延迟队列

在 Kubernetes 中对 delaying queue 的设计非常精美,通过使用 heap 实现的延迟队列,加上 kubernetes 中的通过队列,完成了延迟队列的功能。

// 注释中给了一个hot-loop热循环,通过这个loop实现了delaying
type DelayingInterface interface {
 Interface // 继承了workqueue的功能
 AddAfter(item interface{}, duration time.Duration) // 在time后将内容添加到工作队列中
}

具体实现了 DelayingInterface 的实例

type delayingType struct {
 Interface // 通用的queue 
 clock clock.Clock // 对比的时间 ,包含一些定时器的功能
     type Clock interface {
            PassiveClock
              type PassiveClock interface {
                        Now() time.Time
                        Since(time.Time) time.Duration
                    }
            After(time.Duration) <-chan time.Time
            NewTimer(time.Duration) Timer
            Sleep(time.Duration)
            NewTicker(time.Duration) Ticker
        }
 stopCh chan struct{} // 停止loop
 stopOnce sync.Once // 保证退出只会触发一次
 heartbeat clock.Ticker // 一个定时器,保证了loop的最大空事件等待时间
 waitingForAddCh chan *waitFor // 普通的chan,用来接收数据插入到延迟队列中
 metrics retryMetrics // 重试的指数
}

那么延迟队列的整个数据结构如下图所示

而上面部分也说到了,这个延迟队列的核心就是一个优先级队列,而优先级队列又需要满足:

  • 优先级队列中的每个元素都有一个关联的数字,称为优先级。
  • 当元素离开优先级队列时,最高优先级的元素最先离开。

waitFor 就是这个优先级队列的数据结构

type waitFor struct {
 data    t // 数据
 readyAt time.Time // 加入工作队列的时间
 index int // 优先级队列中的索引
}

waitForPriorityQueue 是对 container/heap/heap.go.Inferface 的实现,其数据结构就是使最小 readyAt 位于 Root 的一个 MinHeap

type Interface interface {
 sort.Interface
 Push(x interface{}) // add x as element Len()
 Pop() interface{}   // remove and return element Len() - 1.
}

而这个的实现是 waitForPriorityQueue

type waitForPriorityQueue []*waitFor

func (pq waitForPriorityQueue) Len() int {
 return len(pq)
}
// 这个也是最重要的一个,就是哪个属性是排序的关键,也是heap.down和heap.up中使用的
func (pq waitForPriorityQueue) Less(i, j int) bool {
 return pq[i].readyAt.Before(pq[j].readyAt)
}
func (pq waitForPriorityQueue) Swap(i, j int) {
 pq[i], pq[j] = pq[j], pq[i]
 pq[i].index = i
 pq[j].index = j
}
// push 和pop 必须使用heap.push 和heap.pop
func (pq *waitForPriorityQueue) Push(x interface{}) {
 n := len(*pq)
 item := x.(*waitFor)
 item.index = n
 *pq = append(*pq, item)
}


func (pq *waitForPriorityQueue) Pop() interface{} {
 n := len(*pq)
 item := (*pq)[n-1]
 item.index = -1
 *pq = (*pq)[0:(n - 1)]
 return item
}

// Peek returns the item at the beginning of the queue, without removing the
// item or otherwise mutating the queue. It is safe to call directly.
func (pq waitForPriorityQueue) Peek() interface{} {
 return pq[0]
}

而整个延迟队列的核心就是 waitingLoop,作为了延迟队列的主要逻辑,检查 waitingForAddCh 有没有要延迟的内容,取出延迟的内容放置到 Heap 中;以及保证最大的阻塞周期

func (q *delayingType) waitingLoop() {
 defer utilruntime.HandleCrash()
 never := make(<-chan time.Time) // 作为占位符
 var nextReadyAtTimer clock.Timer // 最近一个任务要执行的定时器
 waitingForQueue := &waitForPriorityQueue{} // 优先级队列,heap
 heap.Init(waitingForQueue)
 waitingEntryByData := map[t]*waitFor{} // 检查是否反复添加

 for {
  if q.Interface.ShuttingDown() {
   return
  }

  now := q.clock.Now()
  for waitingForQueue.Len() > 0 {
   entry := waitingForQueue.Peek().(*waitFor)
   if entry.readyAt.After(now) {
    break // 时间没到则不处理
   }

   entry = heap.Pop(waitingForQueue).(*waitFor) // 从优先级队列中取出一个
   q.Add(entry.data) // 添加到延迟队列中
   delete(waitingEntryByData, entry.data) // 删除map表中的数据
  }

  // 如果存在数据则设置最近一个内容要执行的定时器
  nextReadyAt := never
  if waitingForQueue.Len() > 0 {
   if nextReadyAtTimer != nil {
    nextReadyAtTimer.Stop()
   }
   entry := waitingForQueue.Peek().(*waitFor) // 窥视[0]和值
   nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) // 创建一个定时器
   nextReadyAt = nextReadyAtTimer.C()
  }

  select {
  case <-q.stopCh: // 退出
   return
  case <-q.heartbeat.C(): // 多久没有任何动作时重新一次循环
  case <-nextReadyAt: // 如果有元素时间到了,则继续执行循环,处理上面添加的操作
  case waitEntry := <-q.waitingForAddCh:
   if waitEntry.readyAt.After(q.clock.Now()) { // 时间没到,是用readyAt和now对比time.Now
    // 添加到延迟队列中,有两个 waitingEntryByData waitingForQueue
    insert(waitingForQueue, waitingEntryByData, waitEntry)
   } else {
    q.Add(waitEntry.data)
   }

   drained := false // 保证可以取完q.waitingForAddCh // addafter
   for !drained {
    select {
                // 这里是一个有buffer的队列,需要保障这个队列读完
    case waitEntry := <-q.waitingForAddCh: 
     if waitEntry.readyAt.After(q.clock.Now()) {
      insert(waitingForQueue, waitingEntryByData, waitEntry)
     } else {
      q.Add(waitEntry.data)
     }
    default// 保证可以退出,但限制于上一个分支的0~n的读取
    // 如果上一个分支阻塞,则为没有数据就是取尽了,走到这个分支
    // 如果上个分支不阻塞则读取到上个分支阻塞为止,代表阻塞,则走default退出
     drained = true
    }
   }
  }
 }
}

限速队列

限速队列 RateLimiting 是在优先级队列是在延迟队列的基础上进行扩展的一个队列

type RateLimitingInterface interface {
 DelayingInterface // 继承延迟队列
 // 在限速器准备完成后(即合规后)添加条目到队列中
 AddRateLimited(item interface{})
 // drop掉条目,无论成功或失败
 Forget(item interface{})
 // 被重新放入队列中的次数
 NumRequeues(item interface{}) int
}

可以看到一个限速队列的抽象对应只要满足了 AddRateLimited() , Forget() , NumRequeues() 的延迟队列都是限速队列。看了解规则之后,需要对具体的实现进行分析。

type rateLimitingType struct {
 DelayingInterface
 rateLimiter RateLimiter
}

func (q *rateLimitingType) AddRateLimited(item interface{}) {
 q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

func (q *rateLimitingType) NumRequeues(item interface{}) int {
 return q.rateLimiter.NumRequeues(item)
}

func (q *rateLimitingType) Forget(item interface{}) {
 q.rateLimiter.Forget(item)
}
rateLimitingType` 则是对抽象规范 `RateLimitingInterface` 的实现,可以看出是在延迟队列的基础上增加了一个限速器 `RateLimiter

type RateLimiter interface {
 // when决定等待多长时间
 When(item interface{}) time.Duration
 // drop掉item
 // or for success, we'll stop tracking it
 Forget(item interface{})
 // 重新加入队列中的次数
 NumRequeues(item interface{}) int
}

抽象限速器的实现,有 BucketRateLimiter , ItemBucketRateLimiter , ItemExponentialFailureRateLimiter , ItemFastSlowRateLimiter ,  MaxOfRateLimiter ,下面对这些限速器进行分析

BucketRateLimiter

BucketRateLimiter 是实现 rate.Limiter 与 抽象 RateLimiter 的一个令牌桶,初始化时通过 workqueue.DefaultControllerRateLimiter() 进行初始化。

func DefaultControllerRateLimiter() RateLimiter {
 return NewMaxOfRateLimiter(
  NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
  // 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
  &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
 )
}

更多关于令牌桶算法可以参考这里[3]

ItemBucketRateLimiter

ItemBucketRateLimiter 是作为列表存储每个令牌桶的实现,每个 key 都是单独的限速器

type ItemBucketRateLimiter struct {
 r     rate.Limit
 burst int

 limitersLock sync.Mutex
 limiters     map[interface{}]*rate.Limiter
}

func NewItemBucketRateLimiter(r rate.Limit, burst int) *ItemBucketRateLimiter {
 return &ItemBucketRateLimiter{
  r:        r,
  burst:    burst,
  limiters: make(map[interface{}]*rate.Limiter),
 }
}

ItemExponentialFailureRateLimiter

如名所知 ItemExponentialFailureRateLimiter 限速器是一个错误指数限速器,根据错误的次数,将指数用于 delay 的时长,指数的计算公式为:𝑏𝑎𝑠𝑒𝐷𝑒𝑙𝑎𝑦×2<𝑛𝑢𝑚𝑓𝑎𝑖𝑙𝑢𝑟𝑒**𝑠>

。可以看出 When 绝定了流量整形的 delay 时间,根据错误次数为指数进行延长重试时间

type ItemExponentialFailureRateLimiter struct {
 failuresLock sync.Mutex
 failures     map[interface{}]int // 失败的次数

 baseDelay time.Duration // 延迟基数
 maxDelay  time.Duration // 最大延迟
}

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
 r.failuresLock.Lock()
 defer r.failuresLock.Unlock()

 exp := r.failures[item]
 r.failures[item] = r.failures[item] + 1

 // The backoff is capped such that 'calculated' value never overflows.
 backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2float64(exp))
 if backoff > math.MaxInt64 {
  return r.maxDelay
 }

 calculated := time.Duration(backoff)
 if calculated > r.maxDelay {
  return r.maxDelay
 }

 return calculated
}

func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
 r.failuresLock.Lock()
 defer r.failuresLock.Unlock()

 return r.failures[item]
}

func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
 r.failuresLock.Lock()
 defer r.failuresLock.Unlock()

 delete(r.failures, item)
}

ItemFastSlowRateLimiter

ItemFastSlowRateLimiter ,限速器先快速重试一定次数,然后慢速重试

type ItemFastSlowRateLimiter struct {
 failuresLock sync.Mutex
 failures     map[interface{}]int

 maxFastAttempts int // 最大尝试次数
 fastDelay       time.Duration // 快的速度
 slowDelay       time.Duration // 慢的速度
}


func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
 return &ItemFastSlowRateLimiter{
  failures:        map[interface{}]int{},
  fastDelay:       fastDelay,
  slowDelay:       slowDelay,
  maxFastAttempts: maxFastAttempts,
 }
}

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
 r.failuresLock.Lock()
 defer r.failuresLock.Unlock()

 r.failures[item] = r.failures[item] + 1
 // 当错误次数没超过快速的阈值使用快速,否则使用慢速
 if r.failures[item] <= r.maxFastAttempts {
  return r.fastDelay
 }

 return r.slowDelay
}

func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
 r.failuresLock.Lock()
 defer r.failuresLock.Unlock()

 return r.failures[item]
}

func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
 r.failuresLock.Lock()
 defer r.failuresLock.Unlock()

 delete(r.failures, item)
}

MaxOfRateLimiter

MaxOfRateLimiter 是返回限速器列表中,延迟最大的那个限速器

type MaxOfRateLimiter struct {
 limiters []RateLimiter
}

func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
 ret := time.Duration(0)
 for _, limiter := range r.limiters {
  curr := limiter.When(item)
  if curr > ret {
   ret = curr
  }
 }

 return ret
}

func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
 return &MaxOfRateLimiter{limiters: limiters}
}

func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
 ret := 0
    // 找到列表内所有的NumRequeues(失败的次数),以最多次的为主。 
 for _, limiter := range r.limiters {
  curr := limiter.NumRequeues(item)
  if curr > ret {
   ret = curr
  }
 }

 return ret
}

func (r *MaxOfRateLimiter) Forget(item interface{}) {
 for _, limiter := range r.limiters {
  limiter.Forget(item)
 }
}

如何使用 Kubernetes 的限速器

基于流量管制的限速队列实例,可以大量突发,但是需要进行整形,添加操作会根据 When() 中设计的需要等待的时间进行添加。根据不同的队列实现不同方式的延迟

package main

import (
 "fmt"
 "log"
 "strconv"
 "time"

 "k8s.io/client-go/util/workqueue"
)

func main() {
 stopCh := make(chan string)
 timeLayout := "2006-01-02:15:04:05.0000"
 limiter := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
 length := 20 // 一共请求20次
 chs := make([]chan string, length)
 for i := 0; i < length; i++ {
  chs[i] = make(chan string1)
  go func(taskId string, ch chan string) {
   item := "Task-" + taskId + time.Now().Format(timeLayout)
   log.Println(item + " Added.")
            limiter.AddRateLimited(item) // 添加会根据When() 延迟添加到工作队列中

  }(strconv.FormatInt(int64(i), 10), chs[i])

  go func() {
   for {
    key, quit := limiter.Get()
    if quit {
     return
    }
    log.Println(fmt.Sprintf("%s process done", key))
    defer limiter.Done(key)

   }
  }()
 }
 <-stopCh
}

因为默认的限速器不支持初始化 QPS,修改源码内的为 𝐵𝑇(1,5),执行结果可以看出,大突发流量时,超过桶内 token 数时,会根据 token 生成的速度进行放行。

图中,任务的添加是突发性的,日志打印的是同时添加,但是在添加前输出的日志,消费端可以看到实际是被延迟了。配置的是每秒一个 token,实际上放行流量也是每秒一个 token。

引用链接

[1]

golang priorityQueue: https://pkg.go.dev/container/heap#example__priorityQueue

[2]

heap: https://www.cpp.edu/~ftang/courses/CS241/notes/heap.htm

[3]

更多关于令牌桶算法可以参考这里: https://www.cnblogs.com/Cylon/p/16379709.html




你可能还喜欢

点击下方图片即可阅读

会玩,macOS 使用指纹解锁 sudo 密码

2022-06-19

Kubernetes 中跨 StorageClass 迁移存储完全指南

2022-06-18

系统的过载(Overload)以及处理思路

2022-06-14

Kubernetes 容器和镜像 GC 原理解析

2022-06-13


云原生是一种信仰 🤘

关注公众号

后台回复◉k8s◉获取史上最方便快捷的 Kubernetes 高可用部署工具,只需一条命令,连 ssh 都不需要!



点击 "阅读原文" 获取更好的阅读体验!


发现朋友圈变“安静”了吗?

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存