go协程专项练习

 

一些关于协程(Goroutine)和通道(channel)的练习题

一起学习:smiley:

协程

package main

import (
    "fmt"
    "time"
)

func f(from string) {
    for i := 0; i < 3; i++ {
        fmt.Println(from, ":", i)
    }
}

func main() {

    f("direct")//阻塞式

    go f("goroutine")//协程交替输出

    go func(msg string) {//协程交替输出
        fmt.Println(msg)
    }("going")

    time.Sleep(time.Second)
    fmt.Println("done")
}
direct : 0
direct : 1
direct : 2
going
goroutine : 0
goroutine : 1
goroutine : 2
done

通道

package main

import "fmt"

func main() {
	//双向的通道,无容量
    messages := make(chan string)

    //messages通道接收消息
    //容量为0就不能接收多个值,不然会死锁
    go func() { messages <- "ping" }()

    //mgs变量读取消息
    msg := <-messages
    
    //打印读取到的消息
    fmt.Println(msg)
}
ping

通道缓冲

package main

import "fmt"

func main() {
	//定义双通道,且容量为2
    messages := make(chan string, 2)

    //接收两个值
    messages <- "buffered"
    messages <- "channel"

    //输出接收到的值
    fmt.Println(<-messages)
    fmt.Println(<-messages)
}
buffered
channel

通道同步

package main

import (
	"fmt"
	"time"
)

func worker(done chan bool) {
	fmt.Print("working...")
	time.Sleep(time.Second)
	fmt.Println("done")

	done <- true
}

func main() {

	done := make(chan bool, 1)
	go worker(done)

	//程序将一直阻塞,直至收到 worker 使用通道发送的通知。
	<-done//阻塞了主协程
}

如果你把 <- done 这行代码从程序中移除, 程序甚至可能在 worker 开始运行前就结束了。

working...done

通道方向

package main

import "fmt"

// chan<-,我<-吃的,吃饭,限定接收通道,也就是只能获拿
func ping(pings chan<- string, msg string) {
    pings <- msg
}
// <-chan,吃的<-我,做饭,限定是发送通道,也就是只能给
func pong(pings <-chan string, pongs chan<- string) {
    msg := <-pings
    pongs <- msg
    //pongs <-(<-pings)
}

func main() {
    pings := make(chan string, 1)
    pongs := make(chan string, 1)
    //pings <- "passed message"
    ping(pings, "passed message")
    //pongs <-(<-pings)
    pong(pings, pongs)
    fmt.Println(<-pongs)
}

不必死记形式,理解箭头的流向,就可以判断是发送还是接收

passed message

通道选择器

Go 的 选择器(select) 让你可以同时等待多个通道操作。 将协程、通道和选择器结合,是 Go 的一个强大特性。

package main

import (
	"fmt"
	"time"
)

func main() {

	c1 := make(chan string)
	c2 := make(chan string)
	c3 := make(chan string)

	go func() {
		time.Sleep(1 * time.Second)
		c1 <- "one"
	}()
	go func() {
		time.Sleep(2 * time.Second)
		c2 <- "two"
	}()
	go func() {
		time.Sleep(3 * time.Second)
		c2 <- "three"
	}()

	for i := 0; i < 3; i++ {
        //高级的switch,一次选择一个
		select {
		case msg1 := <-c1:
			fmt.Println("received", msg1)
		case msg2 := <-c2:
			fmt.Println("received", msg2)
		case msg3 := <-c3:
			fmt.Println("received", msg3)
		}
	}
}

程序总共用了3秒完成,因为sleep那三个协程是并发执行的

received one
received two
received three

超时处理

func function(){
	c1 := make(chan string, 1)
	go func() {
		time.Sleep(2 * time.Second)
		c1 <- "result 1"
	}()

	select {
	case res := <-c1:
		fmt.Println(res)
	//超时机制,运行时间大于1秒就自动执行
	case <-time.After(1 * time.Second):
		fmt.Println("timeout 1")
	}

	c2 := make(chan string, 1)
	go func() {
		time.Sleep(2 * time.Second)
		c2 <- "result 2"
	}()
	select {
	case res := <-c2:
		fmt.Println(res)
    //超时机制,运行时间大于3秒就自动执行
	case <-time.After(3 * time.Second):
		fmt.Println("timeout 2")
	}
}

//
// @Title countTime 
// @Description 记录函数运行时间
// @Author lido 2022-02-26 15:49:27
// @Param function 
//
func countTime(function func()){
	startTime := time.Now()
	
	function()

	elapsedTime := time.Since(startTime) / time.Millisecond  // duration in ms
	fmt.Printf("Segment finished in %dms", elapsedTime) //Segment finished in xxms
}

func main() {
	countTime(function)
}

通过最后函数运行时间可以判断是并发执行的,不然同时等待连个2秒的sleep最少需要4秒钟

timeout 1
result 2
Segment finished in 3001ms

非阻塞通道操作

常规的通过通道发送和接收数据是阻塞的。 然而,我们可以使用带一个 default 子句的 select 来实现 非阻塞 的发送、接收,甚至是非阻塞的多路 select

package main

import "fmt"

func main() {
    messages := make(chan string)
    signals := make(chan bool)

    select {
    case msg := <-messages:
        fmt.Println("received message", msg)
    default:
        fmt.Println("no message received")
    }

    msg := "hi"
    select {
    case messages <- msg:
        fmt.Println("sent message", msg)
    default:
        fmt.Println("no message sent")
    }

    select {
    case msg := <-messages:
        fmt.Println("received message", msg)
    case sig := <-signals:
        fmt.Println("received signal", sig)
    default:
        fmt.Println("no activity")
    }
}
no message received
sent message #错了,因该是no message sent
no activity

这里,第二个输出是no message sent,因为messages通道没有容量,且有接收者,所以是不能获取值的

但是但是!下面这个程序就可以赋值

func main() {
	//双向的通道,无容量
	messages := make(chan string)

	//messages通道接收消息
	//go func() { messages <- "ping" }()

	time.Sleep(time.Second)
}

这里的原因其实通过报的错误就可以看的出来,再看看下面这个程序

func in(msg chan string){
	msg <- "ping"
}

func main() {
	//双向的通道,无容量
	messages := make(chan string)

    //在主协程执行函数
	in(messages)

	time.Sleep(time.Second)
}
fatal error: all goroutines are asleep - deadlock!

会报错,所有协程死锁!

那怎样才能不死锁嘞?其实就是开另外一个协程就不会死锁了

func in(msg chan string){
	msg <- "ping"
}

func main() {
	//双向的通道,无容量
	messages := make(chan string)

    //再另外一个协程执行这个函数,就不会导致所有的线程死锁了
	go in(messages)

	time.Sleep(time.Second)
}

通道的关闭

close() 关闭 一个通道意味着不能再向这个通道发送值了。 该特性可以向通道的接收方传达工作已经完成的信息。

package main

import "fmt"

func main() {
    jobs := make(chan int, 5)
    done := make(chan bool)

    go func() {
        for {
            j, more := <-jobs
            if more {
                fmt.Println("received job", j)
            } else {
                fmt.Println("received all jobs")
                done <- true
                return
            }
        }
    }()

    for j := 1; j <= 3; j++ {
        jobs <- j
        fmt.Println("sent job", j)
    }
    close(jobs)
    fmt.Println("sent all jobs")

    <-done
}
received job 1
sent job 1
sent job 2
sent job 3
sent all jobs
received job 2
received job 3
received all jobs

通道遍历

package main

import "fmt"

func main() {

    queue := make(chan string, 2)
    queue <- "one"
    queue <- "two"
    close(queue)

    for elem := range queue {
        fmt.Println(elem)
    }
}
one
two

定时器

如果你需要的仅仅是单纯的等待,使用 time.Sleep 就够了。 使用定时器的原因之一就是,你可以在定时器触发之前将其取消。比普通的time.sleep多了个取消

package main

import (
    "fmt"
    "time"
)

func main() {

    timer1 := time.NewTimer(2 * time.Second)

    <-timer1.C//时间没到,没收到消息就一直阻塞着
    fmt.Println("Timer 1 fired")

    timer2 := time.NewTimer(time.Second)
    go func() {
        <-timer2.C//本来也要阻塞等1秒,可是主协程停止了,它也就终止了
        fmt.Println("Timer 2 fired")
    }()
    stop2 := timer2.Stop()
    if stop2 {
        fmt.Println("Timer 2 stopped")
    }

    time.Sleep(2 * time.Second)
}
Timer 1 fired
Timer 2 stopped
//
// @Title countTime
// @Description 记录函数运行时间
// @Author lido 2022-02-26 15:49:27
// @Param function
//
func countTime2(function func()){
	startTime := time.Now()

	function()

	elapsedTime := time.Since(startTime) / time.Millisecond  // duration in ms
	fmt.Printf("Segment finished in %dms", elapsedTime) //Segment finished in xxms
}

func function2(){
	timer1 := time.NewTimer(2 * time.Second)

	<-timer1.C
	fmt.Println("Timer 1 fired")

	timer2 := time.NewTimer(time.Second)
	go func() {
		<-timer2.C//这次有了充足的时间,就会执行
		fmt.Println("Timer 2 fired")
	}()

	time.Sleep(2 * time.Second)
}

func main() {
	countTime2(function2)
}
Timer 1 fired
Timer 2 fired
Segment finished in 4008ms

打点器

package main

import (
    "fmt"
    "time"
)

func main() {

    ticker := time.NewTicker(500 * time.Millisecond)
    done := make(chan bool)

    go func() {
        //死循环
        for {
            //并发接收
            select {
            case <-done:
                return
            case t := <-ticker.C:
                fmt.Println("Tick at", t)
            }
        }
    }()

    //主协程停1600ms,也就是运行3个<-ticker.C:
    time.Sleep(1600 * time.Millisecond)
    //结束打点
    ticker.Stop()
    //结束协程
    done <- true
    fmt.Println("Ticker stopped")
}
Tick at 2022-02-26 17:16:17.7310457 +0800 CST m=+0.503368501
Tick at 2022-02-26 17:16:18.2306314 +0800 CST m=+1.002954201
Tick at 2022-02-26 17:16:18.7321656 +0800 CST m=+1.504488401
Ticker stopped

工作池

package main

import (
    "fmt"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Println("worker", id, "started  job", j)
        time.Sleep(time.Second)
        fmt.Println("worker", id, "finished job", j)
        results <- j * 2
    }
}

func main() {

    const numJobs = 5
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    //开启三个协程完成工作
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    //输入
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)

    //输出
    for a := 1; a <= numJobs; a++ {
        <-results
    }
}

其实就是开了3个协程完成了5个任务,中间协程回停留1秒

worker 1 started  job 1
worker 3 started  job 3
worker 2 started  job 2
worker 3 finished job 3
worker 3 started  job 4
worker 2 finished job 2
worker 2 started  job 5
worker 1 finished job 1
worker 2 finished job 5
worker 3 finished job 4

WaitGroup

package main

import (
   "fmt"
   "sync"
   "time"
)

func worker2(id int) {
   fmt.Printf("Worker %d starting\n", id)

   time.Sleep(time.Second)
   fmt.Printf("Worker %d done\n", id)
}

func main() {

   //WaitGroup 用于等待这里启动的所有协程完成
   var wg sync.WaitGroup

   for i := 1; i <= 5; i++ {
      //增加计数器
      wg.Add(1)

      //闭包协程,注意这里的i是引用的
      //i在这里是引用着外面for循环的i,是不确定的
      go func() {
         //释放计数器
         defer wg.Done()
         worker2(i)
      }()
   }

   //等待
   wg.Wait()

}

可以看到闭包协程引用了不确定的i,会出现6,因为for结束了i并没有马上回收,i++后就变成6了

Worker 6 starting
Worker 6 starting
Worker 4 starting
Worker 6 starting
Worker 6 starting
Worker 4 done
Worker 6 done
Worker 6 done
Worker 6 done
Worker 6 done

正确的引用

package main

import (
	"fmt"
	"sync"
	"time"
)

func worker2(id int) {
	fmt.Printf("Worker %d starting\n", id)

	time.Sleep(time.Second)
	fmt.Printf("Worker %d done\n", id)
}

func main() {

	var wg sync.WaitGroup

	for i := 1; i <= 5; i++ {
		wg.Add(1)

        //深度的拷贝
		go func(i int) {
			defer wg.Done()
			worker2(i)
		}(i)
	}

	wg.Wait()

}
Worker 4 starting
Worker 5 starting
Worker 3 starting
Worker 2 starting
Worker 1 starting
Worker 5 done
Worker 1 done
Worker 4 done
Worker 2 done
Worker 3 done

速度限制

package main

import (
	"fmt"
	"time"
)

func main() {

	requests := make(chan int, 5)
	for i := 1; i <= 5; i++ {
		requests <- i
	}
	close(requests)

	//任务速率限制的调度器。
	limiter := time.Tick(1000 * time.Millisecond)

	//循环读取任务
	for req := range requests {
		<-limiter//固定速率
		fmt.Println("1 request", req, time.Now())
	}

	//定义爆发速率
	burstyLimiter := make(chan time.Time, 3)

	for i := 0; i < 3; i++ {
		//基本是同时写入,填充通道,表示允许的爆发(bursts)。
		burstyLimiter <- time.Now()
	}

	go func() {
		//第3个后的限制。
		for t := range time.Tick(1000 * time.Millisecond) {
			burstyLimiter <- t
		}
	}()

	burstyRequests := make(chan int, 7)
	for i := 1; i <= 7; i++ {
		burstyRequests <- i
	}
	close(burstyRequests)

	//前3个是爆发,后面又200ms限速
	for req := range burstyRequests {
		<-burstyLimiter
		fmt.Println("2 request", req, time.Now())
	}
}
1 request 1 2022-02-27 16:33:11.495175 +0800 CST m=+1.004471501
1 request 2 2022-02-27 16:33:12.4950694 +0800 CST m=+2.004365901
1 request 3 2022-02-27 16:33:13.4952412 +0800 CST m=+3.004537701
1 request 4 2022-02-27 16:33:14.4964354 +0800 CST m=+4.005731901
1 request 5 2022-02-27 16:33:15.4950524 +0800 CST m=+5.004348901
2 request 1 2022-02-27 16:33:15.4951094 +0800 CST m=+5.004405901
2 request 2 2022-02-27 16:33:15.4951094 +0800 CST m=+5.004405901
2 request 3 2022-02-27 16:33:15.4951094 +0800 CST m=+5.004405901
2 request 4 2022-02-27 16:33:16.4955178 +0800 CST m=+6.004814301
2 request 5 2022-02-27 16:33:17.4954599 +0800 CST m=+7.004756401
2 request 6 2022-02-27 16:33:18.495542 +0800 CST m=+8.004838501
2 request 7 2022-02-27 16:33:19.4966771 +0800 CST m=+9.005973601

原子计数器

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {

    var ops uint64

    var wg sync.WaitGroup

    for i := 0; i < 50; i++ {
        wg.Add(1)

        go func() {
            for c := 0; c < 1000; c++ {
                //使用原子操作
                atomic.AddUint64(&ops, 1)
            }
            wg.Done()
        }()
    }

    wg.Wait()

    fmt.Println("ops:", ops)
}
ops: 50000

不使用原子计数器,会出现资源协程间竞争

package main

import (
	"fmt"
	"sync"
)

func main() {

	var ops uint64

	var wg sync.WaitGroup

	for i := 0; i < 50; i++ {
		wg.Add(1)

		go func() {
			for c := 0; c < 1000; c++ {
				ops++ //不使用原子操作
			}
			wg.Done()
		}()
	}

	wg.Wait()

	fmt.Println("ops:", ops)
}
ops: 49227

互斥锁

func main() {
    var a []int
    var mu sync.Mutex//定义互斥锁
    for i := 0; i < 100; i++ {
        go func(i int) {
            //相当于一个原子操作
            mu.Lock()//上锁
            a = append(a, i)
            mu.Unlock()//解锁
        }(i)
    }
    time.Sleep(2 * time.Second)
    fmt.Println(a)
}
[1 0 2 9 7 8 10 4 5 3 11 6 12 14 13 16 15 23 20 21 22 19 25 24 17 26 18 27 28 29 32 30 31 34 35 36 40 33 37 39 38 42 43 41 44 51 45 49 50 55 52 53 48 54 46 47 57 56 58 59 60 64 61 62 63 68 72 70 71 74 69 75 73 65 66 67 76 79 77 78 85 80 81 82 83 84 86 88 87 90 89 91 92 93 96 94 95 97 98 99]

优雅的使用

package main

import (
	"fmt"
	"sync"
)

//将互斥锁申请到结构体中
type Container struct {
	mu       sync.Mutex
	counters map[string]int
}

//通过方法的性质来巧妙的调用
func (c *Container) inc(name string) {

    //上锁和解锁
	c.mu.Lock()
	defer c.mu.Unlock()
	c.counters[name]++
}

func main() {
	c := Container{
		counters: map[string]int{"a": 0, "b": 0},
	}

	var wg sync.WaitGroup

	doIncrement := func(name string, n int) {
		for i := 0; i < n; i++ {
			c.inc(name)
		}
		wg.Done()
	}

	wg.Add(3)
	go doIncrement("a", 10000)
	go doIncrement("a", 10000)
	go doIncrement("b", 10000)

	wg.Wait()
	fmt.Println(c.counters)
}
map[a:20000 b:10000]

要是不使用锁呢

package main

import (
	"fmt"
	"sync"
)

type Container struct {
	mu       sync.Mutex
	counters map[string]int
}

func (c *Container) inc(name string) {
	c.counters[name]++ //不使用锁
}

func main() {
	c := Container{
		counters: map[string]int{"a": 0, "b": 0},
	}

	var wg sync.WaitGroup

	doIncrement := func(name string, n int) {
		for i := 0; i < n; i++ {
			c.inc(name)
		}
		wg.Done()
	}

	wg.Add(3)
	go doIncrement("a", 10000)
	go doIncrement("a", 10000)
	go doIncrement("b", 10000)

	wg.Wait()
	fmt.Println(c.counters)
}
goroutine 7 [running]:
	goroutine running on other thread; stack unavailable
......

状态协程

使用内建协程和通道的同步特性。 Go 共享内存的思想是,通过通信使每个数据仅被单个协程所拥有,即通过通信实现共享内存。 基于通道的方法与该思想完全一致!

package main

import (
	"fmt"
	"math/rand"
	"sync/atomic"
	"time"
)

type readOp struct {
	key  int
	resp chan int
}
type writeOp struct {
	key  int
	val  int
	resp chan bool
}

func main() {

	//原子计数
	var readOps uint64
	var writeOps uint64

	reads := make(chan readOp)
	writes := make(chan writeOp)

	//开启协程
	go func() {
		var state = make(map[int]int)
		//不停的并发
		for {
			select {
			case read := <-reads:
				read.resp <- state[read.key]
			case write := <-writes:
				state[write.key] = write.val
				write.resp <- true
			}
		}
	}()

	//100个协程
	for r := 0; r < 100; r++ {
		go func() {
			for {
				read := readOp{
					key:  rand.Intn(5),
					resp: make(chan int),
				}
				//发送构建好的结构体到,state所在的协程中
				reads <- read
				//获取state所在的协程的返回值
				<-read.resp
				//原子计数
				atomic.AddUint64(&readOps, 1)
			}
		}()
	}

	for w := 0; w < 10; w++ {
		go func() {
			for {
				write := writeOp{
					key:  rand.Intn(5),
					val:  rand.Intn(100),
					resp: make(chan bool),
				}
				//将写入构建好发给state所在协程中
				writes <- write
				//获取write的返回值
				<-write.resp
				//原子计数
				atomic.AddUint64(&writeOps, 1)
			}
		}()
	}

	time.Sleep(time.Second)

	readOpsFinal := atomic.LoadUint64(&readOps)
	fmt.Println("readOps:", readOpsFinal)
	writeOpsFinal := atomic.LoadUint64(&writeOps)
	fmt.Println("writeOps:", writeOpsFinal)
}

readOps: 797809
writeOps: 802426

参考资料

Go by Example 中文版