goroutine Wrapper For Recover and Context Propagation

Oren Rose
7 min readAug 18, 2023

--

Photo by Reagan Ross: https://www.pexels.com/photo/gopher-on-gray-soil-13631585/

Introduction

Go’s concurrency model is great. It’s one of the main features that brings many developers to Go. But despite its low cost and (super) easy implementation, it has a few gotchas. In this post, we’ll see some of them and how to overcome them by building a simple async wrapper.

Recovering From Panic

Our first reason for building a simple async wrapper instead of just typing go ... , is recovering from a panic.

Imagine the next example:

func main() {
defer func() {
fmt.Println("In main defer func")
}()

fmt.Println("main running go pleaseDontPanic()")
go pleaseDontPanic()
time.Sleep(time.Second)

fmt.Println("End")
}

func pleaseDontPanic(strings ...string) {
fmt.Println(strings[0])
}

// https://go.dev/play/p/-9E1YXeH97T

When running it in the playground you will get:

This is because you cannot recover from a panic in a different goroutine. If a goroutine panics and has no recover defer block, it will terminate the application. That doesn’t sound so great.

The function pleaseDontPanic has a runtime error, which causes the application to terminate. Note how the defer in the main wasn’t even called.

Needless to say that a good programmer would have checked the length of strings. In Go, you can almost always make sure your function won’t panic, but this is just an example. We are humans and our friends are humans (for now), so bugs may occur. A simple wrapper could add a recover block automatically:

package async

type async struct {
}

func New() *async {
return &async{}
}

func (a *async) RunAsync(fn func()) {
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Printf("async.RunAsync recovered from panic: %s\n", r)
}
}()

fn()
}()
}

Instead of running go pleaseDontPanic() we’ll simply wrap via our aync package :

func main() {
// code emitted...

as := async.New()
as.RunAsync(func() {
pleaseDontPanic()
})

// code emitted...


// https://go.dev/play/p/C9LuvW722U6

Now, when running in the playground, you will get this output:

Context Propagation

Note how our async.RunAsync function argument didn’t take a context. Yet, in some cases, we would like to pass a context to our goroutine. A common scenario is traceID. We can have trace id in the context, and we would like to propagate it to the goroutine.

We shouldn’t pass the context directly to our goroutine due to canceling. Imagine the following doSomething function:

func doSomething(ctx context.Context) error {
select {
case <-ctx.Done():
return fmt.Errorf("doSomething canceling: %w", ctx.Err())
default:
fmt.Printf("doSomething is doing something, trace id = %q\n", ctx.Value("traceID"))
}

return nil
}

Checking ctx.Done() channel is very common. For example, SQL library will prevent the query from taking place if the context is done (ctx.Done() closed).

In a web server, it is useful in cases the user terminates the request manually (clicks ctrl/cmd+c ; closes the browser; etc…), the SQL will stop what it’s doing and release the resources. Every time the HTTP request is finished the context is canceled. (Need to say, that the context is done also in case of a timeout).

If we will run doSomething in a goroutine with the same context, we will cause our doSomething to cancel.

func main() {
ctx := context.WithValue(context.Background(), "traceID", "someTraceID")
ctx, cancelFunc := context.WithCancel(context.Background())

go func() {
if err := doSomething(ctx); err != nil {
fmt.Printf("main: %v\n", err)
}
}()

cancelFunc() // imagine that here the http handler returns
time.Sleep(time.Second)
}

// https://go.dev/play/p/Y6f-NUJkPkD

When running it in the playground, you will get:

We are canceling the context before main ends, which happens before our goroutine runs. Thus doSomething receives the ctx.Done() channel.

The easy solution is just passing context.Background(). However, there are cases we would like to propagate values to the new context. For example, tracing data such as traceID or span , audit data such as userID, and so on.

We will add our async a config, which will be retrieved via functional options:

package async

type runAsyncConfig struct {
propagateKeys []interface{}
}

type RunAsyncOption func(*runAsyncConfig)

type async struct {
conf runAsyncConfig
}

func New(options ...RunAsyncOption) *async {
var conf runAsyncConfig

for _, op := range options {
op(&conf)
}

return &async{
conf: conf,
}
}

And we will add an option to pass keys that needs to be propagated to the goroutine:

func WithPropagateContextKeys(keys ...interface{}) RunAsyncOption {
return func(conf *runAsyncConfig) {
conf.propagateKeys = append(conf.propagateKeys, keys...)
}
}

Out async will start a new context from background, and will propagate the values from the received one:

func (a *async) RunAsync(ctx context.Context, fn func(context.Context)) {
// Start a new async context
ctx = a.asyncContext(ctx)

go func() {
defer func() {
if r := recover(); r != nil {
fmt.Printf("RunAsync recovered from panic: %s\n", r)
}
}()

fn(ctx)
}()
}

func (a *async) asyncContext(ctx context.Context) context.Context {
asyncCtx := context.Background()
for _, key := range a.config.propagateKeys {
val := ctx.Value(key)
asyncCtx = context.WithValue(asyncCtx, key, val)
}

return asyncCtx
}

Notice how the signature has changed — Both RunAsync and fn receive a context:

func (a *async) RunAsync(ctx context.Context, fn func(context.Context))

We will replace our go func() {...} with our async wrapper:

func main() {
// code emitted...

// start async with an option to propagate context value with key "traceID"
as := async.New(
async.WithPropagateContextKeys("traceID"),
)

as.RunAsync(ctx, func(ctx context.Context) {
if err := doSomething(ctx); err != nil {
fmt.Printf("main: %v\n", err)
}
})

// code emitted...
}

// https://go.dev/play/p/ZUAckied3nc

Now when we will run it in the playground we’ll get:

Package async extra features

The source code for package async can be found in this GitHub here. It provides more configuration and fine-tuning for opening gorouitnes:

Error Handling

Package async provides a custom error-handling in case the goroutine returns an error. Thus the HandleFunc that RunAsync receives also can return an error:

package async

type HandleFunc func(ctx context.Context) error

func (a *async) RunAsync(ctx context.Context, fn HandleFunc) {
//...
}

Currently, the default is just printing the error but can be customed with the option WithErrorHandler:

type ErrorHandler interface {
HandleError(ctx context.Context, err error)
}

func WithErrorHandler(errorHandler ErrorHandler) AsyncOption

Limiting

You can customize the async package to have max goroutines that are currently open. In case there are already max goroutines open, it waits until one goroutine will finish:

func WithMaxGoRoutines(n uint) AsyncOption

Timeout

You can set a timeout for the max time you want the goroutine could run:

func WithTimeoutForGoRoutine(t time.Duration) AsyncOption

Need to note however that it doesn’t guarantee it. You can’t really close a separate goroutine. The async just starts a context with timeout:

ctx, cacnelFunc := context.WithTimeout(ctx, a.timeoutForGoRoutine)

This means that after that duration, the ctx.Done() channel will be closed, but if nothing listens on this channel, the goroutine will continue to run.

Pool

This is another type in package async . The pool is for cases you want to have a fixed number of goroutines open and feed these goroutines with work. In some cases, it may be more efficient, since you don’t open and close the goroutine for each call.

The pool has a similar API as the async:

func NewPool(options ...PoolOption) *Pool {...}

func (p *Pool) RunAsync(ctx context.Context, fn HandleFunc) {...}

NewPool creates a new Pool instance. The method initializes n number of workers (the default is 10). Each worker listens for a received function on a shared channel.

When calling Pool.RunAsync, it adds the function to the channel.

As in the async, you can configure a context propagator, error handler, and timeouts. In addition you can configure the following params specifically for the pool:

  • How many open goroutines you want to have:
// WithPoolNumberOfWorkers limits the workers number
// Each worker is listening to a received data on a different goroutine.
func WithPoolNumberOfWorkers(n int) PoolOption
  • The size of the pool:
// WithPoolSize limits the number of messages the channel can receive
func WithPoolSize(n uint) PoolOption

Summary

Although Go’s concurrency model is awesome, in an eco-system with many developers, it’s not a bad idea to have a wrapper that will ease and dry the gotchas we might be facing.

These are some very simple use-cases I had the chance to play with for learning purposes. There are probably many more use-cases that can be shared between many projects. In any case, I hope you found this post and package a bit useful and that it will decrease your debug time :)

--

--

Oren Rose
Oren Rose

Written by Oren Rose

I’m a back-end developer, Golang enthusiastic and Clean Architecture zealous

No responses yet