Go学习笔记(五):常见并发任务
2020-04-18 23:11:40仅执行一次
C#中的单例模式(懒汉式,线程安全)
public class Singleton
{
private static volatile Singleton instance;
private static readonly object syncRoot = new object();
private Singleton() { }
public static Singleton GetInstance()
{
if (instance == null)
{
lock (syncRoot)
{
if (instance == null)
{
instance = new Singleton();
}
}
}
return instance;
}
}
Go的实现
type Singleton struct {
data string
}
var singleInstance *Singleton
var once sync.Once
func GetSingletonObj() *Singleton {
once.Do(func() {
fmt.Println("Create Obj")
singleInstance = new(Singleton)
})
return singleInstance
}
测试
func TestGetSingletonObj(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
obj := GetSingletonObj()
fmt.Printf("%X\n", unsafe.Pointer(obj))
wg.Done()
}()
}
wg.Wait()
}
//打印结果
Create Obj
C00008A000
C00008A000
C00008A000
C00008A000
C00008A000
仅需任意任务完成
要实现所有协程中仅需任意一个协程完成。我们可以在启动协程的方法中直接返回任意一个Channel的结果来达到效果。如下代码
func runTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("The result is from %d", id)
}
func FirstResponse() string {
numOfRunner := 10
ch := make(chan string)
// 启动多个协程
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := runTask(i)
ch <- ret
}(i)
}
// 返回任意协程的结果
return <-ch
}
测试
func TestFirstResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine())
t.Log(FirstResponse())
time.Sleep(time.Second * 1)
t.Log("After:", runtime.NumGoroutine())
}
//打印结果
first_response_test.go:28: Before: 2
first_response_test.go:29: The result is from 6
first_response_test.go:31: After: 11
上面测试结果我们看到执行FirstResponse方法前后的协程数量分别的2和11,也就是说虽然我们实现了仅需任意任务完成,但一个协程返回后其他协程还在等待完成,这样就会带来内存隐患。我们分析原因是因为我们使用的是 阻塞式通讯Channel ,通信只有双方同时在Channel上才能交互,如何换成 Buffer Channel 就没有这个问题了。如下
func FirstResponse() string {
numOfRunner := 10
ch := make(chan string, numOfRunner) //创建buffer channel
// 启动多个协程
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := runTask(i)
ch <- ret
}(i)
}
// 返回任意协程的结果
return <-ch
}
func TestFirstResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine())
t.Log(FirstResponse())
time.Sleep(time.Second * 1)
t.Log("After:", runtime.NumGoroutine())
}
//打印结果
first_response_test.go:28: Before: 2
first_response_test.go:29: The result is from 6
first_response_test.go:31: After: 2
必需所有任务完成
和上一节不同,平时我们可能还会遇到必须所有任务都完成时才返回,我们要怎么实现呢?我们采用上一节的例子,只需要修改上一节创建任务方法的返回结果即可完成我们的任务:
func runTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("The result is from %d", id)
}
func AllResponse() string {
numOfRunner := 10
ch := make(chan string, numOfRunner)
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := runTask(i)
ch <- ret
}(i)
}
finalRet := ""
//等待所有协程完成
for j := 0; j < numOfRunner; j++ {
finalRet += <-ch + "\n"
}
//返回所有结果
return finalRet
}
func TestFirstResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine())
t.Log(AllResponse())
time.Sleep(time.Second * 1)
t.Log("After:", runtime.NumGoroutine())
}
对象池
我们在平常的编程中经常会遇到对象池,例如创建一些代价比较高的数据库连接、网络连接等,我们通常会把这些对象进行池化,避免重复创建。那在go语言中我们可以使用buffered channel实现对象池,我们可以设置buffered channel的大小,也就是池的大小,所有对象可以现在channel里面放好,然后每次从get一个对象,用完之后可以再还到这个channel上。我们来看一些代码:
type ReusableObj struct {
}
type ObjPool struct {
bufChan chan *ReusableObj //用于缓冲可重用对象
}
//创建一个对象池
func NewObjPool(numOfObj int) *ObjPool {
objPool := ObjPool{}
objPool.bufChan = make(chan *ReusableObj, numOfObj) //numOfObj是我们池的大小
for i := 0; i < numOfObj; i++ {
//提前初始化了我们的对象,我们也可以先预制空对象,等具体用到时再初始化
objPool.bufChan <- &ReusableObj{}
}
return &objPool
}
//取对象, 取的对象必须是我们的结构的方法,也就是池的方法,即返回我们池对象的指针
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
select {
case ret := <-p.bufChan:
return ret, nil
case <-time.After(timeout): //超时控制,这里必须要加,如果不做超时会引起长时间阻塞
return nil, errors.New("time out")
}
}
//方法对象
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
select {
case p.bufChan <- obj:
return nil
default:
//如果放不进去的时候,利用多路选择的default立即范围异常,防止出现错误时发生阻塞
return errors.New("overflow")
}
}
测试
func TestObjPool(t *testing.T) {
pool := NewObjPool(10)
// if err := pool.ReleaseObj(&ReusableObj{}); err != nil { //尝试放置超出池大小的对象
// t.Error(err)
// }
for i := 0; i < 11; i++ {
if v, err := pool.GetObj(time.Second * 1); err != nil {
t.Error(err)
} else {
fmt.Printf("%T\n", v)
if err := pool.ReleaseObj(v); err != nil {
t.Error(err)
}
}
}
fmt.Println("Done")
}
注意:对象池不一定都会提高性能,我们要考虑buffered channel里面带来的锁的机制,然后为了保证线程安全这种同步机制对性能的影响。所以在这种情况下如果我们的对象相对简单,易于创建的话,这是一个权衡,需要做好性能测试来评估要不要使用对象池。
sync.pool|对象缓存
sync.pool从命名看好像是一个go语言天然支持的对象池工具类,但实际上它并不,它更像是一个对象缓存。它更Processor有一定关系,我们来了解一下
sync.pool对象获取
- 尝试从私有对象获取
- 私有对象不存在,尝试从当前 Processor 的共享池获取
- 如果当前 Processor 共享池也是空的,那么就尝试去其他Processor 的共享池获取
- 如果所有⼦池都是空的,最后就⽤⽤户指定的 New 函数产⽣⼀个新的对象返回
sync.pool对象放回
- 如果私有对象不存在则保存为私有对象
- 如果私有对象存在,放⼊当前 Processor ⼦池的共享池中
sync.pool对象的生命周期
- GC 会清除 sync.pool 缓存的对象
- 对象的缓存有效期为下⼀次GC 之前
正因为会被GC回收,这也是sync.pool不能当对象池来使用的原因。
使用方法
func TestSyncPool(t *testing.T) {
pool := &sync.Pool{
New: func() interface{} {
fmt.Println("Create a new object.")
return 100
},
}
v := pool.Get().(int)
fmt.Println(v)
pool.Put(3)
//runtime.GC() //手动调用GC,来看看效果。 会清除sync.pool中缓存的对象
v1, _ := pool.Get().(int) //GC后,这里获取的值会是初始值100,而不是3
fmt.Println(v1)
}
多协程环境下测试sync.pool的缓存机制
func TestSyncPoolInMultiGroutine(t *testing.T) {
pool := &sync.Pool{
New: func() interface{} {
fmt.Println("Create a new object.")
return 10
},
}
pool.Put(100)
pool.Put(100)
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
fmt.Println(pool.Get())
wg.Done()
}(i)
}
wg.Wait()
}
//打印结果
100
Create a new object.
10
100
Create a new object.
10
Create a new object.
10
我们可以看到默认放入到缓存中的100被取出来了,而另外的3个对象获取时需要调用创建方法。
总结
- 适合于通过复⽤,降低复杂对象的创建和 GC 代价
- 协程安全,会有锁的开销
- ⽣命周期受 GC 影响,不适合于做连接池等,需⾃⼰管理⽣命周期的资源的池化