如何使用Golang实现并发任务分组_Golang worker pool与任务调度实践

直接用go启动大量goroutine易崩,因内存暴涨、调度器过载、OOM;worker pool通过固定worker数、任务队列、复用执行者实现可控并发,50行内可构建生产级池子。

为什么直接用 go 启动大量 goroutine 容易崩

不是 goroutine 本身贵,而是没节制地创建会导致内存暴涨、调度器过载、甚至触发 OOM。比如读取 10 万行日志并逐行解析,如果每行都 go parseLine(line),瞬间可能起 10 万个 goroutine,而多数任务实际是 I/O 等待或 CPU 轻量计算,根本不需要这么多并发单元。

真正需要的是可控的并发度 + 任务排队 + 复用执行者。这正是 worker pool 的核心价值:用固定数量的长期运行 goroutine 消费任务队列,避免资源抖动。

  • 默认 goroutine 栈初始仅 2KB,但频繁创建/销毁仍带来调度开销
  • 无缓冲 channel 作为任务队列时,发送方会阻塞,天然实现背压
  • worker 数量通常设为 runtime.NumCPU() 或略高(如 ×1.5),而非硬写死 100

chan + for range 实现最简 worker pool

不依赖第三方库,50 行内可搭出生产可用的池子。关键在于任务通道类型定义、worker 死循环消费、以及主协程关闭信号的传递方式。

下面是最小可行示例,支持优雅关闭:

type Task func()
type WorkerPool 

struct { tasks chan Task done chan struct{} }

func NewWorkerPool(workerCount int) *WorkerPool { return &WorkerPool{ tasks: make(chan Task, 100), // 缓冲区防主协程阻塞 done: make(chan struct{}), } }

func (wp *WorkerPool) Start() { for i := 0; i < cap(wp.tasks); i++ { go wp.worker() } }

func (wp *WorkerPool) worker() { for { select { case task := <-wp.tasks: task() case <-wp.done: return } } }

func (wp *WorkerPool) Submit(task Task) { wp.tasks <- task }

func (wp *WorkerPool) Shutdown() { close(wp.done) }

注意:cap(wp.tasks) 是通道容量,不是 worker 数量 —— 这里故意写错来提醒你:worker 数量应独立传入,别和通道容量混用。

sync.WaitGroupcontext.Context 哪个更适合控制生命周期

WaitGroup 只解决“等所有任务结束”,不解决“中途取消”;Context 支持取消、超时、值传递,但需每个 worker 主动检查 ctx.Done()。二者常组合使用。

  • 纯批处理(如导入 CSV):用 WaitGroup + 关闭 channel 就够了
  • 带用户中断或服务级超时(如 HTTP handler 中调用):必须用 context.WithTimeout,并在每个 task 内部检查 ctx.Err()
  • 不要在 worker 内部用 select 同时监听 tasksctx.Done() —— 这会导致任务丢失;应在 Submit 时就拒绝已取消 context 的任务

任务分组的实际约束:如何让同组任务串行执行

常见需求:用户 A 的 10 个订单更新必须按顺序处理,但用户 B 的订单可与 A 并发。这不是靠加锁能解的 —— 锁会全局串行化,违背并发初衷。

正确做法是哈希分组 + 每组独占一个 channel:

  • 对 group key(如 userID)做 hash % poolSize,映射到固定 worker 子集
  • 更稳妥是为每组建独立 channel + 单 worker,用 map[string]chan Task 管理,配合 sync.RWMutex 读写
  • 注意 map 并发写 panic:初始化分组 channel 必须在首次提交时原子完成,或预热所有可能的 key

分组逻辑一旦写死,就很难动态扩缩容。线上遇到热点 key(如大 V 用户请求暴增),单 worker 会成为瓶颈,此时需要二级分片或降级为异步重试。