This page looks best with JavaScript enabled

隱藏在kubelet的WorkQueue

 ·  ☕ 3 min read

首先本文所有的 source code 基於 kubernetes 1.19 版本,所有 source code 為了版面的整潔會精簡掉部分 log 相關的程式碼,僅保留核心邏輯,如果有見解錯誤的地方,還麻煩觀看本文的大大們提出,感謝!

本篇文章將介紹隱藏在 kubelet 的 WorkQueue ,後續在拆解 kubelet 的部分功能會用到 WorkQueue ,該 Worker Queue 僅為簡單的 queue 實作,如果對 kubernetes 其他較為複雜的 Queue 實作有興趣的朋友歡迎參考筆者早期的三篇文章分別是Kubernetes RateLimite work queue 設計真d不錯Kubernetes delaying work queue 設計真d不錯以及Kubernetes common work queue 設計真d不錯

WorkQueue

WorkQueue interface 定義一系列的方法,實作的物件需要透過時間戳 timestamp 記對 type.UID 進行排隊,並且可以取的目前在 Queue 中的所有 work 。

1
2
3
4
5
6
type WorkQueue interface {
	// 回傳所有準備好的 type.UID 。
	GetWork() []types.UID
	// 插入新 types.UID 到 queue 中
	Enqueue(item types.UID, delay time.Duration)
}

struct

basicWorkQueue 實作了 WorkQueue 我們來看一下他的資料結構。

1
2
3
4
5
6
7
var _ WorkQueue = &basicWorkQueue{}

type basicWorkQueue struct {
	clock clock.Clock					//用來記錄當前時間
	lock  sync.Mutex					//避免map中的物件競爭
	queue map[types.UID]time.Time		//用來儲存所有工作以及延遲時間
}

New function 也非常簡單,簡單的建立 map 以及透過使用者傳入 clock 當作 WorkQueue 當前的時間。

1
2
3
4
5
// NewBasicWorkQueue returns a new basic WorkQueue with the provided clock
func NewBasicWorkQueue(clock clock.Clock) WorkQueue {
	queue := make(map[types.UID]time.Time)
	return &basicWorkQueue{queue: queue, clock: clock}
}

implement

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (q *basicWorkQueue) GetWork() []types.UID {
	//防止競爭加鎖
	q.lock.Lock()
	defer q.lock.Unlock()

	//取得目前時間
	now := q.clock.Now()
	var items []types.UID
	//遞迴取得每個 type UID    
	for k, v := range q.queue {
		//判斷每個 type UID 延遲時間是否已經到了,如果延遲時間到了從 map 中移除並且加入已經延遲時間已到的 slice 中。    
		if v.Before(now) {
			items = append(items, k)
			delete(q.queue, k)
		}
	}
	//回傳已延遲完的工作    
	return items
}

func (q *basicWorkQueue) Enqueue(item types.UID, delay time.Duration) {
    //防止競爭加鎖
	q.lock.Lock()
	defer q.lock.Unlock()
    // type UID 設定要延遲多久才開始工作
	q.queue[item] = q.clock.Now().Add(delay)
}

test case

篇幅感覺太短了xD,來介紹一下這個 function 的測試案例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
func TestGetWork(t *testing.T) {
	//建立 basicWorkQueue 以及當假的 clock 
	q, clock := newTestBasicWorkQueue()
	//queue加入 foo1 延遲時間 -1 分鐘    
	q.Enqueue(types.UID("foo1"), -1*time.Minute)
	//queue加入 foo2 延遲時間 -1 分鐘
	q.Enqueue(types.UID("foo2"), -1*time.Minute)
	//queue加入 foo3 延遲時間 1 分鐘
	q.Enqueue(types.UID("foo3"), 1*time.Minute)
	//queue加入 foo4 延遲時間 - 分鐘
	q.Enqueue(types.UID("foo4"), 1*time.Minute)

	//預期取的可以從 queue 中拿到 "foo1" "foo2"
	expected := []types.UID{types.UID("foo1"), types.UID("foo2")}
    
    
    
	//比對預期得結果與實際的結果	, GetWork 從 queue 中拿資料,會取的延遲時間到的資料
	//因為 foo1 foo2 延遲時間為 -1 分鐘,所以 getwork 可以拿到 foo1 foo2 的資料
	compareResults(t, expected, q.GetWork())

	//比對預期得結果與實際的結果	,此時 foo1 foo2 已經從 queue 中 pop 出去了
	//再去拿資料的時候由於其他資料還沒到延遲時間所以 get work 會是空的
	compareResults(t, []types.UID{}, q.GetWork())
    
    
	//將時間 mock 往後調整一個小時
	clock.Step(time.Hour)
    
	//預期取的可以從 queue 中拿到 "foo3" "foo4"
	expected = []types.UID{types.UID("foo3"), types.UID("foo4")}
    
	//此時 time 往後調整一個小時了
	//比對預期得結果與實際的結果	, GetWork 從 queue 中拿資料,會取的延遲時間到的資料
	//因為 foo3 foo4 延遲時間為 1 分鐘,所以 getwork 可以拿到 foo3 foo4 的資料
	compareResults(t, expected, q.GetWork())
    
	//比對預期得結果與實際的結果	,此時 foo3 foo4 已經從 queue 中 pop 出去了
	//再去拿資料的時候queue已經空了所以 get work 時會拿到空的資料
	compareResults(t, []types.UID{}, q.GetWork())
}


//建立 queue 以及假的 clock (可以 mock 用)
func newTestBasicWorkQueue() (*basicWorkQueue, *clock.FakeClock) {
	//建立假的 clock 可以 mock 用
	fakeClock := clock.NewFakeClock(time.Now())

	//建立 basicWorkQueue 物件設定 fake clock
	wq := &basicWorkQueue{
		clock: fakeClock,
		queue: make(map[types.UID]time.Time),
	}
	return wq, fakeClock
}


//比對預期得結果與實際的結果
func compareResults(t *testing.T, expected, actual []types.UID) {
	//建立一個 set 
	expectedSet := sets.NewString()
    
	//將預期的答案放入 set
	for _, u := range expected {
		expectedSet.Insert(string(u))
	}
	
	//用來儲存實際上queue回傳資料用的set 
	actualSet := sets.NewString()
	//將實際上queue回傳資料放入 set
	for _, u := range actual {
		actualSet.Insert(string(u))
	}
    
    
	//判斷 預期的答案 與實際的答案是否一致。   
	if !expectedSet.Equal(actualSet) {
		t.Errorf("Expected %#v, got %#v", expectedSet.List(), actualSet.List())
	}
}

小結

kubelet 中所採用的 WorkQueue 實作上非常的簡單,把需要延遲行為的 type UID 物件放入 queue 中,取出時再判斷 queue 中哪些延遲時間已經到達整成 slice ,再回傳給使用者。本篇文章雖然篇幅不長知識量也不大,但作為後續分析系統每個元件都是重要的一份子呢xD,如果文中有錯希望大家不吝嗇提出,讓我們互相交流學習。


Meng Ze Li
WRITTEN BY
Meng Ze Li
Kubernetes / DevOps / Backend

What's on this Page