go 往一个通道中不断发送数据,同时通过多个线程读取和处理通道数据

下面是一个典型的 Go 示例,展示了如何往一个通道中不断发送数据,同时通过多个 goroutine 并发读取和处理通道数据。这个例子使用了 生产者-消费者模式,其中生产者 goroutine 不断生成数据并发送到通道,而多个消费者 goroutine 从通道中读取数据并处理。


示例:多生产者-多消费者模型

package main

import (
	"fmt"
	"sync"
	"time"
)

// 生产者:不断生成数据并发送到通道
func producer(ch chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for i := 1; i <= 10; i++ { // 生产 10 个数据
		ch <- i
		fmt.Printf("Produced: %d\n", i)
		time.Sleep(100 * time.Millisecond) // 模拟生产耗时
	}
}

// 消费者:从通道中读取数据并处理
func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for value := range ch {
		fmt.Printf("Consumer %d processed: %d\n", id, value)
		time.Sleep(200 * time.Millisecond) // 模拟处理耗时
	}
}

func main() {
	// 创建一个带缓冲的通道
	ch := make(chan int, 5)

	// 使用 WaitGroup 等待所有 goroutine 完成
	var wg sync.WaitGroup

	// 启动生产者 goroutine
	wg.Add(1)
	go producer(ch, &wg)

	// 启动多个消费者 goroutine
	numConsumers := 3
	wg.Add(numConsumers)
	for i := 1; i <= numConsumers; i++ {
		go consumer(i, ch, &wg)
	}

	// 等待生产者完成
	wg.Wait()

	// 关闭通道,通知消费者停止读取
	close(ch)

	// 等待消费者完成
	wg.Wait()

	fmt.Println("All tasks completed.")
}

代码说明

  1. 生产者 (producer)
    • 不断生成数据并发送到通道 ch
    • 使用 defer wg.Done() 确保在函数结束时通知 WaitGroup
  2. 消费者 (consumer)
    • 从通道 ch 中读取数据并处理。
    • 使用 range 循环读取通道数据,直到通道关闭。
    • 每个消费者都有一个唯一的 id,用于标识不同的消费者。
  3. 通道 (ch)
    • 使用带缓冲的通道(make(chan int, 5)),允许生产者在不阻塞的情况下发送数据,直到缓冲区满。
  4. sync.WaitGroup
    • 用于等待所有 goroutine 完成。
    • 生产者完成后关闭通道,通知消费者停止读取。
  5. 关闭通道
    • 生产者完成后关闭通道,消费者通过 range 检测到通道关闭后退出循环。

输出示例

运行程序后,输出可能类似于以下内容(由于并发执行,顺序可能不同):

复制
Produced: 1
Consumer 1 processed: 1
Produced: 2
Consumer 2 processed: 2
Produced: 3
Consumer 3 processed: 3
Produced: 4
Consumer 1 processed: 4
Produced: 5
Consumer 2 processed: 5
Produced: 6
Consumer 3 processed: 6
Produced: 7
Consumer 1 processed: 7
Produced: 8
Consumer 2 processed: 8
Produced: 9
Consumer 3 processed: 9
Produced: 10
Consumer 1 processed: 10
All tasks completed.

关键点

  1. 并发安全
    • 通道是并发安全的,多个 goroutine 可以同时读取和写入通道,而不需要额外的同步机制。
  2. 通道关闭
    • 关闭通道是一种优雅的方式,用于通知消费者停止读取数据。
  3. 负载均衡
    • 多个消费者可以并发处理数据,实现负载均衡。
  4. 缓冲通道
    • 使用带缓冲的通道可以提高性能,避免生产者和消费者之间的强耦合。

扩展

  • 如果需要更复杂的控制(例如动态调整生产者或消费者的数量),可以使用 context 或额外的控制通道。
  • 如果消费者处理数据的耗时较长,可以增加消费者的数量以提高吞吐量。

希望这个示例对你有帮助!

以上代码为什么要使用两个wg.Wait()

在以上代码中,使用两个 wg.Wait() 是为了确保程序的正确性和完整性。它们分别用于等待生产者完成消费者完成,具体原因如下:


1. 第一个 wg.Wait():等待生产者完成

// 等待生产者完成
wg.Wait()
  • 作用:等待生产者 goroutine 完成任务。
  • 原因
    • 生产者负责生成数据并发送到通道。
    • 在关闭通道之前,必须确保生产者已经完成了所有数据的发送。
    • 如果提前关闭通道,消费者可能会读取到不完整的数据,或者生产者可能会尝试向已关闭的通道发送数据,导致 panic。
  • 实现
    • 在生产者 goroutine 中,使用 defer wg.Done() 确保在生产者完成后通知 WaitGroup
    • 主 goroutine 调用 wg.Wait() 等待生产者完成。

2. 第二个 wg.Wait():等待消费者完成

// 关闭通道,通知消费者停止读取
close(ch)

// 等待消费者完成
wg.Wait()
  • 作用:等待消费者 goroutine 完成任务。
  • 原因
    • 消费者从通道中读取数据并处理。
    • 在关闭通道后,消费者会继续处理通道中剩余的数据,直到通道为空。
    • 必须等待所有消费者处理完数据后,程序才能退出,否则可能会导致数据丢失或未完成处理。
  • 实现
    • 在消费者 goroutine 中,使用 defer wg.Done() 确保在消费者完成后通知 WaitGroup
    • 主 goroutine 在关闭通道后调用 wg.Wait() 等待消费者完成。

为什么需要两个 wg.Wait()

  1. 确保生产者完成
    • 生产者完成后才能安全地关闭通道。
    • 如果提前关闭通道,生产者可能会尝试向已关闭的通道发送数据,导致 panic。
  2. 确保消费者完成
    • 关闭通道后,消费者会继续处理通道中剩余的数据。
    • 必须等待所有消费者完成处理,否则程序可能会提前退出,导致数据丢失或未完成处理。
  3. 分离关注点
    • 第一个 wg.Wait() 关注的是生产者的完成。
    • 第二个 wg.Wait() 关注的是消费者的完成。
    • 通过分离关注点,代码逻辑更清晰,易于维护。

代码流程总结

  1. 启动生产者和消费者 goroutine。
  2. 主 goroutine 调用第一个 wg.Wait(),等待生产者完成。
  3. 生产者完成后,关闭通道。
  4. 主 goroutine 调用第二个 wg.Wait(),等待消费者完成。
  5. 所有消费者完成后,程序退出。

示例代码回顾

func main() {
	ch := make(chan int, 5)
	var wg sync.WaitGroup

	// 启动生产者
	wg.Add(1)
	go producer(ch, &wg)

	// 启动消费者
	numConsumers := 3
	wg.Add(numConsumers)
	for i := 1; i <= numConsumers; i++ {
		go consumer(i, ch, &wg)
	}

	// 等待生产者完成
	wg.Wait()

	// 关闭通道,通知消费者停止读取
	close(ch)

	// 等待消费者完成
	wg.Wait()

	fmt.Println("All tasks completed.")
}

如果不使用两个 wg.Wait() 会怎样?

  1. 只使用一个 wg.Wait()
    • 如果只等待生产者完成,消费者可能还没有处理完数据,程序就退出了。
    • 如果只等待消费者完成,生产者可能还没有发送完数据,通道就被关闭了。
  2. 不关闭通道
    • 如果通道未关闭,消费者会一直阻塞在 range 循环中,导致 goroutine 泄漏。
  3. 提前关闭通道
    • 如果提前关闭通道,生产者可能会尝试向已关闭的通道发送数据,导致 panic。

总结

使用两个 wg.Wait() 是为了确保:

  1. 生产者完成数据发送后,再关闭通道。
  2. 消费者处理完所有数据后,程序再退出。

这是一种常见的并发编程模式,确保程序的正确性和完整性。

以上代码,系统怎么知道 第一个 wg.Wait() 关注的是生产者的完成。 第二个 wg.Wait() 关注的是消费者的完成。

在代码中,系统并不知道 wg.Wait() 具体是在等待生产者还是消费者。sync.WaitGroup 本身只是一个计数器,它并不知道你在等待什么。它的行为完全取决于你在代码中如何设计 wg.Add() 和 wg.Done() 的调用。

在之前的代码中,两个 wg.Wait() 实际上是在等待同一个 WaitGroup,这会导致逻辑错误,因为 WaitGroup 的计数器会被生产者和消费者共同使用,无法区分它们的完成状态。


问题分析

在之前的代码中:

// 启动生产者
wg.Add(1)
go producer(ch, &wg)

// 启动消费者
numConsumers := 3
wg.Add(numConsumers)
for i := 1; i <= numConsumers; i++ {
    go consumer(i, ch, &wg)
}

// 等待生产者完成
wg.Wait() // 这里会等待生产者和消费者都完成

// 关闭通道
close(ch)

// 等待消费者完成
wg.Wait() // 这里会直接返回,因为计数器已经为 0
  • 问题
    • 第一个 wg.Wait() 会等待生产者和所有消费者都完成,因为 WaitGroup 的计数器是生产者和消费者共享的。
    • 第二个 wg.Wait() 会直接返回,因为计数器已经为 0。

正确实现:使用两个 WaitGroup

为了明确区分生产者和消费者的完成状态,应该使用两个独立的 WaitGroup

  • 一个用于等待生产者完成。
  • 另一个用于等待消费者完成。

以下是修正后的代码:


修正后的代码

package main

import (
	"fmt"
	"sync"
	"time"
)

// 生产者:不断生成数据并发送到通道
func producer(ch chan<- int, wg *sync.WaitGroup) {
	defer wg.Done() // 生产者完成后通知 WaitGroup
	for i := 1; i <= 10; i++ {
		ch <- i
		fmt.Printf("Produced: %d\n", i)
		time.Sleep(100 * time.Millisecond) // 模拟生产耗时
	}
}

// 消费者:从通道中读取数据并处理
func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {
	defer wg.Done() // 消费者完成后通知 WaitGroup
	for value := range ch {
		fmt.Printf("Consumer %d processed: %d\n", id, value)
		time.Sleep(200 * time.Millisecond) // 模拟处理耗时
	}
}

func main() {
	// 创建一个带缓冲的通道
	ch := make(chan int, 5)

	// 使用两个 WaitGroup
	var producerWg sync.WaitGroup // 用于等待生产者完成
	var consumerWg sync.WaitGroup // 用于等待消费者完成

	// 启动生产者 goroutine
	producerWg.Add(1)
	go producer(ch, &producerWg)

	// 启动多个消费者 goroutine
	numConsumers := 3
	consumerWg.Add(numConsumers)
	for i := 1; i <= numConsumers; i++ {
		go consumer(i, ch, &consumerWg)
	}

	// 等待生产者完成
	producerWg.Wait()

	// 关闭通道,通知消费者停止读取
	close(ch)

	// 等待消费者完成
	consumerWg.Wait()

	fmt.Println("All tasks completed.")
}

代码说明

  1. 两个 WaitGroup
    • producerWg:用于等待生产者完成。
    • consumerWg:用于等待消费者完成。
  2. 生产者
    • 使用 producerWg.Add(1) 增加生产者计数器。
    • 在 producer 函数中,使用 defer producerWg.Done() 减少生产者计数器。
  3. 消费者
    • 使用 consumerWg.Add(numConsumers) 增加消费者计数器。
    • 在 consumer 函数中,使用 defer consumerWg.Done() 减少消费者计数器。
  4. 主 goroutine
    • 调用 producerWg.Wait() 等待生产者完成。
    • 关闭通道 close(ch),通知消费者停止读取。
    • 调用 consumerWg.Wait() 等待消费者完成。

输出示例

Produced: 1
Consumer 1 processed: 1
Produced: 2
Consumer 2 processed: 2
Produced: 3
Consumer 3 processed: 3
Produced: 4
Consumer 1 processed: 4
Produced: 5
Consumer 2 processed: 5
Produced: 6
Consumer 3 processed: 6
Produced: 7
Consumer 1 processed: 7
Produced: 8
Consumer 2 processed: 8
Produced: 9
Consumer 3 processed: 9
Produced: 10
Consumer 1 processed: 10
All tasks completed.

关键点

  1. 分离 WaitGroup
    • 使用两个独立的 WaitGroup 分别管理生产者和消费者的完成状态。
  2. 明确职责
    • producerWg 只关注生产者的完成。
    • consumerWg 只关注消费者的完成。
  3. 避免逻辑错误
    • 确保在关闭通道之前,生产者已经完成。
    • 确保在程序退出之前,消费者已经完成。

总结

通过使用两个独立的 WaitGroup,可以明确区分生产者和消费者的完成状态,避免逻辑错误。这种方式是 Go 并发编程中的常见模式,适用于需要精细控制多个 goroutine 生命周期的场景。

如果有多个生产者,以上代码怎么修改?

如果有多个生产者,代码需要稍作调整,以确保所有生产者完成任务后再关闭通道,同时消费者能够正确处理所有数据。以下是修改后的代码:


修改后的代码:支持多个生产者

go
复制
package main

import (
	"fmt"
	"sync"
	"time"
)

// 生产者:不断生成数据并发送到通道
func producer(id int, ch chan<- int, wg *sync.WaitGroup) {
	defer wg.Done() // 生产者完成后通知 WaitGroup
	for i := 1; i <= 5; i++ { // 每个生产者生产 5 个数据
		value := id*10 + i // 生成唯一的值
		ch <- value
		fmt.Printf("Producer %d produced: %d\n", id, value)
		time.Sleep(100 * time.Millisecond) // 模拟生产耗时
	}
}

// 消费者:从通道中读取数据并处理
func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {
	defer wg.Done() // 消费者完成后通知 WaitGroup
	for value := range ch {
		fmt.Printf("Consumer %d processed: %d\n", id, value)
		time.Sleep(200 * time.Millisecond) // 模拟处理耗时
	}
}

func main() {
	// 创建一个带缓冲的通道
	ch := make(chan int, 10)

	// 使用两个 WaitGroup
	var producerWg sync.WaitGroup // 用于等待所有生产者完成
	var consumerWg sync.WaitGroup // 用于等待所有消费者完成

	// 启动多个生产者 goroutine
	numProducers := 3
	producerWg.Add(numProducers)
	for i := 1; i <= numProducers; i++ {
		go producer(i, ch, &producerWg)
	}

	// 启动多个消费者 goroutine
	numConsumers := 2
	consumerWg.Add(numConsumers)
	for i := 1; i <= numConsumers; i++ {
		go consumer(i, ch, &consumerWg)
	}

	// 等待所有生产者完成
	producerWg.Wait()

	// 关闭通道,通知消费者停止读取
	close(ch)

	// 等待所有消费者完成
	consumerWg.Wait()

	fmt.Println("All tasks completed.")
}

代码说明

  1. 多个生产者
    • 每个生产者都有一个唯一的 id,用于标识不同的生产者。
    • 每个生产者生成 5 个数据,并将其发送到通道。
    • 使用 producerWg.Add(numProducers) 增加生产者计数器。
    • 在 producer 函数中,使用 defer producerWg.Done() 减少生产者计数器。
  2. 多个消费者
    • 每个消费者都有一个唯一的 id,用于标识不同的消费者。
    • 消费者从通道中读取数据并处理。
    • 使用 consumerWg.Add(numConsumers) 增加消费者计数器。
    • 在 consumer 函数中,使用 defer consumerWg.Done() 减少消费者计数器。
  3. 主 goroutine
    • 调用 producerWg.Wait() 等待所有生产者完成。
    • 关闭通道 close(ch),通知消费者停止读取。
    • 调用 consumerWg.Wait() 等待所有消费者完成。

输出示例

由于并发执行,输出顺序可能不同,但大致如下:

复制
Producer 1 produced: 11
Producer 2 produced: 21
Producer 3 produced: 31
Consumer 1 processed: 11
Consumer 2 processed: 21
Producer 1 produced: 12
Producer 2 produced: 22
Producer 3 produced: 32
Consumer 1 processed: 31
Consumer 2 processed: 12
Producer 1 produced: 13
Producer 2 produced: 23
Producer 3 produced: 33
Consumer 1 processed: 22
Consumer 2 processed: 32
Producer 1 produced: 14
Producer 2 produced: 24
Producer 3 produced: 34
Consumer 1 processed: 13
Consumer 2 processed: 23
Producer 1 produced: 15
Producer 2 produced: 25
Producer 3 produced: 35
Consumer 1 processed: 33
Consumer 2 processed: 14
Consumer 1 processed: 24
Consumer 2 processed: 34
Consumer 1 processed: 15
Consumer 2 processed: 25
Consumer 1 processed: 35
All tasks completed.

关键点

  1. 生产者唯一标识
    • 每个生产者生成的数据通过 id*10 + i 确保唯一性。
  2. 通道容量
    • 通道的容量(make(chan int, 10))可以根据生产者和消费者的数量调整,以避免阻塞。
  3. 关闭通道
    • 在所有生产者完成后关闭通道,确保消费者能够正确处理所有数据。
  4. 负载均衡
    • 多个消费者并发处理数据,实现负载均衡。

总结

通过使用多个生产者和消费者,可以充分利用 Go 的并发特性。关键点在于:

  • 使用独立的 WaitGroup 分别管理生产者和消费者的完成状态。
  • 确保在所有生产者完成后关闭通道。
  • 确保在所有消费者完成后程序退出。

这种模式非常适合需要高并发处理的场景,例如任务分发、数据处理等。

 

go 往一个通道中不断发送数据,同时通过多个线程读取和处理通道数据,处理的通道数据又往另外一个通道中不断发送数据,同时通过多个线程读取和处理通道数据,怎么设计程序?

这种场景是一个典型的 多级流水线(Pipeline)模型,其中数据从一个通道流向另一个通道,每一级都有多个 goroutine 并发处理数据。以下是一个完整的设计和实现示例:


设计思路

  1. 第一级
    • 生产者 goroutine 不断生成数据并发送到第一个通道(ch1)。
    • 多个消费者 goroutine 从 ch1 中读取数据,处理后将结果发送到第二个通道(ch2)。
  2. 第二级
    • 多个消费者 goroutine 从 ch2 中读取数据,处理后将结果发送到第三个通道(ch3)。
    • 依此类推,可以设计更多级。
  3. 关闭通道
    • 每一级的生产者完成后,关闭对应的通道,通知下一级消费者停止读取。
  4. 同步
    • 使用 sync.WaitGroup 确保每一级的 goroutine 都完成任务。

示例代码

以下是一个两级流水线的示例:

go
复制
package main

import (
	"fmt"
	"sync"
	"time"
)

// 第一级生产者:生成数据并发送到 ch1
func producer(ch1 chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for i := 1; i <= 10; i++ {
		ch1 <- i
		fmt.Printf("Producer sent: %d\n", i)
		time.Sleep(100 * time.Millisecond) // 模拟生产耗时
	}
}

// 第一级消费者:从 ch1 读取数据,处理后将结果发送到 ch2
func stage1Consumer(id int, ch1 <-chan int, ch2 chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for value := range ch1 {
		result := value * 2 // 模拟处理逻辑
		ch2 <- result
		fmt.Printf("Stage 1 Consumer %d processed: %d -> %d\n", id, value, result)
		time.Sleep(200 * time.Millisecond) // 模拟处理耗时
	}
}

// 第二级消费者:从 ch2 读取数据,处理后将结果发送到 ch3
func stage2Consumer(id int, ch2 <-chan int, ch3 chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for value := range ch2 {
		result := value + 1 // 模拟处理逻辑
		ch3 <- result
		fmt.Printf("Stage 2 Consumer %d processed: %d -> %d\n", id, value, result)
		time.Sleep(200 * time.Millisecond) // 模拟处理耗时
	}
}

// 最终消费者:从 ch3 读取数据并处理
func finalConsumer(id int, ch3 <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for value := range ch3 {
		fmt.Printf("Final Consumer %d received: %d\n", id, value)
		time.Sleep(200 * time.Millisecond) // 模拟处理耗时
	}
}

func main() {
	// 创建通道
	ch1 := make(chan int, 10)
	ch2 := make(chan int, 10)
	ch3 := make(chan int, 10)

	// 使用 WaitGroup 同步
	var wg sync.WaitGroup

	// 启动第一级生产者
	wg.Add(1)
	go producer(ch1, &wg)

	// 启动第一级消费者
	numStage1Consumers := 2
	wg.Add(numStage1Consumers)
	for i := 1; i <= numStage1Consumers; i++ {
		go stage1Consumer(i, ch1, ch2, &wg)
	}

	// 启动第二级消费者
	numStage2Consumers := 2
	wg.Add(numStage2Consumers)
	for i := 1; i <= numStage2Consumers; i++ {
		go stage2Consumer(i, ch2, ch3, &wg)
	}

	// 启动最终消费者
	numFinalConsumers := 2
	wg.Add(numFinalConsumers)
	for i := 1; i <= numFinalConsumers; i++ {
		go finalConsumer(i, ch3, &wg)
	}

	// 等待第一级生产者完成
	wg.Wait()

	// 关闭第一级通道
	close(ch1)

	// 等待第一级消费者完成
	wg.Wait()

	// 关闭第二级通道
	close(ch2)

	// 等待第二级消费者完成
	wg.Wait()

	// 关闭最终通道
	close(ch3)

	// 等待最终消费者完成
	wg.Wait()

	fmt.Println("All tasks completed.")
}

代码说明

  1. 通道
    • ch1:第一级通道,生产者发送数据到 ch1
    • ch2:第二级通道,第一级消费者将处理后的数据发送到 ch2
    • ch3:最终通道,第二级消费者将处理后的数据发送到 ch3
  2. 生产者
    • 生产者生成数据并发送到 ch1
  3. 第一级消费者
    • 从 ch1 读取数据,处理后将结果发送到 ch2
  4. 第二级消费者
    • 从 ch2 读取数据,处理后将结果发送到 ch3
  5. 最终消费者
    • 从 ch3 读取数据并处理。
  6. 关闭通道
    • 每一级的生产者完成后,关闭对应的通道,通知下一级消费者停止读取。
  7. 同步
    • 使用 sync.WaitGroup 确保每一级的 goroutine 都完成任务。

输出示例

由于并发执行,输出顺序可能不同,但大致如下:

复制
Producer sent: 1
Stage 1 Consumer 1 processed: 1 -> 2
Producer sent: 2
Stage 1 Consumer 2 processed: 2 -> 4
Stage 2 Consumer 1 processed: 2 -> 3
Final Consumer 1 received: 3
Producer sent: 3
Stage 1 Consumer 1 processed: 3 -> 6
Stage 2 Consumer 2 processed: 4 -> 5
Final Consumer 2 received: 5
Producer sent: 4
Stage 1 Consumer 2 processed: 4 -> 8
Stage 2 Consumer 1 processed: 6 -> 7
Final Consumer 1 received: 7
Producer sent: 5
Stage 1 Consumer 1 processed: 5 -> 10
Stage 2 Consumer 2 processed: 8 -> 9
Final Consumer 2 received: 9
...
All tasks completed.

关键点

  1. 流水线设计
    • 每一级都有独立的 goroutine 处理数据,并通过通道传递结果。
  2. 通道关闭
    • 每一级的生产者完成后,关闭对应的通道,通知下一级消费者停止读取。
  3. 负载均衡
    • 每一级都可以启动多个消费者 goroutine,实现并发处理。
  4. 同步
    • 使用 sync.WaitGroup 确保所有 goroutine 都完成任务。

总结

通过多级流水线设计,可以实现复杂的数据处理流程。每一级都可以独立扩展和优化,适用于需要高并发处理的场景,例如数据清洗、任务分发、实时计算等。

以上代码,为区别程序正确,要区分wg.wait()吗

是的,为了确保程序的正确性,需要区分不同的 wg.Wait(),以便明确等待的是哪一级的生产者或消费者。在之前的代码中,所有 goroutine 共享同一个 WaitGroup,这会导致逻辑错误,因为无法区分不同阶段的完成状态。

为了正确区分,应该为每一级的生产者和消费者使用独立的 WaitGroup。以下是改进后的代码:


改进后的代码:区分各级的 WaitGroup

package main

import (
	"fmt"
	"sync"
	"time"
)

// 第一级生产者:生成数据并发送到 ch1
func producer(ch1 chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for i := 1; i <= 10; i++ {
		ch1 <- i
		fmt.Printf("Producer sent: %d\n", i)
		time.Sleep(100 * time.Millisecond) // 模拟生产耗时
	}
}

// 第一级消费者:从 ch1 读取数据,处理后将结果发送到 ch2
func stage1Consumer(id int, ch1 <-chan int, ch2 chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for value := range ch1 {
		result := value * 2 // 模拟处理逻辑
		ch2 <- result
		fmt.Printf("Stage 1 Consumer %d processed: %d -> %d\n", id, value, result)
		time.Sleep(200 * time.Millisecond) // 模拟处理耗时
	}
}

// 第二级消费者:从 ch2 读取数据,处理后将结果发送到 ch3
func stage2Consumer(id int, ch2 <-chan int, ch3 chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for value := range ch2 {
		result := value + 1 // 模拟处理逻辑
		ch3 <- result
		fmt.Printf("Stage 2 Consumer %d processed: %d -> %d\n", id, value, result)
		time.Sleep(200 * time.Millisecond) // 模拟处理耗时
	}
}

// 最终消费者:从 ch3 读取数据并处理
func finalConsumer(id int, ch3 <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for value := range ch3 {
		fmt.Printf("Final Consumer %d received: %d\n", id, value)
		time.Sleep(200 * time.Millisecond) // 模拟处理耗时
	}
}

func main() {
	// 创建通道
	ch1 := make(chan int, 10)
	ch2 := make(chan int, 10)
	ch3 := make(chan int, 10)

	// 使用独立的 WaitGroup 同步每一级
	var producerWg sync.WaitGroup // 生产者 WaitGroup
	var stage1Wg sync.WaitGroup   // 第一级消费者 WaitGroup
	var stage2Wg sync.WaitGroup   // 第二级消费者 WaitGroup
	var finalWg sync.WaitGroup    // 最终消费者 WaitGroup

	// 启动第一级生产者
	producerWg.Add(1)
	go producer(ch1, &producerWg)

	// 启动第一级消费者
	numStage1Consumers := 2
	stage1Wg.Add(numStage1Consumers)
	for i := 1; i <= numStage1Consumers; i++ {
		go stage1Consumer(i, ch1, ch2, &stage1Wg)
	}

	// 启动第二级消费者
	numStage2Consumers := 2
	stage2Wg.Add(numStage2Consumers)
	for i := 1; i <= numStage2Consumers; i++ {
		go stage2Consumer(i, ch2, ch3, &stage2Wg)
	}

	// 启动最终消费者
	numFinalConsumers := 2
	finalWg.Add(numFinalConsumers)
	for i := 1; i <= numFinalConsumers; i++ {
		go finalConsumer(i, ch3, &finalWg)
	}

	// 等待第一级生产者完成
	producerWg.Wait()

	// 关闭第一级通道
	close(ch1)

	// 等待第一级消费者完成
	stage1Wg.Wait()

	// 关闭第二级通道
	close(ch2)

	// 等待第二级消费者完成
	stage2Wg.Wait()

	// 关闭最终通道
	close(ch3)

	// 等待最终消费者完成
	finalWg.Wait()

	fmt.Println("All tasks completed.")
}

代码说明

  1. 独立的 WaitGroup
    • producerWg:用于等待生产者完成。
    • stage1Wg:用于等待第一级消费者完成。
    • stage2Wg:用于等待第二级消费者完成。
    • finalWg:用于等待最终消费者完成。
  2. 通道关闭
    • 每一级的生产者完成后,关闭对应的通道,通知下一级消费者停止读取。
  3. 同步
    • 使用独立的 WaitGroup 确保每一级的 goroutine 都完成任务。

输出示例

由于并发执行,输出顺序可能不同,但大致如下:

复制
Producer sent: 1
Stage 1 Consumer 1 processed: 1 -> 2
Producer sent: 2
Stage 1 Consumer 2 processed: 2 -> 4
Stage 2 Consumer 1 processed: 2 -> 3
Final Consumer 1 received: 3
Producer sent: 3
Stage 1 Consumer 1 processed: 3 -> 6
Stage 2 Consumer 2 processed: 4 -> 5
Final Consumer 2 received: 5
Producer sent: 4
Stage 1 Consumer 2 processed: 4 -> 8
Stage 2 Consumer 1 processed: 6 -> 7
Final Consumer 1 received: 7
Producer sent: 5
Stage 1 Consumer 1 processed: 5 -> 10
Stage 2 Consumer 2 processed: 8 -> 9
Final Consumer 2 received: 9
...
All tasks completed.

关键点

  1. 区分 WaitGroup
    • 每一级的生产者和消费者使用独立的 WaitGroup,确保正确同步。
  2. 通道关闭顺序
    • 每一级的生产者完成后,关闭对应的通道,通知下一级消费者停止读取。
  3. 负载均衡
    • 每一级都可以启动多个消费者 goroutine,实现并发处理。

总结

通过为每一级的生产者和消费者使用独立的 WaitGroup,可以明确区分不同阶段的完成状态,确保程序的正确性。这种设计适用于多级流水线场景,能够高效处理并发任务。