Go 语言的并发模型是其最强大的特性之一,基于 CSP(Communicating Sequential Processes)理论,通过 goroutine 和 channel 实现轻量级并发.
一、并发核心概念
1. Goroutine
在 Go 语言中,Goroutine 是实现并发编程的核心特性之一,它本质上是一种轻量级线程。与传统的操作系统线程相比,Goroutine 有着显著的优势。
轻量级线程,由 Go 运行时管理(非操作系统线程)。
启动成本低(KB 级栈内存,动态扩缩容),可轻松创建数千个。
在 Go 语言中,通过 go 关键字可以非常方便地启动一个 Goroutine。以下是一个简单的示例:
func main(){ go func() { fmt.Println("Hello from goroutine!") }() // 这里需要注意,如果主函数直接结束,goroutine 可能来不及执行 // 因为主函数结束会导致整个程序终止 fmt.Println("Main function continues...") }
2. Channel
Channel 是 Go 语言中用于在不同 Goroutine 之间进行通信和同步的重要工具,它遵循 “通过通信共享内存” 的设计理念,避免了传统并发编程中使用共享内存带来的竞态问题。
是一种类型安全的管道,它只能传输特定类型的数据。例如,一个 chan int 类型的 Channel 只能用于传输整数类型的数据
避免共享内存的竞态问题,提倡“通过通信共享内存”。
分两种类型:
无缓冲通道(同步):发送和接收操作会阻塞,直到另一端准备好。
有缓冲通道(异步):缓冲区未满或非空时不会阻塞。
二、原理:GMP 调度模型
1. GMP 调度模型
基本概念
在 Go 语言的并发体系中,GMP 调度模型是其核心机制,它主要由三个关键组件构成:
G(Goroutine):Goroutine 是 Go 语言中轻量级的执行单元,类似于传统操作系统中的线程,但它的开销要小得多。
M(Machine):Machine 代表操作系统线程,它是真正在操作系统内核层面上执行的线程。每个 M 都与一个底层的操作系统线程绑定,负责执行 Goroutine。M 是与操作系统交互的桥梁,它负责处理系统调用、线程上下文切换等底层操作。
P(Processor):Processor 是调度上下文,P 可以看作是 M 执行 Goroutine 的“许可证”,每个 M 要执行 Goroutine 必须先绑定一个 P。P 维护着一个本地的 Goroutine 队列,用于存储待执行的 Goroutine。同时,P 还负责调度和管理这些 Goroutine 的执行顺序。
工作流程
P 维护本地 Goroutine 队列:每个 P 都有一个自己的本地 Goroutine 队列,当创建一个新的 Goroutine 时,它会被放入某个 P 的本地队列中。这个队列是一个先进先出(FIFO)的队列,P 会按照队列中的顺序依次调度 Goroutine 执行。
M 绑定 P 后获取 G 执行:一个 M 要执行 Goroutine,必须先绑定一个空闲的 P。绑定成功后,M 会从 P 的本地队列中取出一个 Goroutine 并执行。当 M 执行完一个 Goroutine 后,会继续从 P 的队列中获取下一个 Goroutine,直到队列为空。
G 阻塞时 M 释放 P:当一个 Goroutine 在执行过程中发生阻塞(例如进行 IO 操作)时,M 会释放当前绑定的 P,以便其他 M 可以使用这个 P 继续执行其他 Goroutine。被释放的 P 会被放入全局 P 列表中,等待其他 M 来绑定。而阻塞的 Goroutine 会被挂起,当阻塞操作完成后,它会被重新放入某个 P 的本地队列或全局队列中,等待再次被调度执行。
2. 抢占式调度
Go 1.14+ 支持基于信号的抢占,避免长时间占用 CPU 的 Goroutine 导致调度延迟。
三、Goroutine 基础
1. 简单示例
下面是一个简单的 Goroutine 示例,展示了如何启动一个 Goroutine 并执行一个简单的任务:
package main import ( "fmt" "time" ) func main() { go sayHello() // 启动 goroutine // 这里使用 time.Sleep 是为了让主函数等待一段时间,以便 goroutine 有机会执行 // 但这种方式并不是一个好的做法,实际开发中应该使用 sync.WaitGroup 进行同步 time.Sleep(100 * time.Millisecond) } func sayHello() { fmt.Println("Hello!") }
2. 使用 sync.WaitGroup 并发组
在实际开发中,为了确保所有的 Goroutine 都执行完毕后再继续执行后续的代码,我们可以使用 sync.WaitGroup 来进行同步。sync.WaitGroup 提供了三个主要的方法:Add、Done 和 Wait。
Add 方法用于设置需要等待的 Goroutine 的数量。
Done 方法用于标记一个 Goroutine 已经执行完毕,相当于将等待的数量减 1。
Wait 方法用于阻塞当前的 Goroutine,直到所有标记的 Goroutine 都执行完毕。
package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup wg.Add(2) // 表示有两个任务需要等待 go func() { defer wg.Done() // 在函数结束时调用 Done 方法,表示该任务完成 fmt.Println("Goroutine 1") }() go func() { defer wg.Done() fmt.Println("Goroutine 2") }() wg.Wait() // 阻塞直到所有任务完成 fmt.Println("All done!") }
四、Channel 操作
1.无缓冲通道
创建和使用无缓冲通道的基本步骤如下:
package main import "fmt" func main() { ch := make(chan int) // 创建一个无缓冲的整数类型通道 go func() { num := 42 ch <- num // 向通道发送数据 fmt.Println("Data sent to channel") }() value := <-ch // 从通道接收数据 fmt.Println("Received value:", value) }
2. 有缓冲通道
有缓冲通道的使用可以提高程序的并发性能,避免不必要的阻塞。以下是一个有缓冲通道的示例:
package main import "fmt" func main() { ch := make(chan string, 2) // 创建一个缓冲区容量为 2 的字符串类型通道 ch <- "A" ch <- "B" fmt.Println("Data sent to channel") fmt.Println(<-ch) // 从通道接收第一个数据 fmt.Println(<-ch) // 从通道接收第二个数据 }
3. 关闭通道
在使用通道时,有时需要通知接收方不再有数据发送,这时可以使用 close 函数关闭通道。关闭通道后,接收方仍然可以从通道中接收数据,直到通道中的数据被全部接收完,之后再接收数据会得到该类型的零值。以下是一个关闭通道的示例:
package main import "fmt" func main() { ch := make(chan int) go func() { for i := 0; i < 3; i++ { ch <- i } close(ch) // 关闭通道 fmt.Println("Channel closed") }() for num := range ch { // 使用 for...range 循环从通道接收数据,直到通道关闭 fmt.Println(num) } fmt.Println("All data received") }
4. Select 多路复
select 语句用于在多个通道操作中进行选择,它类似于 switch 语句,但 select 语句专门用于处理通道操作。select 语句会随机选择一个可以执行的通道操作并执行,如果所有通道操作都无法执行,则会阻塞,直到有一个通道操作可以执行。如果指定了 default 分支,则在所有通道操作都无法执行时会执行 default 分支,不会阻塞。以下是一个使用 select 语句进行多路复用的示例:
package main import ( "fmt" "time" ) func main() { ch1 := make(chan string) ch2 := make(chan string) go func() { time.Sleep(200 * time.Millisecond) ch1 <- "one" }() go func() { time.Sleep(100 * time.Millisecond) ch2 <- "two" }() select { case msg1 := <-ch1: fmt.Println(msg1) case msg2 := <-ch2: fmt.Println(msg2) case <-time.After(1 * time.Second): // 超时控制 fmt.Println("timeout") } }
五、同步原语
1. 互斥锁 sync.Mutex
在并发编程中,多个 Goroutine 同时访问和修改共享资源可能会导致数据不一致的问题,这时可以使用互斥锁 sync.Mutex 来保证同一时间只有一个 Goroutine 可以访问共享资源。互斥锁提供了两个主要的方法:Lock 和 Unlock。
Lock 方法用于获取锁,如果锁已经被其他 Goroutine 持有,则当前 Goroutine 会阻塞,直到锁被释放。
Unlock 方法用于释放锁,允许其他 Goroutine 获取锁。
以下是一个使用互斥锁的示例:
package main import ( "fmt" "sync" ) var counter int var mu sync.Mutex func increment() { mu.Lock() // 获取锁 defer mu.Unlock() // 确保在函数结束时释放锁 counter++ } func main() { var wg sync.WaitGroup for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() increment() }() } wg.Wait() // 等待所有 Goroutine 执行完毕 fmt.Println(counter) // 输出 1000 }
2. 读写锁 sync.RWMutex
读写锁 sync.RWMutex 是一种特殊的锁,它允许多个 Goroutine 同时进行读操作,但在进行写操作时会独占锁,不允许其他 Goroutine 进行读或写操作。读写锁适用于读多写少的场景,可以提高程序的并发性能。读写锁提供了四个主要的方法:RLock、RUnlock、Lock 和 Unlock。
RLock 方法用于获取读锁,允许多个 Goroutine 同时持有读锁。
RUnlock 方法用于释放读锁。
Lock 方法用于获取写锁,在持有写锁期间,不允许其他 Goroutine 进行读或写操作。
Unlock 方法用于释放写锁。
以下是一个使用读写锁的示例:
package main import ( "fmt" "sync" ) var cache = make(map[string]string) var rwMu sync.RWMutex func read(key string) string { rwMu.RLock() // 获取读锁 defer rwMu.RUnlock() // 确保在函数结束时释放读锁 return cache[key] } func write(key, value string) { rwMu.Lock() // 获取写锁 defer rwMu.Unlock() // 确保在函数结束时释放写锁 cache[key] = value } func main() { var wg sync.WaitGroup // 启动多个读操作 for i := 0; i < 5; i++ { wg.Add(1) go func(index int) { defer wg.Done() key := fmt.Sprintf("key%d", index) value := read(key) fmt.Printf("Read key %s: %s\n", key, value) }(i) } // 启动一个写操作 wg.Add(1) go func() { defer wg.Done() key := "key1" value := "value1" write(key, value) fmt.Printf("Written key %s: %s\n", key, value) }() wg.Wait() // 等待所有 Goroutine 执行完毕 }
六、并发模式
1. Worker Pool(工作池)
工作池模式是一种常见的并发模式,它可以帮助我们管理一组工作线程(在 Go 中就是 Goroutine),并将任务分配给这些工作线程进行处理。这种模式在处理大量独立任务时非常有用,可以避免创建过多的 Goroutine 导致系统资源耗尽。
package main import ( "fmt" ) // worker 函数表示一个工作线程,它从 jobs 通道接收任务,处理后将结果发送到 results 通道 func worker(id int, jobs <-chan int, results chan<- int) { for job := range jobs { fmt.Printf("Worker %d processing job %d\n", id, job) // 模拟任务处理,这里简单地将任务值乘以 2 results <- job * 2 } } func main() { // 创建两个有缓冲的通道,分别用于存储任务和结果 jobs := make(chan int, 100) results := make(chan int, 100) // 启动 3 个工作线程 for w := 1; w <= 3; w++ { go worker(w, jobs, results) } // 发送 9 个任务到 jobs 通道 for j := 1; j <= 9; j++ { jobs <- j } // 关闭 jobs 通道,表示不再有新的任务发送 close(jobs) // 收集结果 for a := 1; a <= 9; a++ { <-results } // 关闭 results 通道 close(results) }
2. Fan-out/Fan-in(扇出/扇入)
扇出 / 扇入模式是一种用于并发处理数据的模式。扇出指的是将一个输入源的数据分发到多个 Goroutine 中进行处理,扇入则是将多个 Goroutine 的处理结果合并到一个通道中。
package main import ( "fmt" "sync" ) // merge 函数用于将多个输入通道的数据合并到一个输出通道中 func merge(chs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // 从每个输入通道读取数据 for _, ch := range chs { wg.Add(1) go func(c <-chan int) { defer wg.Done() for n := range c { out <- n } }(ch) } // 关闭输出通道 go func() { wg.Wait() close(out) }() return out } func main() { // 创建 3 个输入通道 ch1 := make(chan int) ch2 := make(chan int) ch3 := make(chan int) // 启动 3 个 Goroutine 向输入通道发送数据 go func() { ch1 <- 1 ch1 <- 2 close(ch1) }() go func() { ch2 <- 3 ch2 <- 4 close(ch2) }() go func() { ch3 <- 5 ch3 <- 6 close(ch3) }() // 合并 3 个输入通道的数据到一个输出通道 resultCh := merge(ch1, ch2, ch3) // 从输出通道读取数据 for num := range resultCh { fmt.Println(num) } }
3. 管道模式(Pipeline)
管道模式是 Go 语言并发编程中一种非常强大且实用的模式,它通过串联多个 Goroutine 来处理数据流,形成一个处理链。这种模式特别适合那些需要分阶段处理任务的场景,例如数据清洗、数据转换、数据筛选等。在管道模式中,每个阶段的 Goroutine 负责完成一个特定的任务,将处理后的数据传递给下一个阶段,就像工厂里的流水线一样,每个工人负责一道工序,最终完成整个产品的生产。
package main import "fmt" // generate 函数用于生成一个整数数据流 // 它接收多个整数作为输入,将这些整数依次发送到一个通道中 // 发送完成后关闭通道 func generate(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out } // square 函数用于对输入通道中的每个整数进行平方操作 // 它从输入通道中接收整数,计算其平方值,并将结果发送到一个新的通道中 // 当输入通道关闭且所有数据都处理完后,关闭输出通道 func square(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out } // filterEven 函数用于过滤掉输入通道中的奇数,只保留偶数 // 它从输入通道中接收整数,判断是否为偶数,如果是则发送到输出通道 // 当输入通道关闭且所有数据都处理完后,关闭输出通道 func filterEven(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { if n%2 == 0 { out <- n } } close(out) }() return out } func main() { // 定义一个整数切片作为初始数据 nums := []int{1, 2, 3, 4} // 管道串联:生成 → 平方 → 过滤 → 输出 // 首先调用 generate 函数生成整数数据流 // 然后将生成的数据流传递给 square 函数进行平方操作 // 接着将平方后的数据流传递给 filterEven 函数进行偶数过滤 // 最后使用 for...range 循环从最终的通道中接收数据并输出 for n := range filterEven(square(generate(nums...))) { fmt.Println(n) // 输出结果:4, 16 } }
管道模式的优点 :
可维护性高:每个阶段的任务都封装在独立的函数中,代码结构清晰,易于理解和维护。
可扩展性强:可以方便地添加、删除或修改处理阶段,以适应不同的业务需求。
并发性能好:每个阶段的处理可以并行进行,充分利用多核 CPU 的资源,提高程序的处理效率。
通过管道模式,我们可以将复杂的任务分解为多个简单的子任务,并通过通道将这些子任务连接起来,实现高效、灵活的数据流处理。
4. 使用 Context 控制生命周期
在 Go 中,context 包提供了一种机制来控制 Goroutine 的生命周期,例如取消任务、设置超时等。
package main import ( "context" "fmt" "time" ) // longRunningTask 函数表示一个长时间运行的任务,它会监听 context 的取消信号 func longRunningTask(ctx context.Context) { for { select { case <-ctx.Done(): // 监听取消信号 fmt.Println("Task canceled:", ctx.Err()) return default: // 模拟工作 time.Sleep(1 * time.Second) } } } func main() { // 创建一个带有超时的 context,超时时间为 3 秒 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() // 启动一个 Goroutine 执行长时间运行的任务 go longRunningTask(ctx) // 等待 4 秒,确保任务会被取消 time.Sleep(4 * time.Second) }
七、常见问题与调试
1. 竞态条件(Race Condition)
竞态条件是指多个 Goroutine 同时访问和修改共享资源,导致程序的行为变得不可预测。可以使用 Go 语言提供的 -race 标志来检测竞态条件。
package main import ( "fmt" "sync" ) var counter int var wg sync.WaitGroup func increment() { counter++ //不用互斥锁 } func main() { for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() increment() }() } wg.Wait() fmt.Println(counter) }
要检测上述代码中的竞态条件,可以使用以下命令:
go run -race main.go
如果存在竞态条件,Go 编译器会输出详细的错误信息,帮助我们定位问题。
2. 死锁
死锁是指程序中的 Goroutine 相互等待对方释放资源,导致所有 Goroutine 都无法继续执行的情况。要避免死锁,需要确保通道操作成对出现,避免永久阻塞。
func main() { ch := make(chan int) ch <- 1 // 这里会导致死锁,因为没有接收方 <-ch } //fatal error: all goroutines are asleep - deadlock!
在上面代码中,由于没有接收方,ch <- 1 会导致永久阻塞,从而引发死锁。要避免这种情况,需要确保在发送数据之前有接收方准备好接收数据。3.在 Goroutine 中传递错误
func doSomething() error { return errors.New("something went wrong") } func main() { errCh := make(chan error) go func() { errCh <- doSomething() }() if err := <-errCh; err != nil { fmt.Println("Error:", err) } }
4. Goroutine 泄漏
Goroutine 泄漏是指 Goroutine 由于某种原因无法正常退出,导致系统资源不断被占用。要避免 Goroutine 泄漏,始终确保通道被关闭,或使用 context 控制退出。
检测方法:
使用 runtime.NumGoroutine() 监控 goroutine 数量
runtime.NumGoroutine() 函数可以返回当前程序中正在运行的 Goroutine 的数量。通过定期检查这个数量,我们可以判断是否存在 Goroutine 泄漏。如果 Goroutine 的数量持续增加,而没有相应的减少,那么很可能存在泄漏。
以下是一个示例代码,展示了如何使用 runtime.NumGoroutine() 监控 Goroutine 数量:
package main import ( "fmt" "runtime" "time" ) func worker(ch chan int) { for { select { case num := <-ch: fmt.Println("Received:", num) default: time.Sleep(100 * time.Millisecond) } } } func main() { ch := make(chan int) go worker(ch) // 定期检查 Goroutine 数量 ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for range ticker.C { numGoroutines := runtime.NumGoroutine() fmt.Printf("Number of goroutines: %d\n", numGoroutines) } }
使用Go 的 pprof 分析 goroutine 堆栈。
pprof 是 Go 语言自带的一个性能分析工具,它可以帮助我们分析程序的 CPU 使用情况、内存分配情况以及 Goroutine 堆栈信息。通过 pprof,我们可以查看每个 Goroutine 的状态和调用栈,从而找出可能存在泄漏的 Goroutine。
以下是一个使用 pprof 分析 Goroutine 堆栈的示例代码:
package main import ( "net/http" _ "net/http/pprof" "time" ) func worker() { for { time.Sleep(1 * time.Second) } } func main() { go worker() // 启动 pprof 服务 go func() { http.ListenAndServe("localhost:6060", nil) }() // 让程序一直运行 select {} }
代码执行后,使用以下命令:
go tool pprof http://localhost:6060/debug/pprof/goroutine
上述命令后,会进入 pprof 的交互式界面。在这个界面中,可以使用各种命令来查看 Goroutine 的堆栈信息,例如 top 命令可以查看占用资源最多的 Goroutine,list 命令可以查看指定函数的调用栈等。通过分析这些信息,我们可以找出可能存在泄漏的 Goroutine,并对代码进行修复。
八、最佳实践
1.优先使用 channel 而非共享内存
在 Go 语言中,推荐使用 channel 来进行 Goroutine 之间的通信和同步,而不是直接使用共享内存。channel 可以避免竞态条件和死锁等问题,使代码更加简洁和安全。
2.避免在 goroutine 中使用全局变量
全局变量在多个 Goroutine 中共享时容易引发竞态条件。尽量将变量的作用域限制在需要使用的 Goroutine 内部,或者使用 channel 来传递数据。
3.使用 select 实现超时和取消
select 语句可以用于同时监听多个通道的操作,结合 time.After 和 context 可以实现超时和取消功能,提高程序的健壮性。
4.小任务用 goroutine,大任务考虑线程池
对于小的、独立的任务,可以直接使用 Goroutine 来执行,因为 Goroutine 的创建和销毁成本较低。对于大的、复杂的任务,可以考虑使用工作池模式来管理一组 Goroutine,避免创建过多的 Goroutine 导致系统资源耗尽。
通过掌握这些概念和模式,你可以高效地编写并发 Go 程序,充分利用多核 CPU 资源。同时,遵循最佳实践和注意常见问题的处理,可以使你的代码更加健壮和可靠。