1 概述
golang中的并发同步,前置知识——Go的内存模型
2 状态
routine都是主动拉取数据的,例如,单一计数器,共享缓存等等,routine之间共享的数据就是状态。
2.1 非依赖状态
同步的状态,写入的数据不来源于自身。
- sync.Value,在一个routine直接Store,而另外routine直接Load。sync.Value保证读写操作是原子的,也就是不可能出现Value的中间状态。
- sync.Pointer,其实和sync.Value的用法是一样的,只是sync.Value操作的是一个值,而sync.Pointer操作的是一个引用。另外,就绝大部份的情况下,原子读写引用的性能比原子读写值的性能要快得多。
2.2 依赖状态
同步的状态,写入的数据来源于自身
- atomic,在sync.atomic库中,提供了对int32,int64,uint32,uint64的原子add操作(计数器),原子cas操作(乐观锁)。
- 容器,在sync中,目前只提供了Map容器的原子操作,并没有提供像java中的丰富的无锁数据结构,arraylist,linkedlist,queue等等。
- 互斥锁,对多个变量的组合读写操作提供互斥块,只要进入互斥块就会被限制为单一进入,这也被称为悲观锁。
- 读写锁,跟互斥锁类似,只是进一步区分了读和写的操作。只有读时多个读可以同时进入,只要含有写时就只能单一进入。当业务是写少读多的环境时,读写锁的效率就会快得多。
3 通知
routine之间存在着主动提醒和被动通知关系的,例如,生产者消费者模型,订阅发布模型,连接池模型,routine之间共享的提醒管道就是通知。
在普通的java,c模型中,这种同步的通知结构只能通过互斥锁,信号量和条件变量来实现。但是,golang中还额外提供了channel和select的工具来实现,这种工具比起传统的工具简单直观很多。在绝大部份的需要通知的情况下,我们都应该只使用channel和select。
3.1 一次提醒一次通知
一个routine的提醒,对应的只有一个routine的通知。
- 同步:routine的提醒必须存在着另外一个的routine正在等待通知,否则该routine就会一直等待直到。方法就是channel unbuffered。
- 半异步:routine的提醒允许此时没有一个的routine不在等待通知,另外一个routine下次等待通知时直接从channel中获取就可以了。但是,这种方法是大小限制的,当buffer溢出时,routine的提醒就必须要等待。方法就是channel buffered。
- 全异步:routine的提醒允许此时没有一个的routine不在等待通知,而且可以假设buffer的大小是无限大的。routine的提醒在任何时候都是不需要等待。方法就是infinite channel,可以看这里和这里
3.2 多次提醒一次通知
将多次提醒聚合为一次通知,方法其实很简单,将多次通知计数然后输出到另外一个提醒就可以了。
func do(ch chan bool) {
time.Sleep(time.Second * 1)
ch <- true
}
func main() {
ch := make(chan bool)
exit := make(chan bool)
for i := 0; i != 4; i++ {
go do(ch)
}
go func() {
for i := 0; i != 4; i++ {
<-ch
}
exit <- true
}()
fmt.Println(time.Now())
<-exit
fmt.Println(time.Now())
}
ch就是多次提醒的写入channel,exit就是一次通知的读取channel。在这个例子中,你也可以使用sync.Waitgroup来完成同样的操作。
3.3 一次提醒多次通知
一次提醒多次通知是channel中比较巧妙的一个特性,它使用的是close channel的特性。
func do(ch chan bool) {
_, isOk := <-ch
fmt.Println("exit!", isOk)
}
func main() {
ch := make(chan bool)
for i := 0; i != 4; i++ {
go do(ch)
}
close(ch)
time.Sleep(time.Second * 1)
}
对channel进行close操作后,这个channel进行读取操作时就会非阻塞地马上返回,它返回的第二个参数为false,代表channel已经被关闭了。这种一次提醒多次通知的功能常常用于通知关闭多个goroutine,这也是Context中Cancel的实现原理。
但是,要注意的是,对一个已经close的channel进行写入操作时会发生panic。
3.4 select
channel加上select后就能实现很多巧妙的功能
3.4.1 通知组合
func main() {
readMsg := make(chan bool)
writeMsg := make(chan bool)
closeMsg := make(chan bool)
for {
select {
case msg := <-readMsg:
fmt.Println("websocket read msg", msg)
case msg := <-writeMsg:
fmt.Println("websocket write msg", msg)
case <-closeMsg:
break
}
}
}
在websocket的场景中,我们希望可以在一个routine中同时处理读消息,写消息,和主动踢下线的能力,select加channel就能很简洁的解决这个问题。
3.4.2 通知转换为channel
func (r *messageReader) Read(b []byte) (int, error)
但是,在绝大部份的sdk中,我们routine接收提醒的方法都不是通过channel,而是阻塞方式的Read操作。例如在websocket库的接收消息方法Read。这样的Read方法是阻塞方式的,我们根本就不能用select将它和closeMsg,writeMsg组合起来。
for {
select {
case msg := websocket.Read(nil):
fmt.Println("websocket read msg", msg)
case msg := <-writeMsg:
fmt.Println("websocket write msg", msg)
case <-closeMsg:
break
}
}
也就是说,我们不能像上面这样写,因为select并不允许对非channel进行通知组合。
readMsg := make(chan bool)
writeMsg := make(chan bool)
closeMsg := make(chan bool)
var readErr error
go func() {
for {
data := make([]byte, 1024)
size, err := websocket.Read(data)
if err != nil {
close(readMsg)
readErr = err
return
}
readMsg <- data[0:size]
}
}()
for {
select {
case msg, isOk := <-readMsg:
if isOk == false {
close(websocket)
fmt.Println("websocket read error", readErr)
break
} else {
fmt.Println("websocket read msg", msg)
}
case msg := <-writeMsg:
fmt.Println("websocket write msg", msg)
case <-closeMsg:
close(websocket)
break
}
}
解决方法很简单,开启一个单独的routine,将阻塞的读操作转换为readMsg的channel,我们就能重新用回原来的通知组合方法了。要注意的是,上面的代码有两个routine,当阻塞读取发生意外时,和,当主动踢下线时,这两种情况下,上面代码是如何退出程序的,是如何有序地退出两个routine的。并且,它保证了close websocket有且只有发生一次。
select {
case msg, isOk := <-readMsg:
if isOk == false {
close(websocket)
fmt.Println("websocket read error", readErr)
break
} else {
fmt.Println("websocket read msg", msg)
go this.msgListener(msg)
}
}
要注意的是,业务经常是收到消息后回调listener接口。为了避免listener接口返回来调用send或者close导致死锁的问题,在触发listener时都需要用单独的go routine来做。
3.4.3 占位通知
select的写法十分灵活,但它有一点不好,就是需要同时组合的channel数量在编译时就已经被固定下来了。无法实现,在运行时组合任意数量的channel。
解决办法是:
- 对于在运行时可能需要减少组合的channel,你可以使用nil channel来代替。golang保证nil channel无论是读操作还是写操作都不会panic,它在select分支下更是永远都不会触发。具体可以看这里,一个很实用的例子是在这里中的infiniteBuffer函数。
- 对于在运行时可能需要增加组合的channel,你可以使用reflect.Select,这可是最灵活的组合通知的方式。
3.4.4 非阻塞测试
readMsg := make(chan bool, 16)
writeMsg := make(chan bool, 16)
for i := 0; i != 16; i++ {
writeMsg <- true
}
select {
case msg := <-readMsg:
fmt.Println("read Msg", msg)
default:
fmt.Println("read msg empty!")
}
select {
case writeMsg <- true:
fmt.Println("write msg", true)
default:
fmt.Println("write msg full!")
}
在select中加入default,我们就能实现测试通知的目的。read channel与default的组合,我们能实现测试channel是否有信息可读。write channel与default的组合,我们能实现测试channel是否已经满了,不能再写入了。
这个非阻塞测试的方法,被nsq消息队列使用来这样的场景。当对write msg进行写入测试失败时,就把msg写入文件中。这样就能同时兼顾内存有限但是速度快,文件速度慢但空间很大的平衡。
另外,你也可以用在reproxy的实现中,将所有的输入请求都丢入到一个较大的buffer channel中,然后后端服务不断从buffer channel中取出数据来处理。当请求突然增大,甚至远远超出后端服务的计划承受能力时,reproxy作为转发层就可以通过检查buffer channel是否已经满了,来提前把后面的所有请求都拒绝掉,以保护后端服务,避免雪崩。也就是传说中的fail-fast机制。
4 总结
golang中的channel与select是处理并发同步的一大神器。
- 本文作者: fishedee
- 版权声明: 本博客所有文章均采用 CC BY-NC-SA 3.0 CN 许可协议,转载必须注明出处!