This page looks best with JavaScript enabled

Kubernetes util tool 常用的 backoff 組合技

 ·  ☕ 8 min read

首先本文所有的 source code 基於 kubernetes 1.19 版本,所有 source code 為了版面的整潔會精簡掉部分 log 相關的程式碼,僅保留核心邏輯,如果有見解錯誤的地方,還麻煩觀看本文的大大們提出,感謝!

本篇文章仍然跟前幾章節 使用 Backoff 指數大漲 以及 使用 Backoff 抖了一下 有著密切的關係,繼續把這條任務解完,以下文章需要前兩章的知識作為鋪墊若有不了解的地方可以透過連結回到前兩章複習相關概念,廢話不多說就直接開始吧!

BackoffUntil

這個 BackoffUntil function 做的事情很簡單,就是透過相隔 backoff manager 所給定的時間觸發使用者所指定的 function ,若是 stop channel 被關閉了就結束整個 BackoffUntil function 的生命週期,來看在 kubernetes是如何實作的吧。
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager.
//
// If sliding is true, the period is computed after f runs. If it is false then
// period includes the runtime for f.
func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
	var t clock.Timer                            //先定義一個空的 timer 等等用來接 backoffmanger 算出的 timer
	for {                                        //無限迴圈開始
		select {
		case <-stopCh:                           //當收到 stop channel 時關閉整個 BackoffUntil function 的生命週期
			return
		default:                                 //防止 channel卡住
		}
		//如果 sliding 不開啟的話,會先從 BackoffManger 算出 timer
		//仔細看下面的話會發現 timer 的時間會包含執行使用者所指定的 function 後才等待 timer 的 ticker 
		if !sliding {                            
        
			t = backoff.Backoff()
		}
        
		//這裡的做法就是卻跑每次執行使用者所指定的 Function 後都會跑入 defer確保panic 處理
		func() {
			defer runtime.HandleCrash()
			f()
		}()

		//那一個if !sliding 有點類似 ,差別在於這裡是執行完使用者所指定的 function 後才算出 timer , timer 等待時間不包含執行使用者的 function 
		if sliding {
			t = backoff.Backoff()
		}
		//在 golang select channel 有個小問題
		//當 select A channel , B channel , C channel 時
		//A B C channel 都剛剛好都有訊號同時到達那 select channel 會選哪一個呢?
		//在 golang 的世界中答案是隨機的, A B C channel 哪一個都有可能被選到xD
		
		//當然 kubernetes 的開發者們一定也知道這個問題,在這裡就有了相應的註解
		//我這裡就保留原始的註解,整段註解的大意大概是如果 stop channel 與 ticker channel 同時到達
		//因為golnag select chennel 機制剛好選中 ticker channel 那會造成使用者指定的 function 多跑一次,這樣是不符合預期的行為。
		//因此在for loop 的一開始會判斷 stop channel 是否有訊號
		//用來防止 stop channel 與 ticker channel 同時到達並且golang select channel 剛好選中 ticker 的問題
		
        
        
		// NOTE: b/c there is no priority selection in golang
		// it is possible for this to race, meaning we could
		// trigger t.C and stopCh, and t.C select falls through.
		// In order to mitigate we re-check stopCh at the beginning
		// of every loop to prevent extra executions of f().
		select {
		case <-stopCh:
			return
		case <-t.C():
		}
	}
}

example

範例簡單的帶一下怎麼使用這個 function ,我們把重點看在 wait.BackoffUntil 就好其他就先不要管,用法十分簡單直接來看 code。

下面的 source code 可能會有些小夥伴覺得很熟悉,對這裡就是之前花了滿大一個篇幅在介紹的 kubenretes controler ,我們可以再複習一下相關的用法!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	// wait.BackoffUntil 要求輸入一個 function 後 wait.BackoffUntil 會間隔 backoffManager 所算出的時間
	// 週期性的執行輸入的 function ,如果接收到 stopCh 的訊號就退出
	// 在這裡輸入的 function 就是 Reflector 的 listAndWatch 了
    // 複習一下 listwatch 負責觀測 kubernetes 資源例如 pod , configmap ,secret .e.t.c
	wait.BackoffUntil(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
	klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

JitterUntil

剛剛看了 BackoffUntil function 主要是透過 backoff manger 計算出 ticker 時間,並且依據是否收到 stop channel 作為是否要結束 BackoffUntil function 的生命週期。

在 kubernetes 中開發者通常不會直接使用 BackoffUntil function 而是使用 jitter 讓所要執行的 function 在 backoff manger 計算時多考慮 jitter 的抖動,實作上也相當簡單,一來Backoff Manger 的實作抽換在這裡抽換成 NewJitteredBackoffManager
source code

1
2
3
4
5
6
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
	//BackoffUntil 使用方法在上一小節有說明過,還不了解的朋友請向上滑動複習
    //NewJitteredBackoffManager 在前兩個章節有詳細說明
    //所以這裡的 timer 計算實作是由 JitteredBackoffManager 實作的,其餘的用法都是一樣。
	BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh)
}

exmaple

範例簡單的帶一下怎麼使用這個 function ,我們把重點看在 wait.JitterUntil 就好其他就先不要管,用法十分簡單直接來看 code。

範例是一個 LeaderElector 的 acquire 去定時的取得 kubernetes 上的資源鎖,細節在本篇不去探討有興趣的小夥伴我在早期的文章 kubernetes 分散式資源鎖 有分享過怎麼在 kubernetes 內實作一個分散是資源鎖,有興趣的可以去看看。
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (le *LeaderElector) acquire(ctx context.Context) bool {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	succeeded := false
	desc := le.config.Lock.Describe()
	klog.Infof("attempting to acquire leader lease %v...", desc)
	// 這裡直接指定一個 function ,在這裡是一個匿名函數做的事情大概是嘗試取得/更新物件
	// 從輸入的幾個參數來看就是每 le.config.RetryPeriod 執行一次 嘗試取得/更新物件的匿名函數
	// 間隔時間的抖動因子由 JitterFactor 決定 ,這裡就決定了最大的間隔時間了
	// 這個 function Sliding (Sliding = true)表示 backofftime 包含了執行使用者自訂的 function 時間。
	// 以及最後 context.done 的部份決定了,這個 wait.JitterUntil function 的生命週期跟著整個 process 而不是某一個 channel 訊號。
	wait.JitterUntil(func() {
		succeeded = le.tryAcquireOrRenew(ctx)
		le.maybeReportTransition()
		if !succeeded {
			klog.V(4).Infof("failed to acquire lease %v", desc)
			return
		}
		le.config.Lock.RecordEvent("became leader")
		le.metrics.leaderOn(le.config.Name)
		klog.Infof("successfully acquired lease %v", desc)
		cancel()
	}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
	return succeeded
}

NonSlidingUntil

NoSlidingUntil function 是對 JitterUntil function 進行簡單的封裝,我們很簡單的帶過去,主要就是把 Sliding 的功能強制關掉,忘記 Sliding是什麼的我們簡單複習一下。

當禁用 Sliding (Sliding = false)表示 backofftime 不包含了執行使用者自訂的 function 時間。
source code

1
2
3
func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) {
	JitterUntil(f, period, 0.0, false, stopCh)
}

exmaple

範例簡單的帶一下怎麼使用這個 function ,我們把重點看在 wait.NonSlidingUntil 就好其他就先不要管,用法十分簡單直接來看 code。

範例是一個cloud provider 的 RouteController 去定時的執行 reconcileNodeRoutes 也就是同步節點上的路由資訊,細節我們不去深入研究把焦點擺在怎麼使用這個工具就好。
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (rc *RouteController) Run(stopCh <-chan struct{}, syncPeriod time.Duration) {
	defer utilruntime.HandleCrash()

	klog.Info("Starting route controller")
	defer klog.Info("Shutting down route controller")
	...
    // 這裡透過 goroutine 直接指定一個 function ,在這裡就是 rc.reconcileNodeRoutes() 並且判斷執行的結果
    // 若是有錯誤就 log 出相關的錯誤訊息
	// 從輸入的幾個參數來看就是每 syncPeriod 執行一次 rc.reconcileNodeRoutes() ,如果 stop channel 收到訊息那 就會結束 wait.Until 的生命週期
    //使用 wait.NonSlidingUntil 這個 function 表示表示 backofftime 不包含了執行使用者自訂的 function 時間。
	go wait.NonSlidingUntil(func() {
		if err := rc.reconcileNodeRoutes(); err != nil {
			klog.Errorf("Couldn't reconcile node routes: %v", err)
		}
	}, syncPeriod, stopCh)

	<-stopCh
}

Until

Unti function 也是對 JitterUntil function 進行簡單的封裝,我們很簡單的帶過去,主要就是把 Sliding 的功能強制開啟,忘記 Sliding 是什麼的我們簡單複習一下。

當啟用 Sliding (Sliding = true)表示 backofftime 包含了執行使用者自訂的 function 時間。
source code

1
2
3
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
	JitterUntil(f, period, 0.0, true, stopCh)
}

example

範例簡單的帶一下怎麼使用這個 function ,我們把重點看在 wait.Until 就好其他就先不要管,用法十分簡單直接來看 code。

範例是一個 CRDFinalizer 去啟動指定的 work 讓他工作,
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (c *CRDFinalizer) Run(workers int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	defer c.queue.ShutDown()

	klog.Infof("Starting CRDFinalizer")
	defer klog.Infof("Shutting down CRDFinalizer")

	if !cache.WaitForCacheSync(stopCh, c.crdSynced) {
		return
	}
	// 這裡透過 goroutine 直接指定一個 function ,在這裡就是 c.runWorker
	// 從輸入的幾個參數來看就是每秒執行一次 c.runWorker ,如果 stop channel 收到訊息那 就會結束 wait.Until 的生命週期
	for i := 0; i < workers; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}

	<-stopCh
}

Forever

Forever function 是對 Until 進行簡單的封裝,簡單來看就是執行一個永遠不會中止且定時觸發的 function。
source code

1
2
3
4
var NeverStop <-chan struct{} = make(chan struct{})
func Forever(f func(), period time.Duration) {
	Until(f, period, NeverStop)
}

example

source code

範例簡單的帶一下怎麼使用這個 function ,我們把重點看在 wait.Forever 就好其他就先不要管,用法十分簡單直接來看 code。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (l *log) Check(_ *http.Request) error {
	l.startOnce.Do(func() {
		l.lastVerified.Store(time.Now())
		// 這裡透過 goroutine 直接指定一個 function 
		// function 的內容看說來就是把 log flush以及 做時間的儲存。
		// 並且這個 function 每一分鐘執行一次。        
		go wait.Forever(func() {
			klog.Flush()
			l.lastVerified.Store(time.Now())
		}, time.Minute)
	})

	lastVerified := l.lastVerified.Load().(time.Time)
	if time.Since(lastVerified) < (2 * time.Minute) {
		return nil
	}
	return fmt.Errorf("logging blocked")
}

JitterUntilWithContext

這就表示定時會呼叫使用者指定的 function ,若是外面有人將 context cancel 掉,那就會結束 wait.JitterUntilWithContext 的生命週期。基本上複用了 JitterUntil 讓使用者可以自行決定 Sliding 與 jitterFactor 。

1
2
3
func JitterUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration, jitterFactor float64, sliding bool) {
	JitterUntil(func() { f(ctx) }, period, jitterFactor, sliding, ctx.Done())
}

example

範例簡單的帶一下怎麼使用這個 function ,我們把重點看在 wait.JitterUntilWithContext 就好其他就先不要管,用法十分簡單直接來看 code。

在這格 function 透過 context.WithCancel(context.Background()) 啟動了一個 context,接著拿到 endpoint slice 透過 for 迴圈遞迴每個 endpoint ,透過 goroutine 呼叫 wait.JitterUntilWithContext 並且帶入檢查 endpoint status 的 function ,當啟用 Sliding (Sliding = true)表示 backofftime 包含了執行使用者自訂的 function 時間。

這就表示定時會呼叫指定的 function ,這個例子就是檢查 endpoint status 。若是外面有人將 context cancel 掉,那就會結束 wait.JitterUntilWithContext 的生命週期。

source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func startDBSizeMonitorPerEndpoint(client *clientv3.Client, interval time.Duration) (func(), error) {
	...

	ctx, cancel := context.WithCancel(context.Background())
	for _, ep := range client.Endpoints() {
		if _, found := dbMetricsMonitors[ep]; found {
			continue
		}
		dbMetricsMonitors[ep] = struct{}{}
		endpoint := ep
		klog.V(4).Infof("Start monitoring storage db size metric for endpoint %s with polling interval %v", endpoint, interval)
		go wait.JitterUntilWithContext(ctx, func(context.Context) {
			epStatus, err := client.Maintenance.Status(ctx, endpoint)
			if err != nil {
				klog.V(4).Infof("Failed to get storage db size for ep %s: %v", endpoint, err)
				metrics.UpdateEtcdDbSize(endpoint, -1)
			} else {
				metrics.UpdateEtcdDbSize(endpoint, epStatus.DbSize)
			}
		}, interval, dbMetricsMonitorJitter, true)
	}

	return func() {
		cancel()
	}, nil
}

UntilWithContext

這就表示定時會呼叫使用者指定的 function ,若是外面有人將 context cancel 掉,那就會結束 wait.UntilWithContext 的生命週期。基本上複用了 JitterUntilWithContext 差別在每此重新呼叫 function 的間隔時間有沒有抖動而已。

1
2
3
func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
	JitterUntilWithContext(ctx, f, period, 0.0, true)
}

exmaple

很可惜 kubernetes 內沒有使用這個 function 我們可以透過 test code 來觀摩怎麼使用這個 function 。

題外話我覺得這個測試寫得真不錯,沒想過可以這樣測試 !!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func TestUntilWithContext(t *testing.T) {
	//建立 with cancel 的 context
	ctx, cancel := context.WithCancel(context.TODO())
	//直接種 context 的生命週期
	cancel()
	//這時候在透過 UntilWithContext 呼叫我們自定義的 function 
	//記得....因為 context 生命週期已經死掉了照理來說不會觸發我們自訂的 function 。    
	UntilWithContext(ctx, func(context.Context) {
		t.Fatal("should not have been invoked")
	}, 0)
	
    
	//建立 with cancel 的 context
	ctx, cancel = context.WithCancel(context.TODO())
	//建立取消的  channel 
	called := make(chan struct{})
	//啟動一個 go routine ,主要透過異步的經由 UntilWithContext 定時呼叫自定義的 function 
	//因為 main thread 會卡在 <-called,當執行到 goroutine 時會送訊號到  called  channel,讓外部收到
	go func() {
		UntilWithContext(ctx, func(context.Context) {
			called <- struct{}{}
		}, 0)
		//發送完訊號後就關閉 channel 
		close(called)
	}()
	//收到 goroutine才會往下繼續執行
	<-called
    //關閉 context 的生命週期讓 UntilWithContext  不再執行我們自定義的 function 
	cancel()
	//收到關閉  channel 的訊號
	<-called
}

小結

Kubernetes 內好用工具非常非常多讓我們不需要造輪子,但是開發者要記住的一個鐵則是盡信書不如無書,前人為我們鋪好的路我們需要了解是怎麼鋪路的,有沒有更好的工法。承襲著前人的智慧將問題以更快速且更好的方式解決。

上述分享的內容中間可能會有錯誤的見解希望各位在觀看文章的大大們可以指出哪裡有問題,讓我學習改進,謝謝。


Meng Ze Li
WRITTEN BY
Meng Ze Li
Kubernetes / DevOps / Backend

What's on this Page