This page looks best with JavaScript enabled

Kubernetes delaying work queue 設計真d不錯

 ·  ☕ 8 min read

首先本文所以 source code 基於 kubernetes 1.19 版本,所有 source code 的為了版面的整潔會精簡掉部分 log 相關的程式碼,僅保留核心邏輯,如果有見解錯誤的地方,還麻煩觀看本文的大大們提出,感謝!

kubernetes work queue

在前一篇 kubernetes common work queue 設計真d不錯一文中分享 kubernetes work queue最基礎的實作方式,再複習一次!Kubernetes 為什麼要實踐一個 work queue 呢?

就我們所知 kubernetes 是用 go 撰寫應該可以使用 channel 的機制直接將物件送給要用的元件(thread)啊,原因其實非常簡單,go channel 的設計功能非常單一無法滿足 kubernetes 所要的場景,例如帶有延遲時間物件需要根據延遲時間排序的 queue ,例如限制物件取出速度的 queue 。

圖片來源:How to Create a Kubernetes Custom Controller Using client-go

上圖引用了How to Create a Kubernetes Custom Controller Using client-go的 controller 架構圖可以看到在 sharedindexinformer 內有引用到這個元件,這個元件實際被定義在 kubernetes 的 client-go library 中。

在第五步驟與第六步驟之間透過 queue 不只解偶了上下層的耦合關係同時 Queue 有達到了消峰填谷的作用,當觀察的物件一直送資料進來不會因為我們業務邏輯處理得太慢而卡住,資料會保留在 queue 中直到被取出。

之前有提到了兩種 queue ,分別是 rate limiters queue 以及 delaying queue ,上一章節介紹完 Kubernetes 通用的 common queue ,本篇文章會從 delaying queue 開始探討。

delaying queue

kubernetes source code 設計得非常精美,我們可以先從 interface 定義了哪些方法來推敲實作這個 interface 的物件可能有什麼功能。

interface

source code

1
2
3
4
5
6
7
8
9
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
// 原生的註解寫得非常棒了,大致上意思為一個物件處理失敗,如果很快地物件在被處理一次失敗的可能性還是很高會造成 
// hot-loop,所以讓物件等待一下在排隊進入 queue 就是 delaying queue 的用意
type DelayingInterface interface {
	Interface    //嵌入了common work queue的interface,delaying queue 也是common queue 的一種
	
	AddAfter(item interface{}, duration time.Duration)    //表示物件需要等待多久才能被放入 queue 中
}

看完了抽象的定義之後,必須要回過來看 delaying queue 實際物件定義了哪些屬性

struct

source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// delayingType wraps an Interface and provides delayed re-enquing
type delayingType struct {
	Interface    //嵌入了一個common queue

	// clock tracks time for delayed firing
	clock clock.Clock    //用來比對物件延遲時間

	// stopCh lets us signal a shutdown to the waiting loop
	stopCh chan struct{}    //異步退出用
	// stopOnce guarantees we only signal shutdown a single time
	stopOnce sync.Once    //異步退出用,保證退出只會被呼叫一次

	// heartbeat ensures we wait no more than maxWait before firing
	heartbeat clock.Ticker    //定時器,定時喚醒thread處理物件

	// waitingForAddCh is a buffered channel that feeds waitingForAdd
	waitingForAddCh chan *waitFor //用以添加延遲物件的channel

	// metrics counts the number of retries
	metrics retryMetrics    //用以紀錄重試的metric
}

剛剛上面有一個疑點那就 type waitFor 到底是什麼
我們先來看看 type waitFor 的結構
source code

1
2
3
4
5
6
7
8
// waitFor holds the data to add and the time it should be added
// 如果需要延遲的物件都會被轉換成這個類型
type waitFor struct {
	data    t            // t 在common queue介紹過,為一個泛行表示什麼都接受的物件
	readyAt time.Time    //在什麼時間加入到queue中的
	// index in the priority queue (heap)
	index int            // index會用在後面的排序,延遲時間較小的排前面(用heap排序)
}

heap 排序

由於放入 dealying queue 的物件,有的可能要延遲 1s 有的可能要延遲 2ms等等, dealying queue 如何保證延遲較小的物件先放入 queue 呢?

delaying queue 透過 heap 進行排序,底下讓展開排序的實作方式。

source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

// waitForPriorityQueue implements a priority queue for waitFor items.
//
// waitForPriorityQueue implements heap.Interface. The item occurring next in
// time (i.e., the item with the smallest readyAt) is at the root (index 0).
// Peek returns this minimum item at index 0. Pop returns the minimum item after
// it has been removed from the queue and placed at index Len()-1 by
// container/heap. Push adds an item at index Len(), and container/heap
// percolates it into the correct location.
type waitForPriorityQueue []*waitFor    //waitForPriorityQueue 這個類型實作了 heap interface ,排序的物件為 waitFor

//實作heap interface 的len方法,取出heap當前的長度。
func (pq waitForPriorityQueue) Len() int {
	return len(pq)
}

//實作 heap interface 的 Less 方法,確認在 waitForPriorityQueue 的第 i 個元素是否比第 j 個元素小
// 若是第 i 個元素比第 j 個元素小就交換,因為我們希望,因為我們希望越小的排越前面。
func (pq waitForPriorityQueue) Less(i, j int) bool {
	return pq[i].readyAt.Before(pq[j].readyAt) // 比的是時間
}

//實作 heap interface 的 swap ,實作 i j 交換
func (pq waitForPriorityQueue) Swap(i, j int) {
	pq[i], pq[j] = pq[j], pq[i]
	pq[i].index = i
	pq[j].index = j
}

//實作 heap interface 的 Push ,向 heap 添加物件
func (pq *waitForPriorityQueue) Push(x interface{}) {
	n := len(*pq)
	item := x.(*waitFor)
	item.index = n              //新加入的物件會記錄當前自己的位置
	*pq = append(*pq, item)     //新加入的物件排到heap的最後面
}

//實作 heap interface 的 Pop ,從 heap 的尾巴彈出最後一個物件。
func (pq *waitForPriorityQueue) Pop() interface{} {
	n := len(*pq)
	item := (*pq)[n-1] 
	item.index = -1
	*pq = (*pq)[0:(n - 1)]    //縮小heap,移除最後一個物件
	return item
}

//回傳heap第一個物件,延遲時間最短的那一個物件
func (pq waitForPriorityQueue) Peek() interface{} {
	return pq[0]
}

看完了資料結構我們接著來看 delaying queue 實作的方法,與初始化方法!

new function

source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//給心跳用最久10秒delaying queue會檢查一次
const maxWait = 10 * time.Second

//傳入clock與common queue以及紀錄這個物件metric的名字,這是一個不公開的方法,有其他封裝好的 new function 可以用
func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType {
	ret := &delayingType{
		Interface:       q,
		clock:           clock,
		heartbeat:       clock.NewTicker(maxWait),
		stopCh:          make(chan struct{}),
		waitingForAddCh: make(chan *waitFor, 1000),
		metrics:         newRetryMetrics(name),
	}
    // 啟動一個 thread 檢測有沒有 wiatfor 物件在等待進入 queue,稍後會展開分析。
	go ret.waitingLoop()
	return ret
}

// 不同的封裝方式,不提 (設定 metric 的 name 為空)
func NewDelayingQueue() DelayingInterface {
	return NewDelayingQueueWithCustomClock(clock.RealClock{}, "")
}

//不同的封裝方式,不提 (設定 metric 的 name 為空,可傳入自己實作的 common queue )
func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {
	return newDelayingQueue(clock.RealClock{}, q, name)
}

//不同的封裝方式,不提 (可傳入 metric 的 name )
func NewNamedDelayingQueue(name string) DelayingInterface {
	return NewDelayingQueueWithCustomClock(clock.RealClock{}, name)
}

//不同的封裝方式,不提 (可傳入 metric 的 name 以及 clock )
func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
    // NewNamed 是前一章節提到建立 common queue的方法
	return newDelayingQueue(clock, NewNamed(name), name)
}

implement function

看完了初始化 delaying queue function 後接下來看看核心的功能。
source code

AddAfter

使用者要放入有延遲的物件需要呼叫這個function,帶入要延遲的物件,以及該物件要延遲多久。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// AddAfter adds the given item to the work queue after the given delay
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
	// don't add if we're already shutting down
	if q.ShuttingDown() {    //如果queue關閉了就不能放入
		return
	}

	q.metrics.retry()        //metric不解釋

	// immediately add things with no delay
	if duration <= 0 {        //如果延遲時間小於等於0表示不用延遲
		q.Add(item)           //直接丟入common queue中
		return
	}

	select {
	case <-q.stopCh:            //因為可能會組塞在 waitingForAddCh 透過 stop 保證退出?
		// unblock if ShutDown() is called
        // 要延遲的物件會封裝成 waitFor 型態並且方入 channel 等待處理
	case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:    
	}
}

waitingLoop

這邊是delaying queue的主要邏輯,執行檢查 waitingForAddCh channel 有沒有延遲物件,取出延遲物件看延遲時間是達到選擇加入 Heap 或是 queue,以及接收心跳包。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
func (q *delayingType) waitingLoop() {
	defer utilruntime.HandleCrash()

	// Make a placeholder channel to use when there are no items in our list
	never := make(<-chan time.Time)                   //我不是很定他的用意...可以看到 nerver channel 又換一個別名表示

	// Make a timer that expires when the item at the head of the waiting queue is ready
	var nextReadyAtTimer clock.Timer                //當 heap 吐出一個延遲物件時透過這個 timer 延遲

	waitingForQueue := &waitForPriorityQueue{}     // heap 物件
	heap.Init(waitingForQueue)                    // heap初始化

	waitingEntryByData := map[t]*waitFor{}        //用來防止同一個物件重複放入,如果有重複的物件就更新延遲時間
    
    
	for {
        //如果queue關閉就離開
		if q.Interface.ShuttingDown() {
			return
		}
        //標記現在時間
		now := q.clock.Now()

		// 如果在 heap 裡面有東西
		for waitingForQueue.Len() > 0 {
                        //拿出第一個在 heap 的物件
			entry := waitingForQueue.Peek().(*waitFor)
                        //如果現在時間還沒達到物件要等待的時間就退出
			if entry.readyAt.After(now) {
				break
			}
                        //如果現在時間達到物件要等到的時間,將物件從heap彈出
			entry = heap.Pop(waitingForQueue).(*waitFor)
                        //加到queue中
			q.Add(entry.data)
                        //刪除set儲存的物件
			delete(waitingEntryByData, entry.data)
		}

		// Set up a wait for the first item's readyAt (if one exists)
		nextReadyAt := never        //在上面有提到過nerver channel 只換成這個名字,不知道用意為何
            // 如果在 heap 裡面有東西
		if waitingForQueue.Len() > 0 {
                        //  若是前一個物件的計時器有殘留物就清除前一個物件的計時器
			if nextReadyAtTimer != nil {
				nextReadyAtTimer.Stop()
			}
                        //拿出第一個在 heap 的物件
			entry := waitingForQueue.Peek().(*waitFor)
                        //看物件延遲多久
			nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
                        //當物件延遲時間到了發通知
			nextReadyAt = nextReadyAtTimer.C()
		}

		select {
            // queue關閉
		case <-q.stopCh:
			return
            // 定時被心跳喚醒
		case <-q.heartbeat.C():
			// continue the loop, which will add ready items
            // 當收到物件延遲時間到了發通知
		case <-nextReadyAt:
			// continue the loop, which will add ready items
            //當有人放需要延遲的物件進queue中
		case waitEntry := <-q.waitingForAddCh:
                    //如果新放入的物件還沒超過延遲時間
			if waitEntry.readyAt.After(q.clock.Now()) {
                            //放入heap中
				insert(waitingForQueue, waitingEntryByData, waitEntry)
			} else {
                    //已經到了延遲時間直接放入queue
				q.Add(waitEntry.data)
			}

                     //一次取光用
			drained := false
			for !drained {
				select {
                                // 一次把把延遲物件的channel取乾淨
				case waitEntry := <-q.waitingForAddCh:
                                      //如果新放入的物件還沒超過延遲時間
					if waitEntry.readyAt.After(q.clock.Now()) {
                                            //放入heap中
						insert(waitingForQueue, waitingEntryByData, waitEntry)
					} else {
                                            //已經到了延遲時間直接放入queue
						q.Add(waitEntry.data)
					}
				default:
                                // 保證會退出這個取光的loop
					drained = true
				}
			}
		}
	}
}

insert

在 waitingLoop 有使用到 insert 這個 function ,他的實作也相當的簡單,簡單的來說就是把 waitfor 的物件放到 Heap 中,我們來看看他是如何實作的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
    //先判斷加入的物件有沒有重複的
    existing, exists := knownEntries[entry.data]
    //若是有重複的話
    if exists {
    	// 跟之前放入的物件比較哪個延遲時間比較短
    	// 若是現在要放入的物件比較短的話就更新 set 中的物件延遲時間
        if existing.readyAt.After(entry.readyAt) {
            existing.readyAt = entry.readyAt
            heap.Fix(q, existing.index)
        }
        return
    }
    //如果 set 沒有重複的話就直接加到 heap 中,以及使用 set 紀錄 heap 有這個物件。
    heap.Push(q, entry)
    knownEntries[entry.data] = entry
}

小結

本章講述了 kubernetes delaying work queue 的底層實作方法,接下來還會有幾篇介紹基於 common work queue 的 rate limiters work queue 以及 其他類型的 work queue ,從中我們可以了解 kubernetes controller 監聽到 etcd 變化的物件後如何把 變化的物件丟入 queue 中等待其他人取出並處理,相關業務邏輯,如果文中有錯希望大家不吝嗇提出,讓我們互相交流學習。


Meng Ze Li
WRITTEN BY
Meng Ze Li
Kubernetes / DevOps / Backend

What's on this Page