This page looks best with JavaScript enabled

Kubernetes Bucket Rate Limiter 設計真d不錯

 ·  ☕ 5 min read

首先本文所以 source code 基於 kubernetes 1.19 版本,所有 source code 的為了版面的整潔會精簡掉部分 log 相關的程式碼,僅保留核心邏輯,如果有見解錯誤的地方,還麻煩觀看本文的大大們提出,感謝!

kubernetes work queue

Kubernetes controller/operator 是一個非常精彩的設計模式,在了解Kubernetes controller/operator 怎麼撰寫之前,了解kubernetes work queue的實作模式是非常重要的,下面引用了How to Create a Kubernetes Custom Controller Using client-go的 controller 架構圖可以看到在 sharedindexinformer 內有引用到這個元件,這個元件實際被定義在 kubernetes 的 client-go library 中。


圖片來源:How to Create a Kubernetes Custom Controller Using client-go

Kubernetes 為什麼要實踐一個 work queue 呢?就我們所知 kubernetes 是用 go 撰寫應該可以使用 channel 的機制直接將物件送給要用的元件(thread)啊,原因其實非常簡單,go channel 的設計功能非常單一無法滿足 kubernetes 所要的場景,例如帶有延遲時間物件需要根據延遲時間排序的queue,例如限制物件取出速度的queue。

之先前的章節有提到 common work queue 、 delaying work queue 、 ratelimiting queue,但 ratelimiting queue 有組合 ratelimiter ,本章節將有實作 ratelimiter 同時也滿常被用到的 BucketRateLimiter 展開解說。

BucketRateLimiter

先來看看 BucketRateLimiter 的 UML 圖,很清楚可以看得出來他實作的 RateLimiter interface 已經嵌入了一個 golang rate package 的 Limiter(也就是固定速度qps限速器,有興趣的朋友可以自行深入閱讀golang的實作方式)

interface

kubernetes source code 設計得非常精美,我們可以先從 interface 定義了哪些方法來推敲實作這個 interface 的物件可能有什麼功能。

source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type RateLimiter interface {
	// When gets an item and gets to decide how long that item should wait
    //當一個物件放入的時候,需要回傳延遲多久(可自定義規則,等等會看到)
	When(item interface{}) time.Duration
    
    
	// Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing	    
    // or for success, we'll stop tracking it
    //當一個物件完成的時候可以,要忘記曾經延遲過(重新計算)
    Forget(item interface{})
    
    
	// NumRequeues returns back how many failures the item has had
    // 回傳物件已經放入幾次(重試了幾次,白話一點呼叫NumRequeues幾次)
	NumRequeues(item interface{}) int
}

看完了抽象的定義之後,必須要回過來看 Bucket Rate Limiter queue 實際物件定義了哪些屬性

struct

source code

1
2
3
4
5
// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
type BucketRateLimiter struct {
	*rate.Limiter        //BucketRateLimiter嵌入了golang.org.x.time.rate.Limiter
                            //也就是固定速度qps限速器,有興趣的朋友可以自行深入閱讀golang的實作方式
}

看完了資料結構我們接著來看 BucketRateLimiter 實作的方法,與初始化方法。

new function

source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
//不知道為什麼要設定一個空的變數
var _ RateLimiter = &BucketRateLimiter{}

// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue.  It has
// both overall and per-item rate limiting.  The overall is a token bucket and the per-item is exponential
func DefaultControllerRateLimiter() RateLimiter {
        //這裡使用到上一章節提到得MaxOfRateLimiter,這裡以兩個RateLimiter為主
        //一個是ItemExponentialFailureRateLimiter,例外一個是本篇的主角BucketRateLimiter
	return BucketRateLimiter(
                //前幾篇有轉們講解有興趣的朋友歡迎回到前幾章節複習
		NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
		// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
                //這裏設定了golang  golang.org.x.time.rate.Limiter 的吞吐速度
                // 設定了 10 個 qps 以及 100 個 bucket
		&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
	)
}

implement function

看完了初始化 BucketRateLimiter 後接下來看看核心的功能。

When

source code

1
2
3
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
	return r.Limiter.Reserve().Delay()    // golang.org.x.time.rate.Limiter實作的這個延遲會是個固定的周期(依照qps以及bucket而定)
}

NumRequeues

當我們需要知道物件已經重試了幾次可以透過NumRequeues function 得知物件重是的次數。
source code

1
2
3
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
	return 0        //因為是固定的頻率,所以我們不需要管物件重次了幾次
}

Forget

當物件做完時需要重新計算放延遲時間與放入次數,需要透過Forget function完成。
source code

1
2
3
func (r *BucketRateLimiter) Forget(item interface{}) {
                    //因為是固定速率所以我們也不需要去管要不要重新處理物件
}

怎麼使用

對於 BucketRateLimiter 物件而言,他只是實作了 RateLimiter interface,使用者要怎麼用這個 Rate Limiter queue 呢?

上一篇有提到 RateLimiter 的初始化方法

1
2
3
4
5
6
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
	return &rateLimitingType{
		DelayingInterface: NewDelayingQueue(),        //前一小節有提到過delating work queue的newfunction
		rateLimiter:       rateLimiter,               //自行實作的rateLimiter
	}
}

使用者可以在傳入參數帶入實作 RateLimiter interface的 ItemExponentialFailureRateLimiter 物件

1
NewRateLimitingQueue(DefaultControllerRateLimiter())

表示使用者要求的 Rate Limiter queue 用了 DelayingQueueBucketRateLimiterItemExponentialFailureRateLimiter

  1. 物件延遲時間由 ItemExponentialFailureRateLimiterBucketRateLimiter 決定
  2. 物件延遲的排序方式由 DelayingQueue 決定(之前有提過用heap加上clock來觸發)
  3. 存放物件的 queue 由 common queue 決定(之前有提過用 processing set 加上 dirty set 合力完成)

大致上流程是這樣,不清楚的地方可以回去複習之前提到過的元件

小結

終於把 kubernetes work queue 的部分梳理完,小小一個 work queue 有如此多實作細節與方式
從 common work queue 如何保證下一次 add 進來的物件就有被取走還沒做完,以及還沒被取走的事情要考慮。
以及 delaying work queue 如何排序一個 延遲物件 ,讓延遲時間最短的物件排在最前面

另外 RateLimiter work queue 展現了設計模式代理了 delaying work queue 以及組合了 rateLimiter ,讓邏輯分離 rateLimiter 產出物件需要延遲多久,交給 delaying work queue 進行排序。

kubernetes底層設計的非常精美,透過閱讀程式碼的方式提升自己對kubernetes的了解,若文章有錯的部分希望大大們指出,謝謝!


Meng Ze Li
WRITTEN BY
Meng Ze Li
Kubernetes / DevOps / Backend

What's on this Page