goroutine调度

0.1、索引

https://blog.waterflow.link/articles/1662974432717

1、进程

一个进程包含可以由任何进程分配的公共资源。这些资源包括但不限于内存地址空间、文件句柄、设备和线程。

一个进程会包含下面一些属性:

2、线程

线程是轻量级的进程,一个线程将在进程内的所有线程之间共享进程的资源,如代码、数据、全局变量、文件和内存地址空间。但是栈和寄存器不会共享,每个线程都有自己的栈和寄存器

线程的优点:

3、用户级线程

用户级线程也称为绿色线程,如:C 中的 coroutine、Go 中的 goroutine 和 Ruby 中的 Fiber
http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662974591.png

该进程维护一个内存地址空间,处理文件,以及正在运行的应用程序的设备和线程。操作系统调度程序决定哪些线程将在任何给定的 CPU 上接收时间

因此,与耗时和资源密集型的进程创建相比,在一个进程中创建多个用户线程(goroutine)效率更高。

4、goroutine

在 Go 中用户级线程被称作 Goroutine,在创建 goroutine 时需要做到:

其中阻塞调用可能是下面一些原因:

为什么 go 需要调度 goroutine?

Go 使用称为 goroutine 的用户级线程,它比内核级线程更轻且更便宜。 例如,创建一个初始 goroutine 将占用 2KB 的堆栈大小,而内核级线程将占用 8KB 的堆栈大小。 还有,goroutine 比内核线程有更快的创建、销毁和上下文切换,所以 go 调度器 需要退出来调度 goroutine。OS 不能调度用户级线程,OS 只知道内核级线程。 Go 调度器 将 goroutine 多路复用到内核级线程,这些线程将在不同的 CPU 内核上运行

什么时候会调度 goroutine?

如果有任何操作应该或将会影响 goroutine 的执行,比如 goroutine 的启动、等待执行和阻塞调用等……

go 调度 如何将 goroutine 多路复用到内核线程中?

1、1:1 调度(1 个线程对应一个 goroutine)

2、N:1 调度(在单个内核线程上多路复用所有 goroutine)

我们看下下面的例子,只为 go 分配了 1 个 processer 去处理 2 个 goroutine:

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

func main() {
	// 分配 1 个逻辑处理器供调度程序使用
	runtime.GOMAXPROCS(1)
	var wg sync.WaitGroup
	wg.Add(2)

	fmt.Println("Starting Goroutines")

	// 开一个go协程打印字母
	go func() {
		defer wg.Done()
		time.Sleep(time.Second)
		// 打印3次字母
		for count := 0; count < 3; count++ {
			for ch := 'a'; ch < 'a'+26; ch++ {
				fmt.Printf("%c ", ch)
			}
			fmt.Println()
		}
	}()

	// 开一个go协程打印数字
	go func() {
		defer wg.Done()
		// 打印3次数字
		for count := 0; count < 3; count++ {
			for n := 1; n <= 26; n++ {
				fmt.Printf("%d ", n)
			}
			fmt.Println()
		}
	}()

	// 等待返回
	fmt.Println("Waiting To Finish")
	wg.Wait()
	fmt.Println("\nTerminating Program")
}

看下结果:

go run main.go
Starting Goroutines
Waiting To Finish
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 
a b c d e f g h i j k l m n o p q r s t u v w x y z 
a b c d e f g h i j k l m n o p q r s t u v w x y z 
a b c d e f g h i j k l m n o p q r s t u v w x y z 

Terminating Program

可以看到这俩个 goroutine 是串行执行的,要么先完成第一个 goroutine,要么先完成第二个 goroutine,并不是并发执行的。

那如何去实现并发执行呢?

我们同样设置 runtime.GOMAXPROCS 为 1,但是在 goroutine 中我们在不同的时机加入阻塞 goroutine 的时间函数 time.Sleep,我们看下会有什么不同的结果。

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

func main() {
	// 分配 1 个逻辑处理器供调度程序使用
	runtime.GOMAXPROCS(1)
	var wg sync.WaitGroup
	wg.Add(2)

	fmt.Println("Starting Goroutines")

	// 开一个go协程打印字母
	go func() {
		defer wg.Done()
		time.Sleep(time.Second)
		// 打印3次字母
		for count := 0; count < 3; count++ {
			for ch := 'a'; ch < 'a'+26; ch++ {
				if count == 0 {
					time.Sleep(10 * time.Millisecond)
				}
				if count == 1 {
					time.Sleep(30 * time.Millisecond)
				}
				if count == 2 {
					time.Sleep(50 * time.Millisecond)
				}
				fmt.Printf("%c ", ch)
			}
			fmt.Println()
		}
	}()

	// 开一个go协程打印数字
	go func() {
		defer wg.Done()
		// 打印3次数字
		for count := 0; count < 3; count++ {
			for n := 1; n <= 26; n++ {
				if count == 0 {
					time.Sleep(20 * time.Millisecond)
				}
				if count == 1 {
					time.Sleep(40 * time.Millisecond)
				}
				if count == 2 {
					time.Sleep(60 * time.Millisecond)
				}
				fmt.Printf("%d ", n)
			}
			fmt.Println()
		}
	}()

	// 等待返回
	fmt.Println("Waiting To Finish")
	wg.Wait()
	fmt.Println("\nTerminating Program")
}

看下结果:

go run main.go
Starting Goroutines
Waiting To Finish
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 
1 2 3 4 5 6 7 8 9 10 11 a 12 b c d e 13 f g h i 14 j k l m 15 n o p 16 q r s t 17 u v w x 18 y z 
19 a b 20 c 21 d 22 e f 23 g 24 h 25 i j 26 
k l 1 m n 2 o p 3 q r 4 s t 5 u v 6 w x 7 y z 
8 a 9 b 10 c 11 d 12 e f 13 g 14 h 15 i 16 j 17 k l 18 m 19 n 20 o 21 p 22 q r 23 s 24 t 25 u 26 
v w x y z 

Terminating Program

通过上面的结果我们可以看到,当 goroutine1 阻塞时,go 调度器会调度 goroutine2 执行。

我们可以得出:

3、线程池

4、M: N 线程共享运行队列调度(GMP)

我们上面提到过导致 goroutine 阻塞调用可能是下面一些原因:

下面看一些 goroutine 阻塞的例子:

package main

import (
	"fmt"
	"io/ioutil"
	"net/http"
	"os"
	"sync"
	"time"
)

// 全局变量
var worker int

func writeToFile(wg *sync.WaitGroup) {
	defer wg.Done()

	file, _ := os.OpenFile("file.txt", os.O_RDWR|os.O_CREATE, 0755)           // 系统调用阻塞
	resp, _ := http.Get("https://blog.waterflow.link/articles/1662706601117") // 网络IO阻塞
	body, _ := ioutil.ReadAll(resp.Body)                                      // 系统调用阻塞

	file.WriteString(string(body))
}

func workerCount(wg *sync.WaitGroup, m *sync.Mutex, ch chan string) {
	// Lock() 给共享资源上锁
	// 独占访问状态,
	// 增加worker的值,
	// Unlock() 释放锁
	m.Lock() // Mutex阻塞
	worker = worker + 1
	ch <- fmt.Sprintf("Worker %d is ready", worker)
	m.Unlock()

	// 返回, 通知WaitGroup完成
	wg.Done()
}

func printWorker(wg *sync.WaitGroup, done chan bool, ch chan string) {

	for i := 0; i < 100; i++ {
		fmt.Println(<-ch) // Channel阻塞
	}
	wg.Done()
	done <- true
}

func main() {

	ch := make(chan string)
	done := make(chan bool)

	var mu sync.Mutex

	var wg sync.WaitGroup

	for i := 1; i <= 100; i++ {
		wg.Add(1)
		go workerCount(&wg, &mu, ch)
	}

	wg.Add(2)
	go writeToFile(&wg)
	go printWorker(&wg, done, ch)

	wg.Wait()

	<-done // Channel阻塞

	<-time.After(1 * time.Second) // Timer阻塞
	close(ch)
	close(done)
}

下面我们看看 go 调度器在上面这些例子中是如何工作的:

  1. M1,M2,M3 尝试从全局 G 队列中获取 G
  2. M1 获取锁并拿到 G1,然后释放锁
  3. M3 获取锁拿到 G2,然后释放锁
  4. M2 获取锁拿到 G3,然后释放锁
  5. G1 在 ch1 的 channel 中阻塞,然后添加到 ch1 的等待队列。导致 M1 空闲
  6. M1 不能闲着,从全局队列获取锁拿到 G4,然后释放锁
  7. G3 阻塞在 ch2 的 channel 中,然后被放到 ch2 的等待队列。导致 M2 空闲
  8. M2 获取锁拿到 G5,然后释放锁
  9. 此时 G3 在 ch2 结束阻塞,被放到全局队列尾部等待执行
  10. G1 在 ch1 结束阻塞,被放到全局队列尾部等待执行
  11. G4,G5,G2 执行完成
  12. M1,M2,M3 重复步骤 1-4

操作系统可以支持多少内核线程?

在 Linux 内核中,此参数在文件 /proc/sys/kernel/threads-max 中定义,该文件用于特定内核。

sh:~$ cat /proc/sys/kernel/threads-max 94751
这里输出 94751 表示内核最多可以执行 94751 个线程

每个 Go 程序可以支持多少个 goroutine?

调度中没有内置对 goroutine 数量的限制。

每个 GO 程序 可以支持多少个内核线程?

默认情况下,运行时将每个程序限制为最多 10,000 个线程。可以通过调用 runtime/debug 包中的 SetMaxThreads 函数来更改此值。

总结:

  1. 内核线程数可以多于内核数
  2. 轻量级 goroutine
  3. 处理 IO 和系统调用
  4. goroutine 并行执行
  5. 不可扩展(所有内核级线程都尝试使用互斥锁访问全局运行队列。因此,由于竞争,这不容易扩展)

5、M:N 线程分布式运行队列调度器

为了解决每个线程同时尝试访问互斥锁的可扩展问题,维护每个线程的本地运行队列

  1. M1,M2,M3,M4 扫描本地可运行队列
  2. M1,M2,M3,M4 从各自的本地队列取出 G4,G6,G1,G3

从上面的动图可以看到:

结论:

  1. 轻量级 goroutine
  2. 处理 IO 和 SystemCalls
  3. goroutine 并行执行
  4. 可扩展
  5. 高效

如果线程数大于内核数,那么会有什么问题呢?

在分布式运行队列调度中,我们知道每个线程都有自己的本地运行队列,其中包含有关接下来将执行哪个 goroutine 的信息。 同样由于系统调用,线程数会增加,并且大多数时候它们的本地运行队列是空的。 因此,如果线程数大于核心数,则每个线程必须扫描所有线程本地运行队列,并且大部分时间它们是空的,所以如果线程过多,这个过程是耗时的并且解决方案 效率不高,因此我们需要将线程扫描限制为使用 M:P:N 线程模型求解的常数。

6、M:P: N 线程

如何检查逻辑处理器的数量?

package main

import (
	"fmt"
	"runtime"
)

func main() {
	fmt.Println(runtime.NumCPU())
}

分布式 M:P:N 调度例子
http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662974709.gif

  1. M1,M2 各自扫描 P1,P2 的队列
  2. M1,M2 从各自的 P1,P2 中取出 G3,G1 执行

在系统调用期间执行 P 的切换
http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662974724.gif

  1. M1,M2 各自扫描 P1,P2 的队列
  2. M1,M2 从各自的 P1,P2 中取出 G3,G1 执行
  3. G1 即将进入系统调用,所以在这之前 G1 会唤醒另一个线程 M3,并将 P2 切换到 M3
  4. M3 扫描 P2 并取出 G2 运行
  5. 一旦 G1 变为非阻塞,它将被推送到全局队列等待运行

在 work-stealing 期间,只需要扫描固定数量的队列,因为逻辑处理器的数量是有限的。

如何选择下一个要运行的 goroutine ?

Go 调度器 将按以下顺序检查以选择下一个要执行的 goroutine

总结:

Go 调度的局限性

翻译自:

https://mukeshpilaniya.github.io/posts/Go-Schedular/