Introduction
Go’s concurrency model is great. It’s one of the main features that brings many developers to Go. But despite its low cost and (super) easy implementation, it has a few gotchas. In this post, we’ll see some of them and how to overcome them by building a simple async wrapper.
Recovering From Panic
Our first reason for building a simple async wrapper instead of just typing go ...
, is recovering from a panic.
Imagine the next example:
func main() {
defer func() {
fmt.Println("In main defer func")
}()
fmt.Println("main running go pleaseDontPanic()")
go pleaseDontPanic()
time.Sleep(time.Second)
fmt.Println("End")
}
func pleaseDontPanic(strings ...string) {
fmt.Println(strings[0])
}
// https://go.dev/play/p/-9E1YXeH97T
When running it in the playground you will get:
This is because you cannot recover from a panic in a different goroutine. If a goroutine panics and has no recover defer block, it will terminate the application. That doesn’t sound so great.
The function pleaseDontPanic
has a runtime error, which causes the application to terminate. Note how the defer in the main wasn’t even called.
Needless to say that a good programmer would have checked the length of strings
. In Go, you can almost always make sure your function won’t panic, but this is just an example. We are humans and our friends are humans (for now), so bugs may occur. A simple wrapper could add a recover block automatically:
package async
type async struct {
}
func New() *async {
return &async{}
}
func (a *async) RunAsync(fn func()) {
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Printf("async.RunAsync recovered from panic: %s\n", r)
}
}()
fn()
}()
}
Instead of running go pleaseDontPanic()
we’ll simply wrap via our aync
package :
func main() {
// code emitted...
as := async.New()
as.RunAsync(func() {
pleaseDontPanic()
})
// code emitted...
// https://go.dev/play/p/C9LuvW722U6
Now, when running in the playground, you will get this output:
Context Propagation
Note how our async.RunAsync
function argument didn’t take a context
. Yet, in some cases, we would like to pass a context
to our goroutine. A common scenario is traceID
. We can have trace id in the context, and we would like to propagate it to the goroutine.
We shouldn’t pass the context
directly to our goroutine due to canceling. Imagine the following doSomething
function:
func doSomething(ctx context.Context) error {
select {
case <-ctx.Done():
return fmt.Errorf("doSomething canceling: %w", ctx.Err())
default:
fmt.Printf("doSomething is doing something, trace id = %q\n", ctx.Value("traceID"))
}
return nil
}
Checking ctx.Done()
channel is very common. For example, SQL library will prevent the query from taking place if the context is done (ctx.Done()
closed).
In a web server, it is useful in cases the user terminates the request manually (clicks ctrl/cmd+c ; closes the browser; etc…), the SQL will stop what it’s doing and release the resources. Every time the HTTP request is finished the context is canceled. (Need to say, that the context is done also in case of a timeout).
If we will run doSomething
in a goroutine with the same context, we will cause our doSomething
to cancel.
func main() {
ctx := context.WithValue(context.Background(), "traceID", "someTraceID")
ctx, cancelFunc := context.WithCancel(context.Background())
go func() {
if err := doSomething(ctx); err != nil {
fmt.Printf("main: %v\n", err)
}
}()
cancelFunc() // imagine that here the http handler returns
time.Sleep(time.Second)
}
// https://go.dev/play/p/Y6f-NUJkPkD
When running it in the playground, you will get:
We are canceling the context before main
ends, which happens before our goroutine runs. Thus doSomething
receives the ctx.Done()
channel.
The easy solution is just passing context.Background()
. However, there are cases we would like to propagate values to the new context. For example, tracing data such as traceID
or span
, audit data such as userID
, and so on.
We will add our async a config, which will be retrieved via functional options:
package async
type runAsyncConfig struct {
propagateKeys []interface{}
}
type RunAsyncOption func(*runAsyncConfig)
type async struct {
conf runAsyncConfig
}
func New(options ...RunAsyncOption) *async {
var conf runAsyncConfig
for _, op := range options {
op(&conf)
}
return &async{
conf: conf,
}
}
And we will add an option to pass keys that needs to be propagated to the goroutine:
func WithPropagateContextKeys(keys ...interface{}) RunAsyncOption {
return func(conf *runAsyncConfig) {
conf.propagateKeys = append(conf.propagateKeys, keys...)
}
}
Out async
will start a new context from background, and will propagate the values from the received one:
func (a *async) RunAsync(ctx context.Context, fn func(context.Context)) {
// Start a new async context
ctx = a.asyncContext(ctx)
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Printf("RunAsync recovered from panic: %s\n", r)
}
}()
fn(ctx)
}()
}
func (a *async) asyncContext(ctx context.Context) context.Context {
asyncCtx := context.Background()
for _, key := range a.config.propagateKeys {
val := ctx.Value(key)
asyncCtx = context.WithValue(asyncCtx, key, val)
}
return asyncCtx
}
Notice how the signature has changed — Both RunAsync
and fn
receive a context:
func (a *async) RunAsync(ctx context.Context, fn func(context.Context))
We will replace our go func() {...}
with our async wrapper:
func main() {
// code emitted...
// start async with an option to propagate context value with key "traceID"
as := async.New(
async.WithPropagateContextKeys("traceID"),
)
as.RunAsync(ctx, func(ctx context.Context) {
if err := doSomething(ctx); err != nil {
fmt.Printf("main: %v\n", err)
}
})
// code emitted...
}
// https://go.dev/play/p/ZUAckied3nc
Now when we will run it in the playground we’ll get:
Package async
extra features
The source code for package async
can be found in this GitHub here. It provides more configuration and fine-tuning for opening gorouitnes:
Error Handling
Package async
provides a custom error-handling in case the goroutine returns an error. Thus the HandleFunc
that RunAsync
receives also can return an error:
package async
type HandleFunc func(ctx context.Context) error
func (a *async) RunAsync(ctx context.Context, fn HandleFunc) {
//...
}
Currently, the default is just printing the error but can be customed with the option WithErrorHandler
:
type ErrorHandler interface {
HandleError(ctx context.Context, err error)
}
func WithErrorHandler(errorHandler ErrorHandler) AsyncOption
Limiting
You can customize the async package to have max goroutines that are currently open. In case there are already max goroutines open, it waits until one goroutine will finish:
func WithMaxGoRoutines(n uint) AsyncOption
Timeout
You can set a timeout for the max time you want the goroutine could run:
func WithTimeoutForGoRoutine(t time.Duration) AsyncOption
Need to note however that it doesn’t guarantee it. You can’t really close a separate goroutine. The async
just starts a context with timeout:
ctx, cacnelFunc := context.WithTimeout(ctx, a.timeoutForGoRoutine)
This means that after that duration, the ctx.Done()
channel will be closed, but if nothing listens on this channel, the goroutine will continue to run.
Pool
This is another type in package async
. The pool
is for cases you want to have a fixed number of goroutines open and feed these goroutines with work. In some cases, it may be more efficient, since you don’t open and close the goroutine for each call.
The pool has a similar API as the async
:
func NewPool(options ...PoolOption) *Pool {...}
func (p *Pool) RunAsync(ctx context.Context, fn HandleFunc) {...}
NewPool
creates a new Pool
instance. The method initializes n number of workers (the default is 10). Each worker listens for a received function on a shared channel.
When calling Pool.RunAsync
, it adds the function to the channel.
As in the async, you can configure a context propagator, error handler, and timeouts. In addition you can configure the following params specifically for the pool:
- How many open goroutines you want to have:
// WithPoolNumberOfWorkers limits the workers number
// Each worker is listening to a received data on a different goroutine.
func WithPoolNumberOfWorkers(n int) PoolOption
- The size of the pool:
// WithPoolSize limits the number of messages the channel can receive
func WithPoolSize(n uint) PoolOption
Summary
Although Go’s concurrency model is awesome, in an eco-system with many developers, it’s not a bad idea to have a wrapper that will ease and dry the gotchas we might be facing.
These are some very simple use-cases I had the chance to play with for learning purposes. There are probably many more use-cases that can be shared between many projects. In any case, I hope you found this post and package a bit useful and that it will decrease your debug time :)