This page looks best with JavaScript enabled

Kubernetes Max Of Rate Limiter 設計真d不錯

 ·  ☕ 4 min read

首先本文以 source code 基於 kubernetes 1.19 版本,所有 source code 的為了版面的整潔會精簡掉部分 log 相關的程式碼,僅保留核心邏輯,如果有見解錯誤的地方,還麻煩觀看本文的大大們提出,感謝!

kubernetes work queue

Kubernetes controller/operator 是一個非常精彩的設計模式,在了解Kubernetes controller/operator 怎麼撰寫之前,了解kubernetes work queue的實作模式是非常重要的,下面引用了How to Create a Kubernetes Custom Controller Using client-go的 controller 架構圖可以看到在 sharedindexinformer 內有引用到這個元件,這個元件實際被定義在 kubernetes 的 client-go library 中。


圖片來源:How to Create a Kubernetes Custom Controller Using client-go

Kubernetes 為什麼要實踐一個 work queue 呢?就我們所知 kubernetes 是用 go 撰寫應該可以使用 channel 的機制直接將物件送給要用的元件(thread)啊,原因其實非常簡單,go channel 的設計功能非常單一無法滿足 kubernetes 所要的場景,例如帶有延遲時間物件需要根據延遲時間排序的queue,例如限制物件取出速度的queue。

之先前的章節有提到 common work queue 、 delaying work queue 、 ratelimiting queue,但 ratelimiting queue 有組合 ratelimiter ,本章節將有實作 ratelimiter 同時也滿常被用到的 MaxOfRateLimiter 展開解說。

MaxOfRateLimiter

interface

kubernetes source code 設計得非常精美,我們可以先從 interface 定義了哪些方法來推敲實作這個 interface 的物件可能有什麼功能。

source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type RateLimiter interface {
	// When gets an item and gets to decide how long that item should wait
    //當一個物件放入的時候,需要回傳延遲多久(可自定義規則,等等會看到)
	When(item interface{}) time.Duration
    
    
	// Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing	    
    // or for success, we'll stop tracking it
    //當一個物件完成的時候可以,要忘記曾經延遲過(重新計算)
    Forget(item interface{})
    
    
	// NumRequeues returns back how many failures the item has had
    // 回傳物件已經放入幾次(重試了幾次,白話一點呼叫NumRequeues幾次)
	NumRequeues(item interface{}) int
}

看完了抽象的定義之後,必須要回過來看 Max Of RateLimiter queue 實際物件定義了哪些屬性

struct

source code

1
2
3
4
5
6
// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
// were separately delayed a longer time.
type MaxOfRateLimiter struct {
	limiters []RateLimiter     //定義一個實作集合RateLimiter比如前幾個章節有提到的 ItemExponentialFailureRateLimiter、ItemFastSlowRateLimiter都可以放到這個集合內
}

看完了資料結構我們接著來看 MaxOfRateLimiter 實作的方法,與初始化方法。

new function

source code

1
2
3
4
//傳入實作集合RateLimiter比如前幾個章節有提到的 ItemExponentialFailureRateLimiter、ItemFastSlowRateLimiter都可以放到這個集合內
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
	return &MaxOfRateLimiter{limiters: limiters}    
}

implement function

看完了初始化 MaxOfRateLimiter 後接下來看看核心的功能。

When

source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
	ret := time.Duration(0)
	for _, limiter := range r.limiters {
		curr := limiter.When(item)    //把集合內所有的when都跑跑看,看看延遲時間哪個比較大,用大的延遲時間為主。
		if curr > ret {
			ret = curr
		}
	}

	return ret
}

NumRequeues

當我們需要知道物件已經重試了幾次可以透過NumRequeues function 得知物件重是的次數。
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
	ret := 0
	for _, limiter := range r.limiters {
		curr := limiter.NumRequeues(item)    //把集合內所有的NumRequeues都跑跑看,看看哪個用了最多次,以最多次那個為主。 
		if curr > ret {
			ret = curr
		}
	}

	return ret
}

Forget

當物件做完時需要重新計算放延遲時間與放入次數,需要透過Forget function完成。
source code

1
2
3
4
5
6
func (r *MaxOfRateLimiter) Forget(item interface{}) {
	for _, limiter := range r.limiters {    //做完了,讓集合內所有的人重新處理這個物件。
		limiter.Forget(item)
	}
}

怎麼使用

對於 MaxOfRateLimiter 物件而言,他只是實作了 RateLimiter interface,使用者要怎麼用這個 Rate Limiter queue 呢?

上一篇有提到 RateLimiter 的初始化方法

1
2
3
4
5
6
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
	return &rateLimitingType{
		DelayingInterface: NewDelayingQueue(),        //前一小節有提到過delating work queue的newfunction
		rateLimiter:       rateLimiter,               //自行實作的rateLimiter
	}
}

使用者可以在傳入參數帶入實作 RateLimiter interface的 ItemExponentialFailureRateLimiter 物件

1
2
3
4
NewRateLimitingQueue(
    NNewItemFastSlowRateLimiter(5*time.Millisecond, 3*time.Second, 3),
    NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
)

表示使用者要求的 Rate Limiter queue 用了 DelayingQueueItemExponentialFailureRateLimiter以及ItemFastSlowRateLimiter

  1. 物件延遲時間由ItemExponentialFailureRateLimiterItemFastSlowRateLimiter決定
  2. 物件延遲的排序方式由 DelayingQueue 決定(之前有提過用heap加上clock來觸發)
  3. 存放物件的 queue 由 common queue 決定(之前有提過用 processing set 加上 dirty set 合力完成)

大致上流程是這樣,不清楚的地方可以回去複習之前提到過的元件

小結

下一章節將介紹另外一個實作 rateLimiter interface的物件BucketRateLimiter,文章有錯的部分希望大大們指出,謝謝!


Meng Ze Li
WRITTEN BY
Meng Ze Li
Kubernetes / DevOps / Backend

What's on this Page