首页
技术小册
AIGC
面试刷题
技术文章
MAGENTO
云计算
视频课程
源码下载
PDF书籍
「涨薪秘籍」
登录
注册
01 | Mutex:如何解决资源并发访问问题?
02 | Mutex:庖丁解牛看实现
03|Mutex:4种易错场景大盘点
04| Mutex:骇客编程,如何拓展额外功能?
05| RWMutex:读写锁的实现原理及避坑指南
06 | WaitGroup:协同等待,任务编排利器
07 | Cond:条件变量的实现机制及避坑指南
08 | Once:一个简约而不简单的并发原语
09 | map:如何实现线程安全的map类型?
10 | Pool:性能提升大杀器
11 | Context:信息穿透上下文
12 | atomic:要保证原子操作,一定要使用这几种方法
13 | Channel:另辟蹊径,解决并发问题
14 | Channel:透过代码看典型的应用模式
15 | 内存模型:Go如何保证并发读写的顺序?
16 | Semaphore:一篇文章搞懂信号量
17 | SingleFlight 和 CyclicBarrier:请求合并和循环栅栏该怎么用?
18 | 分组操作:处理一组子任务,该用什么并发原语?
19 | 在分布式环境中,Leader选举、互斥锁和读写锁该如何实现?
20 | 在分布式环境中,队列、栅栏和STM该如何实现?
当前位置:
首页>>
技术小册>>
Golang并发编程实战
小册名称:Golang并发编程实战
### 18 | 分组操作:处理一组子任务,该用什么并发原语? 在Golang的并发编程实践中,处理一组子任务是一个常见的需求,它要求程序能够高效地并行执行多个独立或相互依赖的任务,同时管理这些任务的执行流程,确保资源的有效利用和任务的正确完成。为了实现这一目标,Golang提供了一系列强大的并发原语,包括goroutines、channels、sync包中的工具(如WaitGroup、Mutex、RWMutex、Once等),以及更高级的并发控制结构如context包和errors包中的新特性。本章将深入探讨在处理一组子任务时,如何选择和应用这些并发原语。 #### 1. 理解需求:分组操作的核心 分组操作的核心在于将一组任务分配给多个goroutines并行执行,同时提供一种机制来等待所有任务完成、收集结果或处理错误。这种模式在Web服务器处理多个请求、批量数据处理、并行测试等场景中尤为常见。 #### 2. Goroutines与Channels:基础构建块 **Goroutines** 是Golang中实现并发的核心机制,它们轻量级且易于创建。每个goroutine独立执行其函数,与主goroutine并行运行。然而,仅凭goroutines本身并不足以有效管理并发任务,特别是当需要等待多个任务完成时。 **Channels** 作为goroutines之间的通信管道,是实现任务同步和结果传递的关键。通过channels,一个goroutine可以发送数据到另一个goroutine,从而协调它们的执行。 #### 3. sync.WaitGroup:等待所有goroutines完成 `sync.WaitGroup` 是处理一组子任务时最常用的并发原语之一。它允许goroutines等待一组其他goroutines的完成。WaitGroup内部维护了一个计数器,每当一个goroutine启动时,通过调用`Add(1)`增加计数器;每当一个goroutine完成时,通过调用`Done()`(实际上是`Add(-1)`的封装)减少计数器。主goroutine通过调用`Wait()`方法阻塞,直到计数器归零,即所有子goroutines都已完成。 **示例代码**: ```go package main import ( "fmt" "sync" "time" ) func worker(id int, wg *sync.WaitGroup) { defer wg.Done() // 确保在函数退出时调用Done() fmt.Printf("Worker %d starting\n", id) time.Sleep(time.Second) fmt.Printf("Worker %d done\n", id) } func main() { var wg sync.WaitGroup for i := 1; i <= 5; i++ { wg.Add(1) go worker(i, &wg) } wg.Wait() // 等待所有worker完成 fmt.Println("All workers have finished") } ``` #### 4. Channels与Range:收集子任务结果 当需要收集每个子任务的结果时,可以使用channels。每个goroutine将结果发送到channel中,而主goroutine则从channel中读取结果。使用`range`语句可以方便地遍历channel,直到其被关闭,从而自动处理所有发送的数据。 **示例代码**: ```go package main import ( "fmt" "time" ) func worker(id int, results chan<- int) { time.Sleep(time.Second) results <- id * 2 // 假设每个worker都计算其ID的两倍 } func main() { results := make(chan int, 5) // 缓冲channel,防止阻塞 for i := 1; i <= 5; i++ { go worker(i, results) } // 等待所有worker发送结果 for i := 1; i <= 5; i++ { result := <-results fmt.Println("Result:", result) } close(results) // 关闭channel,通知range循环结束 } ``` 注意:在上面的例子中,虽然使用了固定次数的循环来读取结果,但在实际应用中,更常见的做法是使用`range`遍历channel,直到其关闭。 #### 5. 错误处理与Context 在并发程序中,错误处理尤为重要。由于多个goroutine可能同时运行,一个goroutine中的错误可能会影响到其他goroutine或整个程序的执行。`context`包提供了一种机制,允许goroutines之间传递取消信号、超时时间以及其他请求相关的值,从而优雅地处理错误和中断。 **使用Context**: ```go package main import ( "context" "fmt" "time" ) func worker(ctx context.Context, id int) { select { case <-time.After(time.Second): fmt.Printf("Worker %d done\n", id) case <-ctx.Done(): fmt.Printf("Worker %d cancelled\n", id) } } func main() { ctx, cancel := context.WithCancel(context.Background()) time.AfterFunc(2*time.Second, cancel) // 2秒后取消 for i := 1; i <= 5; i++ { go worker(ctx, i) } // 等待足够的时间以观察结果 time.Sleep(3 * time.Second) } ``` 在这个例子中,我们使用`context.WithCancel`创建了一个可取消的context,并通过`cancel`函数在2秒后触发取消操作。每个worker都监听context的Done channel,以便在收到取消信号时提前退出。 #### 6. 并发控制:避免资源竞争 在处理并发任务时,还需注意资源竞争和同步问题。如果多个goroutines需要访问共享资源(如全局变量、文件句柄等),必须使用适当的同步机制,如`sync.Mutex`或`sync.RWMutex`,来确保一次只有一个goroutine可以访问该资源。 #### 7. 总结 处理一组子任务的并发编程在Golang中是一个复杂但强大的领域。通过合理使用goroutines、channels、sync包中的工具(如WaitGroup)以及context包,可以构建出高效、可靠、易于维护的并发程序。在选择并发原语时,应充分考虑任务的具体需求,如是否需要等待所有任务完成、是否需要收集任务结果、是否需要处理错误和中断等。通过不断实践和探索,你将能够更加熟练地运用这些并发原语,编写出更加优秀的并发程序。
上一篇:
17 | SingleFlight 和 CyclicBarrier:请求合并和循环栅栏该怎么用?
下一篇:
19 | 在分布式环境中,Leader选举、互斥锁和读写锁该如何实现?
该分类下的相关小册推荐:
Go Web编程(下)
企业级Go应用开发从零开始
Go开发权威指南(下)
深入解析go语言
Go-Web编程实战
深入浅出Go语言核心编程(三)
深入浅出Go语言核心编程(六)
从零写一个基于go语言的Web框架
Go语言入门实战经典
Golang修炼指南
Go开发基础入门
Go Web编程(中)