Concurrency Management Pattern in Go
In 2014 I posted an answer to a StackOverflow question “Always have x number of goroutines running at any time.” Here I refine my solution with a real-world pattern I rely on to control coordinated goroutine concurrency in Go.
Plain Channels
Here's my barebones approach from 2014,
that relies on send/receive semantics of Go channels:
package main
func main() {
maxGoroutines := 10
guard := make(chan struct{}, maxGoroutines)
for i := 0; i < 30; i++ {
guard <- struct{}{} // blocks if the guard channel is full
go func(n int) {
worker(n)
<-guard
}(i)
}
}
func worker(i int) { println("doing work on", i) }
While it illustrates the general concept, it’s not a great pattern to apply to practical tasks:
- there’s no error handling here;
- this code does not wait until all worker goroutines are complete.
First, let’s improve this by waiting until all worker goroutines are complete.
This code uses a guard channel to regulate capacity.
At the end of the “job producing” loop, we need to fill the guarding channel up to its capacity.
Once channel filling succeeds, we can guarantee that job processing completes:
for i := 0; i < cap(guard); i++ {
guard <- struct{}{}
}
(See https://go.dev/play/p/mx0Sdvi5C7B for updated code listing)
sync.WaitGroup
Go standard library has a dedicated type — sync.WaitGroup — to wait until a collection of goroutines finish.
Generally, you increment its internal counter by calling Add(1)
before starting each goroutine in a group
and call Done()
from within the goroutine before it returns.
To wait until all goroutines in a collection are complete, you call a blocking Wait()
method.
Here's an updated example that now relies on sync.WaitGroup:
package main
import "sync"
func main() {
maxGoroutines := 10
guard := make(chan struct{}, maxGoroutines)
var wg sync.WaitGroup
for i := 0; i < 30; i++ {
guard <- struct{}{}
wg.Add(1)
go func(n int) {
worker(n)
<-guard
wg.Done()
}(i)
}
wg.Wait()
}
func worker(i int) { println("doing work on", i) }
errgroup.Group
Let’s get to error handling. In the real world, most functions dealing with some resource need to report an error.
Imagine that our worker function now returns an error.
We can implement multiple ways to collect error values: from having a preallocated slice of error type values
to sending them over a dedicated channel.
However, my “go-to” solution for this pattern has been the errgroup package for the last few years.
This package exposes the Group type, that makes coordinating goroutine groups a breeze.
Here's how our code looks like when updated to rely on errgroup.Group:
package main
import "golang.org/x/sync/errgroup"
func main() {
maxGoroutines := 10
guard := make(chan struct{}, maxGoroutines)
var group errgroup.Group
for i := 0; i < 30; i++ {
guard <- struct{}{}
n := i
group.Go(func() error {
defer func() { <-guard }()
return worker(n)
})
}
if err := group.Wait(); err != nil {
println("first encountered error:", err)
}
}
func worker(i int) error { println("doing work on", i); return nil }
Group type here combines the functionality of sync.WaitGroup, and captures the first non-nil error returned by any of the worker goroutines.
It is common to abandon the execution of some in-flight work if something errors out early.
errgroup.Group supports this scenario as well, integrating context support for cancelation.
Documentation for errgroup.WithContext says:
The derived Context is canceled the first time a function passed to Go returns a non-nil error or the first time Wait returns, whichever occurs first.
Here's an updated example that relies on context cancelation support:
package main
import (
"context"
"golang.org/x/sync/errgroup"
)
func main() {
maxGoroutines := 10
guard := make(chan struct{}, maxGoroutines)
group, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 30; i++ {
guard <- struct{}{}
n := i
group.Go(func() error {
defer func() { <-guard }()
return worker(ctx, n)
})
}
if err := group.Wait(); err != nil {
println("first encountered error:", err)
}
}
func worker(ctx context.Context, i int) error { println("doing work on", i); return nil }
I find producer/consumer patterns to dominate cases like the example above in real-world scenarios.
This code below illustrates how I structure almost all coordinated concurrent workflows in Go nowadays:
package main
import (
"context"
"golang.org/x/sync/errgroup"
)
func main() {
maxGoroutines := 10
group, ctx := errgroup.WithContext(context.Background())
jobs := make(chan int)
group.Go(func() error { // single producer
defer close(jobs) // stops consumers once producer finishes
for i := 0; i < 30; i++ {
select {
case jobs <- i: // feeds new “job payload” to consumers
case <-ctx.Done(): // early exit on error
return ctx.Err()
}
}
return nil
})
for i := 0; i < maxGoroutines; i++ { // multiple consumers
group.Go(func() error {
for job := range jobs {
if err := worker(ctx, job); err != nil {
return err
}
}
return nil
})
}
if err := group.Wait(); err != nil {
println("first encountered error:", err)
}
}
func worker(ctx context.Context, i int) error { println("doing work on", i); return nil }