How to properly handle concurrency and parallelism with Golang

Rene Manqueros
Analytics Vidhya
Published in
11 min readApr 21, 2021

--

Programming with threads is always awesome, you can get results faster, work on more things at the same time and keep a server busy as much as possible, unfortunately this creates several problems that are not so obvious when you’re not used to working with threads.

Old but gold

In this short article I’ll guide you on what I consider the most common problems and how I usually work around them.

Across the tutorial we’ll be using the same base exercise which will run the numbers from 0–15 and print them on screen, after each step we’ll be incrementally adding new features but the main mechanic of counting to a certain limit will persist, the code might not be the best Go code that you’ve ever seen or the most idiomatic but it’s written in a way to make the problems that we’re trying to solve very obvious.

We’ll have our starting point on ex_1.go:

package main

import (
"log"
"time"
)

func main() {
start := time.Now()
var counter = 0
for i := 1; i <= 16; i++ {
log.Println("Start", counter)
counter++
}
elapsed := time.Since(start)
log.Println(elapsed)
}

This will be just the numbers in order:

This represents any program you write that does its work sequentially, it does task 0, once it finishes proceeds to 1 and so on.

Now we’ll introduce our first threading mechanism which is a “go routine”, this will run any code inside in a different thread (which is not really really a thread but for now let us assume they’re the same thing), we start a go routine by using the go keyword before a function, it can be an anonymous function or a named one, in here I’ll just do an anonymous one.

ex_2.go

Now that we’ve sent our counting/printing statements to a new thread the behavior is a bit different from the first run:

It did run faster… but it didn’t print any of the numbers!

When we started a new thread, the main thread kept going, ignoring what was happening on that new thread it then reached the end of the main thread (line 18) and stopped execution, since there is nothing telling it that another thread was running it just closes up.

So we’ll now do a small adjustment to naively wait for all threads to finish:

ex_3.go

This time we added a sleep to the main thread and we hope that all goroutines finish in those 2 seconds:

OK so we’re getting better now, we have the 16 rows but they all show a 0, also if we look at the log times, they all ran on second :47, nothing was displayed on :48, then it finished executing on :49, so our naive wait of waiting for the threads to finished created two new problems:

  • the counter value isn’t updating properly
  • we’re idle for some time

If I run multiple times this same code, an interesting phenomenon will happen:

One of the tasks is reporting an 11 (you might get another different number), this is because we have a data race condition, all threads are started at pretty much the same time and they are all sharing the counter variable, so they all start with the value of 0, increment it by one and then print it out, for some of these threads the value was 0 when they started but when they increment it, it was something different, that is what causes the 11 here.

To solve this we’ll do a very basic but not so accurate thing (we’ll solve this in a better way later on):

ex_4.go

First we’ll do the counting on the iteration instead of inside the thread and well just let the thread do the printing of the value, since doing the increments is faster than spinning up a new thread, also we’ll be copying the value of counter and send it to the goroutine as a parameter, that copy will be read as “thisCounter”:

So the data race condition seems to have been fixed, at least on this simple case where the counter++ is faster than creating new threads, we’ll consider it a win for now and fix it properly later, but let’s move over to the other problem we created when threading: waiting for tasks to finish.

ex_5.go

This time we’re introducing another component to our program: a WaitGroup, this is a mechanism to synchronize all of our threads.

We declare our waitgroup on line 12, then on each iteration we’ll tell on line 15 that we’re adding a new thread to our wait group, the delta argument will tell the waitgroup how many tasks to wait for at the end, in here I am incrementing the waitgroup size by 1 per loop iteration but we could have done it outside of the loop and just say 16, doing it inside the loop will make our code more portable for when we do the next level of waitgroups, so we do the delta=1 for now, inside the goroutine you’ll notice a “wg.Done()” call, what this does is to decrement by one, the amount of tasks the waitgroup will need to synchronize at the end, so wg.Add increases it and wg.Done decreases it, then on line 22, we have a wg.Wait, which is just waiting for an internal counter inside the waitgroup to reach 0, until it does, it will just block the execution forever and ever and if you never call enough wg.Done(), then we’ll be idle forever or until an external factor breaks the execution.

This time the program finishes right after all tasks have finished :)

So we have another problem that wasn’t visible since this is a tiny example but, if we had thousands of tasks to run and they all did some heavy processing, we’d be starting a thousand goroutines all at the same time, depending on the available resources, we might crash the system as everything is working at the same time, for this we have a small mechanism to control how much work is enqueued at the same time: a SizedWaitGroup.

ex_6.go

We’ll use remeh’s package for sizedwaitgroups and you’ll notice the syntax is pretty similar to a regular waitgroup:

This time when creating the sizedwaitgroup, we need to specify a limit, the maximum amount of tasks to run in parallel before spawning a new thread, this will give us the benefits of parallelization without the risk of destroying a server, we’ll then .Add() a task to our sizedwaitgroups counter, on this implementation we don’t get to specify how much tasks we’re adding, it is forced to be in increments of one, this is because the function will block the main thread if the sizedwaitgroup is full (6 in our example), so the first 6 iterations of the loop will go on normally then on the 7th iteration, since the sizedwaitgroup is full (and no task has reported a .Done() yet), it will wait until there is a slot available in the group, once there is, that thread will start and the loop will go on until it finishes all the work or gets blocked again due to our wait pool being full, the .Done() function behaves the same way as in in the waitgroup as well as the .Wait() function.

This time we see that tasks are worked in blocks of 6, and since our delay before finishing a thread is at 2 seconds, that is how long it takes to free up a slot.

Now that we have several mechanisms to run tasks, lets focus on the problem that we’ve been deferring for a while: data race conditions.

We’ll start by coding a small variation of our program but one that makes the problem very obvious:

ex_7.go

This one has most of the components we’ve been talking for a while, but the problem lies in the fact that we’re writing to the counter variable, and by the time we have to write to it, more changes could have occurred (mocked via the time.Sleep), so the value is not constant/predictable:

The code might look dumb but the point is proven

For this we’ll use a tool called “channels”, which is the preferred way of sharing data between threads.

We start on line 9 by creating a channel called “counterChan”, which will contain data of the type “int” (you can also specify a buffer size but that will be on another article).

Two identical threads are then started on lines 11 and 18, each will run up to 8 times, send the value of 1 through the channel and sleep for 100 ms, then the main thread will run for 16 iterations (8+8 of each thread) and will pull a value from the counterChan and sum it to the counter variable.

The channel will behave kind of like a one item queue (over simplified description), anything you write to it using the format of:

CHANNEL <- value

will be written in a thread safe manner to this “queue”, and then when you try to read from it:

variable <- CHANNEL

this will block the execution if the channel is empty, once a value is placed onto it, then it’s sent to the variable and the execution is unlocked (at least until it reaches another channel read).

In the example, we’re writing a hard-coded value of 1, and then have a loop that will listen 16 times for values from the channel and add that value to the counter variable.

If the channel already has a value that we haven’t read from yet, the execution will be blocked until it is empty, so we should always be very careful of reading/writing to the channel as fast as possible and deferring any other code to another thread.

Since the counter is aggregated in a single thread the numbers are sorted

We will now approach the same issue with a different solution: Mutexes

ex_9.go

This time we have a slice of strings and two threads that will write to it, we have the same data race condition, that is more obvious once you run the program multiple times:

In the first and last runs, it worked as we would have expected, one item is added to the slice then the other one and data is printed correctly with one item from each thread, in the second run, each thread read the slice as being empty, thread one added a value and printed it with that value, then thread two did the same thing, in the third run the data from the first thread was added but not printed, then the second thread added a value, the first thread prints it, then the second thread prints it (if it’s confusing for you, imagine how confused the computer will be!).

To solve it this time, we’ll use the well know mutex (MUTual EXclution) primitive, which is a lock you can set on something that will be shared by multiple threads and only one thread can work on it at a time.

ex_10.go

Line 9 will define our mutex, then on lines 16/25, a .Lock() is used to start a block of exclusive access to a resource, once the values are added to the Tasks slice, the lock is immediately removed so that a different thread can make use of the slice as well.

And now no matter how many times we run it, the data will be what we expect it to be.

The next example will be pretty much the same just that this time we’ll be doing more concurrent threads:

ex_11.go

Even if the data is sorted different after each run, all data points are unique which is what we expect.

Our final example will be a design pattern I use to keep the resources in use as much as possible (and only when needed).

To be continued…

We do our typical loop and then call a function “Add” on each number after a random sleep, the for at the end will just keep the program running constantly, on my use cases the program always has work to do/is listening to a rabbitMQ queue.

… continued

The Add func takes the item that was sent, blocks access to the tasks slice via a mutex and appends it to the slice, if the slice has a length of 10 items, those are then sent to a different process “Dump” which could be doing anything with them, those 10 items are removed from the slice so that they don’t get reprocessed. If for any reason the slice is less than 10 items, then we start a timer that will wait for 5 seconds, if the timer expires and the slice has data in it, the Dump func will be called with whatever is available, this way we keep pushing data forward either because our “tasks buffer” is full or enough time has passed without an action on it, that we should clear it to keep the other part busy.

The first 3 lines will be triggered by the buffer being full and the last one because a timer expired, if you note the times, the 5 seconds start running from the first add and the timer that was triggered was the one from the T24 data point, which happened before that item was dumped, once that timer expired, the others will just skip the dump as those 2 leftover values have already left the buffer.

--

--