Go併發模型
隔離(Confinedent)
在 Go 程式中透過程式碼慣例(而非語言機制)來保證某個資源(變數或物件)只會在某個 goroutine 中被存取
假設你有一個data []byte
,你需要將它傳給某個函數去讀裡面的資料(read only),但是它又沒有像io.Reader或<-chan這樣的只讀寫法。這時候,只需將它隔離起來即可:
func ReadOnly[T any](data []T) <-chan T {
dataCh := make(chan T)
go func() {
defer close(dataCh)
for _, d := range data {
dataCh <- d
}
}()
return dataCh
}
func main() {
sourceData := make([]int, 100) // [0,0,0...0]
for i, d := range sourceData {
fmt.Println(d)
sourceData[i] = 1 // 在讀取 sourceData 時發生了寫入,危險!
}
for d := range ReadOnly(sourceData) {
fmt.Println(d)
}
}
但是當在for range readOnly(sourceData)
的時候如果中途break或return,在另一個協程中還在嘗試向dataCh中不斷寫入資料,此時會發生goroutine洩漏。為了防止以上情況,我們可以引入context
讓我們在reader協程中途退出時出發cancel context,讓writer協程也能順利退出。
func ReadOnlyWithContext[T any](ctx context.Context, data []T) <-chan T {
dataCh := make(chan T)
go func() {
defer close(dataCh)
for _, d := range data {
select {
case <-ctx.Done():
return
case dataCh <- d:
}
}
}()
return dataCh
}
func main() {
sourceData := make([]int, 100) // [0,0,0...0]
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
count := 0
for d := range ReadOnlyWithContext(ctx, sourceData) {
if count > 50 {
cancel() //在退出時發出 cancel 信號
break
}
fmt.Println(count, ":", d)
count++
}
}
取消(Cancellation)
有時,我們可以不通過context來提前終止程式,而是通過chan。
func WorkWithCancel[T any](cancel <-chan struct{}, dataCh <-chan T) <-chan struct{} {
wait := make(chan struct{})
go func() {
defer close(wait)
for {
select {
case d, ok := <-dataCh:
if !ok {
return // dataCh 從外面被關閉
}
fmt.Println(d) // Do some real work here
case <-cancel:
return
}
}
}()
return wait
}
func main() {
cancel := make(chan struct{})
wait := WorkWithCancel[any](cancel, nil) // 此處dataCh傳入的是nil既代表該case會一直被阻塞
go func() {
time.Sleep(1 * time.Second)
close(cancel)
}()
<-wait
}
OR Channel 模式
監聽多個並行事件,當任意一個事件完成,即發送完成信號。
假設搶票情境:你開了10個線程去搶票,當10個線程中的線程任意線程搶到票後,你需要關閉其他的9個線程。這時OR Channel模式就起到作用了:監聽10個線程,當任意線程完成任務,即可通知其他的線程停止活動,之後再去做其他事情。
func OrDone[T any](chans ...<-chan T) <-chan T {
switch len(chans) {
case 0:
return nil
case 1:
return chans[0]
}
orDone := make(chan T)
go func() {
defer close(orDone)
switch len(chans) {
case 2:
select {
case <-chans[0]:
case <-chans[1]:
}
default:
select {
case <-chans[0]:
case <-chans[1]:
case <-chans[2]:
case <-OrDone(append(chans[3:], orDone)...):
}
}
}()
return orDone
}
func main() {
work := func(after time.Duration) <-chan struct{} {
done := make(chan struct{})
go func() {
defer close(done)
time.Sleep(after)
}()
return done
}
<-OrDone(
work(time.Second), // 在一秒後work會close(done),OrDone也隨即退出
work(time.Minute),
work(time.Hour),
)
}