go语言并发的最佳实践

Go 语言的并发模型是其最强大的特性之一,基于 CSP(Communicating Sequential Processes)理论,通过 goroutine 和 channel 实现轻量级并发.

一、并发核心概念
1. Goroutine
在 Go 语言中,Goroutine 是实现并发编程的核心特性之一,它本质上是一种轻量级线程。与传统的操作系统线程相比,Goroutine 有着显著的优势。
轻量级线程,由 Go 运行时管理(非操作系统线程)。
启动成本低(KB 级栈内存,动态扩缩容),可轻松创建数千个。
在 Go 语言中,通过 go 关键字可以非常方便地启动一个 Goroutine。以下是一个简单的示例:

   func main(){
      go func() {
       fmt.Println("Hello from goroutine!")
	   }()
	   
    // 这里需要注意,如果主函数直接结束,goroutine 可能来不及执行
    // 因为主函数结束会导致整个程序终止
    fmt.Println("Main function continues...")

   }

2. Channel
Channel 是 Go 语言中用于在不同 Goroutine 之间进行通信和同步的重要工具,它遵循 “通过通信共享内存” 的设计理念,避免了传统并发编程中使用共享内存带来的竞态问题。
是一种类型安全的管道,它只能传输特定类型的数据。例如,一个 chan int 类型的 Channel 只能用于传输整数类型的数据
避免共享内存的竞态问题,提倡“通过通信共享内存”。
分两种类型:
无缓冲通道(同步):发送和接收操作会阻塞,直到另一端准备好。
有缓冲通道(异步):缓冲区未满或非空时不会阻塞。

二、原理:GMP 调度模型

1. GMP 调度模型

基本概念

在 Go 语言的并发体系中,GMP 调度模型是其核心机制,它主要由三个关键组件构成:
G(Goroutine):Goroutine 是 Go 语言中轻量级的执行单元,类似于传统操作系统中的线程,但它的开销要小得多。
M(Machine):Machine 代表操作系统线程,它是真正在操作系统内核层面上执行的线程。每个 M 都与一个底层的操作系统线程绑定,负责执行 Goroutine。M 是与操作系统交互的桥梁,它负责处理系统调用、线程上下文切换等底层操作。
P(Processor):Processor 是调度上下文,P 可以看作是 M 执行 Goroutine 的“许可证”,每个 M 要执行 Goroutine 必须先绑定一个 P。P 维护着一个本地的 Goroutine 队列,用于存储待执行的 Goroutine。同时,P 还负责调度和管理这些 Goroutine 的执行顺序。

工作流程

P 维护本地 Goroutine 队列:每个 P 都有一个自己的本地 Goroutine 队列,当创建一个新的 Goroutine 时,它会被放入某个 P 的本地队列中。这个队列是一个先进先出(FIFO)的队列,P 会按照队列中的顺序依次调度 Goroutine 执行。
M 绑定 P 后获取 G 执行:一个 M 要执行 Goroutine,必须先绑定一个空闲的 P。绑定成功后,M 会从 P 的本地队列中取出一个 Goroutine 并执行。当 M 执行完一个 Goroutine 后,会继续从 P 的队列中获取下一个 Goroutine,直到队列为空。
G 阻塞时 M 释放 P:当一个 Goroutine 在执行过程中发生阻塞(例如进行 IO 操作)时,M 会释放当前绑定的 P,以便其他 M 可以使用这个 P 继续执行其他 Goroutine。被释放的 P 会被放入全局 P 列表中,等待其他 M 来绑定。而阻塞的 Goroutine 会被挂起,当阻塞操作完成后,它会被重新放入某个 P 的本地队列或全局队列中,等待再次被调度执行。
2. 抢占式调度
Go 1.14+ 支持基于信号的抢占,避免长时间占用 CPU 的 Goroutine 导致调度延迟。

三、Goroutine 基础
1. 简单示例
下面是一个简单的 Goroutine 示例,展示了如何启动一个 Goroutine 并执行一个简单的任务:

package main

import (
    "fmt"
    "time"
)

func main() {
    go sayHello() // 启动 goroutine
	// 这里使用 time.Sleep 是为了让主函数等待一段时间,以便 goroutine 有机会执行
 	// 但这种方式并不是一个好的做法,实际开发中应该使用 sync.WaitGroup 进行同步
    time.Sleep(100 * time.Millisecond)
}

func sayHello() {
    fmt.Println("Hello!")
}

2. 使用 sync.WaitGroup 并发组
在实际开发中,为了确保所有的 Goroutine 都执行完毕后再继续执行后续的代码,我们可以使用 sync.WaitGroup 来进行同步。sync.WaitGroup 提供了三个主要的方法:Add、Done 和 Wait。
Add 方法用于设置需要等待的 Goroutine 的数量。
Done 方法用于标记一个 Goroutine 已经执行完毕,相当于将等待的数量减 1。
Wait 方法用于阻塞当前的 Goroutine,直到所有标记的 Goroutine 都执行完毕。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(2) // 表示有两个任务需要等待

    go func() {
        defer wg.Done() // 在函数结束时调用 Done 方法,表示该任务完成
        fmt.Println("Goroutine 1")
    }()

    go func() {
        defer wg.Done()
        fmt.Println("Goroutine 2")
    }()

    wg.Wait() // 阻塞直到所有任务完成
    fmt.Println("All done!")
}

四、Channel 操作
1.无缓冲通道
创建和使用无缓冲通道的基本步骤如下:

package main

import "fmt"

func main() {
    ch := make(chan int) // 创建一个无缓冲的整数类型通道
    go func() {
        num := 42
        ch <- num // 向通道发送数据
        fmt.Println("Data sent to channel")
    }()

    value := <-ch // 从通道接收数据
    fmt.Println("Received value:", value)
}

2. 有缓冲通道
有缓冲通道的使用可以提高程序的并发性能,避免不必要的阻塞。以下是一个有缓冲通道的示例:

package main

import "fmt"

func main() {
    ch := make(chan string, 2) // 创建一个缓冲区容量为 2 的字符串类型通道
    ch <- "A"
    ch <- "B"
    fmt.Println("Data sent to channel")

    fmt.Println(<-ch) // 从通道接收第一个数据
    fmt.Println(<-ch) // 从通道接收第二个数据
}

3. 关闭通道
在使用通道时,有时需要通知接收方不再有数据发送,这时可以使用 close 函数关闭通道。关闭通道后,接收方仍然可以从通道中接收数据,直到通道中的数据被全部接收完,之后再接收数据会得到该类型的零值。以下是一个关闭通道的示例:

package main

import "fmt"

func main() {
    ch := make(chan int)
    go func() {
        for i := 0; i < 3; i++ {
            ch <- i
        }
        close(ch) // 关闭通道
        fmt.Println("Channel closed")
    }()

    for num := range ch { // 使用 for...range 循环从通道接收数据,直到通道关闭
        fmt.Println(num)
    }
    fmt.Println("All data received")
}

4. Select 多路复
select 语句用于在多个通道操作中进行选择,它类似于 switch 语句,但 select 语句专门用于处理通道操作。select 语句会随机选择一个可以执行的通道操作并执行,如果所有通道操作都无法执行,则会阻塞,直到有一个通道操作可以执行。如果指定了 default 分支,则在所有通道操作都无法执行时会执行 default 分支,不会阻塞。以下是一个使用 select 语句进行多路复用的示例:

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(200 * time.Millisecond)
        ch1 <- "one"
    }()

    go func() {
        time.Sleep(100 * time.Millisecond)
        ch2 <- "two"
    }()

    select {
    case msg1 := <-ch1:
        fmt.Println(msg1)
    case msg2 := <-ch2:
        fmt.Println(msg2)
    case <-time.After(1 * time.Second): // 超时控制
        fmt.Println("timeout")
    }
}

五、同步原语
1. 互斥锁 sync.Mutex
在并发编程中,多个 Goroutine 同时访问和修改共享资源可能会导致数据不一致的问题,这时可以使用互斥锁 sync.Mutex 来保证同一时间只有一个 Goroutine 可以访问共享资源。互斥锁提供了两个主要的方法:Lock 和 Unlock。
Lock 方法用于获取锁,如果锁已经被其他 Goroutine 持有,则当前 Goroutine 会阻塞,直到锁被释放。
Unlock 方法用于释放锁,允许其他 Goroutine 获取锁。
以下是一个使用互斥锁的示例:

package main

import (
    "fmt"
    "sync"
)

var counter int
var mu sync.Mutex

func increment() {
    mu.Lock()         // 获取锁
    defer mu.Unlock() // 确保在函数结束时释放锁
    counter++
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            increment()
        }()
    }
    wg.Wait() // 等待所有 Goroutine 执行完毕
    fmt.Println(counter) // 输出 1000
}

2. 读写锁 sync.RWMutex
读写锁 sync.RWMutex 是一种特殊的锁,它允许多个 Goroutine 同时进行读操作,但在进行写操作时会独占锁,不允许其他 Goroutine 进行读或写操作。读写锁适用于读多写少的场景,可以提高程序的并发性能。读写锁提供了四个主要的方法:RLock、RUnlock、Lock 和 Unlock。
RLock 方法用于获取读锁,允许多个 Goroutine 同时持有读锁。
RUnlock 方法用于释放读锁。
Lock 方法用于获取写锁,在持有写锁期间,不允许其他 Goroutine 进行读或写操作。
Unlock 方法用于释放写锁。
以下是一个使用读写锁的示例:

package main

import (
    "fmt"
    "sync"
)

var cache = make(map[string]string)
var rwMu sync.RWMutex

func read(key string) string {
    rwMu.RLock()         // 获取读锁
    defer rwMu.RUnlock() // 确保在函数结束时释放读锁
    return cache[key]
}

func write(key, value string) {
    rwMu.Lock()          // 获取写锁
    defer rwMu.Unlock()  // 确保在函数结束时释放写锁
    cache[key] = value
}

func main() {
    var wg sync.WaitGroup

    // 启动多个读操作
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(index int) {
            defer wg.Done()
            key := fmt.Sprintf("key%d", index)
            value := read(key)
            fmt.Printf("Read key %s: %s\n", key, value)
        }(i)
    }

    // 启动一个写操作
    wg.Add(1)
    go func() {
        defer wg.Done()
        key := "key1"
        value := "value1"
        write(key, value)
        fmt.Printf("Written key %s: %s\n", key, value)
    }()

    wg.Wait() // 等待所有 Goroutine 执行完毕
}

六、并发模式
1. Worker Pool(工作池)
工作池模式是一种常见的并发模式,它可以帮助我们管理一组工作线程(在 Go 中就是 Goroutine),并将任务分配给这些工作线程进行处理。这种模式在处理大量独立任务时非常有用,可以避免创建过多的 Goroutine 导致系统资源耗尽。


package main

import (
    "fmt"
)

// worker 函数表示一个工作线程,它从 jobs 通道接收任务,处理后将结果发送到 results 通道
func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        // 模拟任务处理,这里简单地将任务值乘以 2
        results <- job * 2
    }
}

func main() {
    // 创建两个有缓冲的通道,分别用于存储任务和结果
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    // 启动 3 个工作线程
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    // 发送 9 个任务到 jobs 通道
    for j := 1; j <= 9; j++ {
        jobs <- j
    }
    // 关闭 jobs 通道,表示不再有新的任务发送
    close(jobs)

    // 收集结果
    for a := 1; a <= 9; a++ {
        <-results
    }
    // 关闭 results 通道
    close(results)
}

2. Fan-out/Fan-in(扇出/扇入)
扇出 / 扇入模式是一种用于并发处理数据的模式。扇出指的是将一个输入源的数据分发到多个 Goroutine 中进行处理,扇入则是将多个 Goroutine 的处理结果合并到一个通道中。

package main

import (
	"fmt"
	"sync"
)

// merge 函数用于将多个输入通道的数据合并到一个输出通道中
func merge(chs ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	// 从每个输入通道读取数据
	for _, ch := range chs {
		wg.Add(1)
		go func(c <-chan int) {
			defer wg.Done()
			for n := range c {
				out <- n
			}
		}(ch)
	}

	// 关闭输出通道
	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

func main() {
	// 创建 3 个输入通道
	ch1 := make(chan int)
	ch2 := make(chan int)
	ch3 := make(chan int)

	// 启动 3 个 Goroutine 向输入通道发送数据
	go func() {
		ch1 <- 1
		ch1 <- 2
		close(ch1)
	}()
	go func() {
		ch2 <- 3
		ch2 <- 4
		close(ch2)
	}()
	go func() {
		ch3 <- 5
		ch3 <- 6
		close(ch3)
	}()

	// 合并 3 个输入通道的数据到一个输出通道
	resultCh := merge(ch1, ch2, ch3)

	// 从输出通道读取数据
	for num := range resultCh {
		fmt.Println(num)
	}
}

3. 管道模式(Pipeline)
管道模式是 Go 语言并发编程中一种非常强大且实用的模式,它通过串联多个 Goroutine 来处理数据流,形成一个处理链。这种模式特别适合那些需要分阶段处理任务的场景,例如数据清洗、数据转换、数据筛选等。在管道模式中,每个阶段的 Goroutine 负责完成一个特定的任务,将处理后的数据传递给下一个阶段,就像工厂里的流水线一样,每个工人负责一道工序,最终完成整个产品的生产。

package main

import "fmt"

// generate 函数用于生成一个整数数据流
// 它接收多个整数作为输入,将这些整数依次发送到一个通道中
// 发送完成后关闭通道
func generate(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

// square 函数用于对输入通道中的每个整数进行平方操作
// 它从输入通道中接收整数,计算其平方值,并将结果发送到一个新的通道中
// 当输入通道关闭且所有数据都处理完后,关闭输出通道
func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * n
		}
		close(out)
	}()
	return out
}

// filterEven 函数用于过滤掉输入通道中的奇数,只保留偶数
// 它从输入通道中接收整数,判断是否为偶数,如果是则发送到输出通道
// 当输入通道关闭且所有数据都处理完后,关闭输出通道
func filterEven(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			if n%2 == 0 {
				out <- n
			}
		}
		close(out)
	}()
	return out
}

func main() {
	// 定义一个整数切片作为初始数据
	nums := []int{1, 2, 3, 4}
	// 管道串联:生成 → 平方 → 过滤 → 输出
	// 首先调用 generate 函数生成整数数据流
	// 然后将生成的数据流传递给 square 函数进行平方操作
	// 接着将平方后的数据流传递给 filterEven 函数进行偶数过滤
	// 最后使用 for...range 循环从最终的通道中接收数据并输出
	for n := range filterEven(square(generate(nums...))) {
		fmt.Println(n) // 输出结果:4, 16
	}
}

管道模式的优点 :
可维护性高:每个阶段的任务都封装在独立的函数中,代码结构清晰,易于理解和维护。
可扩展性强:可以方便地添加、删除或修改处理阶段,以适应不同的业务需求。
并发性能好:每个阶段的处理可以并行进行,充分利用多核 CPU 的资源,提高程序的处理效率。
通过管道模式,我们可以将复杂的任务分解为多个简单的子任务,并通过通道将这些子任务连接起来,实现高效、灵活的数据流处理。

4. 使用 Context 控制生命周期
在 Go 中,context 包提供了一种机制来控制 Goroutine 的生命周期,例如取消任务、设置超时等。

package main

import (
	"context"
	"fmt"
	"time"
)

// longRunningTask 函数表示一个长时间运行的任务,它会监听 context 的取消信号
func longRunningTask(ctx context.Context) {
	for {
		select {
		case <-ctx.Done(): // 监听取消信号
			fmt.Println("Task canceled:", ctx.Err())
			return
		default:
			// 模拟工作
			time.Sleep(1 * time.Second)
		}
	}
}

func main() {
	// 创建一个带有超时的 context,超时时间为 3 秒
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()

	// 启动一个 Goroutine 执行长时间运行的任务
	go longRunningTask(ctx)

	// 等待 4 秒,确保任务会被取消
	time.Sleep(4 * time.Second)
}

七、常见问题与调试
1. 竞态条件(Race Condition)
竞态条件是指多个 Goroutine 同时访问和修改共享资源,导致程序的行为变得不可预测。可以使用 Go 语言提供的 -race 标志来检测竞态条件。

package main

import (
	"fmt"
	"sync"
)

var counter int
var wg sync.WaitGroup

func increment() {
	counter++  //不用互斥锁
}

func main() {
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			increment()
		}()
	}
	wg.Wait()
	fmt.Println(counter)
}

要检测上述代码中的竞态条件,可以使用以下命令:

go run -race main.go

如果存在竞态条件,Go 编译器会输出详细的错误信息,帮助我们定位问题。

2. 死锁
死锁是指程序中的 Goroutine 相互等待对方释放资源,导致所有 Goroutine 都无法继续执行的情况。要避免死锁,需要确保通道操作成对出现,避免永久阻塞。

func main() {
	ch := make(chan int)
	ch <- 1 // 这里会导致死锁,因为没有接收方
	<-ch
}
//fatal error: all goroutines are asleep - deadlock!

在上面代码中,由于没有接收方,ch <- 1 会导致永久阻塞,从而引发死锁。要避免这种情况,需要确保在发送数据之前有接收方准备好接收数据。3.在 Goroutine 中传递错误

func doSomething() error {
    return errors.New("something went wrong")
}

func main() {
    errCh := make(chan error)
    go func() {
        errCh <- doSomething()
    }()

    if err := <-errCh; err != nil {
        fmt.Println("Error:", err)
    }
}

4. Goroutine 泄漏
Goroutine 泄漏是指 Goroutine 由于某种原因无法正常退出,导致系统资源不断被占用。要避免 Goroutine 泄漏,始终确保通道被关闭,或使用 context 控制退出。

检测方法:

使用 runtime.NumGoroutine() 监控 goroutine 数量

runtime.NumGoroutine() 函数可以返回当前程序中正在运行的 Goroutine 的数量。通过定期检查这个数量,我们可以判断是否存在 Goroutine 泄漏。如果 Goroutine 的数量持续增加,而没有相应的减少,那么很可能存在泄漏。

以下是一个示例代码,展示了如何使用 runtime.NumGoroutine() 监控 Goroutine 数量:

package main

import (
	"fmt"
	"runtime"
	"time"
)

func worker(ch chan int) {
	for {
		select {
		case num := <-ch:
			fmt.Println("Received:", num)
		default:
			time.Sleep(100 * time.Millisecond)
		}
	}
}

func main() {
	ch := make(chan int)
	go worker(ch)

	// 定期检查 Goroutine 数量
	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()

	for range ticker.C {
		numGoroutines := runtime.NumGoroutine()
		fmt.Printf("Number of goroutines: %d\n", numGoroutines)
	}
}

使用Go 的 pprof 分析 goroutine 堆栈。

pprof 是 Go 语言自带的一个性能分析工具,它可以帮助我们分析程序的 CPU 使用情况、内存分配情况以及 Goroutine 堆栈信息。通过 pprof,我们可以查看每个 Goroutine 的状态和调用栈,从而找出可能存在泄漏的 Goroutine。

以下是一个使用 pprof 分析 Goroutine 堆栈的示例代码:

package main

import (
	"net/http"
	_ "net/http/pprof"
	"time"
)

func worker() {
	for {
		time.Sleep(1 * time.Second)
	}
}

func main() {
	go worker()

	// 启动 pprof 服务
	go func() {
		http.ListenAndServe("localhost:6060", nil)
	}()

	// 让程序一直运行
	select {}
}

代码执行后,使用以下命令:

go tool pprof http://localhost:6060/debug/pprof/goroutine

上述命令后,会进入 pprof 的交互式界面。在这个界面中,可以使用各种命令来查看 Goroutine 的堆栈信息,例如 top 命令可以查看占用资源最多的 Goroutine,list 命令可以查看指定函数的调用栈等。通过分析这些信息,我们可以找出可能存在泄漏的 Goroutine,并对代码进行修复。

八、最佳实践
1.优先使用 channel 而非共享内存
在 Go 语言中,推荐使用 channel 来进行 Goroutine 之间的通信和同步,而不是直接使用共享内存。channel 可以避免竞态条件和死锁等问题,使代码更加简洁和安全。

2.避免在 goroutine 中使用全局变量
全局变量在多个 Goroutine 中共享时容易引发竞态条件。尽量将变量的作用域限制在需要使用的 Goroutine 内部,或者使用 channel 来传递数据。

3.使用 select 实现超时和取消
select 语句可以用于同时监听多个通道的操作,结合 time.After 和 context 可以实现超时和取消功能,提高程序的健壮性。

4.小任务用 goroutine,大任务考虑线程池
对于小的、独立的任务,可以直接使用 Goroutine 来执行,因为 Goroutine 的创建和销毁成本较低。对于大的、复杂的任务,可以考虑使用工作池模式来管理一组 Goroutine,避免创建过多的 Goroutine 导致系统资源耗尽。
通过掌握这些概念和模式,你可以高效地编写并发 Go 程序,充分利用多核 CPU 资源。同时,遵循最佳实践和注意常见问题的处理,可以使你的代码更加健壮和可靠。