Go is very suitable for concurrent programming: add go in front of your function call, and you function will run on a separate goroutine (like a light thread)
And Channel is a very powerful feature in Go for communicating between goroutines
Unbuffered channel block on send and receive, meaning that a send will block until the other side: the receiver is ready to receive. It can be used for synchronising goroutines, and is a very simple way of communicating and you know exactly when the communication happens.
Where as buffered channel won't block even when no receiver is ready, but will block when the buffer is full. This fits quite well in a queue idea: when the queue is full, no new process can come in; when a process is done, another one can come in.
In this post, I'll build up a simple queue using the unbuffered channel feature in Go
Firstly let's mock out our process function:
func proc(p string) {
log.Println(p, "is processing")
time.Sleep(3 * time.Second)
log.Println("leaving", p)
}
And my main initially looks like this:
func main() {
jobs := []string{"p1", "p2", "p3", "p4", "p5"}
for _, p := range jobs {
proc(p)
}
log.Println("All done!")
}
So we mock out our jobs: a string slice consists of 5 elements. And we simply loop through the jobs and process each one.
Then we need to put them in a queue and run them in parallel. Whenever one finishes, another one starts:
func main() {
const n = 2
jobs := []string{"p1", "p2", "p3", "p4", "p5"}
queue := make(chan string, n)
for _, p := range jobs {
queue <- p
go func(p string) {
proc(p)
<-queue
}(p)
}
log.Println("All done!")
}
Here, we've defined a unbuffered channel with capacity of n, and put proc into a anonymouse function and put it onto a goroutine. We send p to the queue and take out one from the queue after proc finishes. Deadly easy.
But the program will not do anything and directly exit, because main won't wait for all the goroutines to finish.
We need to add a wait group to wait for all goroutines to finish. Foutunately Go's got a rich standard library, and we can use sync.WaitGroup to do this.
Here is the full program:
package main
import (
"log"
"os"
"sync"
"time"
)
func main() {
log.SetOutput(os.Stdout)
const n = 2
jobs := []string{"p1", "p2", "p3", "p4", "p5"}
var wg sync.WaitGroup
queue := make(chan string, n)
for _, p := range jobs {
queue <- p
wg.Add(1)
go func(p string) {
proc(p)
<-queue
wg.Done()
}(p)
}
wg.Wait()
log.Println("All done!")
}
func proc(p string) {
log.Println(p, "is processing")
time.Sleep(3 * time.Second)
log.Println("leaving", p)
}
You can run the code HERE in the browser