Posted on

Concurrency Management Pattern in Go

In 2014 I posted an answer to a StackOverflow question “Always have x number of goroutines running at any time.” Here I refine my solution with a real-world pattern I rely on to control coordinated goroutine concurrency in Go.

Plain Channels

Here's my barebones approach from 2014, that relies on send/receive semantics of Go channels:

package main

func main() {
	maxGoroutines := 10
	guard := make(chan struct{}, maxGoroutines)
	for i := 0; i < 30; i++ {
		guard <- struct{}{} // blocks if the guard channel is full
		go func(n int) {
			worker(n)
			<-guard
		}(i)
	}
}

func worker(i int) { println("doing work on", i) }

While it illustrates the general concept, it’s not a great pattern to apply to practical tasks:

First, let’s improve this by waiting until all worker goroutines are complete. This code uses a guard channel to regulate capacity. At the end of the “job producing” loop, we need to fill the guarding channel up to its capacity. Once channel filling succeeds, we can guarantee that job processing completes:

for i := 0; i < cap(guard); i++ {
	guard <- struct{}{}
}

(See https://go.dev/play/p/mx0Sdvi5C7B for updated code listing)

sync.WaitGroup

Go standard library has a dedicated type — sync.WaitGroup — to wait until a collection of goroutines finish. Generally, you increment its internal counter by calling Add(1) before starting each goroutine in a group and call Done() from within the goroutine before it returns. To wait until all goroutines in a collection are complete, you call a blocking Wait() method.

Here's an updated example that now relies on sync.WaitGroup:

package main

import "sync"

func main() {
	maxGoroutines := 10
	guard := make(chan struct{}, maxGoroutines)
	var wg sync.WaitGroup

	for i := 0; i < 30; i++ {
		guard <- struct{}{}
		wg.Add(1)
		go func(n int) {
			worker(n)
			<-guard
			wg.Done()
		}(i)
	}
	wg.Wait()
}

func worker(i int) { println("doing work on", i) }

errgroup.Group

Let’s get to error handling. In the real world, most functions dealing with some resource need to report an error. Imagine that our worker function now returns an error. We can implement multiple ways to collect error values: from having a preallocated slice of error type values to sending them over a dedicated channel. However, my “go-to” solution for this pattern has been the errgroup package for the last few years.

This package exposes the Group type, that makes coordinating goroutine groups a breeze.

Here's how our code looks like when updated to rely on errgroup.Group:

package main

import "golang.org/x/sync/errgroup"

func main() {
	maxGoroutines := 10
	guard := make(chan struct{}, maxGoroutines)
	var group errgroup.Group

	for i := 0; i < 30; i++ {
		guard <- struct{}{}
		n := i
		group.Go(func() error {
			defer func() { <-guard }()
			return worker(n)
		})
	}
	if err := group.Wait(); err != nil {
		println("first encountered error:", err)
	}
}

func worker(i int) error { println("doing work on", i); return nil }

Group type here combines the functionality of sync.WaitGroup, and captures the first non-nil error returned by any of the worker goroutines.

It is common to abandon the execution of some in-flight work if something errors out early. errgroup.Group supports this scenario as well, integrating context support for cancelation. Documentation for errgroup.WithContext says:

The derived Context is canceled the first time a function passed to Go returns a non-nil error or the first time Wait returns, whichever occurs first.

Here's an updated example that relies on context cancelation support:

package main

import (
	"context"

	"golang.org/x/sync/errgroup"
)

func main() {
	maxGoroutines := 10
	guard := make(chan struct{}, maxGoroutines)
	group, ctx := errgroup.WithContext(context.Background())

	for i := 0; i < 30; i++ {
		guard <- struct{}{}
		n := i
		group.Go(func() error {
			defer func() { <-guard }()
			return worker(ctx, n)
		})
	}
	if err := group.Wait(); err != nil {
		println("first encountered error:", err)
	}
}

func worker(ctx context.Context, i int) error { println("doing work on", i); return nil }

The Final Form: One Producer, Multiple Consumers

I find producer/consumer patterns to dominate cases like the example above in real-world scenarios.

This code below illustrates how I structure almost all coordinated concurrent workflows in Go nowadays:

package main

import (
	"context"

	"golang.org/x/sync/errgroup"
)

func main() {
	maxGoroutines := 10
	group, ctx := errgroup.WithContext(context.Background())

	jobs := make(chan int)
	group.Go(func() error { // single producer
		defer close(jobs) // stops consumers once producer finishes
		for i := 0; i < 30; i++ {
			select {
			case jobs <- i: // feeds new “job payload” to consumers
			case <-ctx.Done(): // early exit on error
				return ctx.Err()
			}
		}
		return nil
	})

	for i := 0; i < maxGoroutines; i++ { // multiple consumers
		group.Go(func() error {
			for job := range jobs {
				if err := worker(ctx, job); err != nil {
					return err
				}
			}
			return nil
		})
	}

	if err := group.Wait(); err != nil {
		println("first encountered error:", err)
	}
}

func worker(ctx context.Context, i int) error { println("doing work on", i); return nil }