This page looks best with JavaScript enabled

Kubernetes Item Exponential Failure RateLimiter work queue 設計真d不錯

 ·  ☕ 4 min read

先本文所以 source code 基於 kubernetes 1.19 版本,所有 source code 的為了版面的整潔會精簡掉部分 log 相關的程式碼,僅保留核心邏輯,如果有見解錯誤的地方,還麻煩觀看本文的大大們提出,感謝!

kubernetes work queue

Kubernetes controller/operator 是一個非常精彩的設計模式,在了解Kubernetes controller/operator 怎麼撰寫之前,了解kubernetes work queue的實作模式是非常重要的,下面引用了How to Create a Kubernetes Custom Controller Using client-go的 controller 架構圖可以看到在 sharedindexinformer 內有引用到這個元件,這個元件實際被定義在 kubernetes 的 client-go library 中。

圖片來源:How to Create a Kubernetes Custom Controller Using client-go

Kubernetes 為什麼要實踐一個 work queue 呢?就我們所知 kubernetes 是用 go 撰寫應該可以使用 channel 的機制直接將物件送給要用的元件(thread)啊,原因其實非常簡單,go channel 的設計功能非常單一無法滿足 kubernetes 所要的場景,例如帶有延遲時間物件需要根據延遲時間排序的queue,例如限制物件取出速度的queue。

之先前的章節有提到 common work queue 、 delaying work queue 、 ratelimiting queue,但 ratelimiting queue 有組合 ratelimiter ,本章節將有實作 ratelimiter 同時也滿常被用到的 ItemExponentialFailureRateLimiter 展開解說。

ItemExponentialFailureRateLimiter

interface

kubernetes source code 設計得非常精美,我們可以先從 interface 定義了哪些方法來推敲實作這個 interface 的物件可能有什麼功能。

source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type RateLimiter interface {
	// When gets an item and gets to decide how long that item should wait
    //當一個物件放入的時候,需要回傳延遲多久(可自定義規則,等等會看到)
	When(item interface{}) time.Duration
    
    
	// Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing	    
    // or for success, we'll stop tracking it
    //當一個物件完成的時候可以,要忘記曾經延遲過(重新計算)
    Forget(item interface{})
    
    
	// NumRequeues returns back how many failures the item has had
    // 回傳物件已經放入幾次(重試了幾次,白話一點呼叫NumRequeues幾次)
	NumRequeues(item interface{}) int
}

看完了抽象的定義之後,必須要回過來看 Item Exponential Failure RateLimiter queue 實際物件定義了哪些屬性

struct

source code

1
2
3
4
5
6
7
8
9
// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
// dealing with max failures and expiration are up to the caller
type ItemExponentialFailureRateLimiter struct {
	failuresLock sync.Mutex            //鎖,防止資源競爭
	failures     map[interface{}]int   //計算某個物件呼叫了延遲的次數

	baseDelay time.Duration            //基礎延遲的時間
	maxDelay  time.Duration            //最多要延遲多久
}

看完了資料結構我們接著來看 ItemExponentialFailureRateLimiter 實作的方法,與初始化方法。

new function

source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
//不知道為什麼要設定一個空的變數
var _ RateLimiter = &ItemExponentialFailureRateLimiter{}

//傳入基礎要等待的時間,最大等待時間,以及初始化failures map資料集
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
	return &ItemExponentialFailureRateLimiter{
		failures:  map[interface{}]int{},
		baseDelay: baseDelay,
		maxDelay:  maxDelay,
	}
}

//預設等待時間為 1 Millisecond,最大等待時間為1000秒
func DefaultItemBasedRateLimiter() RateLimiter {
	return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)
}

implement function

看完了初始化 ItemExponentialFailureRateLimiter 後接下來看看核心的功能。

When

source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
	r.failuresLock.Lock()                    //鎖不做解釋
	defer r.failuresLock.Unlock()            //解鎖不做解釋

	exp := r.failures[item]                  //查看map裡面物件放入的次數
	r.failures[item] = r.failures[item] + 1    //放入次數+1

	// The backoff is capped such that 'calculated' value never overflows.
        // backoff=base * 2^(物件放入的次數),代表延遲時間為指數型成長。
	backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))  
        //如果backoff 大於int64(溢位)就回傳最大延遲時間
	if backoff > math.MaxInt64 {
		return r.maxDelay
	}
        //封裝為延遲時間
	calculated := time.Duration(backoff)
        //延遲時間超過最大延遲時間就回傳最大延遲時間
	if calculated > r.maxDelay {
		return r.maxDelay
	}

	return calculated
}

NumRequeues

當我們需要知道物件已經重試了幾次可以透過NumRequeues function 得知物件重是的次數。
source code

1
2
3
4
5
6
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
	r.failuresLock.Lock()                    //鎖不做解釋
	defer r.failuresLock.Unlock()            //解鎖不做解釋

	return r.failures[item]                  //回傳重試次數
}

Forget

當物件做完時需要重新計算放延遲時間與放入次數,需要透過Forget function完成。
source code

1
2
3
4
5
6
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
	r.failuresLock.Lock()                    //鎖不做解釋
	defer r.failuresLock.Unlock()            //解鎖不做解釋

	delete(r.failures, item)                 //刪除map裡的物件(重新計算)
}

怎麼使用

對於 ItemExponentialFailureRateLimiter 物件而言,他只是實作了 RateLimiter interface,使用者要怎麼用這個 Rate Limiter queue 呢?

上一篇有提到 RateLimiter 的初始化方法

1
2
3
4
5
6
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
	return &rateLimitingType{
		DelayingInterface: NewDelayingQueue(),        //前一小節有提到過delating work queue的newfunction
		rateLimiter:       rateLimiter,               //自行實作的rateLimiter
	}
}

使用者可以在傳入參數帶入實作 RateLimiter interface的 ItemExponentialFailureRateLimiter 物件

1
NewRateLimitingQueue(NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)

表示使用者要求的 Rate Limiter queue 用了 DelayingQueueItemExponentialFailureRateLimiter

  1. 物件延遲時間由ItemExponentialFailureRateLimiter決定
  2. 物件延遲的排序方式由 DelayingQueue 決定(之前有提過用heap加上clock來觸發)
  3. 存放物件的 queue 由 common queue 決定(之前有提過用 processing set 加上 dirty set 合力完成)

大致上流程是這樣,不清楚的地方可以回去複習之前提到過的元件

小結

下一章節將介紹另外一個實作 rateLimiter interface的物件ItemFastSlowRateLimiter,文章有錯的部分希望大大們指出,謝謝!


Meng Ze Li
WRITTEN BY
Meng Ze Li
Kubernetes / DevOps / Backend

What's on this Page