Spiga

Go学习笔记(四):并发

2020-04-15 18:42:50

协程机制(Groutine)

Thead VS Groutine

  1. 创建时默认的 stack 的⼤⼩
    • JDK5 以后 Java Thread stack 默认为1M
    • Groutine 的 Stack 初始化⼤⼩为2K
  2. 和 KSE (Kernel Space Entity) 的对应关系
    • Java Thread 是 1:1
    • Groutine 是 M:N

MPG模型

  • M 代表着一个内核线程,也可以称为一个工作线程。goroutine就是跑在M之上的
  • P代表着(Processor)处理器 它的主要用途就是用来执行goroutine的,一个P代表执行一个Go代码片段的基础(可以理解为上下文环境),所以它也维护了一个可运行的goroutine队列,和自由的goroutine队列,里面存储了所有需要它来执行的goroutine
  • G 代表着goroutine 实际的数据结构(就是你封装的那个方法),并维护者goroutine 需要的栈、程序计数器以及它所在的M等信息

我们在看上面这个图,图中P正在执行的Goroutine为蓝色的,处于待执行状态的Goroutine为灰色的,灰色的Goroutine形成了一个队列runqueues 。 我们再看一下三者的宏观图:

由上图可以看出Groutine与KSE是M:N的多对多关系。在这里,当一个P关联多个G时,就会处理G的执行顺序,就是并发,当一个P在执行一个协程工作时,其他的会在等待,当正在执行的协程遇到阻塞情况,例如IO操作等,go的处理器就会去执行其他的协程,因为对于类似IO的操作,处理器不知道你需要多久才能执行结束,所以他不回去等你执行完。

正是因为是非抢占式的,所以才轻松的构造上万的协程,如果是抢占式,那么就会在切换任务时,保存当前的上下文环境,因为当前线程如果正在做一件事,做到一半,我们就强制停止,这时我们就必须多保存很多信息,避免再次切换回来时任务出错。

线程是操作系统层面的多任务,而go的协程属于编译器层面的多任务,go有自己的调度器来调度。一个协程在哪个线程上是不确定的,这个是由调度器来决定的,多个协程可能在一个或多个线程上运行。

编写一个Groutine

func TestGroutine(t *testing.T) {
    for i := 0; i < 10; i++ {
        go func(i int) {
            fmt.Println(i)
        }(i)
    }
    time.Sleep(time.Millisecond * 50)
}

上面程序中通过 go 语句很清楚的创建一个groutine,因为协程的执行是无序的,所以每次调用上面程序,print的结果都不一样。初学者经常会犯如下的错误,

func TestGroutine(t *testing.T) {
    for i := 0; i < 10; i++ {
        go func() {
            fmt.Println(i)  //错误的写法
        }()
    }
    time.Sleep(time.Millisecond * 50)
}
//打印结果  10,10,10,10,10,10,10,10,10,10

在go 语句里面因为可以直接访问变量i,初学者可能就直接使用,这时打印的结果可能就不是预期的结果了。造成这样的原因是因为协程中 i 是一个共享变量,第一个程序不会有问题是因为go里面的参数传递的都是值类型,每一个协程中都有一份值拷贝。既然出现了共享变量的问题,接下来我们讨论下如何处理?

共享内存并发机制

要解决共享内存问题,就需要用到锁了,看下面的实例:

func TestCounter(t *testing.T) {
    counter := 0
    for i := 0; i < 5000; i++ {
        go func() {
            counter++
        }()
    }
    time.Sleep(1 * time.Second)
    t.Logf("counter = %d", counter) //小于5000,如4759
}

counter是一个共享内存,该程序最终打印的值可能是一个小于5000的值。我们加锁后的代码如下:

func TestCounterThreadSafe(t *testing.T) {
    var mut sync.Mutex  //锁对象
    counter := 0
    for i := 0; i < 5000; i++ {
        go func() {
            defer func() {
                mut.Unlock()    //解锁
            }()
            mut.Lock()  //加锁
            counter++
        }()
    }
    time.Sleep(1 * time.Second)
    t.Logf("counter = %d", counter) // 5000
}

这时就能正常打印出5000的结果了,细心的同学可能看到例子中我多加了一句 sleep的等待,如果去掉这句代码时,发现最终的打印结果依然可能小于5000。发生这种原因是因为go语句中的5000次协程调用可能还没执行完,主程序的打印就结束了,虽然加了锁后最终counter的值是5000,但主程序却提前打印了。要处理这个问题,我们还需要多一个程序等待,如下代码:

func TestCounterWaitGroup(t *testing.T) {
    var mut sync.Mutex
    var wg sync.WaitGroup
    counter := 0
    for i := 0; i < 5000; i++ {
        wg.Add(1)
        go func() {
            defer func() {
                mut.Unlock()
            }()
            mut.Lock()
            counter++
            wg.Done()
        }()
    }
    wg.Wait()
    t.Logf("counter = %d", counter)
}

到这里一个简单的共享内存并发控制机制调用就完成了,接下来我们学习更高效的CSP并发机制。

CSP 并发机制

CSP是Communicating Sequential Process的缩写, CSP的设计理念,是依赖一个信息通道来完成两个通信实体之间的通信协调。 一些编程语言中使用的一种典型并发机制是Actor Model:

在Actor Model中,会维护一个容量无限的MailBox(Message Queue),消息发送实体A会将Message添加到MailBox中,其EventLoop会循环检查MailBox中的信息,当监听到新消息的时候发送给接受实体B的MailBox中,接受实体B在其EventLoop的驱动下被动的处理消息。

CSP与Actor Model的不同点:

  • CSP模式中,消息是通过Channel来通讯的,Channel相当于一个消息通讯的中间人,这样可以让两个通讯实体的耦合更松一些
  • Golang的Channel容量是可以设置的,并独立于协程的
  • Actor Model中,接受进程是被动的处理消息,Golang中的协程可以主动的获取Channel中的信息,并进行相关的处理

阻塞式通讯Channel

通信双方必须同时在Channel上,才能够完成本次交互,要是任意一方不在Channel上,那么一方就会被阻塞的等待,直到另一方完成本次交互。

对于消息发送者A,要是消息接收者B不在Channel上,那么发送者A会被一直阻塞在那里,直到接收者B出现,才能够完成本次交互,发送者A才能够继续执行后续的代码;对于消息接收者B,要是发送者A不在Channel上,那么接收者B会被一直阻塞在那里等待,直到发送者A出现,完成本次交互,接收者B才能够继续执行下面的任务。

Buffer Channel

Buffer Channel使得消息发送者和接收者之间有一种更为松的耦合性,Channel可以设置一个消息容量。

在Channel容量没有满的情况下,消息发送者A是可以一直往Channel中非阻塞的放入消息的,而不需要等待接收者B必须同时出现在Channel上。一旦Channel的容量满了,那么消息发送者A就必须阻塞的等待,直到接收者B取出消息,将Channel空出来,发送者A才可以继续非阻塞的放入消息。

同样的,对于接收者B,只要Channel中有消息,接收者B就可以非阻塞的取消息,不需要阻塞的等待消息发送者A同时出现在Channel上。但要是Channel上是空的,那接收者B就必须阻塞的等待发送者A放入消息,接收者B才能够取到消息,并继续执行下面的任务。

下面我们看来一下Channel应用的实例:

import (
    "fmt"
    "testing"
    "time"
)

func service() string {
    time.Sleep(time.Millisecond * 50)
    return "Done"
}

func otherTask() {
    fmt.Println("working on something else")
    time.Sleep(time.Millisecond * 100)
    fmt.Println("Task is done.")
}

func AsyncService() chan string {
    retCh := make(chan string)  //创建的是一个阻塞式通讯Channel
    //retCh := make(chan string, 1) //创建的是一个容量为1的Buffer Channel
    go func() {
        ret := service()
        fmt.Println("returned result.")
        retCh <- ret    //获取service的结果放入Channel中
        fmt.Println("service exited.")
    }()
    return retCh    //将用来返回结果的Channel返回
}

func TestAsynService(t *testing.T) {
    retCh := AsyncService() //获取service结果的Channel
    otherTask() //运行其他Task
    fmt.Println(<-retCh)    //从Channel中等待获取service的结果
    time.Sleep(time.Second * 1)
}

面的代码我们可以看到,AsyncService启动了一个协程异步执行Service的任务,并返回了一个Channel,在主协程中执行OtherTask的时候,并发的执行Service的任务,经过协程并发处理的两个Task执行的总时间缩短了。 但是需要注意的是,AsyncService中的子协程执行完任务之后(打印了returned result), 之后便可以立即将结果放到Channel中,但由于主协程(接收者)还在阻塞的执行其他任务,不在Channel上,所以子协程(发送者)会被一直阻塞的等待在向Channel中放入消息的地方,直到主协程(接收者)通过<-retCh从Channel中获取到消息的时候(打印 Done. )之后,子协程才被释放继续执行下面的代码(打印 service exited)

执行结果:

多路选择和超时控制

基于select的多路复用:

  1. 解决如果一个channel中没有事件发过来,程序会立即阻塞,无法接收到第二个channel中的事件
  2. 一般每一个case都代表一个通信操作,多个case会选一个能执行的
  3. default会默认执行,因此可以作为轮询channel来用
  4. 无default时,可以其做超时控制
//多渠道的选择
select {
    case ret := <-retCh1:
        t.Logf("result %s", ret)
    case ret :=<-retCh2: 
        t.Logf("result %s", ret) 
    default:
        t.Error(“No one returned”)
}
//超时控制
select {
case ret := <-AsyncService():
    t.Log(ret)
case <-time.After(time.Millisecond * 100):
    t.Error("time out")
}

channel 的关闭和⼴播

  • 向关闭的 channel 发送数据,会导致 panic
  • v, ok <-ch; ok 为 bool 值,true 表示正常接受,false 表示通道关闭
  • 所有的 channel 接收者都会在 channel 关闭时,⽴刻从阻塞等待中返回且上述 ok 值为 false。这个⼴播机制常被利⽤,进⾏向多个订阅者同时发送信号。如:退出信号
func dataProducer(ch chan int, wg *sync.WaitGroup) {
    go func() {
        for i := 0; i < 10; i++ {
            ch <- i
        }
        close(ch)
        wg.Done()
    }()

}

func dataReceiver(ch chan int, wg *sync.WaitGroup) {
    go func() {
        for {
            if data, ok := <-ch; ok {
                fmt.Println(data)
            } else {
                break
            }
        }
        wg.Done()
    }()

}

func TestCloseChannel(t *testing.T) {
    var wg sync.WaitGroup
    ch := make(chan int)
    wg.Add(1)
    dataProducer(ch, &wg)
    wg.Add(1)
    dataReceiver(ch, &wg)
    // wg.Add(1)
    // dataReceiver(ch, &wg)
    wg.Wait()
}

任务的取消

获取取消通知

func isCancelled(cancelChan chanstruct{}) bool {
    select {
        case <-cancelChan:
        return true
    default:
        return false
    }
}

发送取消任务

func cancel_1(cancelChan chanstruct{}) {
    ancelChan <- struct{}{}
}

通过关闭Channel取消

func cancel_2(cancelChan chan struct{}) {
    close(cancelChan)
}

测试

func TestCancel(t *testing.T) {
    cancelChan := make(chan struct{}, 0)
    for i := 0; i < 5; i++ {
        go func(i int, cancelCh chan struct{}) {
            for {
                if isCancelled(cancelCh) {
                    break
                }
                time.Sleep(time.Millisecond * 5)
            }
            fmt.Println(i, "Cancelled")
        }(i, cancelChan)
    }
    cancel_1(cancelChan)
    //cancel_2(cancelChan)
    time.Sleep(time.Second * 1)
}

Context 与任务取消

上面介绍的取消任务只能取消当前任务,加入当前任务包括了子任务,取消时希望同时取消所有的子任务,如下图所示应该如何取消了?这里我们就要用到Context对象

Context

  • 根 Context:通过 context.Background () 创建
  • ⼦ Context:context.WithCancel(parentContext) 创建
    • ctx, cancel := context.WithCancel(context.Background())
  • 当前 Context 被取消时,基于他的⼦ context 都会被取消
  • 接收取消通知 <-ctx.Done()
func isCancelled(ctx context.Context) bool {
    select {
    case <-ctx.Done():
        return true
    default:
        return false
    }
}

func TestCancel(t *testing.T) {
    ctx, cancel := context.WithCancel(context.Background())
    for i := 0; i < 5; i++ {
        go func(i int, ctx context.Context) {
            for {
                if isCancelled(ctx) {
                    break
                }
                time.Sleep(time.Millisecond * 5)
            }
            fmt.Println(i, "Cancelled")
        }(i, ctx)
    }
    cancel()
    time.Sleep(time.Second * 1)
}