Go语言中的并发模式你了解了吗

工作中查看项目代码,发现会存在使用 GO 语言做并发的时候出现各种各样的异常情况,有的输出结果和自己期望和设计的不一致,有的是程序直接阻塞住,更有甚者直接是程序

工作中查看项目代码,发现会存在使用 GO 语言做并发的时候出现各种各样的异常情况,有的输出结果和自己期望和设计的不一致,有的是程序直接阻塞住,更有甚者直接是程序 crash 掉。

实际上,出现上述的情况,还是因为我们对于 GO 语言的并发模型和涉及的 GO 语言基础不够扎实,误解了语言的用法。

那么,对于 GO 语言的并发模式,我们一起来梳理一波。 GO 语言常见的并发模式有这些:

  • 创建模式
  • 退出模式
  • 管道模式
  • 超时模式和取消模式

在 GO 语言里面,咱们使用使用并发,自然离不开使用 GO 语言的协程 goroutine,通道 channel 和 多路复用 select,接下来就来看看各种模式都是如何去搭配使用这三个关键原语的

创建模式

使用过通道和协程的朋友对于创建模式肯定不会模式,这是一个非常常用的方式,也是一个非常简单的使用方式:

  • 主协程中调用 help 函数,返回一个通道 ch 变量
  • 通道 ch 用于主协程和 子协程之间的通信,其中通道的数据类型完全可以自行定义
type XXX struct{...}
func help(fn func()) chan XXX {
    ch := make(chan XXX)
    // 开启一个协程
    go func(){
        // 此处的协程可以控制和外部的 主协程 通过 ch 来进行通信,达到一定逻辑便可以执行自己的 fn 函数
        fn()
        ch <- XXX
    }()
}
func main(){
    ch := help(func(){
        fmt.Println("这是GO 语言 并发模式之 创建模式")
    })
    <- ch
}

退出模式

程序的退出我们应该也不会陌生,对于一些常驻的服务,如果是要退出程序,自然是不能直接就断掉,此时会有一些连接和业务并没有关闭,直接关闭程序会导致业务异常,例如在关闭过程中最后一个 http 请求没有正常响应等等等

此时,就需要做优雅关闭了,对于协程 goroutine 退出有 3 种模式

  • 分离模式
  • join 模式
  • notify-and-wait 模式

分离模式

此处的分离模式,分离这个术语实际上是线程中的术语,pthread detached

分离模式可以理解为,咱们创建的协程 goroutine,直接分离,创建子协程的父协程不用关心子协程是如何退出的,子协程的生命周期主要与它执行的主函数有关,咱们 return 之后,子协程也就结束了

对于这类分离模式的协程,咱们需要关注两类,一种是一次性的任务,咱们 go 出来后,执行简单任务完毕后直接退出,一种是常驻程序,需要优雅退出,处理一些垃圾回收的事情

例如这样:

  • 主程序中设置一个通道变量 ch ,类型为 os.Signal
  • 然后主程序就开始各种创建协程执行自己的各种业务
  • 直到程序收到了 syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT 任意一个信号的时候,则会开始进行垃圾回收等清理工作,执行完毕后,程序再进行退出
func main(){
     ch := make(chan os.Signal)
     signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
     // ...
     // go 程序执行其他业务
     // ...
    for i := range ch {
        switch i {
        case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
            // 做一些清理工作
            os.Exit(0)
        }
    }
}

join 模式

看到这个关键字,是不是也似曾相识,和线程貌似很像,例如 线程中 父线程可以通过 pthread_join 来等待子线程结束,并且还可以获取子线程的结束状态

GO 语言中等待子协程退出并且获取子协程的退出状态,咱们就可以使用通道 channel 的方式来进行处理

例子1

等待一个子协程退出,并获取退出状态

  • 主协程中调用 help 方法得到一个 ch 通道变量,主协程阻塞着读 ch
  • help 中开辟一个子协程去执行传入的 fn 回调函数,并传参为 ok bool
  • 实际 fn 函数判断传参 ok 是否是 true,若不是则返回具体的错误信息,若是 true 则返回 nil
func help(f func(bool) error, ok bool) <-chan error {
	ch := make(chan error)
	go func() {
		ch <- f(ok)
	}()
	return ch
}
func fn(ok bool) error {
	if !ok {
		return errors.New("not ok ... ")
	}
	return nil
}
func main() {
	ch := help(fn, true)
	fmt.Println("help 111")
	err := <-ch
	fmt.Println("help 111 done ", err)
	ch = help(fn, false)
	fmt.Println("help 222")
	err = <-ch
	fmt.Println("help 222 done ", err)
}

看上如上程序,我们就可以知道,第一次调用 help(fn , true) ,主协程等待子协程退出的时候,会得到一个错误信息,为 not ok ... , 第二次调用 help(fn , false) 的时候,返回的 err 是一个 nil

通过上述这种方式,主协程不仅可以轻易的等待一个子协程退出,还可以获取到子协程退出的状态

那么,主协程如果是等待多个协程退出呢?需要如何处理?

例子2

主协程等待多个协程退出咱们就需要使用到 GO 中的 sync.WaitGroup

  • 使用 help 函数,传入回调函数,参数1 bool,参数2 int ,其中参数 2 表示开辟子协程的个数,返回值为一个无缓冲的 channel 变量,数据类型是 struct{}
  • 使用 var wg sync.WaitGroup ,开辟子协程的时候记录一次 wg.Add(1),当子协程退出时 ,记录退出 wg.Done()
  • help 中再另起一个协程 wg.Wait() 等待所有子协程退出,并将 ch 变量写入值
  • 主协程阻塞读取 ch 变量的值,待所有子协程都退出之后,help 中写入到 ch 中的数据,主协程就能马上收到 ch 中的数据,并退出程序
func help(f func(bool)error, ok bool, num int)chan struct{}{
    ch := make(chan struct{})
    var wg sync.WaitGroup
    for i:=0; i<num; i++ {
        wg.Add(1)
        go func(){
            f(ok)
            fmt.Println(" f done ")
            wg.Done()
        }() 
    }
    go func(){
        // 等待所有子协程退出
        wg.Wait()
        ch <- struct{}{}
    }()
    return ch
}
func fn(ok bool) error{
    time.Sleep(time.Second * 1)
    if !ok{
        return errors.New("not ok ... ")
    }
    return nil
}
func main(){
    ch := help(fn , true)
    fmt.Println("help 111")
     <- ch 
    fmt.Println("help 111 done ",err)
}

notify-and-wait 模式

可以看到上述模式,都是主协程等待一个子协程,或者多个子协程结束后,主协程再进行退出,或者处理完垃圾回收后退出

那么如果主协程要主动通知子协程退出,我们应该要如何处理呢?

同样的问题,如果主协程自己退出了,而没有通知其他子协程退出,这是会导致业务数据异常或者丢失的,那么此刻我们就可以使用到 notify-and-wait 模式 来进行处理

我们就直接来写一个主协程通知并等待多个子协程退出的 demo:

  • 主协程调用 help 函数,得到一个 quit chan struct{} 类型的通道变量,主协程阻塞读取 quit 的值
  • help 函数根据传入的参数 num 来创建 num 个子协程,并且使用 sync.WaitGroup 来控制
  • 当主协程在 quit 通道中写入数据时,主动通知所有子协程退出
  • help 中的另外一个协程读取到 quit 通道中的数据,便 close 掉 j 通道,触发所有的子协程读取 j 通道值的时候,得到的 ok 为 false,进而所有子协程退出
  • wg.Wait() 等待所有子协程退出后,再在 quit 中写入数据
  • 主协程此时从 quit 中读取到数据,则知道所有子协程全部退出,自己的主协程即刻退出
func fn(){
   // 模拟在处理业务
   time.Sleep(time.Second * 1)
}
func help(num int, f func()) chan struct{}{
   quit := make(chan struct{})
   j := make(chan int)
   var wg sync.WaitGroup
   // 创建子协程处理业务
   for i:=0;i<num;i++{
      wg.Add(1)
      go func(){
         defer wg.Done()
         _,ok:=<-j
         if !ok{
            fmt.Println("exit child goroutine .")
            return
         }
         // 子协程 正常执行业务
         f()
      }()
   }
   go func(){
      <-quit
      close(j)
      // 等待子协程全部退出
      wg.Wait()
      quit <- struct{}{}
   }()
   return quit
}
func main(){
   quit := help(10, fn)
   // 模拟主程序处理在处理其他事项
   // ...
   time.Sleep(time.Second * 10)
   quit <- struct{}{}
   // 此处等待所有子程序退出
   select{
   case <- quit:
      fmt.Println(" programs exit. ")
   }
}

上述程序执行结果如下,可以看到 help 函数创建了 10 个子协程,主协程主动通知子协程全部退出,退出的时候也是 10 个子协程退出了,主协程才退出

上述程序,如果某一个子协程出现了问题,导致子协程不能完全退出,也就是说某些子协程在 f 函数中阻塞住了,那么这个时候主协程岂不是一直无法退出???

那么此时,在主协程通知子协程退出的时候,我们加上一个超时时间,表达意思为,超过某个时间,如果子协程还没有全部退出完毕,那么主协程仍然主动关闭程序,可以这样写:

设定一个定时器, 3 秒后会触发,即可以从 t.C 中读取到数据

t := time.NewTimer(time.Second * 3)
defer t.Stop()
// 此处等待所有子程序退出
select{
case <-t.C:
   fmt.Println("timeout programs exit. ")
case <- quit:
   fmt.Println(" 111 programs exit. ")
}

管道模式

说到管理,或许大家对 linux 里面的管道更加熟悉吧,例如使用 linux 命令找到文件中的 golang 这个字符串

cat xxx.txt |grep "golang"

那么对于 GO 语言并发模式中的管道模式也是类似的效果,我们就可以用这个管道模式来过滤数据

例如我们可以设计这样一个程序,兄弟们可以动起手来写一写,评论区见哦:

  • 整个程序总共使用 2 个通道
  • help 函数中传输数据量 50 ,逻辑计算能够被 5 整除的数据写到第一个通道 ch1 中
  • 另一个协程阻塞读取 ch1 中的内容,并将取出的数据乘以 3 ,将结果写入到 ch2 中
  • 主协程就阻塞读取 ch2 的内容,读取到内容后,挨个打印出来

管道模式有两种模式,扇出模式 和 扇入模式,这个比较好理解

  • 扇出模式:多种类型的数据从同一个通道 channel 中读取数据,直到通道关闭
  • 扇入模式:输入的时候有多个通道channel,程序将所有的通道内数据汇聚,统一输入到另外一个通道channel A 里面,另外一个程序则从这个通道channel A 中读取数据,直到这个通道A关闭为止

超时模式和取消模式化

超时模式

上述例子中有专门说到如何去使用他,实际上我们还可以这样用:

select{
case <- time.Afer(time.Second * 2):
   fmt.Println("timeout programs exit. ")
case <- quit:
   fmt.Println(" 111 programs exit. ")
}

取消模式

则是使用了 GO 语言的 context 包中的提供了上下文机制,可以在协程 goroutine 之间传递 deadline,取消等信号

我们使用的时候例如可以这样:

  • 使用 context.WithCancel 创建一个可以被取消的上下文,启动一个协程 在 3 秒后关闭上下文
  • 使用 for 循环模拟处理业务,默认会走 select 的 default 分支
  • 3 秒后 走到 select 的 ctx.Done(),则进入到了取消模式,程序退出
ctx, cancelFunc := context.WithCancel(context.Background())
go func() {
   time.Sleep(time.Second * 3)
   cancelFunc()
}()
for {
   select {
   case <-ctx.Done():
      fmt.Println("program exit .")
      return
   default:
      fmt.Println("I'm still here.")
      time.Sleep(time.Second)
   }
}

总的来说,今天分享了 GO 语言中常见的几种并发模式:创建模式,退出模式,管道模式,超时模式和取消模式,更多的,还是要我们要思考其原理和应用起来,学习他们才能更加的有效

以上就是Go语言中的并发模式你了解了吗的详细内容,更多关于go并发模式的资料请关注好代码网其它相关文章!

标签: go 并发模式