在 Go 语言中,生产者-消费者模式是一种常见的并发模式,用于处理生产者和消费者之间的任务分配。生产者负责生成任务并将其放入队列,而消费者则从队列中取出任务并执行。Go 的并发模型通过 goroutine 和 channel 可以很好地实现这种模式。
下面是一个简单的生产者-消费者模式的实现示例:
package main import ( "fmt" "sync" "time" ) func producer(id int, tasks chan<- int, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < 5; i++ { task := id*10 + i fmt.Printf("Producer %d produced task %d\n", id, task) tasks <- task time.Sleep(time.Millisecond * 500) } } func consumer(id int, tasks <-chan int, wg *sync.WaitGroup) { defer wg.Done() for task := range tasks { fmt.Printf("Consumer %d consumed task %d\n", id, task) time.Sleep(time.Second) } } func main() { tasks := make(chan int, 10) var producerWG, consumerWG sync.WaitGroup // 启动生产者 producerCount := 2 producerWG.Add(producerCount) for i := 1; i <= producerCount; i++ { go producer(i, tasks, &producerWG) } // 启动消费者 consumerCount := 3 consumerWG.Add(consumerCount) for i := 1; i <= consumerCount; i++ { go consumer(i, tasks, &consumerWG) } // 等待生产者完成并关闭channel producerWG.Wait() close(tasks) // 等待消费者处理剩余任务 consumerWG.Wait() fmt.Println("All tasks completed.") }
代码说明:
关键修改说明:
分离 WaitGroup: 使用 producerWG 和 consumerWG 分别跟踪生产者和消费者的完成状态。
正确的关闭顺序: 先等待生产者完成并关闭 channel,确保消费者能正常退出循环。
消除冗余 wg.Wait(): 主函数只需等待生产者和消费者各自的 WaitGroup,避免死锁。
改进后的流程:
生产者 goroutine 完成任务后通过 producerWG.Done() 通知。
主函数等待所有生产者完成,随后关闭 channel。
消费者处理完 channel 中的所有任务后(channel 关闭后退出循环),通过 consumerWG.Done() 通知。
主函数等待所有消费者完成,程序正常退出。
此修正确保并发流程正确同步,避免了死锁并保证所有任务被处理。
生产者逻辑:
生产者生成任务并发送到 tasks channel。
生产者完成任务后调用 wg.Done()。
消费者逻辑:
消费者从 tasks channel 中接收任务并处理。
当 tasks channel 关闭且没有更多任务时,消费者退出循环并调用 wg.Done()。
主函数逻辑:
启动生产者和消费者,并分别将它们的数量添加到 WaitGroup。
等待所有生产者完成任务后,关闭 tasks channel。
等待所有消费者完成任务后,程序退出。
优点:
动态等待:不再依赖固定的 time.Sleep,而是通过 WaitGroup 动态等待任务完成。
优雅关闭:显式关闭 channel,确保消费者能够正确退出。
可扩展性:可以轻松调整生产者和消费者的数量,而无需修改核心逻辑。
输出示例:
Producer 1 produced task 10 Producer 2 produced task 20 Consumer 1 consumed task 10 Consumer 2 consumed task 20 Producer 1 produced task 11 Producer 2 produced task 21 Consumer 3 consumed task 11 Consumer 1 consumed task 21 Producer 1 produced task 12 Producer 2 produced task 22 Consumer 2 consumed task 12 Consumer 3 consumed task 22 ... All tasks completed.
总结:
通过使用 sync.WaitGroup 和显式关闭 channel,我们实现了更实用和优雅的生产者-消费者模式。这种方式可以动态感知任务完成情况,避免了不必要的等待时间,同时提高了代码的可维护性和可扩展性。
应用:
实时数据处理系统
在实时数据处理系统中,生产者生成数据(如传感器数据、用户行为数据),消费者对数据进行实时分析或存储。
场景描述:
生产者:生成实时数据并发送到队列。
消费者:从队列中读取数据并进行处理(如分析、存储)。
Go 实现:
package main import ( "fmt" "sync" "time" ) // 模拟数据生成 func dataProducer(data chan<- int, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < 10; i++ { data <- i fmt.Printf("Produced data: %d\n", i) time.Sleep(time.Millisecond * 200) // 模拟数据生成耗时 } } // 模拟数据处理 func dataConsumer(id int, data <-chan int, wg *sync.WaitGroup) { defer wg.Done() for d := range data { fmt.Printf("Consumer %d processed data: %d\n", id, d) time.Sleep(time.Millisecond * 500) // 模拟数据处理耗时 } } func main() { data := make(chan int, 10) // 使用独立的 WaitGroup 控制生产者和消费者 var producerWG, consumerWG sync.WaitGroup // 启动 2 个数据生产者 for i := 1; i <= 2; i++ { producerWG.Add(1) go dataProducer(data, &producerWG) } // 启动 3 个数据消费者 for i := 1; i <= 3; i++ { consumerWG.Add(1) go dataConsumer(i, data, &consumerWG) } // 等待所有生产者完成 producerWG.Wait() // 关闭数据通道(通知消费者没有新数据) close(data) // 等待所有消费者完成 consumerWG.Wait() fmt.Println("All data processed.") }
总结
生产者-消费者模式在以下场景中非常实用:
解耦生产与消费:生产者和消费者可以独立运行,互不干扰。
流量控制:通过 channel 的缓冲区大小,可以控制生产者和消费者的处理速度。
提高系统性能:通过并发处理任务,可以充分利用多核 CPU 资源。
以上例子展示了该模式在日志收集、任务调度、图片处理和实时数据处理等场景中的应用。根据具体需求,可以灵活调整生产者和消费者的数量以及任务的处理逻辑。