一些关于协程(Goroutine)和通道(channel)的练习题
一起学习
协程
package main
import (
"fmt"
"time"
)
func f(from string) {
for i := 0; i < 3; i++ {
fmt.Println(from, ":", i)
}
}
func main() {
f("direct")//阻塞式
go f("goroutine")//协程交替输出
go func(msg string) {//协程交替输出
fmt.Println(msg)
}("going")
time.Sleep(time.Second)
fmt.Println("done")
}
direct : 0
direct : 1
direct : 2
going
goroutine : 0
goroutine : 1
goroutine : 2
done
通道
package main
import "fmt"
func main() {
//双向的通道,无容量
messages := make(chan string)
//messages通道接收消息
//容量为0就不能接收多个值,不然会死锁
go func() { messages <- "ping" }()
//mgs变量读取消息
msg := <-messages
//打印读取到的消息
fmt.Println(msg)
}
ping
通道缓冲
package main
import "fmt"
func main() {
//定义双通道,且容量为2
messages := make(chan string, 2)
//接收两个值
messages <- "buffered"
messages <- "channel"
//输出接收到的值
fmt.Println(<-messages)
fmt.Println(<-messages)
}
buffered
channel
通道同步
package main
import (
"fmt"
"time"
)
func worker(done chan bool) {
fmt.Print("working...")
time.Sleep(time.Second)
fmt.Println("done")
done <- true
}
func main() {
done := make(chan bool, 1)
go worker(done)
//程序将一直阻塞,直至收到 worker 使用通道发送的通知。
<-done//阻塞了主协程
}
如果你把 <- done
这行代码从程序中移除, 程序甚至可能在 worker
开始运行前就结束了。
working...done
通道方向
package main
import "fmt"
// chan<-,我<-吃的,吃饭,限定接收通道,也就是只能获拿
func ping(pings chan<- string, msg string) {
pings <- msg
}
// <-chan,吃的<-我,做饭,限定是发送通道,也就是只能给
func pong(pings <-chan string, pongs chan<- string) {
msg := <-pings
pongs <- msg
//pongs <-(<-pings)
}
func main() {
pings := make(chan string, 1)
pongs := make(chan string, 1)
//pings <- "passed message"
ping(pings, "passed message")
//pongs <-(<-pings)
pong(pings, pongs)
fmt.Println(<-pongs)
}
不必死记形式,理解箭头的流向,就可以判断是发送还是接收
passed message
通道选择器
Go 的 选择器(select) 让你可以同时等待多个通道操作。 将协程、通道和选择器结合,是 Go 的一个强大特性。
package main
import (
"fmt"
"time"
)
func main() {
c1 := make(chan string)
c2 := make(chan string)
c3 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
c1 <- "one"
}()
go func() {
time.Sleep(2 * time.Second)
c2 <- "two"
}()
go func() {
time.Sleep(3 * time.Second)
c2 <- "three"
}()
for i := 0; i < 3; i++ {
//高级的switch,一次选择一个
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
case msg3 := <-c3:
fmt.Println("received", msg3)
}
}
}
程序总共用了3秒完成,因为sleep那三个协程是并发执行的
received one
received two
received three
超时处理
func function(){
c1 := make(chan string, 1)
go func() {
time.Sleep(2 * time.Second)
c1 <- "result 1"
}()
select {
case res := <-c1:
fmt.Println(res)
//超时机制,运行时间大于1秒就自动执行
case <-time.After(1 * time.Second):
fmt.Println("timeout 1")
}
c2 := make(chan string, 1)
go func() {
time.Sleep(2 * time.Second)
c2 <- "result 2"
}()
select {
case res := <-c2:
fmt.Println(res)
//超时机制,运行时间大于3秒就自动执行
case <-time.After(3 * time.Second):
fmt.Println("timeout 2")
}
}
//
// @Title countTime
// @Description 记录函数运行时间
// @Author lido 2022-02-26 15:49:27
// @Param function
//
func countTime(function func()){
startTime := time.Now()
function()
elapsedTime := time.Since(startTime) / time.Millisecond // duration in ms
fmt.Printf("Segment finished in %dms", elapsedTime) //Segment finished in xxms
}
func main() {
countTime(function)
}
通过最后函数运行时间可以判断是并发执行的,不然同时等待连个2秒的sleep最少需要4秒钟
timeout 1
result 2
Segment finished in 3001ms
非阻塞通道操作
常规的通过通道发送和接收数据是阻塞的。 然而,我们可以使用带一个 default
子句的 select
来实现 非阻塞 的发送、接收,甚至是非阻塞的多路 select
。
package main
import "fmt"
func main() {
messages := make(chan string)
signals := make(chan bool)
select {
case msg := <-messages:
fmt.Println("received message", msg)
default:
fmt.Println("no message received")
}
msg := "hi"
select {
case messages <- msg:
fmt.Println("sent message", msg)
default:
fmt.Println("no message sent")
}
select {
case msg := <-messages:
fmt.Println("received message", msg)
case sig := <-signals:
fmt.Println("received signal", sig)
default:
fmt.Println("no activity")
}
}
no message received
sent message #错了,因该是no message sent
no activity
这里,第二个输出是no message sent,因为messages通道没有容量,且有接收者,所以是不能获取值的
但是但是!下面这个程序就可以赋值
func main() {
//双向的通道,无容量
messages := make(chan string)
//messages通道接收消息
//go func() { messages <- "ping" }()
time.Sleep(time.Second)
}
这里的原因其实通过报的错误就可以看的出来,再看看下面这个程序
func in(msg chan string){
msg <- "ping"
}
func main() {
//双向的通道,无容量
messages := make(chan string)
//在主协程执行函数
in(messages)
time.Sleep(time.Second)
}
fatal error: all goroutines are asleep - deadlock!
会报错,所有协程死锁!
那怎样才能不死锁嘞?其实就是开另外一个协程就不会死锁了
func in(msg chan string){
msg <- "ping"
}
func main() {
//双向的通道,无容量
messages := make(chan string)
//再另外一个协程执行这个函数,就不会导致所有的线程死锁了
go in(messages)
time.Sleep(time.Second)
}
通道的关闭
close() 关闭 一个通道意味着不能再向这个通道发送值了。 该特性可以向通道的接收方传达工作已经完成的信息。
package main
import "fmt"
func main() {
jobs := make(chan int, 5)
done := make(chan bool)
go func() {
for {
j, more := <-jobs
if more {
fmt.Println("received job", j)
} else {
fmt.Println("received all jobs")
done <- true
return
}
}
}()
for j := 1; j <= 3; j++ {
jobs <- j
fmt.Println("sent job", j)
}
close(jobs)
fmt.Println("sent all jobs")
<-done
}
received job 1
sent job 1
sent job 2
sent job 3
sent all jobs
received job 2
received job 3
received all jobs
通道遍历
package main
import "fmt"
func main() {
queue := make(chan string, 2)
queue <- "one"
queue <- "two"
close(queue)
for elem := range queue {
fmt.Println(elem)
}
}
one
two
定时器
如果你需要的仅仅是单纯的等待,使用 time.Sleep
就够了。 使用定时器的原因之一就是,你可以在定时器触发之前将其取消。比普通的time.sleep
多了个取消
package main
import (
"fmt"
"time"
)
func main() {
timer1 := time.NewTimer(2 * time.Second)
<-timer1.C//时间没到,没收到消息就一直阻塞着
fmt.Println("Timer 1 fired")
timer2 := time.NewTimer(time.Second)
go func() {
<-timer2.C//本来也要阻塞等1秒,可是主协程停止了,它也就终止了
fmt.Println("Timer 2 fired")
}()
stop2 := timer2.Stop()
if stop2 {
fmt.Println("Timer 2 stopped")
}
time.Sleep(2 * time.Second)
}
Timer 1 fired
Timer 2 stopped
//
// @Title countTime
// @Description 记录函数运行时间
// @Author lido 2022-02-26 15:49:27
// @Param function
//
func countTime2(function func()){
startTime := time.Now()
function()
elapsedTime := time.Since(startTime) / time.Millisecond // duration in ms
fmt.Printf("Segment finished in %dms", elapsedTime) //Segment finished in xxms
}
func function2(){
timer1 := time.NewTimer(2 * time.Second)
<-timer1.C
fmt.Println("Timer 1 fired")
timer2 := time.NewTimer(time.Second)
go func() {
<-timer2.C//这次有了充足的时间,就会执行
fmt.Println("Timer 2 fired")
}()
time.Sleep(2 * time.Second)
}
func main() {
countTime2(function2)
}
Timer 1 fired
Timer 2 fired
Segment finished in 4008ms
打点器
package main
import (
"fmt"
"time"
)
func main() {
ticker := time.NewTicker(500 * time.Millisecond)
done := make(chan bool)
go func() {
//死循环
for {
//并发接收
select {
case <-done:
return
case t := <-ticker.C:
fmt.Println("Tick at", t)
}
}
}()
//主协程停1600ms,也就是运行3个<-ticker.C:
time.Sleep(1600 * time.Millisecond)
//结束打点
ticker.Stop()
//结束协程
done <- true
fmt.Println("Ticker stopped")
}
Tick at 2022-02-26 17:16:17.7310457 +0800 CST m=+0.503368501
Tick at 2022-02-26 17:16:18.2306314 +0800 CST m=+1.002954201
Tick at 2022-02-26 17:16:18.7321656 +0800 CST m=+1.504488401
Ticker stopped
工作池
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
//开启三个协程完成工作
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
//输入
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
//输出
for a := 1; a <= numJobs; a++ {
<-results
}
}
其实就是开了3个协程完成了5个任务,中间协程回停留1秒
worker 1 started job 1
worker 3 started job 3
worker 2 started job 2
worker 3 finished job 3
worker 3 started job 4
worker 2 finished job 2
worker 2 started job 5
worker 1 finished job 1
worker 2 finished job 5
worker 3 finished job 4
WaitGroup
package main
import (
"fmt"
"sync"
"time"
)
func worker2(id int) {
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
//WaitGroup 用于等待这里启动的所有协程完成
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
//增加计数器
wg.Add(1)
//闭包协程,注意这里的i是引用的
//i在这里是引用着外面for循环的i,是不确定的
go func() {
//释放计数器
defer wg.Done()
worker2(i)
}()
}
//等待
wg.Wait()
}
可以看到闭包协程引用了不确定的i,会出现6,因为for结束了i并没有马上回收,i++后就变成6了
Worker 6 starting
Worker 6 starting
Worker 4 starting
Worker 6 starting
Worker 6 starting
Worker 4 done
Worker 6 done
Worker 6 done
Worker 6 done
Worker 6 done
正确的引用
package main
import (
"fmt"
"sync"
"time"
)
func worker2(id int) {
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
//深度的拷贝
go func(i int) {
defer wg.Done()
worker2(i)
}(i)
}
wg.Wait()
}
Worker 4 starting
Worker 5 starting
Worker 3 starting
Worker 2 starting
Worker 1 starting
Worker 5 done
Worker 1 done
Worker 4 done
Worker 2 done
Worker 3 done
速度限制
package main
import (
"fmt"
"time"
)
func main() {
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
//任务速率限制的调度器。
limiter := time.Tick(1000 * time.Millisecond)
//循环读取任务
for req := range requests {
<-limiter//固定速率
fmt.Println("1 request", req, time.Now())
}
//定义爆发速率
burstyLimiter := make(chan time.Time, 3)
for i := 0; i < 3; i++ {
//基本是同时写入,填充通道,表示允许的爆发(bursts)。
burstyLimiter <- time.Now()
}
go func() {
//第3个后的限制。
for t := range time.Tick(1000 * time.Millisecond) {
burstyLimiter <- t
}
}()
burstyRequests := make(chan int, 7)
for i := 1; i <= 7; i++ {
burstyRequests <- i
}
close(burstyRequests)
//前3个是爆发,后面又200ms限速
for req := range burstyRequests {
<-burstyLimiter
fmt.Println("2 request", req, time.Now())
}
}
1 request 1 2022-02-27 16:33:11.495175 +0800 CST m=+1.004471501
1 request 2 2022-02-27 16:33:12.4950694 +0800 CST m=+2.004365901
1 request 3 2022-02-27 16:33:13.4952412 +0800 CST m=+3.004537701
1 request 4 2022-02-27 16:33:14.4964354 +0800 CST m=+4.005731901
1 request 5 2022-02-27 16:33:15.4950524 +0800 CST m=+5.004348901
2 request 1 2022-02-27 16:33:15.4951094 +0800 CST m=+5.004405901
2 request 2 2022-02-27 16:33:15.4951094 +0800 CST m=+5.004405901
2 request 3 2022-02-27 16:33:15.4951094 +0800 CST m=+5.004405901
2 request 4 2022-02-27 16:33:16.4955178 +0800 CST m=+6.004814301
2 request 5 2022-02-27 16:33:17.4954599 +0800 CST m=+7.004756401
2 request 6 2022-02-27 16:33:18.495542 +0800 CST m=+8.004838501
2 request 7 2022-02-27 16:33:19.4966771 +0800 CST m=+9.005973601
原子计数器
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var ops uint64
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
for c := 0; c < 1000; c++ {
//使用原子操作
atomic.AddUint64(&ops, 1)
}
wg.Done()
}()
}
wg.Wait()
fmt.Println("ops:", ops)
}
ops: 50000
不使用原子计数器,会出现资源协程间竞争
package main
import (
"fmt"
"sync"
)
func main() {
var ops uint64
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
for c := 0; c < 1000; c++ {
ops++ //不使用原子操作
}
wg.Done()
}()
}
wg.Wait()
fmt.Println("ops:", ops)
}
ops: 49227
互斥锁
func main() {
var a []int
var mu sync.Mutex//定义互斥锁
for i := 0; i < 100; i++ {
go func(i int) {
//相当于一个原子操作
mu.Lock()//上锁
a = append(a, i)
mu.Unlock()//解锁
}(i)
}
time.Sleep(2 * time.Second)
fmt.Println(a)
}
[1 0 2 9 7 8 10 4 5 3 11 6 12 14 13 16 15 23 20 21 22 19 25 24 17 26 18 27 28 29 32 30 31 34 35 36 40 33 37 39 38 42 43 41 44 51 45 49 50 55 52 53 48 54 46 47 57 56 58 59 60 64 61 62 63 68 72 70 71 74 69 75 73 65 66 67 76 79 77 78 85 80 81 82 83 84 86 88 87 90 89 91 92 93 96 94 95 97 98 99]
优雅的使用
package main
import (
"fmt"
"sync"
)
//将互斥锁申请到结构体中
type Container struct {
mu sync.Mutex
counters map[string]int
}
//通过方法的性质来巧妙的调用
func (c *Container) inc(name string) {
//上锁和解锁
c.mu.Lock()
defer c.mu.Unlock()
c.counters[name]++
}
func main() {
c := Container{
counters: map[string]int{"a": 0, "b": 0},
}
var wg sync.WaitGroup
doIncrement := func(name string, n int) {
for i := 0; i < n; i++ {
c.inc(name)
}
wg.Done()
}
wg.Add(3)
go doIncrement("a", 10000)
go doIncrement("a", 10000)
go doIncrement("b", 10000)
wg.Wait()
fmt.Println(c.counters)
}
map[a:20000 b:10000]
要是不使用锁呢
package main
import (
"fmt"
"sync"
)
type Container struct {
mu sync.Mutex
counters map[string]int
}
func (c *Container) inc(name string) {
c.counters[name]++ //不使用锁
}
func main() {
c := Container{
counters: map[string]int{"a": 0, "b": 0},
}
var wg sync.WaitGroup
doIncrement := func(name string, n int) {
for i := 0; i < n; i++ {
c.inc(name)
}
wg.Done()
}
wg.Add(3)
go doIncrement("a", 10000)
go doIncrement("a", 10000)
go doIncrement("b", 10000)
wg.Wait()
fmt.Println(c.counters)
}
goroutine 7 [running]:
goroutine running on other thread; stack unavailable
......
状态协程
使用内建协程和通道的同步特性。 Go 共享内存的思想是,通过通信使每个数据仅被单个协程所拥有,即通过通信实现共享内存。 基于通道的方法与该思想完全一致!
package main
import (
"fmt"
"math/rand"
"sync/atomic"
"time"
)
type readOp struct {
key int
resp chan int
}
type writeOp struct {
key int
val int
resp chan bool
}
func main() {
//原子计数
var readOps uint64
var writeOps uint64
reads := make(chan readOp)
writes := make(chan writeOp)
//开启协程
go func() {
var state = make(map[int]int)
//不停的并发
for {
select {
case read := <-reads:
read.resp <- state[read.key]
case write := <-writes:
state[write.key] = write.val
write.resp <- true
}
}
}()
//100个协程
for r := 0; r < 100; r++ {
go func() {
for {
read := readOp{
key: rand.Intn(5),
resp: make(chan int),
}
//发送构建好的结构体到,state所在的协程中
reads <- read
//获取state所在的协程的返回值
<-read.resp
//原子计数
atomic.AddUint64(&readOps, 1)
}
}()
}
for w := 0; w < 10; w++ {
go func() {
for {
write := writeOp{
key: rand.Intn(5),
val: rand.Intn(100),
resp: make(chan bool),
}
//将写入构建好发给state所在协程中
writes <- write
//获取write的返回值
<-write.resp
//原子计数
atomic.AddUint64(&writeOps, 1)
}
}()
}
time.Sleep(time.Second)
readOpsFinal := atomic.LoadUint64(&readOps)
fmt.Println("readOps:", readOpsFinal)
writeOpsFinal := atomic.LoadUint64(&writeOps)
fmt.Println("writeOps:", writeOpsFinal)
}
readOps: 797809
writeOps: 802426