singleFlight源码解析

是什么?

SingleFlight 作用是将并发请求合并成一个请求,以减少对下层服务的压力

应用场景

  1. 大量的请求同时过来,查询一个已经失效的缓存,导致大量请求打到数据库(缓存击穿)

如何使用?

现在有一个查询接口,从 Redis 中读取数据,如果没有数据,就会从数据库读并设置到 Redis 中。假如现在有大量请求过来,同时发现 Redis 中没有数据,导致请求都打到下层的数据库,对数据库造成压力。

实际上这些请求可以共享一个结果。

这个时候就需要使用 singleFlight 合并这些请求。

设置一个读取缓存的接口

package main

import (
	"github.com/gin-gonic/gin"
	_ "github.com/go-sql-driver/mysql"
	"golang.org/x/sync/singleflight"
	"gorm.io/driver/mysql"
	"gorm.io/gorm"
	"log"
	"math/rand"
	"net/http"
	"time"
)

type Demo struct {
	Id        int
	Name      string
	UpdatedAt int
	CreatedAt int
	IsDeleted int
}

var demo Demo
var db *gorm.DB

var singleFlight singleflight.Group

// 获取并设置数据到缓存
func getAndSetCache(requestId int64, cacheKey string) (Demo, error) {
	log.Printf("request %v start to get and set cache...", requestId)

	// do的入参key,可以直接使用缓存的key,这样同一个缓存,只有一个协程会去读DB
	value, _, _ := singleFlight.Do(cacheKey, func() (ret interface{}, err error) {

		db.Table("demo").First(&demo)
		log.Println("只有一个协程获取到了数据库值:", demo)

		return demo, nil

	})

	return value.(Demo), nil
}

func init() {
	dsn := "root:liufutian@tcp(127.0.0.1:3306)/test?charset=utf8mb4&parseTime=True&loc=Local"
	db1, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
	if err != nil {
		panic(err)
	}

	db = db1
	sqlDB, err := db.DB()
	sqlDB.SetMaxIdleConns(10)
	sqlDB.SetMaxOpenConns(100)
	sqlDB.SetConnMaxLifetime(time.Hour)
	sqlDB.SetConnMaxIdleTime(time.Hour)
}

func main() {
	router := gin.New()
	router.Use(gin.Recovery())

	router.GET("/demo/list", func(c *gin.Context) {
		cache, _ := getAndSetCache(rand.Int63(), "cacheKey")
		c.JSON(http.StatusOK, cache)
	})

	router.Run(":9501")
}

开启服务

go run single_flight.go
[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
 - using env:	export GIN_MODE=release
 - using code:	gin.SetMode(gin.ReleaseMode)

[GIN-debug] GET    /demo/list                --> main.main.func1 (2 handlers)
[GIN-debug] [WARNING] You trusted all proxies, this is NOT safe. We recommend you to set a value.
Please check https://pkg.go.dev/github.com/gin-gonic/gin#readme-don-t-trust-all-proxies for details.
[GIN-debug] Listening and serving HTTP on :9501

并发测试

ab -c 10 -n 20 http://localhost:9501/demo/list

输出结果

2022/08/12 22:54:57 request 5577006791947779410 start to get and set cache...
2022/08/12 22:54:57 只有一个协程获取到了数据库值 {1 liu 33 1 0}
2022/08/12 22:54:57 request 8674665223082153551 start to get and set cache...
2022/08/12 22:54:57 request 6129484611666145821 start to get and set cache...
2022/08/12 22:54:57 request 4037200794235010051 start to get and set cache...
2022/08/12 22:54:57 request 3916589616287113937 start to get and set cache...
2022/08/12 22:54:57 request 6334824724549167320 start to get and set cache...
2022/08/12 22:54:57 request 1443635317331776148 start to get and set cache...
2022/08/12 22:54:57 request 605394647632969758 start to get and set cache...
2022/08/12 22:54:57 request 2775422040480279449 start to get and set cache...
2022/08/12 22:54:57 request 894385949183117216 start to get and set cache...
2022/08/12 22:54:57 request 4751997750760398084 start to get and set cache...
2022/08/12 22:54:57 只有一个协程获取到了数据库值 {1 liu 33 1 0}
2022/08/12 22:54:57 request 7504504064263669287 start to get and set cache...
2022/08/12 22:54:57 request 1976235410884491574 start to get and set cache...
2022/08/12 22:54:57 request 3510942875414458836 start to get and set cache...
2022/08/12 22:54:57 request 2933568871211445515 start to get and set cache...
2022/08/12 22:54:57 request 4324745483838182873 start to get and set cache...
2022/08/12 22:54:57 request 2703387474910584091 start to get and set cache...
2022/08/12 22:54:57 request 2610529275472644968 start to get and set cache...
2022/08/12 22:54:57 request 6263450610539110790 start to get and set cache...
2022/08/12 22:54:57 request 2015796113853353331 start to get and set cache...
2022/08/12 22:54:57 只有一个协程获取到了数据库值 {1 liu 33 1 0}

可以看到同时开了 10 个协程,共 20 个并发请求,但是真正读数据库的只有 3 个。达到了我们的预期效果。

源码解读

上面的使用中可以看到,我们用到了 singleFlightDo 方法

var singleFlight singleflight.Group

// 获取并设置数据到缓存
func getAndSetCache(requestId int64, cacheKey string) (Demo, error) {
	log.Printf("request %v start to get and set cache...", requestId)

	// do的入参key,可以直接使用缓存的key,这样同一个缓存,只有一个协程会去读DB
	value, _, _ := singleFlight.Do(cacheKey, func() (ret interface{}, err error) {

第一行初始化了一个 Group 结构体

type Group struct {
	// 互斥锁,用来保护m的
	mu sync.Mutex
	// 一个懒加载的map,保存的是call的引用
	m map[string]*call
}

我们再看这个 call 里面都有什么

// 正在flight,或者已经flight完成的结构体
type call struct {
	// 并发控制,等待一组goroutine完成,然后返回
	wg sync.WaitGroup

	// 回调函数的返回参数
	// 这些字段在 WaitGroup 完成之前写入一次
	// 并且仅在 WaitGroup 完成后读取。
	val interface{}
	err error

	// forgotten indicates whether Forget was called with this call's key
	// while the call was still in flight.
	forgotten bool

	// 重复调用次数的计数器
	dups int
	// 针对DoChan函数的channel数组
	chans []chan<- Result
}

// 保存了DoChan的结果
type Result struct {
	Val    interface{}
	Err    error
	Shared bool
}

Do 方法

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	// 并发进来先上锁
	g.mu.Lock()
	// 如果没有初始化,在这里初始化m
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	// 如果m中已经有值
	if c, ok := g.m[key]; ok {
		// 数量+1,释放锁并等待
		c.dups++
		g.mu.Unlock()
		c.wg.Wait()

		if e, ok := c.err.(*panicError); ok {
			panic(e)
		} else if c.err == errGoexit {
			runtime.Goexit()
		}
		// 唤醒后返回
		return c.val, c.err, true
	}
	// 初始化call
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	// 同一批gorutine中只会执行一次
	g.doCall(c, key, fn)
	return c.val, c.err, c.dups > 0
}

doCall 方法

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	normalReturn := false
	recovered := false

	defer func() {
		if !normalReturn && !recovered {
			c.err = errGoexit
		}

    // 执行完成
		c.wg.Done()
		g.mu.Lock()
		defer g.mu.Unlock()
		if !c.forgotten {
      // 安全删m中的key
			delete(g.m, key)
		}

		if e, ok := c.err.(*panicError); ok {
			...
		} else if c.err == errGoexit {
			// Already in the process of goexit, no need to call again
		} else {
			// Normal return
			for _, ch := range c.chans {
				ch <- Result{c.val, c.err, c.dups > 0}
			}
		}
	}()

	func() {
		...
  
    // 执行回调函数
		c.val, c.err = fn()
    // 正常返回设置为true
		normalReturn = true
	}()

	if !normalReturn {
		recovered = true
	}
}

当执行 Do 方法的时候

  1. 因为要对 map 操作所以先上锁,检查是否初始化 m,没有就初始化 m。
  2. 检查 m 中是否有对应的 key,如果没有,初始化 call,wg+1(因为这一批请求进来只会有一个真正执行,所以 wg 设置为 1),并存到 m 中,注意此时锁会释放掉
  3. 锁释放对应上面 27 行,假如有 10 个请求同时进来,有 1 个请求执行了前 2 步,并释放锁。剩下的 9 个请求会进入 if 分支,对应上面的 15 行。所以从这里开始,会有两条分支出去。第一条分支对应真正执行 doCall 的 goroutine,第二条分支是剩下的 9 个 goroutine 等待第一条分支执行 c.wg.Done()。下面我们具体说下这两条分支:
  4. 第二条分支在第一条分支执行 doCall 的同时,剩下的 9 个 goroutine 获取锁,重复调用次数 c.dups 每次加 1,释放锁,等待第一条分支执行 c.wg.Done()
  5. 第一条分支会执行 doCall 方法,进去之后执行回调函数,返回值保存到 call 中的 valerr 中,设置 normalReturntrue。然后进入到 defer,执行 c.wg.Done(),并删除 m 中对应的 key
  6. 当第一条分支执行 c.wg.Done(),第二条分支唤醒并返回已经被赋值的 c.val, c.err
  7. 此时整个流程执行结束,只请求了一次数据库操作,剩下的 goroutine 复用返回值

do 方法核心示例

package main

import (
	"fmt"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

type Group struct {
	m  map[string]string
	mu sync.Mutex
}

var wg sync.WaitGroup
var i int
var k = "key"
var ch = make(chan struct{})

func (g *Group) do() {
	for i = 0; i < 10; i++ {
		go func(i int) {
			g.mu.Lock()
			if g.m == nil {
				g.m = make(map[string]string)
			}
			if _, ok := g.m[k]; ok {
				g.mu.Unlock()
				wg.Wait()
				fmt.Println("完成了烙铁", i)
			} else {
				g.m[k] = ""
				g.mu.Unlock()
				wg.Add(1)
				fmt.Println("控制并发流程+1")
				// 处理redis或者mysql,耗时操作
				time.Sleep(time.Second * 3)
				wg.Done()
			}

		}(i)

	}

}

func main() {
	g := &Group{}
	g.do()
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, syscall.SIGUSR2)
	<-sig
}

返回结果:

go run waitgroup.go
控制并发流程+1
完成了烙铁 1
完成了烙铁 0
完成了烙铁 8
完成了烙铁 7
完成了烙铁 9
完成了烙铁 5
完成了烙铁 6
完成了烙铁 2
完成了烙铁 4