This page looks best with JavaScript enabled

Kubernetes util tool 使用 wait.waitgroup 抓住 goroutine

 ·  ☕ 3 min read

首先本文所有的 source code 基於 kubernetes 1.19 版本,所有 source code 為了版面的整潔會精簡掉部分 log 相關的程式碼,僅保留核心邏輯,如果有見解錯誤的地方,還麻煩觀看本文的大大們提出,感謝!

SafeWaitGroup

這一段在先前一版的文章中理解錯誤,在明天將補上新版,請大家見諒。

source code

1
2
3
4
5
6
7
8
9
// SafeWaitGroup must not be copied after first use.
type SafeWaitGroup struct {
	wg sync.WaitGroup
	mu sync.RWMutex
	// wait indicate whether Wait is called, if true,
	// then any Add with positive delta will return error.
	wait bool
}
...

wiat

在 kubernetes 中比起 package waitgroup 從 source code 中更長看到使用者使用 package wait ,在這一小節中會看一些範例了解 kubernetes 中哪裡有用到這一個 package。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// package group 很簡單就是嵌入了一個 go startand librart 的 sync.WaitGroup
type Group struct {
	wg sync.WaitGroup
}

// StartWithChannel 可以透過 stopCh 關閉在這個 goroutine group 中的啟動function。
func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{})) {
	g.Start(func() {
		f(stopCh)
	})
}

// StartWithContext 可以透過 Context 關閉在這個 goroutine group 中的啟動 function。
func (g *Group) StartWithContext(ctx context.Context, f func(context.Context)) {
	g.Start(func() {
		f(ctx)
	})
}

// 這裡就是對 sync.WaitGroup 進行簡單的封裝,上述兩種 StartWithContext 與 StartWithChannel
// 都會呼叫 start function 增加 WaitGroup 的 wg 與 執行 goroutine
func (g *Group) Start(f func()) {
	g.wg.Add(1)
	go func() {
		defer g.wg.Done()
		f()
	}()
}

// 透過 Wait function 等待所有透過 goroutine group 啟動的 function 執行完畢也就是 執行 defer g.wg.Done()
func (g *Group) Wait() {
	g.wg.Wait()
}

example

分成兩個範例來看,第一個是單純跑 Group 的 Start function 。

這個 function 裡面在做什麼我們先不深入研究,把焦點放在 Group 的 Start function 與 Wait function 就好。

可以看到一開始透過匿名函數使用 for 迴圈經由 wg.Start 啟動了 n 個 goroutine ,接著卡在 stop channel 。

後續若是收到 stop channel 的話可以簡單地理解成工作要收工了,因此後面再用一個 for 迴圈把所有 listeners 的 add channel 關閉,最後透過 wg.Wait() 等待所有 channel 把關閉工作完成才退出function 。

wg.Start 與 wg.wait 整體來說使用上不複雜可以配合多種情境,例如範例所示範的開啟多個 worker 等待 worker 完成。

Tips 至於為什麼不用 StartWithChannel 來處理範例中的 listener.run listener.pop ,好問題我也不知道xDDD
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	func() {
		p.listenersLock.RLock()
		defer p.listenersLock.RUnlock()
		for _, listener := range p.listeners {
			p.wg.Start(listener.run)
			p.wg.Start(listener.pop)
		}
		p.listenersStarted = true
	}()
	<-stopCh
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	for _, listener := range p.listeners {
		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
	}
	p.wg.Wait() // Wait for all .pop() and .run() to stop
}

接著來看另外一個範例 wg.StartWithChannel ,範例這個 function 裡面在做什麼我們先不深入研究,把焦點放在 Group 的 StartWithChannel function 與 Wait function 就好。

可以看到一開始透過 Run function 離開時第一個觸發的是 close processor Stop Channel。

接著觸發的 defer 是 wg.Wait() 這裡就會等待所有的透過 wait.Group 啟動的 goroutine 完成才結束工作。

可以簡單地理解成廣播說下課,要等所有同學把課本收好才可以離開,而老師就站在前面看著大家收拾。就可以把 wg 想像成老師負責監督大家只要有人沒收好東西就不准最下一步,processorStopCh 可以想成學校的廣播,當廣播發號司令同學就要做某某事情。
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
    ...
    var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	wg.StartWithChannel(processorStopCh, s.processor.run)
    ...
}

小結

簡單來說 package wait 只是對 go startand librart 進行簡單的封裝,帶來的效益就是讓大多數的開發者可以沿用這這個封裝好的 package ,因為要達到某一個功能實作的方式有很多種,有了適當的封裝能讓開發風格更統一。文章中若有出現錯誤的見解希望各位在觀看文章的大大們可以指出哪裡有問題,讓我學習改進,謝謝。


Meng Ze Li
WRITTEN BY
Meng Ze Li
Kubernetes / DevOps / Backend

What's on this Page