为什么需要协程池
尽管 Go 的 goroutine 非常轻量级,但在以下场景中直接无限制创建 goroutine 会有问题:
- 高并发场景:当瞬间需要处理上万个任务时,无限制创建 goroutine 可能导致资源耗尽
- 资源敏感型任务:如数据库连接、网络请求等需要控制并发数的场景
- 避免频繁创建销毁:某些任务执行时间很短但频率很高,复用 goroutine 更高效
协程池的工作流程
- 初始化:创建指定数量的 worker goroutine
- 提交任务:将任务放入任务通道
- 任务执行:空闲 worker 从通道获取任务并执行
- 关闭:优雅关闭所有 worker
实现简易协程池
package main
import (
"fmt"
"sync"
"time"
)
// Task 定义任务类型
type Task func()
// Pool 协程池结构体
type Pool struct {
taskChan chan Task // 任务通道
wg sync.WaitGroup // 用于等待所有worker完成
workerNum int // 协程池大小
}
// NewPool 创建协程池
func NewPool(workerNum, taskQueueSize int) *Pool {
p := &Pool{
taskChan: make(chan Task, taskQueueSize),
workerNum: workerNum,
}
// 启动worker
p.wg.Add(workerNum)
for i := 0; i < workerNum; i++ {
go p.worker()
}
return p
}
// worker 工作协程
func (p *Pool) worker() {
defer p.wg.Done()
for task := range p.taskChan {
task()
}
}
// Submit 提交任务
func (p *Pool) Submit(task Task) {
p.taskChan <- task
}
// Close 关闭协程池
func (p *Pool) Close() {
close(p.taskChan)
p.wg.Wait()
}
func main() {
// 创建协程池,3个worker,任务队列大小10
pool := NewPool(3, 10)
// 提交20个任务
for i := 0; i < 20; i++ {
id := i
pool.Submit(func() {
time.Sleep(time.Second)
fmt.Printf("Task %d executed by worker\n", id)
})
}
// 关闭协程池并等待所有任务完成
pool.Close()
fmt.Println("All tasks completed")
}