Spiga

Go语言实践(二):Goroutine

2020-11-30 20:05:56

Processes and Threads

操作系统会为该应用程序创建一个进程。作为一个应用程序,它像一个为所有资源而运行的容器。这些资源包括内存地址空间、文件句柄、设备和线程。

线程是操作系统调度的一种执行路径,用于在处理器执行我们在函数中编写的代码。一个进程从一个线程开始,即主线程,当该线程终止时,进程终止。这是因为主线程是应用程序的原点。然后,主线程可以依次启动更多的线程,而这些线程可以启动更多的线程。

无论线程属于哪个进程,操作系统都会安排线程在可用处理器上运行。每个操作系统都有自己的算法来做出这些决定。

Goroutines and Parallelism

Go 语言层面支持的 go 关键字,可以快速的让一个函数创建为 goroutine,我们可以认为 main 函数就是作为 goroutine 执行的。操作系统调度线程在可用处理器上运行,Go运行时调度 goroutines 在绑定到单个操作系统线程的逻辑处理器中运行(P)。即使使用这个单一的逻辑处理器和操作系统线程,也可以调度数十万 goroutine 以惊人的效率和性能并发运行。

并发不是并行。并行是指两个或多个线程同时在不同的处理器执行代码。如果将运行时配置为使用多个逻辑处理器,则调度程序将在这些逻辑处理器之间分配 goroutine,这将导致 goroutine 在不同的操作系统线程上运行。但是,要获得真正的并行性,您需要在具有多个物理处理器的计算机上运行程序。否则,goroutines 将针对单个物理处理器并发运行,即使 Go 运行时使用多个逻辑处理器。

Keep yourself busy or do the work yourself

比如我们想监听一个端口,但并不知道它什么时候返回,我们可能会使用一个select来永远阻塞,如下代码:

func main() {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request)) {
		fmt.FprintIn(w, "Hello, GrpherCon SG")
	}
	go func() {
		if(err := http.listenAndServe(":8080", nil); err != nil) {
			log.Fatal(err)
		}
	}()

	select {}
}

上面代码main函数无法控制内部的goroutine,假设监听退出了,main函数并不知道,它将一直阻塞。

如果你的 goroutine 在从另一个 goroutine 获得结果之前无法取得进展,那么通常情况下,你自己去做这项工作比委托它( go func() )更简单。

这通常消除了将结果从 goroutine 返回到其启动器所需的大量状态跟踪和 chan 操作。

func main() {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request)) {
		fmt.FprintIn(w, "Hello, GrpherCon SG")
	}
	if(err := http.listenAndServe(":8080", nil); err != nil) {
		log.Fatal(err)
	}
}

上面的写法没有太多问题,但log.Fatal并不推荐,假设监听程序退出了,log.Fatal函数内部会调用os.exit进行强制退出,而os.exit并不会被defer。mian函数依然会一直等待。

Never start a goroutine without knowning when it will stop

接下来假设我们要监听2个端口,如下代码

func main() {
	mux := http.NewServeMux()
	mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
		fmt.Fprintln(resp, "Hello, QCon!")
	})
	go http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux)
	http.ListenAndServe("0.0.0.0:8080", mux)
}

为了不让程序阻塞,我们会启动一个goroutine,同样假设go出去的代码报错了,main函数也并不会知道。

通常,每次启动一个goroutine时,应该弄清楚下面两个问题:

  • 什么时候结束
  • 如何能让它结束

怎么改进呢?

上面简单的应用程序在两个不同的端口上提供 http 流量,假设端口8080用于应用程序流量,端口8001用于访问 /debug/pprof 端点。

通过将 serveApp 和 serveDebug 处理程序分解为各自的函数,我们将它们与main.main 解耦,我们还遵循了上面的建议,并确保 serveApp 和 serveDebug 将它们的并发性留给调用者。

如果 serveApp 返回,则 main.main 将返回导致程序关闭,只能靠类似 supervisor 进程管理来重新启动。

func serverApp() {
	mux := http.NewServeMux()
	mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
		fmt.Fprintln(resp, "Hello, QCon!")
	})
	http.ListenAndServe("0.0.0.0:8080", mux)
}

func serveDebug() {
	http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux)
}

func main(){
	go serveDebug()
	serverApp()
}

然而,serveDebug 是在一个单独的 goroutine 中运行的,如果它返回,那么所在的 goroutine 将退出,而程序的其余部分继续运行。由于 /debug 处理程序很久以前就停止工作了,所以其他同学会很不高兴地发现他们无法在需要时从您的应用程序中获取统计信息。

func serverApp() {
	mux := http.NewServeMux()
	mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
		fmt.Fprintln(resp, "Hello, QCon!")
	})
	if err := http.ListenAndServe("0.0.0.0:8080", mux); err != nil {
		log.Fatal(err)
	}
}

func serveDebug() {
    if err := http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux); err != nil {
		log.Fatal(err)
	}
}

func main(){
	go serveDebug()
	go serverApp()
	select {}
}

ListenAndServer 返回 nil error,最终 main.main 无法退出。同样log.Fatal 调用了 os.Exit,会无条件终止程序;defers 不会被调用到。

解决方法

func serve(addr string, handler http.Handler, stop <-chan struct{}) error {
	s := http.Server{
		Addr : addr,
		Handler : handler,
	}

	go func() {
		<-stop //wait for stop signal
		s.Shutdown(context.Background())
	}()

	return s.ListenAndServe()
}

func main() {
	done := make(chan error, 2)
	stop := make(chan struct{})
	go func() {
		done <- serveDebug(stop)
	}()
	go func() {
		done <= serveApp(stop)
	}

	var stopped bool
	for i :=0; i< cap(done); i++ {
		if err := <-done; err != nil {
			fmt.Println("error: %v", err)
		}
		if !stopped {
			stopped = true
			close(stop)
		}
	}
}

参考:https://github.com/da440dil/go-workgroup

还有一些其他的情况,看如下实例

func leak() {
	ch := make(chan int)

	go func() {
		val := <-ch
		fmt.Println("We received a value:", val)
	}()
}

上面代码ch是一个局部对象,外部没有任何地方可以管理ch,因而go出去的程序是不可控的。

func search(term string)(string, error) {
	time.Sleep(200 * time.Millisecond)
}

func process(term sting) error {
	record, err := search(term)
	if err != nil {
		return err
	}

	fmt.Println("Received:", record)
	return nil
}

这里search 函数是一个模拟实现,用于模拟长时间运行的操作,如数据库查询或 rpc 调用。在本例中,硬编码为200ms。定义了一个名为 process 的函数,接受字符串参数,传递给 search。对于某些应用程序,顺序调用产生的延迟可能是不可接受的。处理方法:

type result struct {
	record string
	err error
}

func process(term string) error {
	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
	defer cancel()

	ch := make(chan result)

	go func() {
		record, err := search(term)
		ch <- result{record: , err}
	}()

	select {
	case <-ctx.Done():
		return errors.New("search canceled")
	case result := <-ch:
		if result.err != nil {
			return result.err
		}
		fmt.PrintIn("Received:", result.record)
		return nil
	}
}

Leave concurrency to the caller

这两个 API 有什么区别?

func ListDirectoty(dir string)([]string, error)
func ListDirectoty(dir string) chan string

将目录读取到一个 slice 中,然后返回整个切片,或者如果出现错误,则返回错误。这是同步调用的,ListDirectory 的调用方会阻塞,直到读取所有目录条目。根据目录的大小,这可能需要很长时间,并且可能会分配大量内存来构建目录条目名称的 slice。

ListDirectory 返回一个 chan string,将通过该 chan 传递目录。当通道关闭时,这表示不再有目录。由于在 ListDirectory 返回后发生通道的填充,ListDirectory 可能内部启动 goroutine 来填充通道。

ListDirectory chan 版本还有两个问题:

  • 通过使用一个关闭的通道作为不再需要处理的项目的信号,ListDirectory 无法告诉调用者通过通道返回的项目集不完整,因为中途遇到了错误。调用方无法区分空目录与完全从目录读取的错误之间的区别。这两种方法都会导致从 ListDirectory 返回的通道会立即关闭。
  • 调用者必须继续从通道读取,直到它关闭,因为这是调用者知道开始填充通道的 goroutine 已经停止的唯一方法。这对 ListDirectory 的使用是一个严重的限制,调用者必须花时间从通道读取数据,即使它可能已经收到了它想要的答案。对于大中型目录,它可能在内存使用方面更为高校,但这种方法并不比原始的基于 slice 的方法快。

我们可以尝试把API改成这样

func ListDirectoty(dir string, fn func(string))

显示是指定一个回调方法,这也是借助了filepath.WalkDir 的设计思想。如果函数启动 goroutine,则必须向调用方提供显式停止该goroutine 的方法。通常,将异步执行函数的决定权交给该函数的调用方通常更容易。

Incomplete Work

我们使用服务端埋点来跟踪记录一些事件。

type Tracker struct{}

func (t *Tracker) Event(data string) {
	time.Sleep(time.Microsecond)
	log.Panicln(data)
}

无法保证创建的 goroutine 生命周期管理,会导致最场景的问题,就是在服务关闭时候,有一些事件丢失。

type App struct{
	track Tracker
}

func (a *App) Handle(w http.ResponseWriter, r *http.Request) {
	// Do some actual work.

	// Respond to the client.
	w.WriteHeader(http.StatusCreated)

	// Fire and Hope
	// BUG: We are not managing this goroutine
	go a.track.Event("this event")
}

使用 sync.WaitGroup 来追踪每一个创建的 goroutine。

type Tracker struct{
	wg sync.WaitGroup
}

func (t *Tracker) Event(data string) {
	t.wa.add(1)

	go.func() {
		defer t.wg.Done()

		time.Sleep(time.Microsecond)
		log.Panicln(data)
	}()
}

func (t *Tracker) Shutdown() {
	t.wg.Wait()
}

type App struct{
	track Tracker
}

func (a *App) Handle(w http.ResponseWriter, r *http.Request) {
	// Do some actual work.

	// Respond to the client.
	w.WriteHeader(http.StatusCreated)

	// Fire and Hope
	// BUG: We are not managing this goroutine
	go a.track.Event("this event")
}

func main() {
	var a App

	a.track.Shutdown()
}

将 wg.Wait() 操作托管到其他 goroutine,owner goroutine 使用 context 处理超时。

我仍然不喜欢这个 demo,大量创建goroutine 来处理任务,代价高。可以继续优化:

func main() {
	tr := NewTracker()
	go tr.Run()
	_ = tr.Event(context.Background(), "test")
	_ = tr.Event(context.Background(), "test")
	_ = tr.Event(context.Background(), "test")
	ctx. cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
	defer cancel()
	tr.Shutdown(ctx)
}

func NewTracker() *Tracker {
	return &Tracker{
		ch: make(chan string, 10),
	}
}

type Tracker struct{
	ch chan string
	stop chan struct{}
}

func (t *Tracker) Event(ctx context.Context, data string) error {
	select {
	case t.ch <- data:
		return nil
	case <- ctx.Done():
		return ctx.Err()
	}
}

func (t *Tracker) Run() {
	for data := range t.ch {
		time.Sleep(1 * time.Second)
		fmt.Println(data)
	}
	t.stop <- struct{}{}
}

func (t *Tracker) Shutdown() {
	close(t.ch)
	select {
	case <- t.stop:
	case <- ctx.Done():
	}
}