生产者-消费者模式

在 Go 语言中,生产者-消费者模式是一种常见的并发模式,用于处理生产者和消费者之间的任务分配。生产者负责生成任务并将其放入队列,而消费者则从队列中取出任务并执行。Go 的并发模型通过 goroutine 和 channel 可以很好地实现这种模式。

下面是一个简单的生产者-消费者模式的实现示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

func producer(id int, tasks chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < 5; i++ {
        task := id*10 + i
        fmt.Printf("Producer %d produced task %d\n", id, task)
        tasks <- task
        time.Sleep(time.Millisecond * 500)
    }
}

func consumer(id int, tasks <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for task := range tasks {
        fmt.Printf("Consumer %d consumed task %d\n", id, task)
        time.Sleep(time.Second)
    }
}

func main() {
    tasks := make(chan int, 10)

    var producerWG, consumerWG sync.WaitGroup

    // 启动生产者
    producerCount := 2
    producerWG.Add(producerCount)
    for i := 1; i <= producerCount; i++ {
        go producer(i, tasks, &producerWG)
    }

    // 启动消费者
    consumerCount := 3
    consumerWG.Add(consumerCount)
    for i := 1; i <= consumerCount; i++ {
        go consumer(i, tasks, &consumerWG)
    }

    // 等待生产者完成并关闭channel
    producerWG.Wait()
    close(tasks)

    // 等待消费者处理剩余任务
    consumerWG.Wait()

    fmt.Println("All tasks completed.")
}

代码说明:

关键修改说明:

分离 WaitGroup: 使用 producerWG 和 consumerWG 分别跟踪生产者和消费者的完成状态。

正确的关闭顺序: 先等待生产者完成并关闭 channel,确保消费者能正常退出循环。

消除冗余 wg.Wait(): 主函数只需等待生产者和消费者各自的 WaitGroup,避免死锁。

改进后的流程:

生产者 goroutine 完成任务后通过 producerWG.Done() 通知。

主函数等待所有生产者完成,随后关闭 channel。

消费者处理完 channel 中的所有任务后(channel 关闭后退出循环),通过 consumerWG.Done() 通知。

主函数等待所有消费者完成,程序正常退出。

此修正确保并发流程正确同步,避免了死锁并保证所有任务被处理。

生产者逻辑:

生产者生成任务并发送到 tasks channel。

生产者完成任务后调用 wg.Done()。

消费者逻辑:

消费者从 tasks channel 中接收任务并处理。

当 tasks channel 关闭且没有更多任务时,消费者退出循环并调用 wg.Done()。

主函数逻辑:

启动生产者和消费者,并分别将它们的数量添加到 WaitGroup。

等待所有生产者完成任务后,关闭 tasks channel。

等待所有消费者完成任务后,程序退出。

优点:

动态等待:不再依赖固定的 time.Sleep,而是通过 WaitGroup 动态等待任务完成。

优雅关闭:显式关闭 channel,确保消费者能够正确退出。

可扩展性:可以轻松调整生产者和消费者的数量,而无需修改核心逻辑。

输出示例:

Producer 1 produced task 10
Producer 2 produced task 20
Consumer 1 consumed task 10
Consumer 2 consumed task 20
Producer 1 produced task 11
Producer 2 produced task 21
Consumer 3 consumed task 11
Consumer 1 consumed task 21
Producer 1 produced task 12
Producer 2 produced task 22
Consumer 2 consumed task 12
Consumer 3 consumed task 22
...
All tasks completed.

总结:

通过使用 sync.WaitGroup 和显式关闭 channel,我们实现了更实用和优雅的生产者-消费者模式。这种方式可以动态感知任务完成情况,避免了不必要的等待时间,同时提高了代码的可维护性和可扩展性。

应用:

实时数据处理系统

在实时数据处理系统中,生产者生成数据(如传感器数据、用户行为数据),消费者对数据进行实时分析或存储。

场景描述:

生产者:生成实时数据并发送到队列。

消费者:从队列中读取数据并进行处理(如分析、存储)。

Go 实现:

package main

import (
	"fmt"
	"sync"
	"time"
)

// 模拟数据生成
func dataProducer(data chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for i := 0; i < 10; i++ {
		data <- i
		fmt.Printf("Produced data: %d\n", i)
		time.Sleep(time.Millisecond * 200) // 模拟数据生成耗时
	}
}

// 模拟数据处理
func dataConsumer(id int, data <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for d := range data {
		fmt.Printf("Consumer %d processed data: %d\n", id, d)
		time.Sleep(time.Millisecond * 500) // 模拟数据处理耗时
	}
}

func main() {
	data := make(chan int, 10)

	// 使用独立的 WaitGroup 控制生产者和消费者
	var producerWG, consumerWG sync.WaitGroup

	// 启动 2 个数据生产者
	for i := 1; i <= 2; i++ {
		producerWG.Add(1)
		go dataProducer(data, &producerWG)
	}

	// 启动 3 个数据消费者
	for i := 1; i <= 3; i++ {
		consumerWG.Add(1)
		go dataConsumer(i, data, &consumerWG)
	}

	// 等待所有生产者完成
	producerWG.Wait()

	// 关闭数据通道(通知消费者没有新数据)
	close(data)

	// 等待所有消费者完成
	consumerWG.Wait()

	fmt.Println("All data processed.")
}

总结

生产者-消费者模式在以下场景中非常实用:

解耦生产与消费:生产者和消费者可以独立运行,互不干扰。

流量控制:通过 channel 的缓冲区大小,可以控制生产者和消费者的处理速度。

提高系统性能:通过并发处理任务,可以充分利用多核 CPU 资源。

以上例子展示了该模式在日志收集、任务调度、图片处理和实时数据处理等场景中的应用。根据具体需求,可以灵活调整生产者和消费者的数量以及任务的处理逻辑。