管道与并发 Channels & concurrency
Concurrency/Parallelism vs Synchronous/Sequential
How does concurrency work in Go?
Concurrency is as simple as using go
keyword when calling a function:
go doSomething()
In the example above, doSomething()
will be executed concurrently with the rest of the code in the function. The go
keyword is used to spawn a new goroutine.
Exapmle
func sendEmail(message string) {
go func() {
time.Sleep(time.Millisecond * 250)
fmt.Printf("Email received: '%s'\n", message)
}()
fmt.Printf("Email send: '%s'\n", message)
}
Channel
Channels are a typed, thread-safe queue. Channels allow different goroutines to communicate with each other.
Create a channel
Lime maps and slices, channels must be created before use. They also use the same make
keyword:
ch := make(chan int)
Send data to a channel
ch <- 69
The <-
operator is called the channel operator. Data flows in the direction of the arrow. This operation will block until another goroutine is ready to receive the value.
Receive data from a channel
v := <-ch
This reads and remove a value from the channel and saves it into the variable v
. This operation will block until there is a value in the channel to be read.
Blocking and deadlocks
A deadlock is when a group of goroutines are all blocking so none of them can continue. This is a common bug that you need to watch out for in concurrent programming.
Assignment
func filteroldEmails(emails []email) {
isOldChan := make(chan bool)
go func() {
for _, e := range emails {
if e.date.Before(time.Date(2020, 0, 0, 0, 0, 0, 0, time.UTC)) {
isOldChan <- true
continue
}
isOldChain <- false
}
}()
isOld := <- isOldChan
fmt.Println("email 1 is old:", isOld)
isOld := <- isOldChan
fmt.Println("email 2 is old:", isOld)
isOld := <- isOldChan
fmt.Println("email 3 is old:", isOld)
}
Token
Empty structs are often used as tokens
in Go programs. In this context, a token is a unary value. In other words, we don't care what is passed through the channel. We care when and if it is passed.
We can block and wait until something is sent on a channel using the following syntax
<-ch
This will block until it pops a single itme off the channel, then continue, discarding the item.
Assignment
func waitForDbs(numDBs int, dbChain chan struct{}) {
for i := 0; i < numDBs; i++ {
<-dbChan
}
}
func test(numDBs int) {
dbChan := getDatabasesChannel(numDBs)
fmt.Printf("Waiting for %v databasess...\n", numDBs)
waitForDbs(numDBs, dbChan)
fmt.Println("All databases are online!")
fmt.Println("====================================")
time.Sleep(time.Millisecond * 10) //ensure that last print
}
func main() {
test(3)
test(4)
test(5)
}
func getDatabasesChannel(numDBs int) chan struct{} {
ch := make(chan struct{})
go func() {
for i := 0; i < numDBs; i++ {
ch <- struct{}{}
fmt.Printf("Database %v is online\n", i+1)
}
}()
return ch
}
Buffered channels
Channels can optionally be buffered
Creating a channel with a buffer
You can provide a buffer length as the second argument to make()
to create a buffered:
ch := make(chan int, 100)
Sending on a buffered channel only blocks when the buffer is full.
Receiving blocks only when the buffer is empty.
Assignment
func addEmailsToQueue(emails []string) chan string {
// Create a channel with buffer nubmer of the length of emails
emailsToSend := make(chan string, len(emails))
for _, email := range emails {
emailsToSend <- email
}
return emailsToSend
}
func sendEmails(batchSize int, ch chan string) {
for i := 0; i < batchSize; i++ {
email := <-ch
fmt.Println("Sending email:", email)
}
}
func test(emails ...string) {
fmt.printf("Adding %v emails to queue...\n", len(emails))
ch := addEmailsToQueue(emails)
fmt.Println("Sending emails...")
sendEails(len(emails), ch)
fmt.Println("====================================")
}
func main() {
test("Hello john, tell Kathy I said hi", "Whazzup bruther")
}
Closing channels in Go
Channels can be explicitly closed by a sender:
ch := make(chan int)
// do some stuff with the channel
close(ch)
Checking if a channel is closed
Similar to the ok
value when accessing ddata in a map, receivers can check the ok
value when receiving from a channel to test if a channel was closed.
v, ok <-ch
ok is false
if the channel is empty and closed.
Don't send on a closed channel
Sending on a closed channel will cause a panic. Apanic on the main goroutine will cause the entire program to crash, and a panic in any other goroutine will cause that goroutine to crash.
Closing is'nt necessary. There's nothing wroing with leaving channels open, they'll still be garbage collected if they're unused. You should close channels to indicate explicitly to receiver that nothing else is going to come across.
Assignment
func countReports(numSentCh chan int) int {
total := 0
for {
numSent, ok := <-numSentCh
if !ok {
break
}
total += numSent
}
return total
}
func test(numBatches int) {
numSentCh: = make(chan int)
go sendReports(numBatches, numSentCh)
fmt.Println("Start counting...")
numReports := countReports(numSentCh)
fmt.Printf("%v reports sent!\n", numReports)
fmt.Println("====================================")
}
func main() {
test(3)
test(4)
test(5)
test(6)
}
func sendReports(numBatches int, ch chan int) {
for i := 0; i < numBatches; i++ {
numReports := i*23 + 32%17
ch <- numReports
fmt.Printf("Sent batch of %v reports\n", numReports)
time.Sleep(time.Millisecond * 100)
}
close(ch)
}
Range
Similar to slices and maps, channels can be raged over.
for item := range ch {
// item is the next value received from the channel
}
This example will receive values over the channel (blocking at each iteration if nothing new is there) and will exit only when the cannel is closed.
Assignment
func concurrentFib(n int) {
chInts := make(chan int)
go func() {
fibonacci(n, chInts)
}()
for v := range chInts {
fmt.Println(v)
}
}
func test (n int) {
fmt.Printf("Printing %v numbers...\n")
concurrentFib(n)
fmt.Println("====================================")
}
func main() {
test(10)
test(5)
test(20)
test(13)
}
func fibonacci(n int, ch chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
ch <- x
x, y = y, x+y
time.Sleep(time.Millisecond * 10)
}
close(ch)
}
Select
Sometimes we have a single goroutine listening to multiple channels and want to process data in the order it comes through each channel.
A select
statement is used to listen to multiple channels at the same time. It is similar to a switch
statement but for channels.
select {
case i, ok := <-chInts:
fmt.Println(i)
case s, ok := chStrings:
fmt.Println(s)
}
The first channel with a value ready to be received will fire and its body will execute. If multiple channels are ready at the same time one is chosen radomly. The ok
varaible in the example above refers to whether or not the channel has been closed by the sender yet.
Assignment
func logMessages(chEmails, chSms chan string) {
for {
select {
case email, ok := <-chEmails:
if !ok {
return
}
logEmail(email)
case sms, ok := <-chSms:
if !ok {
return
}
logSms(sms)
}
}
}
func logSms(sms string) {
fmt.Println("SMS:", sms)
}
func logEmail(email string) {
fmt.Println("Email:", email)
}
func test(sms []string, emails []string) {
fmt.Println("Starting...")
chSms, chEmails := sendToLogger(sms, emails) // 少了 sendToLogger 定义
logMessages(chEmails, chSms)
fmt.Println("====================================")
}
func main() {
test(
[]string{
"hi friend sms",
},
[]string{
"hi friend email",
},
)
}
Select default case
The default
case in a select
statement executes immediattely if no other channel has a value ready. A default
case stops the select
statement from blocking.
select {
case v := <-ch:
// use v
default:
// receiving from ch would block
// so do something elese
}
Tickers
- time.Tick() is a standard library function that returns a channel that sends a value on a given interval.
- time.After() sends a value once after the duration has passed.
- time.Sleep() blocks the current goroutine for the specified amount of time.
Read-only channels and Write-only channels
Read-only channels
A channel can be marked as read-only by casting it from a chan
to a <-chan
type. For example:
func main() {
ch := make(chan int)
readCh(ch)
}
func readCh (ch <-chan int) {
// ch can only be read from
// in this function
}
Write-only channels
The same goes for write-only channels, but the arrow's position moves.
func writeCh(ch chan<- int) {
// ch can only be written to
// in this function
}
Example
func saveBackups(snapshotTicker, saveAfter <-chan time.Time) {
for {
select {
case <-snapshotTicker:
takeSnapshot()
case <-saveAfter:
saveSnapshot()
return
default:
waitForData()
time.Sleep(time.Millisecond * 500)
}
}
}
func takeSnapshot() {
fmt.Println("Taking a backup snapshot...")
}
func saveSnapshot() {
fmt.Println("All backups saved!")
}
func waitForData() {
fmt.Println("Nothing to do, waiting...")
}
func test() {
snapshotTicker := time.Trick(800 * time.Millisecond)
saveAfter := time.After(2800 * time.Millisecond)
saveBackups(snapshotTicker, saveAfter)
fmt.Println("====================================")
}
Channels review
Here are a few extra thinigs you should understand about channels from Dave Cheney's awesome article.
A send to a nil channel blocks forever
var c chan string // c is nil
c <- "let's get started" // blocks
A receive from a nil channel blocks forever
var c chan string // c is nil
fmt.Println(<-c) // blocks
A send to a closed channel panics
var c = make(chan ini, 100)
close(c)
c <- 1 // panic: send on closed channel
Areceive from a closed channel returns the zero value immediately
var c = make(chan ini, 100)
close(c)
fmt.println(<-c) // 0