Go学习笔记(四):并发
2020-04-15 18:42:50协程机制(Groutine)
Thead VS Groutine
- 创建时默认的 stack 的⼤⼩
- JDK5 以后 Java Thread stack 默认为1M
- Groutine 的 Stack 初始化⼤⼩为2K
- 和 KSE (Kernel Space Entity) 的对应关系
- Java Thread 是 1:1
- Groutine 是 M:N
MPG模型
- M 代表着一个内核线程,也可以称为一个工作线程。goroutine就是跑在M之上的
- P代表着(Processor)处理器 它的主要用途就是用来执行goroutine的,一个P代表执行一个Go代码片段的基础(可以理解为上下文环境),所以它也维护了一个可运行的goroutine队列,和自由的goroutine队列,里面存储了所有需要它来执行的goroutine
- G 代表着goroutine 实际的数据结构(就是你封装的那个方法),并维护者goroutine 需要的栈、程序计数器以及它所在的M等信息
我们在看上面这个图,图中P正在执行的Goroutine为蓝色的,处于待执行状态的Goroutine为灰色的,灰色的Goroutine形成了一个队列runqueues 。 我们再看一下三者的宏观图:
由上图可以看出Groutine与KSE是M:N的多对多关系。在这里,当一个P关联多个G时,就会处理G的执行顺序,就是并发,当一个P在执行一个协程工作时,其他的会在等待,当正在执行的协程遇到阻塞情况,例如IO操作等,go的处理器就会去执行其他的协程,因为对于类似IO的操作,处理器不知道你需要多久才能执行结束,所以他不回去等你执行完。
正是因为是非抢占式的,所以才轻松的构造上万的协程,如果是抢占式,那么就会在切换任务时,保存当前的上下文环境,因为当前线程如果正在做一件事,做到一半,我们就强制停止,这时我们就必须多保存很多信息,避免再次切换回来时任务出错。
线程是操作系统层面的多任务,而go的协程属于编译器层面的多任务,go有自己的调度器来调度。一个协程在哪个线程上是不确定的,这个是由调度器来决定的,多个协程可能在一个或多个线程上运行。
编写一个Groutine
func TestGroutine(t *testing.T) {
for i := 0; i < 10; i++ {
go func(i int) {
fmt.Println(i)
}(i)
}
time.Sleep(time.Millisecond * 50)
}
上面程序中通过 go 语句很清楚的创建一个groutine,因为协程的执行是无序的,所以每次调用上面程序,print的结果都不一样。初学者经常会犯如下的错误,
func TestGroutine(t *testing.T) {
for i := 0; i < 10; i++ {
go func() {
fmt.Println(i) //错误的写法
}()
}
time.Sleep(time.Millisecond * 50)
}
//打印结果 10,10,10,10,10,10,10,10,10,10
在go 语句里面因为可以直接访问变量i,初学者可能就直接使用,这时打印的结果可能就不是预期的结果了。造成这样的原因是因为协程中 i 是一个共享变量,第一个程序不会有问题是因为go里面的参数传递的都是值类型,每一个协程中都有一份值拷贝。既然出现了共享变量的问题,接下来我们讨论下如何处理?
共享内存并发机制
要解决共享内存问题,就需要用到锁了,看下面的实例:
func TestCounter(t *testing.T) {
counter := 0
for i := 0; i < 5000; i++ {
go func() {
counter++
}()
}
time.Sleep(1 * time.Second)
t.Logf("counter = %d", counter) //小于5000,如4759
}
counter是一个共享内存,该程序最终打印的值可能是一个小于5000的值。我们加锁后的代码如下:
func TestCounterThreadSafe(t *testing.T) {
var mut sync.Mutex //锁对象
counter := 0
for i := 0; i < 5000; i++ {
go func() {
defer func() {
mut.Unlock() //解锁
}()
mut.Lock() //加锁
counter++
}()
}
time.Sleep(1 * time.Second)
t.Logf("counter = %d", counter) // 5000
}
这时就能正常打印出5000的结果了,细心的同学可能看到例子中我多加了一句 sleep的等待,如果去掉这句代码时,发现最终的打印结果依然可能小于5000。发生这种原因是因为go语句中的5000次协程调用可能还没执行完,主程序的打印就结束了,虽然加了锁后最终counter的值是5000,但主程序却提前打印了。要处理这个问题,我们还需要多一个程序等待,如下代码:
func TestCounterWaitGroup(t *testing.T) {
var mut sync.Mutex
var wg sync.WaitGroup
counter := 0
for i := 0; i < 5000; i++ {
wg.Add(1)
go func() {
defer func() {
mut.Unlock()
}()
mut.Lock()
counter++
wg.Done()
}()
}
wg.Wait()
t.Logf("counter = %d", counter)
}
到这里一个简单的共享内存并发控制机制调用就完成了,接下来我们学习更高效的CSP并发机制。
CSP 并发机制
CSP是Communicating Sequential Process的缩写, CSP的设计理念,是依赖一个信息通道来完成两个通信实体之间的通信协调。 一些编程语言中使用的一种典型并发机制是Actor Model:
在Actor Model中,会维护一个容量无限的MailBox(Message Queue),消息发送实体A会将Message添加到MailBox中,其EventLoop会循环检查MailBox中的信息,当监听到新消息的时候发送给接受实体B的MailBox中,接受实体B在其EventLoop的驱动下被动的处理消息。
CSP与Actor Model的不同点:
- CSP模式中,消息是通过Channel来通讯的,Channel相当于一个消息通讯的中间人,这样可以让两个通讯实体的耦合更松一些
- Golang的Channel容量是可以设置的,并独立于协程的
- Actor Model中,接受进程是被动的处理消息,Golang中的协程可以主动的获取Channel中的信息,并进行相关的处理
阻塞式通讯Channel
通信双方必须同时在Channel上,才能够完成本次交互,要是任意一方不在Channel上,那么一方就会被阻塞的等待,直到另一方完成本次交互。
对于消息发送者A,要是消息接收者B不在Channel上,那么发送者A会被一直阻塞在那里,直到接收者B出现,才能够完成本次交互,发送者A才能够继续执行后续的代码;对于消息接收者B,要是发送者A不在Channel上,那么接收者B会被一直阻塞在那里等待,直到发送者A出现,完成本次交互,接收者B才能够继续执行下面的任务。
Buffer Channel
Buffer Channel使得消息发送者和接收者之间有一种更为松的耦合性,Channel可以设置一个消息容量。
在Channel容量没有满的情况下,消息发送者A是可以一直往Channel中非阻塞的放入消息的,而不需要等待接收者B必须同时出现在Channel上。一旦Channel的容量满了,那么消息发送者A就必须阻塞的等待,直到接收者B取出消息,将Channel空出来,发送者A才可以继续非阻塞的放入消息。
同样的,对于接收者B,只要Channel中有消息,接收者B就可以非阻塞的取消息,不需要阻塞的等待消息发送者A同时出现在Channel上。但要是Channel上是空的,那接收者B就必须阻塞的等待发送者A放入消息,接收者B才能够取到消息,并继续执行下面的任务。
下面我们看来一下Channel应用的实例:
import (
"fmt"
"testing"
"time"
)
func service() string {
time.Sleep(time.Millisecond * 50)
return "Done"
}
func otherTask() {
fmt.Println("working on something else")
time.Sleep(time.Millisecond * 100)
fmt.Println("Task is done.")
}
func AsyncService() chan string {
retCh := make(chan string) //创建的是一个阻塞式通讯Channel
//retCh := make(chan string, 1) //创建的是一个容量为1的Buffer Channel
go func() {
ret := service()
fmt.Println("returned result.")
retCh <- ret //获取service的结果放入Channel中
fmt.Println("service exited.")
}()
return retCh //将用来返回结果的Channel返回
}
func TestAsynService(t *testing.T) {
retCh := AsyncService() //获取service结果的Channel
otherTask() //运行其他Task
fmt.Println(<-retCh) //从Channel中等待获取service的结果
time.Sleep(time.Second * 1)
}
面的代码我们可以看到,AsyncService启动了一个协程异步执行Service的任务,并返回了一个Channel,在主协程中执行OtherTask的时候,并发的执行Service的任务,经过协程并发处理的两个Task执行的总时间缩短了。 但是需要注意的是,AsyncService中的子协程执行完任务之后(打印了returned result), 之后便可以立即将结果放到Channel中,但由于主协程(接收者)还在阻塞的执行其他任务,不在Channel上,所以子协程(发送者)会被一直阻塞的等待在向Channel中放入消息的地方,直到主协程(接收者)通过<-retCh从Channel中获取到消息的时候(打印 Done. )之后,子协程才被释放继续执行下面的代码(打印 service exited)
执行结果:
多路选择和超时控制
基于select的多路复用:
- 解决如果一个channel中没有事件发过来,程序会立即阻塞,无法接收到第二个channel中的事件
- 一般每一个case都代表一个通信操作,多个case会选一个能执行的
- default会默认执行,因此可以作为轮询channel来用
- 无default时,可以其做超时控制
//多渠道的选择
select {
case ret := <-retCh1:
t.Logf("result %s", ret)
case ret :=<-retCh2:
t.Logf("result %s", ret)
default:
t.Error(“No one returned”)
}
//超时控制
select {
case ret := <-AsyncService():
t.Log(ret)
case <-time.After(time.Millisecond * 100):
t.Error("time out")
}
channel 的关闭和⼴播
- 向关闭的 channel 发送数据,会导致 panic
- v, ok <-ch; ok 为 bool 值,true 表示正常接受,false 表示通道关闭
- 所有的 channel 接收者都会在 channel 关闭时,⽴刻从阻塞等待中返回且上述 ok 值为 false。这个⼴播机制常被利⽤,进⾏向多个订阅者同时发送信号。如:退出信号
func dataProducer(ch chan int, wg *sync.WaitGroup) {
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
wg.Done()
}()
}
func dataReceiver(ch chan int, wg *sync.WaitGroup) {
go func() {
for {
if data, ok := <-ch; ok {
fmt.Println(data)
} else {
break
}
}
wg.Done()
}()
}
func TestCloseChannel(t *testing.T) {
var wg sync.WaitGroup
ch := make(chan int)
wg.Add(1)
dataProducer(ch, &wg)
wg.Add(1)
dataReceiver(ch, &wg)
// wg.Add(1)
// dataReceiver(ch, &wg)
wg.Wait()
}
任务的取消
获取取消通知
func isCancelled(cancelChan chanstruct{}) bool {
select {
case <-cancelChan:
return true
default:
return false
}
}
发送取消任务
func cancel_1(cancelChan chanstruct{}) {
ancelChan <- struct{}{}
}
通过关闭Channel取消
func cancel_2(cancelChan chan struct{}) {
close(cancelChan)
}
测试
func TestCancel(t *testing.T) {
cancelChan := make(chan struct{}, 0)
for i := 0; i < 5; i++ {
go func(i int, cancelCh chan struct{}) {
for {
if isCancelled(cancelCh) {
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, "Cancelled")
}(i, cancelChan)
}
cancel_1(cancelChan)
//cancel_2(cancelChan)
time.Sleep(time.Second * 1)
}
Context 与任务取消
上面介绍的取消任务只能取消当前任务,加入当前任务包括了子任务,取消时希望同时取消所有的子任务,如下图所示应该如何取消了?这里我们就要用到Context对象
Context
- 根 Context:通过 context.Background () 创建
- ⼦ Context:context.WithCancel(parentContext) 创建
- ctx, cancel := context.WithCancel(context.Background())
- 当前 Context 被取消时,基于他的⼦ context 都会被取消
- 接收取消通知 <-ctx.Done()
func isCancelled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}
func TestCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 5; i++ {
go func(i int, ctx context.Context) {
for {
if isCancelled(ctx) {
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, "Cancelled")
}(i, ctx)
}
cancel()
time.Sleep(time.Second * 1)
}