Spiga

Go学习笔记(五):常见并发任务

2020-04-18 23:11:40

仅执行一次

C#中的单例模式(懒汉式,线程安全)

public class Singleton
{
    private static  volatile Singleton instance;
    private static readonly object syncRoot = new object();
 
    private Singleton() { }
 
    public static Singleton GetInstance()
    {
        if (instance == null)
        {
            lock (syncRoot)
            {
                if (instance == null)
                {
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }
}

Go的实现

type Singleton struct {
    data string
}

var singleInstance *Singleton
var once sync.Once

func GetSingletonObj() *Singleton {
    once.Do(func() {
        fmt.Println("Create Obj")
        singleInstance = new(Singleton)
    })
    return singleInstance
}

测试

func TestGetSingletonObj(t *testing.T) {
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            obj := GetSingletonObj()
            fmt.Printf("%X\n", unsafe.Pointer(obj))
            wg.Done()
        }()
    }
    wg.Wait()
}
//打印结果
Create Obj
C00008A000
C00008A000
C00008A000
C00008A000
C00008A000

仅需任意任务完成

要实现所有协程中仅需任意一个协程完成。我们可以在启动协程的方法中直接返回任意一个Channel的结果来达到效果。如下代码

func runTask(id int) string {
    time.Sleep(10 * time.Millisecond)
    return fmt.Sprintf("The result is from %d", id)
}

func FirstResponse() string {
    numOfRunner := 10
    ch := make(chan string)
    // 启动多个协程
    for i := 0; i < numOfRunner; i++ {  
        go func(i int) {
            ret := runTask(i)
            ch <- ret
        }(i)
    }
    // 返回任意协程的结果
    return <-ch
}

测试

func TestFirstResponse(t *testing.T) {
    t.Log("Before:", runtime.NumGoroutine())
    t.Log(FirstResponse())
    time.Sleep(time.Second * 1)
    t.Log("After:", runtime.NumGoroutine())
}
//打印结果
first_response_test.go:28: Before: 2
first_response_test.go:29: The result is from 6
first_response_test.go:31: After: 11

上面测试结果我们看到执行FirstResponse方法前后的协程数量分别的2和11,也就是说虽然我们实现了仅需任意任务完成,但一个协程返回后其他协程还在等待完成,这样就会带来内存隐患。我们分析原因是因为我们使用的是 阻塞式通讯Channel ,通信只有双方同时在Channel上才能交互,如何换成 Buffer Channel 就没有这个问题了。如下

func FirstResponse() string {
    numOfRunner := 10
    ch := make(chan string, numOfRunner)    //创建buffer channel
    // 启动多个协程
    for i := 0; i < numOfRunner; i++ {  
        go func(i int) {
            ret := runTask(i)
            ch <- ret
        }(i)
    }
    // 返回任意协程的结果
    return <-ch
}

func TestFirstResponse(t *testing.T) {
    t.Log("Before:", runtime.NumGoroutine())
    t.Log(FirstResponse())
    time.Sleep(time.Second * 1)
    t.Log("After:", runtime.NumGoroutine())
}
//打印结果
first_response_test.go:28: Before: 2
first_response_test.go:29: The result is from 6
first_response_test.go:31: After: 2

必需所有任务完成

和上一节不同,平时我们可能还会遇到必须所有任务都完成时才返回,我们要怎么实现呢?我们采用上一节的例子,只需要修改上一节创建任务方法的返回结果即可完成我们的任务:

func runTask(id int) string {
    time.Sleep(10 * time.Millisecond)
    return fmt.Sprintf("The result is from %d", id)
}

func AllResponse() string {
    numOfRunner := 10
    ch := make(chan string, numOfRunner)
    for i := 0; i < numOfRunner; i++ {
        go func(i int) {
            ret := runTask(i)
            ch <- ret
        }(i)
    }
    finalRet := ""
    //等待所有协程完成
    for j := 0; j < numOfRunner; j++ {
        finalRet += <-ch + "\n"
    }
    //返回所有结果
    return finalRet
}

func TestFirstResponse(t *testing.T) {
    t.Log("Before:", runtime.NumGoroutine())
    t.Log(AllResponse())
    time.Sleep(time.Second * 1)
    t.Log("After:", runtime.NumGoroutine())
}

对象池

我们在平常的编程中经常会遇到对象池,例如创建一些代价比较高的数据库连接、网络连接等,我们通常会把这些对象进行池化,避免重复创建。那在go语言中我们可以使用buffered channel实现对象池,我们可以设置buffered channel的大小,也就是池的大小,所有对象可以现在channel里面放好,然后每次从get一个对象,用完之后可以再还到这个channel上。我们来看一些代码:

type ReusableObj struct {
}

type ObjPool struct {
    bufChan chan *ReusableObj //用于缓冲可重用对象
}

//创建一个对象池
func NewObjPool(numOfObj int) *ObjPool {
    objPool := ObjPool{}
    objPool.bufChan = make(chan *ReusableObj, numOfObj) //numOfObj是我们池的大小
    for i := 0; i < numOfObj; i++ {
        //提前初始化了我们的对象,我们也可以先预制空对象,等具体用到时再初始化
        objPool.bufChan <- &ReusableObj{}   
    }
    return &objPool
}

//取对象, 取的对象必须是我们的结构的方法,也就是池的方法,即返回我们池对象的指针
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
    select {
    case ret := <-p.bufChan:
        return ret, nil
    case <-time.After(timeout): //超时控制,这里必须要加,如果不做超时会引起长时间阻塞
        return nil, errors.New("time out")
    }

}

//方法对象
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
    select {
    case p.bufChan <- obj:
        return nil
    default:
        //如果放不进去的时候,利用多路选择的default立即范围异常,防止出现错误时发生阻塞
        return errors.New("overflow")   
    }
}

测试

func TestObjPool(t *testing.T) {
    pool := NewObjPool(10)
    // if err := pool.ReleaseObj(&ReusableObj{}); err != nil { //尝试放置超出池大小的对象
    //  t.Error(err)
    // }
    for i := 0; i < 11; i++ {
        if v, err := pool.GetObj(time.Second * 1); err != nil {
            t.Error(err)
        } else {
            fmt.Printf("%T\n", v)
            if err := pool.ReleaseObj(v); err != nil {
                t.Error(err)
            }
        }
    }
    fmt.Println("Done")
}

注意:对象池不一定都会提高性能,我们要考虑buffered channel里面带来的锁的机制,然后为了保证线程安全这种同步机制对性能的影响。所以在这种情况下如果我们的对象相对简单,易于创建的话,这是一个权衡,需要做好性能测试来评估要不要使用对象池。

sync.pool|对象缓存

sync.pool从命名看好像是一个go语言天然支持的对象池工具类,但实际上它并不,它更像是一个对象缓存。它更Processor有一定关系,我们来了解一下

sync.pool对象获取

  • 尝试从私有对象获取
  • 私有对象不存在,尝试从当前 Processor 的共享池获取
  • 如果当前 Processor 共享池也是空的,那么就尝试去其他Processor 的共享池获取
  • 如果所有⼦池都是空的,最后就⽤⽤户指定的 New 函数产⽣⼀个新的对象返回

sync.pool对象放回

  • 如果私有对象不存在则保存为私有对象
  • 如果私有对象存在,放⼊当前 Processor ⼦池的共享池中

sync.pool对象的生命周期

  • GC 会清除 sync.pool 缓存的对象
  • 对象的缓存有效期为下⼀次GC 之前

正因为会被GC回收,这也是sync.pool不能当对象池来使用的原因。

使用方法

func TestSyncPool(t *testing.T) {
    pool := &sync.Pool{
        New: func() interface{} {
            fmt.Println("Create a new object.")
            return 100
        },
    }

    v := pool.Get().(int)
    fmt.Println(v)
    pool.Put(3)
    //runtime.GC() //手动调用GC,来看看效果。 会清除sync.pool中缓存的对象
    v1, _ := pool.Get().(int)   //GC后,这里获取的值会是初始值100,而不是3
    fmt.Println(v1)
}

多协程环境下测试sync.pool的缓存机制

func TestSyncPoolInMultiGroutine(t *testing.T) {
    pool := &sync.Pool{
        New: func() interface{} {
            fmt.Println("Create a new object.")
            return 10
        },
    }

    pool.Put(100)
    pool.Put(100)

    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            fmt.Println(pool.Get())
            wg.Done()
        }(i)
    }
    wg.Wait()
}
//打印结果
100
Create a new object.
10
100 
Create a new object.
10
Create a new object.
10

我们可以看到默认放入到缓存中的100被取出来了,而另外的3个对象获取时需要调用创建方法。

总结

  • 适合于通过复⽤,降低复杂对象的创建和 GC 代价
  • 协程安全,会有锁的开销
  • ⽣命周期受 GC 影响,不适合于做连接池等,需⾃⼰管理⽣命周期的资源的池化