go channel原理及使用场景

源码解析

type hchan struct {
	qcount   uint           // Channel 中的元素个数
	dataqsiz uint           // Channel 中的循环队列的长度
	buf      unsafe.Pointer // Channel 的缓冲区数据指针
	elemsize uint16         // 当前 Channel 能够收发的元素大小
	closed   uint32
	elemtype *_type // 当前 Channel 能够收发的元素类型
	sendx    uint   // Channel 的发送操作处理到的位置
	recvx    uint   // Channel 的接收操作处理到的位置
	recvq    waitq  // 当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,双向链表(sugog)
	sendq    waitq  // 当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,双向链表(sugog)

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662024619.png

创建 channel

channel 的初始化有 2 种,一种是没有缓冲区的 channel,一种是有缓冲区的 channel。对应的初始化之后 hchan 也是有区别的。

无缓冲区的 channel,初始化的时候只为 channel 分配内存,缓冲区 dataqsiz 的长度为 0

有缓冲的 channel,初始化时会为 channel 和缓冲区分配内存,dataqsiz 长度大于 0

同时 channel 的元素大小和缓冲区的长度都是有大小限制的

func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

	// 如果内存超了,或者分配的内存大于channel最大分配内存,或者分配的size小于0,直接Panic
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case mem == 0:
		// 如果没有缓冲区,分配一段内存
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// 有缓冲时,如果元素不包含指针类型,会为当前的 Channel 和底层的数组分配一块连续的内存空间
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// 有缓冲区,且元素包含指针类型,channel和buf数组各自分配内存
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	// 元素大小,元素类型,循环数组长度,更新到channel
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c
}

发送数据(ch <- i)

接收数据(<- ch)

关闭 channel

使用场景

报错情形

1、一个经典的算法题

有 4 个 goroutine,编号为 1、2、3、4。每秒钟会有一个 goroutine 打印出自己的编号,要求写一个程序,让输出的编号总是按照 1、2、3、4、1、2、3、4...的顺序打印出来

package main

import (
	"fmt"
	"time"
)

func main() {
	// 4个channel
	chs := make([]chan int, 4)
	for i, _ := range chs {
		chs[i] = make(chan int)
		// 开4个协程
		go func(i int) {
			for {
				// 获取当前channel值并打印
				v := <-chs[i]
				fmt.Println(v + 1)
				time.Sleep(time.Second)
				// 把下一个值写入下一个channel,等待下一次消费
				chs[(i+1)%4] <- (v + 1) % 4
			}

		}(i)
	}

	// 往第一个塞入0
	chs[0] <- 0
	select {}
}

2、限流器

package main

import (
	"fmt"
	"time"
)

func main() {
	// 每次处理3个请求
	chLimit := make(chan struct{}, 3)
	for i := 0; i < 20; i++ {
		chLimit <- struct{}{}
		go func(i int) {
			fmt.Println("下游服务处理逻辑...", i)
			time.Sleep(time.Second * 3)
			<-chLimit
		}(i)
	}
	time.Sleep(30 * time.Second)
}

如果觉得 sleep 太丑太暴力,可以用 waitGroup 控制结束时机

package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup

func main() {
	// 每次处理3个请求
	chLimit := make(chan struct{}, 3)
	for i := 0; i < 20; i++ {
		chLimit <- struct{}{}
		wg.Add(1)
		go func(i int) {
			fmt.Println("下游服务处理逻辑...", i)
			time.Sleep(time.Second * 3)
			<-chLimit
			wg.Done()
		}(i)
	}
	wg.Wait()
}

3、优雅退出

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func main() {
	var closing = make(chan struct{})
	var closed = make(chan struct{})

	go func() {
		for {
			select {
			case <-closing:
				return
			default:
				fmt.Println("业务逻辑...")
				time.Sleep(1 * time.Second)
			}
		}
	}()

	termChan := make(chan os.Signal)
	// 监听退出信号
	signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
	<-termChan

	// 退出中
	close(closing)

	// 退出之前清理一下
	go doCleanup(closed)

	select {
	case <-closed:
	case <-time.After(time.Second):
		log.Println("清理超时不等了")
	}

	log.Println("优雅退出")
}

func doCleanup(closed chan struct{}) {
	time.Sleep(time.Minute)
	// 清理完后退出
	close(closed)
}

4、实现互斥锁

初始化一个缓冲区为 1 的 channel,放入元素代表一把锁,谁获取到这个元素就代表获取了这把锁,释放锁的时候再把这个元素放回 channel

package main

import (
	"log"
	"time"
)

type Mutex struct {
	ch chan struct{}
}

// 初始化锁
func NewMutex() *Mutex {
	mu := &Mutex{make(chan struct{}, 1)}
	mu.ch <- struct{}{}
	return mu
}

// 加锁,阻塞获取
func (m *Mutex) Lock() {
	<-m.ch
}

// 释放锁
func (m *Mutex) Unlock() {
	select {
	// 成功写入channel代表释放成功
	case m.ch <- struct{}{}:
	default:
		panic("unlock of unlocked mutex")
	}
}

// 尝试获取锁
func (m *Mutex) TryLock() bool {
	select {
	case <-m.ch:
		return true
	default:

	}
	return false
}

func (m *Mutex) LockTimeout(timeout time.Duration) bool {
	timer := time.NewTimer(timeout)

	select {
	case <-m.ch:
		// 成功获取锁关闭定时器
		timer.Stop()
		return true
	case <-timer.C:

	}
	// 获取锁超时
	return false
}

// 是否上锁
func (m *Mutex) IsLocked() bool {
	return len(m.ch) == 0
}

func main() {
	m := NewMutex()
	ok := m.TryLock()
	log.Printf("locked v %v\n", ok)
	ok = m.TryLock()
	log.Printf("locked v %v\n", ok)

	go func() {
		time.Sleep(5 * time.Second)
		m.Unlock()
	}()

	ok = m.LockTimeout(10 * time.Second)
	log.Printf("LockTimeout v %v\n", ok)
}

参考:

极刻时间《go 并发编程实战》