Golang并发绕不开的重要组件之Goroutine详解

并发指的是程序由若干个独立运行的代码组成,主要依赖于多核CPU的并行运算和调度能力。 Golang在并发方面的能力较为突出,通过Goroutinue实现了典型的

并发指的是程序由若干个独立运行的代码组成,主要依赖于多核CPU的并行运算和调度能力。

Golang在并发方面的能力较为突出,通过Goroutinue实现了典型的协程的概念。Golang的并发理念是:通过通信来共享内存,而不是通过共享内存来通信。

goroutinue与传统线程的区别

主要体现在四个方面:

1.内存占用不同:Goroutinue的创建消耗2kb内存,并且在栈空间不足时会自动扩容;而线程默认会占用较大的栈空间(1-8MB),且栈空间大小不变,会有溢出风险

2.开销不同:goroutinue创建和销毁消耗都非常小,是用户态线程;而线程的创建和销毁都会有巨大的消耗,是内核级交互

3.调度切换不同:goroutinue切换消耗200ns,在1.4后优化至20ns;而线程切换消耗1000-15000纳秒

4.复杂性不同:goroutinue简单易用,M个线程托管N个goroutinue;线程创建和退出复杂,多个线程间通讯复杂,使用网络多路复用,应用服务线程门槛高

如果想实现一个并发程序,要考虑几个方面:

程序代码如何独立运行?

独立运行的代码如何进行通信?

如何做到数据同步、调度同步?

这就引出了Golang并发编程中的几个重要组件:Goroutinue、Channel、Context、Sync

Goroutine

Golang中并发执行的单元称为Goroutinue,也就是Go协程

使用方法非常简单,使用go关键字即可启动新的Goroutinue

示例代码:

func main() {
    // 输出奇数
    printOdd := func() {
        for i := 1; i <= 10; i += 2 {
            fmt.Println(i)
            time.Sleep(100 * time.Millisecond)
        }
    }

    // 输出偶数
    printEven := func() {
        for i := 2; i <= 10; i += 2 {
            fmt.Println(i)
            time.Sleep(100 * time.Millisecond)
        }
    }

    go printOdd()
    go printEven()

    // 阻塞等待
    time.Sleep(time.Second)
}

执行结果:

1 2 4 3 6 5 7 8 10 9 

我们只需要一个go关键字就可以非常简便的启动一个Goroutinue协程。最后程序睡眠1秒,原因是主Goroutinue(main函数)需要等待内部Goroutinue运行结束才能结束,否则子Goroutinue程序可能执行一半会被强制停止。

调度的随机性

通过结果可以看到数字的输出顺序并不是按照一定顺序,因为Goroutinue的调度执行是随机的。

Goroutinue的并发规模

goroutinue本身的数量是无上限的,但是一定会受到栈内存空间以及操作系统的资源限制,可以通过函数 runtime.NumGoroutine()获取当前Goroutinue数量。前面也提到过,一个Goroutinue初始的栈内存只有2KB,用于保存Goroutinue中的执行数据,且栈内存可以扩容,按需增大或缩小,单个Goroutinue最大可以扩展到1GB。

上面通过time sleep的方法太傻了,我们可以通过官方提供的 sync.WaitGroup 来实现Goroutinue的协同调度。

sync.WaitGroup

sync.WaitGroup用于等待一组Goroutinue执行完毕,其实是一个计数器思想的实现方案,它的核心方法有三个:

  • Add():调用此函数用于增加等待的Goroutinue数量,原子操作保证并发安全
  • Done():调用此函数用于减去一个计数,原子操作保证并发安全
  • Wait():调用此函数用于阻塞,直到所有的Goroutinue完成,也就是计数器归0时,才会解除阻塞状态

现在我们将上面的代码最后一行的 time.Sleep(time.Second) 去掉,再次执行会得到空的输出,原因就是主Goroutinue直接结束,两个子Goroutinue没来得及执行就已经退出了,让我们用 WaitGroup 来改造一下

示例代码:

func main() {
    wg := sync.WaitGroup{}
    // 输出奇数
    printOdd := func() {
        defer wg.Done()
        for i := 1; i <= 10; i += 2 {
            fmt.Printf("%d ", i)
            time.Sleep(100 * time.Millisecond)
        }
    }
    // 输出偶数
    printEven := func() {
        defer wg.Done()
        for i := 2; i <= 10; i += 2 {
            fmt.Printf("%d ", i)
            time.Sleep(100 * time.Millisecond)
        }
    }
    wg.Add(2)
    go printOdd()
    go printEven()
    // 阻塞等待
    fmt.Println("waiting...")
    wg.Wait()
    fmt.Println("\nfinish...")
}

执行结果:

waiting...
2 1 3 4 6 5 7 8 9 10 
finish...

这个简单的例子可以比较直观的展示waitGroup的基础用法。waitGroup 适用于一个主Goroutinue需要等待其他Goroutinue全部运行结束后才结束的这种场景,不适用于主Goroutinue需要结束,而通知其他Goroutinue结束的情景。

在这里有个使用上的注意事项,那就是 waitGroup 不要复制使用,因为内部维护的计数器不能修改,否则会造成Goroutinue的泄露,在传值时需要用指针类型来进行传递。

waitGroup的内部结构

可以进入源码查看内部结构:

type WaitGroup struct {
   noCopy noCopy
   // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
   // 64-bit atomic operations require 64-bit alignment, but 32-bit
   // compilers only guarantee that 64-bit fields are 32-bit aligned.
   // For this reason on 32 bit architectures we need to check in state()
   // if state1 is aligned or not, and dynamically "swap" the field order if
   // needed.
   state1 uint64
   state2 uint32
}

可以看到并不是一个复杂的结构,其中含义:

  • noCopy: 用于保证不会被复制
  • state1: 以64bit计算机为例,高32bit是计数器
  • state2: 以64bit计算机为例,低32bit是等待的Goroutinue

三大关键函数核心代码:

func (wg *WaitGroup) Add(delta int) {
   ...
   state := atomic.AddUint64(statep, uint64(delta)<<32)
   ...
}
func (wg *WaitGroup) Done() {
   wg.Add(-1)
}
func (wg *WaitGroup) Wait() {
   ...
   for {
      state := atomic.LoadUint64(statep)
      v := int32(state >> 32)
      w := uint32(state)
      if v == 0 {
         // Counter is 0, no need to wait.
         if race.Enabled {
            race.Enable()
            race.Acquire(unsafe.Pointer(wg))
         }
         return
      }
      // Increment waiters count.
      if atomic.CompareAndSwapUint64(statep, state, state+1) {
         if race.Enabled && w == 0 {
            // Wait must be synchronized with the first Add.
            // Need to model this is as a write to race with the read in Add.
            // As a consequence, can do the write only for the first waiter,
            // otherwise concurrent Waits will race with each other.
            race.Write(unsafe.Pointer(semap))
         }
         runtime_Semacquire(semap)
         if *statep != 0 {
            panic("sync: WaitGroup is reused before previous Wait has returned")
         }
         if race.Enabled {
            race.Enable()
            race.Acquire(unsafe.Pointer(wg))
         }
         return
      }
   }
}

到此这篇关于Golang并发绕不开的重要组件之Goroutine详解的文章就介绍到这了,更多相关Golang Goroutine内容请搜索好代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持好代码网!

标签: Go Goroutine