下面是一个典型的 Go 示例,展示了如何往一个通道中不断发送数据,同时通过多个 goroutine 并发读取和处理通道数据。这个例子使用了 生产者-消费者模式,其中生产者 goroutine 不断生成数据并发送到通道,而多个消费者 goroutine 从通道中读取数据并处理。
示例:多生产者-多消费者模型
package main import ( "fmt" "sync" "time" ) // 生产者:不断生成数据并发送到通道 func producer(ch chan<- int, wg *sync.WaitGroup) { defer wg.Done() for i := 1; i <= 10; i++ { // 生产 10 个数据 ch <- i fmt.Printf("Produced: %d\n", i) time.Sleep(100 * time.Millisecond) // 模拟生产耗时 } } // 消费者:从通道中读取数据并处理 func consumer(id int, ch <-chan int, wg *sync.WaitGroup) { defer wg.Done() for value := range ch { fmt.Printf("Consumer %d processed: %d\n", id, value) time.Sleep(200 * time.Millisecond) // 模拟处理耗时 } } func main() { // 创建一个带缓冲的通道 ch := make(chan int, 5) // 使用 WaitGroup 等待所有 goroutine 完成 var wg sync.WaitGroup // 启动生产者 goroutine wg.Add(1) go producer(ch, &wg) // 启动多个消费者 goroutine numConsumers := 3 wg.Add(numConsumers) for i := 1; i <= numConsumers; i++ { go consumer(i, ch, &wg) } // 等待生产者完成 wg.Wait() // 关闭通道,通知消费者停止读取 close(ch) // 等待消费者完成 wg.Wait() fmt.Println("All tasks completed.") }
代码说明
- 生产者 (
producer
):- 不断生成数据并发送到通道
ch
。 - 使用
defer wg.Done()
确保在函数结束时通知WaitGroup
。
- 不断生成数据并发送到通道
- 消费者 (
consumer
):- 从通道
ch
中读取数据并处理。 - 使用
range
循环读取通道数据,直到通道关闭。 - 每个消费者都有一个唯一的
id
,用于标识不同的消费者。
- 从通道
- 通道 (
ch
):- 使用带缓冲的通道(
make(chan int, 5)
),允许生产者在不阻塞的情况下发送数据,直到缓冲区满。
- 使用带缓冲的通道(
sync.WaitGroup
:- 用于等待所有 goroutine 完成。
- 生产者完成后关闭通道,通知消费者停止读取。
- 关闭通道:
- 生产者完成后关闭通道,消费者通过
range
检测到通道关闭后退出循环。
- 生产者完成后关闭通道,消费者通过
输出示例
运行程序后,输出可能类似于以下内容(由于并发执行,顺序可能不同):
Produced: 1 Consumer 1 processed: 1 Produced: 2 Consumer 2 processed: 2 Produced: 3 Consumer 3 processed: 3 Produced: 4 Consumer 1 processed: 4 Produced: 5 Consumer 2 processed: 5 Produced: 6 Consumer 3 processed: 6 Produced: 7 Consumer 1 processed: 7 Produced: 8 Consumer 2 processed: 8 Produced: 9 Consumer 3 processed: 9 Produced: 10 Consumer 1 processed: 10 All tasks completed.
关键点
- 并发安全:
- 通道是并发安全的,多个 goroutine 可以同时读取和写入通道,而不需要额外的同步机制。
- 通道关闭:
- 关闭通道是一种优雅的方式,用于通知消费者停止读取数据。
- 负载均衡:
- 多个消费者可以并发处理数据,实现负载均衡。
- 缓冲通道:
- 使用带缓冲的通道可以提高性能,避免生产者和消费者之间的强耦合。
扩展
- 如果需要更复杂的控制(例如动态调整生产者或消费者的数量),可以使用
context
或额外的控制通道。 - 如果消费者处理数据的耗时较长,可以增加消费者的数量以提高吞吐量。
希望这个示例对你有帮助!
以上代码为什么要使用两个wg.Wait()
在以上代码中,使用两个 wg.Wait()
是为了确保程序的正确性和完整性。它们分别用于等待生产者完成和消费者完成,具体原因如下:
1. 第一个 wg.Wait()
:等待生产者完成
// 等待生产者完成 wg.Wait()
- 作用:等待生产者 goroutine 完成任务。
- 原因:
- 生产者负责生成数据并发送到通道。
- 在关闭通道之前,必须确保生产者已经完成了所有数据的发送。
- 如果提前关闭通道,消费者可能会读取到不完整的数据,或者生产者可能会尝试向已关闭的通道发送数据,导致 panic。
- 实现:
- 在生产者 goroutine 中,使用
defer wg.Done()
确保在生产者完成后通知WaitGroup
。 - 主 goroutine 调用
wg.Wait()
等待生产者完成。
- 在生产者 goroutine 中,使用
2. 第二个 wg.Wait()
:等待消费者完成
// 关闭通道,通知消费者停止读取 close(ch) // 等待消费者完成 wg.Wait()
- 作用:等待消费者 goroutine 完成任务。
- 原因:
- 消费者从通道中读取数据并处理。
- 在关闭通道后,消费者会继续处理通道中剩余的数据,直到通道为空。
- 必须等待所有消费者处理完数据后,程序才能退出,否则可能会导致数据丢失或未完成处理。
- 实现:
- 在消费者 goroutine 中,使用
defer wg.Done()
确保在消费者完成后通知WaitGroup
。 - 主 goroutine 在关闭通道后调用
wg.Wait()
等待消费者完成。
- 在消费者 goroutine 中,使用
为什么需要两个 wg.Wait()
?
- 确保生产者完成:
- 生产者完成后才能安全地关闭通道。
- 如果提前关闭通道,生产者可能会尝试向已关闭的通道发送数据,导致 panic。
- 确保消费者完成:
- 关闭通道后,消费者会继续处理通道中剩余的数据。
- 必须等待所有消费者完成处理,否则程序可能会提前退出,导致数据丢失或未完成处理。
- 分离关注点:
- 第一个
wg.Wait()
关注的是生产者的完成。 - 第二个
wg.Wait()
关注的是消费者的完成。 - 通过分离关注点,代码逻辑更清晰,易于维护。
- 第一个
代码流程总结
- 启动生产者和消费者 goroutine。
- 主 goroutine 调用第一个
wg.Wait()
,等待生产者完成。 - 生产者完成后,关闭通道。
- 主 goroutine 调用第二个
wg.Wait()
,等待消费者完成。 - 所有消费者完成后,程序退出。
示例代码回顾
func main() { ch := make(chan int, 5) var wg sync.WaitGroup // 启动生产者 wg.Add(1) go producer(ch, &wg) // 启动消费者 numConsumers := 3 wg.Add(numConsumers) for i := 1; i <= numConsumers; i++ { go consumer(i, ch, &wg) } // 等待生产者完成 wg.Wait() // 关闭通道,通知消费者停止读取 close(ch) // 等待消费者完成 wg.Wait() fmt.Println("All tasks completed.") }
如果不使用两个 wg.Wait()
会怎样?
- 只使用一个
wg.Wait()
:- 如果只等待生产者完成,消费者可能还没有处理完数据,程序就退出了。
- 如果只等待消费者完成,生产者可能还没有发送完数据,通道就被关闭了。
- 不关闭通道:
- 如果通道未关闭,消费者会一直阻塞在
range
循环中,导致 goroutine 泄漏。
- 如果通道未关闭,消费者会一直阻塞在
- 提前关闭通道:
- 如果提前关闭通道,生产者可能会尝试向已关闭的通道发送数据,导致 panic。
总结
使用两个 wg.Wait()
是为了确保:
- 生产者完成数据发送后,再关闭通道。
- 消费者处理完所有数据后,程序再退出。
这是一种常见的并发编程模式,确保程序的正确性和完整性。
以上代码,系统怎么知道 第一个 wg.Wait() 关注的是生产者的完成。 第二个 wg.Wait() 关注的是消费者的完成。
在代码中,系统并不知道 wg.Wait()
具体是在等待生产者还是消费者。sync.WaitGroup
本身只是一个计数器,它并不知道你在等待什么。它的行为完全取决于你在代码中如何设计 wg.Add()
和 wg.Done()
的调用。
在之前的代码中,两个 wg.Wait()
实际上是在等待同一个 WaitGroup
,这会导致逻辑错误,因为 WaitGroup
的计数器会被生产者和消费者共同使用,无法区分它们的完成状态。
问题分析
在之前的代码中:
// 启动生产者 wg.Add(1) go producer(ch, &wg) // 启动消费者 numConsumers := 3 wg.Add(numConsumers) for i := 1; i <= numConsumers; i++ { go consumer(i, ch, &wg) } // 等待生产者完成 wg.Wait() // 这里会等待生产者和消费者都完成 // 关闭通道 close(ch) // 等待消费者完成 wg.Wait() // 这里会直接返回,因为计数器已经为 0
- 问题:
- 第一个
wg.Wait()
会等待生产者和所有消费者都完成,因为WaitGroup
的计数器是生产者和消费者共享的。 - 第二个
wg.Wait()
会直接返回,因为计数器已经为 0。
- 第一个
正确实现:使用两个 WaitGroup
为了明确区分生产者和消费者的完成状态,应该使用两个独立的 WaitGroup
:
- 一个用于等待生产者完成。
- 另一个用于等待消费者完成。
以下是修正后的代码:
修正后的代码
package main import ( "fmt" "sync" "time" ) // 生产者:不断生成数据并发送到通道 func producer(ch chan<- int, wg *sync.WaitGroup) { defer wg.Done() // 生产者完成后通知 WaitGroup for i := 1; i <= 10; i++ { ch <- i fmt.Printf("Produced: %d\n", i) time.Sleep(100 * time.Millisecond) // 模拟生产耗时 } } // 消费者:从通道中读取数据并处理 func consumer(id int, ch <-chan int, wg *sync.WaitGroup) { defer wg.Done() // 消费者完成后通知 WaitGroup for value := range ch { fmt.Printf("Consumer %d processed: %d\n", id, value) time.Sleep(200 * time.Millisecond) // 模拟处理耗时 } } func main() { // 创建一个带缓冲的通道 ch := make(chan int, 5) // 使用两个 WaitGroup var producerWg sync.WaitGroup // 用于等待生产者完成 var consumerWg sync.WaitGroup // 用于等待消费者完成 // 启动生产者 goroutine producerWg.Add(1) go producer(ch, &producerWg) // 启动多个消费者 goroutine numConsumers := 3 consumerWg.Add(numConsumers) for i := 1; i <= numConsumers; i++ { go consumer(i, ch, &consumerWg) } // 等待生产者完成 producerWg.Wait() // 关闭通道,通知消费者停止读取 close(ch) // 等待消费者完成 consumerWg.Wait() fmt.Println("All tasks completed.") }
代码说明
- 两个
WaitGroup
:producerWg
:用于等待生产者完成。consumerWg
:用于等待消费者完成。
- 生产者:
- 使用
producerWg.Add(1)
增加生产者计数器。 - 在
producer
函数中,使用defer producerWg.Done()
减少生产者计数器。
- 使用
- 消费者:
- 使用
consumerWg.Add(numConsumers)
增加消费者计数器。 - 在
consumer
函数中,使用defer consumerWg.Done()
减少消费者计数器。
- 使用
- 主 goroutine:
- 调用
producerWg.Wait()
等待生产者完成。 - 关闭通道
close(ch)
,通知消费者停止读取。 - 调用
consumerWg.Wait()
等待消费者完成。
- 调用
输出示例
Produced: 1 Consumer 1 processed: 1 Produced: 2 Consumer 2 processed: 2 Produced: 3 Consumer 3 processed: 3 Produced: 4 Consumer 1 processed: 4 Produced: 5 Consumer 2 processed: 5 Produced: 6 Consumer 3 processed: 6 Produced: 7 Consumer 1 processed: 7 Produced: 8 Consumer 2 processed: 8 Produced: 9 Consumer 3 processed: 9 Produced: 10 Consumer 1 processed: 10 All tasks completed.
关键点
- 分离
WaitGroup
:- 使用两个独立的
WaitGroup
分别管理生产者和消费者的完成状态。
- 使用两个独立的
- 明确职责:
producerWg
只关注生产者的完成。consumerWg
只关注消费者的完成。
- 避免逻辑错误:
- 确保在关闭通道之前,生产者已经完成。
- 确保在程序退出之前,消费者已经完成。
总结
通过使用两个独立的 WaitGroup
,可以明确区分生产者和消费者的完成状态,避免逻辑错误。这种方式是 Go 并发编程中的常见模式,适用于需要精细控制多个 goroutine 生命周期的场景。
如果有多个生产者,代码需要稍作调整,以确保所有生产者完成任务后再关闭通道,同时消费者能够正确处理所有数据。以下是修改后的代码:
修改后的代码:支持多个生产者
package main import ( "fmt" "sync" "time" ) // 生产者:不断生成数据并发送到通道 func producer(id int, ch chan<- int, wg *sync.WaitGroup) { defer wg.Done() // 生产者完成后通知 WaitGroup for i := 1; i <= 5; i++ { // 每个生产者生产 5 个数据 value := id*10 + i // 生成唯一的值 ch <- value fmt.Printf("Producer %d produced: %d\n", id, value) time.Sleep(100 * time.Millisecond) // 模拟生产耗时 } } // 消费者:从通道中读取数据并处理 func consumer(id int, ch <-chan int, wg *sync.WaitGroup) { defer wg.Done() // 消费者完成后通知 WaitGroup for value := range ch { fmt.Printf("Consumer %d processed: %d\n", id, value) time.Sleep(200 * time.Millisecond) // 模拟处理耗时 } } func main() { // 创建一个带缓冲的通道 ch := make(chan int, 10) // 使用两个 WaitGroup var producerWg sync.WaitGroup // 用于等待所有生产者完成 var consumerWg sync.WaitGroup // 用于等待所有消费者完成 // 启动多个生产者 goroutine numProducers := 3 producerWg.Add(numProducers) for i := 1; i <= numProducers; i++ { go producer(i, ch, &producerWg) } // 启动多个消费者 goroutine numConsumers := 2 consumerWg.Add(numConsumers) for i := 1; i <= numConsumers; i++ { go consumer(i, ch, &consumerWg) } // 等待所有生产者完成 producerWg.Wait() // 关闭通道,通知消费者停止读取 close(ch) // 等待所有消费者完成 consumerWg.Wait() fmt.Println("All tasks completed.") }
代码说明
- 多个生产者:
- 每个生产者都有一个唯一的
id
,用于标识不同的生产者。 - 每个生产者生成 5 个数据,并将其发送到通道。
- 使用
producerWg.Add(numProducers)
增加生产者计数器。 - 在
producer
函数中,使用defer producerWg.Done()
减少生产者计数器。
- 每个生产者都有一个唯一的
- 多个消费者:
- 每个消费者都有一个唯一的
id
,用于标识不同的消费者。 - 消费者从通道中读取数据并处理。
- 使用
consumerWg.Add(numConsumers)
增加消费者计数器。 - 在
consumer
函数中,使用defer consumerWg.Done()
减少消费者计数器。
- 每个消费者都有一个唯一的
- 主 goroutine:
- 调用
producerWg.Wait()
等待所有生产者完成。 - 关闭通道
close(ch)
,通知消费者停止读取。 - 调用
consumerWg.Wait()
等待所有消费者完成。
- 调用
输出示例
由于并发执行,输出顺序可能不同,但大致如下:
Producer 1 produced: 11 Producer 2 produced: 21 Producer 3 produced: 31 Consumer 1 processed: 11 Consumer 2 processed: 21 Producer 1 produced: 12 Producer 2 produced: 22 Producer 3 produced: 32 Consumer 1 processed: 31 Consumer 2 processed: 12 Producer 1 produced: 13 Producer 2 produced: 23 Producer 3 produced: 33 Consumer 1 processed: 22 Consumer 2 processed: 32 Producer 1 produced: 14 Producer 2 produced: 24 Producer 3 produced: 34 Consumer 1 processed: 13 Consumer 2 processed: 23 Producer 1 produced: 15 Producer 2 produced: 25 Producer 3 produced: 35 Consumer 1 processed: 33 Consumer 2 processed: 14 Consumer 1 processed: 24 Consumer 2 processed: 34 Consumer 1 processed: 15 Consumer 2 processed: 25 Consumer 1 processed: 35 All tasks completed.
关键点
- 生产者唯一标识:
- 每个生产者生成的数据通过
id*10 + i
确保唯一性。
- 每个生产者生成的数据通过
- 通道容量:
- 通道的容量(
make(chan int, 10)
)可以根据生产者和消费者的数量调整,以避免阻塞。
- 通道的容量(
- 关闭通道:
- 在所有生产者完成后关闭通道,确保消费者能够正确处理所有数据。
- 负载均衡:
- 多个消费者并发处理数据,实现负载均衡。
总结
通过使用多个生产者和消费者,可以充分利用 Go 的并发特性。关键点在于:
- 使用独立的
WaitGroup
分别管理生产者和消费者的完成状态。 - 确保在所有生产者完成后关闭通道。
- 确保在所有消费者完成后程序退出。
这种模式非常适合需要高并发处理的场景,例如任务分发、数据处理等。
这种场景是一个典型的 多级流水线(Pipeline)模型,其中数据从一个通道流向另一个通道,每一级都有多个 goroutine 并发处理数据。以下是一个完整的设计和实现示例:
设计思路
- 第一级:
- 生产者 goroutine 不断生成数据并发送到第一个通道(
ch1
)。 - 多个消费者 goroutine 从
ch1
中读取数据,处理后将结果发送到第二个通道(ch2
)。
- 生产者 goroutine 不断生成数据并发送到第一个通道(
- 第二级:
- 多个消费者 goroutine 从
ch2
中读取数据,处理后将结果发送到第三个通道(ch3
)。 - 依此类推,可以设计更多级。
- 多个消费者 goroutine 从
- 关闭通道:
- 每一级的生产者完成后,关闭对应的通道,通知下一级消费者停止读取。
- 同步:
- 使用
sync.WaitGroup
确保每一级的 goroutine 都完成任务。
- 使用
示例代码
以下是一个两级流水线的示例:
package main import ( "fmt" "sync" "time" ) // 第一级生产者:生成数据并发送到 ch1 func producer(ch1 chan<- int, wg *sync.WaitGroup) { defer wg.Done() for i := 1; i <= 10; i++ { ch1 <- i fmt.Printf("Producer sent: %d\n", i) time.Sleep(100 * time.Millisecond) // 模拟生产耗时 } } // 第一级消费者:从 ch1 读取数据,处理后将结果发送到 ch2 func stage1Consumer(id int, ch1 <-chan int, ch2 chan<- int, wg *sync.WaitGroup) { defer wg.Done() for value := range ch1 { result := value * 2 // 模拟处理逻辑 ch2 <- result fmt.Printf("Stage 1 Consumer %d processed: %d -> %d\n", id, value, result) time.Sleep(200 * time.Millisecond) // 模拟处理耗时 } } // 第二级消费者:从 ch2 读取数据,处理后将结果发送到 ch3 func stage2Consumer(id int, ch2 <-chan int, ch3 chan<- int, wg *sync.WaitGroup) { defer wg.Done() for value := range ch2 { result := value + 1 // 模拟处理逻辑 ch3 <- result fmt.Printf("Stage 2 Consumer %d processed: %d -> %d\n", id, value, result) time.Sleep(200 * time.Millisecond) // 模拟处理耗时 } } // 最终消费者:从 ch3 读取数据并处理 func finalConsumer(id int, ch3 <-chan int, wg *sync.WaitGroup) { defer wg.Done() for value := range ch3 { fmt.Printf("Final Consumer %d received: %d\n", id, value) time.Sleep(200 * time.Millisecond) // 模拟处理耗时 } } func main() { // 创建通道 ch1 := make(chan int, 10) ch2 := make(chan int, 10) ch3 := make(chan int, 10) // 使用 WaitGroup 同步 var wg sync.WaitGroup // 启动第一级生产者 wg.Add(1) go producer(ch1, &wg) // 启动第一级消费者 numStage1Consumers := 2 wg.Add(numStage1Consumers) for i := 1; i <= numStage1Consumers; i++ { go stage1Consumer(i, ch1, ch2, &wg) } // 启动第二级消费者 numStage2Consumers := 2 wg.Add(numStage2Consumers) for i := 1; i <= numStage2Consumers; i++ { go stage2Consumer(i, ch2, ch3, &wg) } // 启动最终消费者 numFinalConsumers := 2 wg.Add(numFinalConsumers) for i := 1; i <= numFinalConsumers; i++ { go finalConsumer(i, ch3, &wg) } // 等待第一级生产者完成 wg.Wait() // 关闭第一级通道 close(ch1) // 等待第一级消费者完成 wg.Wait() // 关闭第二级通道 close(ch2) // 等待第二级消费者完成 wg.Wait() // 关闭最终通道 close(ch3) // 等待最终消费者完成 wg.Wait() fmt.Println("All tasks completed.") }
代码说明
- 通道:
ch1
:第一级通道,生产者发送数据到ch1
。ch2
:第二级通道,第一级消费者将处理后的数据发送到ch2
。ch3
:最终通道,第二级消费者将处理后的数据发送到ch3
。
- 生产者:
- 生产者生成数据并发送到
ch1
。
- 生产者生成数据并发送到
- 第一级消费者:
- 从
ch1
读取数据,处理后将结果发送到ch2
。
- 从
- 第二级消费者:
- 从
ch2
读取数据,处理后将结果发送到ch3
。
- 从
- 最终消费者:
- 从
ch3
读取数据并处理。
- 从
- 关闭通道:
- 每一级的生产者完成后,关闭对应的通道,通知下一级消费者停止读取。
- 同步:
- 使用
sync.WaitGroup
确保每一级的 goroutine 都完成任务。
- 使用
输出示例
由于并发执行,输出顺序可能不同,但大致如下:
Producer sent: 1 Stage 1 Consumer 1 processed: 1 -> 2 Producer sent: 2 Stage 1 Consumer 2 processed: 2 -> 4 Stage 2 Consumer 1 processed: 2 -> 3 Final Consumer 1 received: 3 Producer sent: 3 Stage 1 Consumer 1 processed: 3 -> 6 Stage 2 Consumer 2 processed: 4 -> 5 Final Consumer 2 received: 5 Producer sent: 4 Stage 1 Consumer 2 processed: 4 -> 8 Stage 2 Consumer 1 processed: 6 -> 7 Final Consumer 1 received: 7 Producer sent: 5 Stage 1 Consumer 1 processed: 5 -> 10 Stage 2 Consumer 2 processed: 8 -> 9 Final Consumer 2 received: 9 ... All tasks completed.
关键点
- 流水线设计:
- 每一级都有独立的 goroutine 处理数据,并通过通道传递结果。
- 通道关闭:
- 每一级的生产者完成后,关闭对应的通道,通知下一级消费者停止读取。
- 负载均衡:
- 每一级都可以启动多个消费者 goroutine,实现并发处理。
- 同步:
- 使用
sync.WaitGroup
确保所有 goroutine 都完成任务。
- 使用
总结
通过多级流水线设计,可以实现复杂的数据处理流程。每一级都可以独立扩展和优化,适用于需要高并发处理的场景,例如数据清洗、任务分发、实时计算等。
是的,为了确保程序的正确性,需要区分不同的 wg.Wait()
,以便明确等待的是哪一级的生产者或消费者。在之前的代码中,所有 goroutine 共享同一个 WaitGroup
,这会导致逻辑错误,因为无法区分不同阶段的完成状态。
为了正确区分,应该为每一级的生产者和消费者使用独立的 WaitGroup
。以下是改进后的代码:
改进后的代码:区分各级的 WaitGroup
package main import ( "fmt" "sync" "time" ) // 第一级生产者:生成数据并发送到 ch1 func producer(ch1 chan<- int, wg *sync.WaitGroup) { defer wg.Done() for i := 1; i <= 10; i++ { ch1 <- i fmt.Printf("Producer sent: %d\n", i) time.Sleep(100 * time.Millisecond) // 模拟生产耗时 } } // 第一级消费者:从 ch1 读取数据,处理后将结果发送到 ch2 func stage1Consumer(id int, ch1 <-chan int, ch2 chan<- int, wg *sync.WaitGroup) { defer wg.Done() for value := range ch1 { result := value * 2 // 模拟处理逻辑 ch2 <- result fmt.Printf("Stage 1 Consumer %d processed: %d -> %d\n", id, value, result) time.Sleep(200 * time.Millisecond) // 模拟处理耗时 } } // 第二级消费者:从 ch2 读取数据,处理后将结果发送到 ch3 func stage2Consumer(id int, ch2 <-chan int, ch3 chan<- int, wg *sync.WaitGroup) { defer wg.Done() for value := range ch2 { result := value + 1 // 模拟处理逻辑 ch3 <- result fmt.Printf("Stage 2 Consumer %d processed: %d -> %d\n", id, value, result) time.Sleep(200 * time.Millisecond) // 模拟处理耗时 } } // 最终消费者:从 ch3 读取数据并处理 func finalConsumer(id int, ch3 <-chan int, wg *sync.WaitGroup) { defer wg.Done() for value := range ch3 { fmt.Printf("Final Consumer %d received: %d\n", id, value) time.Sleep(200 * time.Millisecond) // 模拟处理耗时 } } func main() { // 创建通道 ch1 := make(chan int, 10) ch2 := make(chan int, 10) ch3 := make(chan int, 10) // 使用独立的 WaitGroup 同步每一级 var producerWg sync.WaitGroup // 生产者 WaitGroup var stage1Wg sync.WaitGroup // 第一级消费者 WaitGroup var stage2Wg sync.WaitGroup // 第二级消费者 WaitGroup var finalWg sync.WaitGroup // 最终消费者 WaitGroup // 启动第一级生产者 producerWg.Add(1) go producer(ch1, &producerWg) // 启动第一级消费者 numStage1Consumers := 2 stage1Wg.Add(numStage1Consumers) for i := 1; i <= numStage1Consumers; i++ { go stage1Consumer(i, ch1, ch2, &stage1Wg) } // 启动第二级消费者 numStage2Consumers := 2 stage2Wg.Add(numStage2Consumers) for i := 1; i <= numStage2Consumers; i++ { go stage2Consumer(i, ch2, ch3, &stage2Wg) } // 启动最终消费者 numFinalConsumers := 2 finalWg.Add(numFinalConsumers) for i := 1; i <= numFinalConsumers; i++ { go finalConsumer(i, ch3, &finalWg) } // 等待第一级生产者完成 producerWg.Wait() // 关闭第一级通道 close(ch1) // 等待第一级消费者完成 stage1Wg.Wait() // 关闭第二级通道 close(ch2) // 等待第二级消费者完成 stage2Wg.Wait() // 关闭最终通道 close(ch3) // 等待最终消费者完成 finalWg.Wait() fmt.Println("All tasks completed.") }
代码说明
- 独立的
WaitGroup
:producerWg
:用于等待生产者完成。stage1Wg
:用于等待第一级消费者完成。stage2Wg
:用于等待第二级消费者完成。finalWg
:用于等待最终消费者完成。
- 通道关闭:
- 每一级的生产者完成后,关闭对应的通道,通知下一级消费者停止读取。
- 同步:
- 使用独立的
WaitGroup
确保每一级的 goroutine 都完成任务。
- 使用独立的
输出示例
由于并发执行,输出顺序可能不同,但大致如下:
Producer sent: 1 Stage 1 Consumer 1 processed: 1 -> 2 Producer sent: 2 Stage 1 Consumer 2 processed: 2 -> 4 Stage 2 Consumer 1 processed: 2 -> 3 Final Consumer 1 received: 3 Producer sent: 3 Stage 1 Consumer 1 processed: 3 -> 6 Stage 2 Consumer 2 processed: 4 -> 5 Final Consumer 2 received: 5 Producer sent: 4 Stage 1 Consumer 2 processed: 4 -> 8 Stage 2 Consumer 1 processed: 6 -> 7 Final Consumer 1 received: 7 Producer sent: 5 Stage 1 Consumer 1 processed: 5 -> 10 Stage 2 Consumer 2 processed: 8 -> 9 Final Consumer 2 received: 9 ... All tasks completed.
关键点
- 区分
WaitGroup
:- 每一级的生产者和消费者使用独立的
WaitGroup
,确保正确同步。
- 每一级的生产者和消费者使用独立的
- 通道关闭顺序:
- 每一级的生产者完成后,关闭对应的通道,通知下一级消费者停止读取。
- 负载均衡:
- 每一级都可以启动多个消费者 goroutine,实现并发处理。
总结
通过为每一级的生产者和消费者使用独立的 WaitGroup
,可以明确区分不同阶段的完成状态,确保程序的正确性。这种设计适用于多级流水线场景,能够高效处理并发任务。