Golang的通道技巧

整理近期遇到的Golang中通道和协程的技巧。

计时器及系统负荷均衡

  • time 包中的 time.Ticker 结构体,该对象以指定的时间间隔重复向结构体中的通道C发送时间值,通道C对用户只读,该对象可通过工厂函数 time.NewTicker(dur int64) 创建, dur是指定的时间间隔,单位为纳秒(ns)。在使协程周期性执行任务(打印状态日志,输出等)时使用。调用 Stop() 使计时器停止,与 select 结合如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
ticker := time.NewTicker(updateInterval)
defer ticker.Stop()
...
select {
case u:= <-ch1:
...
case v:= <-ch2:
...
case <-ticker.C:
logState(status) // call some logging function logState
default: // no value ready to be received
...
}
  • time.Tick() 函数声明为 Tick(d Duration) <-chan Time,该函数返回的通道不需要关闭,它以 d 为周期给返回的通道发送时间,d 是纳秒数。可以通过此函数限制处理频率,如果应对的请求不平稳,可以增加一个带缓冲的可读写通道,从 chRate 中读取处理时钟,在请求暴增时可以快速处理与缓冲数相等的请求,之后处理速度会下降到和 chRate 一样的速率。
1
2
3
4
5
6
7
8
import "time"
rate_per_sec := 10
var dur Duration = 1e9 / rate_per_sec
chRate := time.Tick(dur) // a tick every 1/10th of a second
for req := range requests {
<- chRate // rate limit our Service.Method RPC calls
go client.Call("Service.Method", req, ...)
}
  • 定时器(Timer)定时器和计时器(Ticker)结构体类似(构造函数为 NewTimer(d Duration)),但它只发送一次时间,在 Dration d 之后。

  • time.After(d) 函数声明为 func After(d Duration) <-chan Time,在 Duration d 之后,当前时间被发到返回的通道;因此它和 NewTimer(d).C 等价;它类似 Tick(),但 After() 只发送一次时间。可以使用此函数应对简单的超时模式,以下为三种形式。

    • 要执行某个任务(如从通道 ch 中读取数据),但最多等待1秒。先创建一个信号通道,之后启动一个 lambda 协程,协程在给通道发送数据前休眠:
1
2
3
4
5
6
7
8
9
10
11
12
timeout := make(chan bool, 1)
go func() {
time.Sleep(1e9) // one second
timeout <- true
}()
select {
case <-ch:
// a read from ch has occured
case <-timeout:
// the read from ch has timed out
break
}
  • time.After() 函数替换 timeout-channel。可以在 select 中使用来让发送信号超时或停止协程的执行。以下代码,在 timeoutNs 纳秒后执行 selecttimeout 分支后,包含client.Calllambda 协程也随之结束,不会给通道 ch 返回值。缓冲大小设置为 1 是必要的,可以避免协程死锁以及确保超时的通道可以被垃圾回收。需要注意如果 select 中的某些非定时器选项的通道读写密集,则可能无法结束这些进程。这种情况如果将 select 放到一个 for 循环中,也无法精确地在定时器通道写入时就结束,因为 select 对可以执行的多个 case 采取伪随机算法选择,可能结束进程的时间要比定时器发出信号略晚一些。
1
2
3
4
5
6
7
8
9
ch := make(chan error, 1)
go func() { ch <- client.Call("Service.Method", args, &reply) } ()
select {
case <-time.After(timeoutNs):
// call timed out
break
case resp := <-ch
// use resp and reply
}
  • 假设程序从多个复制的数据库同时读取,只需要接收首先到达的答案,Query 函数获取数据库的连接切片,并行请求每一个数据库并返回收到的第一个响应。结果通道 ch 必须是带缓冲的,以保证第一个发送进来的数据有地方可以存放,确保放入的首个数据总会成功:
1
2
3
4
5
6
7
8
9
10
11
12
func Query(conns []conn, query string) Result {
ch := make(chan Result, 1)
for _, conn := range conns {
go func(c Conn) {
select {
case ch <- c.DoQuery(query):
default:
}
}(conn)
}
return <- ch
}

缓冲通道实现信号量模式

*使用缓冲通道模拟信号量,需满足

  • 带缓冲通道的容量和要同步的资源容量相同
  • 通道的长度(当前存放的元素个数)与当前资源被使用的数量相同
  • 容量减去通道的长度就是未处理的资源个数(标准信号量的整数值)
  • 创建一个可缓冲通道表示单一信号量。
1
2
type Empty interface {}
type semaphore chan Empty
  • 将可用资源的数量N来初始化信号量 semaphore:sem = make(semaphore, N),提供方法从信号量通道中读取、写入。
1
2
3
4
5
6
7
8
9
10
11
12
13
// acquire n resources
func (s semaphore) P(n int) {
e := new(Empty)
for i := 0; i < n; i++ {
s <- e
}
}
// release n resouces
func (s semaphore) V(n int) {
for i:= 0; i < n; i++{
<- s
}
}
  • 一个互斥的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* mutexes */
func (s semaphore) Lock() {
s.P(1)
}
func (s semaphore) Unlock(){
s.V(1)
}
/* signal-wait */
func (s semaphore) Wait(n int) {
s.P(n)
}
func (s semaphore) Signal() {
s.V(1)
}

管道过滤

  • 从通道接收的数据并发送给输出通道,可过滤符合条件的数据。
1
2
3
4
5
6
7
8
9
10
sendChan := make(chan int)
reciveChan := make(chan string)
go filter(sendChan, receiveChan)

func filter(in <-chan int, out chan<- string) {
for inValue := range in {
result := ... /// processing inValue
out <- result
}
}

协程的恢复

  • 以下代码停掉了服务器内部一个失败的协程而不影响其他协程的工作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func server(workChan <-chan *Work) {
for work := range workChan {
go safelyDo(work) // start the goroutine for that work
}
}

func safelyDo(work *Work) {
defer func {
if err := recover(); err != nil {
log.Printf("Work failed with %s in %v", err, work)
}
}()
do(work)
}

参考文献:

原创作品,允许转载,转载时无需告知,但请务必以超链接形式标明文章原始出处(https://forec.github.io/2016/09/08/gochan-accumulate/) 、作者信息(Forec)和本声明。

分享到