Tags: #advance #topic #golang #goroutine #channels #workerpool #threadpool
Often we end up with some work which is so time-consuming that if we’re able to assign, multiple person/worker, to do that job the execution time will reduce the time which will save a lot of time for those particular tasks.
Today we’re going to solve this problem by creating a worker pool also known as thread pool so that tasks are done by multiple workers concurrently. We’re particularly using Golang’s lightweight thread also known as Goroutine & Channels.
Prerequisites: Goroutine, Channels
Goroutine
A goroutine is a lightweight thread managed by the Go runtime unlike other languages like Python who’s threads are managed by OS and also expensive to run. So goroutines are functions or methods that run concurrently with other functions or methods.
Channels
Channels are ways in which different goroutines communicate with each other. We can understand them as pipes through which you can connect with different concurrent goroutines. The communication is bidirectional by default, meaning that you can send and receive values from the same channel.
Let’s define some workers so that we can solve the time issue using goroutines and channels.
Task
func task() {
time.Sleep(time.Second) // some task to be executed
}
Job
Note: Each job takes 1 second to complete
func job(workerID, jobID int) {
fmt.Println("worker", workerID, "started job", jobID)
task()
fmt.Println("worker", workerID, "finished job", jobID)
}
Worker
func worker(workerID int, jobs <-chan int, results chan<- int) {
for j := range jobs {
job(workerID, j)
results<- j * 2
}
}
In Golang, we define a channel with the keyword `chan`. Anyone get confused when seeing those arrow signs with channels, let’s simplified those first…
chan // read-write <-chan // read-only chan<- // write-only
So we can say without any arrow in the `chan` keyword would mean the channel can read-write which is the default behavior, but if we want a read-only channel we put an arrow sign before the `chan` keyword like `<-chan` and if we want a write-only channel we put an arrow sign after the `chan` keyword like `chan<-` this.
So for our example above the `jobs` channel only reads and the `results` channel only writes data.
So let’s continue on our worker pool example…
Our worker function will receive the jobs and send the results of the job in the results channel.
We make the job function to execute the task function to simulate an actual task running by the worker.
In the `task` function we put a sleep function which will wait for a second so that it behaves like expensive/time-consuming work.
Create a int channel with buffer
func makeIntChannels(buffer int) chan int {
channel := make(chan int, buffer)
return channel
}
Worker pool
func execUsingWorkerPool(numOfJobs, numOfWorkers int) {
defer duration(track("time using worker pool"))
jobs := makeIntChannels(numOfJobs)
results := makeIntChannels(numOfJobs)
for w := 1; w <= numOfWorkers; w++ {
go worker(w, jobs, results)
}
for job := 1; job <= numOfJobs; job++ {
jobs<- job
}
close(jobs) // closing the job channel to indicate that's all the work we have.
for i := 1; i <= numOfJobs; i++ {
<-results
}
}
Without worker pool
func execWithoutUsingWorkerPool(numOfJobs, worker int) {
defer duration(track("time without using worker pool"))
for j := 1; j <= numOfJobs; j++ {
job(worker, j)
}
}
Calculate execution time
func track(msg string) (string, time.Time) {
return msg, time.Now()
}
func duration(msg string, start time.Time) {
log.Printf("%v: %v\n", msg, time.Since(start))
}
whoo!!! lots of code right…
Let’s go through the `main` function to understand what’s happening
Main function
func main() {
const numOfJobs = 5
const numOfWorkers = 3
execUsingWorkerPool(numOfJobs, numOfWorkers)
execWithoutUsingWorkerPool(numOfJobs, 1)
}
In the main function, we’re defining the number of jobs and workers as a const a value so that we can reuse them in the worker pool function and single worker pool function.
Let’s check out the `execUsingWorkerPool` function to understand what’s happening.
defer duration(track("time using worker pool"))
In the first line, we use the `defer` keyword, which means that when `execUsingWorkerPool` function executes all other statements in the function block & the last command will be executed would be defined in the `defer` statement, cool right…
`duration` & `track` function here is a util function which allows us to track the execution time. `track` function passed as a parameter in the duration function as in the Golang, this is called higher-order function or first-class citizen which means is a function can be assigned to a variable, pass as a parameter to other function and return a function from another function.
jobs := makeIntChannels(numOfJobs) results := makeIntChannels(numOfJobs)
Next line we define two int buffer channels as jobs & results. In order to use our pool of workers, we need to send them jobs and collect their results.
for worker := 1; worker <= numOfWorkers; worker++ {
go worker(worker, jobs, results)
}
Next line This starts up workers, for our example, we use 3 workers, initially blocked because there are no jobs yet.
for job := 1; job <= numOfJobs; job++ {
jobs<- job
}
close(jobs) // closing the job channel to indicate that's all the work we have.
Next, we send a total of 5 jobs and then close the jobs channel to indicate, that’s all the work we have right now.
for i := 1; i <= numOfJobs; i++ {
<-results
}
Finally, we collect all the results of the jobs we define. This also ensures that the worker goroutines have finished all the workers.
If you like, you can read the same article on my Personal Blog
You can read our other blog-posts Here
You can read my other blog-posts Here
Output
worker 3 started job 1
worker 1 started job 2
worker 2 started job 3
worker 2 finished job 3
worker 2 started job 4
worker 3 finished job 1
worker 3 started job 5
worker 1 finished job 2
worker 3 finished job 5
worker 2 finished job 4
2021/03/18 09:25:25 time using worker pool: 2.000943787s
worker 1 started job 1
worker 1 finished job 1
worker 1 started job 2
worker 1 finished job 2
worker 1 started job 3
worker 1 finished job 3
worker 1 started job 4
worker 1 finished job 4
worker 1 started job 5
worker 1 finished job 5
2021/03/18 09:25:30 time without using worker pool: 5.001234313s
In Conclusion, we can say using the worker pool, execution time reduces to 2+ seconds where without worker pool, it’s taking 5+ seconds. Hopefully, After this, we understand what is a worker pool and how to create one in Golang, and the benefit of using a worker pool.