为什么需要协程池

尽管 Go 的 goroutine 非常轻量级,但在以下场景中直接无限制创建 goroutine 会有问题:

  • 高并发场景:当瞬间需要处理上万个任务时,无限制创建 goroutine 可能导致资源耗尽
  • 资源敏感型任务:如数据库连接、网络请求等需要控制并发数的场景
  • 避免频繁创建销毁:某些任务执行时间很短但频率很高,复用 goroutine 更高效

协程池的工作流程

  1. 初始化:创建指定数量的 worker goroutine
  2. 提交任务:将任务放入任务通道
  3. 任务执行:空闲 worker 从通道获取任务并执行
  4. 关闭:优雅关闭所有 worker

实现简易协程池

package main

import (
    "fmt"
    "sync"
    "time"
)

// Task 定义任务类型
type Task func()

// Pool 协程池结构体
type Pool struct {
    taskChan  chan Task   // 任务通道
    wg        sync.WaitGroup // 用于等待所有worker完成
    workerNum int          // 协程池大小
}

// NewPool 创建协程池
func NewPool(workerNum, taskQueueSize int) *Pool {
    p := &Pool{
        taskChan:  make(chan Task, taskQueueSize),
        workerNum: workerNum,
    }
    
    // 启动worker
    p.wg.Add(workerNum)
    for i := 0; i < workerNum; i++ {
        go p.worker()
    }
    
    return p
}

// worker 工作协程
func (p *Pool) worker() {
    defer p.wg.Done()
    
    for task := range p.taskChan {
        task()
    }
}

// Submit 提交任务
func (p *Pool) Submit(task Task) {
    p.taskChan <- task
}

// Close 关闭协程池
func (p *Pool) Close() {
    close(p.taskChan)
    p.wg.Wait()
}

func main() {
    // 创建协程池,3个worker,任务队列大小10
    pool := NewPool(3, 10)
    
    // 提交20个任务
    for i := 0; i < 20; i++ {
        id := i
        pool.Submit(func() {
            time.Sleep(time.Second)
            fmt.Printf("Task %d executed by worker\n", id)
        })
    }
    
    // 关闭协程池并等待所有任务完成
    pool.Close()
    fmt.Println("All tasks completed")
}
最后修改:2025 年 04 月 14 日
如果觉得我的文章对你有用,请随意赞赏