Go语言:channel
channel是Go语言中类型安全的内置类型;两个goruntine可以通过channle同步的进行消息通信;想必你一定听过Go 的创始人之一 Rob Pike的那句名言:
Do not communicate by sharing memory; instead, share memory by communicating; 不要通过共享内存的方式通信,而是要通过通信的方式共享数据
channel的设计和使用充分体现了这样Go语言哲学思想;下面我们来了解一下channel的基本类型和常见用法。
无缓冲 Channels
ch := make(chan struct{})
无缓冲chan的特点是发送端和接受端必须同时准备就绪才可以发送数据;因此: 1、如果发送端goroutine发送数据时如果接收端为没有goroutine准备就绪时发送端会阻塞 2、同样的当接收端goroutine从channel中读取数据时,如果发送端没有发送数据,接收端将阻塞。 看一下下面这段代码会发生什么情况?
ch := make(chan struct{})
ch <- struct{}{}
go func() {
<-ch
fmt.Println("receive message")
}()
显然在第二行 写入chan时 接收端的goroutine并没有就绪,因此此处会一直等待,导致接下来接收端无法正常启动。从而造成死锁的情况;这种情况下可以怎么解决呢?一直方式是新启动一个goroutine进行写入操作;例外一种就是将ch的容量改为1;和也就是接下来我们要介绍的有缓冲chan。
有缓冲Channels
ch := make(chan struct{},10)
有缓冲Channel具有容量,因此其行为与无缓冲channel不同。当 goroutine 向缓冲chan发送数据时,如果chan已经写满则该goroutine将阻塞直到chan可写入。如果chan中有空间,可以立即发送goroutine 可以继续执行。当goroutine 从缓冲chan读取数据时,如果缓冲chan为空,则该goroutine阻塞直到有数据写入。
ch := make(chan struct{},1)
ch <- struct{}{}
go func() {
<-ch
fmt.Println("receive message")
}()
单向Channel
上面的示例中我们用到的都是双向channel,既可以写入也可以读取;接下来我们介绍的时单向channel,也就是只能读取或者只能写入的通道; 1、只能读取 <-chan 2、只能写入 chan<-
func read(ch <-chan string) {
fmt.Println("read from channel: " + <-ch)
}
func write(ch chan<- string) {
ch <- "message"
fmt.Println("write to channel ")
}
func main() {
ch1 := make(chan string, 1)
go write(ch1)
go read(ch1)
time.Sleep(5 * time.Second)
}
需要注意的时双向chan可以转成单项chan,而单向chan不能转成双向chan。
常见用法:扇入模式
这里的扇入是指有多个Channel 输入、一个 Channel 输出。 此处引用“Go Concurrency Patterns”中的示例;将多个chan合并到 out chan;通过WaitGroup等待全部chan的数据写入out后关闭out chan;
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
wg.Add(len(cs))
for _, c := range cs {
go func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
常见用法:扇出模式
扇出模式是和扇入模式相反的。扇出模式只有一个输入 Channel,有多个目标 Channel即一对多的关系。
func fanOut(ch <-chan int, out []chan int) {
var wg sync.WaitGroup
for v := range ch { // 从输入chan中读取数据
for i := 0; i < len(out); i++ {
wg.Add(1)
go func(v int, idx int) {
defer wg.Done()
out[idx] <- v
}(v, i)
}
}
wg.Wait()
for i := 0; i < len(out); i++ {
close(out[i])
}
}
反射操作Channel
一般处理单个channel会用select监听;或者for range结构读取;如果channl数量比较多或者数量不确定,可以使用反射操作channel
func createCases(chs ...chan int) []reflect.SelectCase {
var cases []reflect.SelectCase
// 创建recv case
for _, ch := range chs {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
})
}
// 创建send case
for i, ch := range chs {
v := reflect.ValueOf(i)
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectSend,
Chan: reflect.ValueOf(ch),
Send: v,
})
}
return cases
}
func main() {
var ch1 = make(chan int, 10)
var ch2 = make(chan int, 10)
// 创建SelectCase
var cases = createCases(ch1, ch2)
go func() {
for {
ch1 <- 1
time.Sleep(2 * time.Second)
}
}()
go func() {
for {
ch1 <- 2
time.Sleep(2 * time.Second)
}
}()
for {
chosen, recv, ok := reflect.Select(cases)
if recv.IsValid() { // recv case
fmt.Println("recv:", cases[chosen].Dir, recv, ok)
} else { // send case
fmt.Println("send:", cases[chosen].Dir, ok)
}
time.Sleep(time.Second)
}
}