说到channel,就一定要说一说线程了。任何实际项目,无论大小,并发是必然存在的。并发的存在,就涉及到线程通信。在当下的开发语言中,线程通讯主要有两种,共享内存与消息传递。共享内存一定都很熟悉,通过共同操作同一对象,实现线程间通讯。消息传递即通过类似聊天的方式。golang对并发的处理采用了协程的技术。golang的goroutine就是协程的实现。协程的概念很早就有,简单的理解为轻量级线程,goroutine就是为了解决并发任务间的通信而设计的。golang解决通信的理念是:不要通过共享内存来通信,而应该通过通信来共享内存。golang解决方案是消息传递机制,消息的传递就是通过channel来实现的。
Channal是什么?Channal就是用来通信的,就像Unix下的管道一样,在Go中是这样使用Channel的。
下面的程序演示了一个goroutine和主程序通信的例程。这个程序足够简单了。
1 2 3 4 5 6 7 8 9 10
| package main import "fmt" func main() { //创建一个string类型的channel channel := make(chan string) //创建一个goroutine向channel里发一个字符串 go func() { channel <- "hello" }() msg := <- channel fmt.Println(msg) }
|
指定channel的buffer
指定buffer的大小很简单,看下面的程序:
1 2 3 4 5 6 7 8 9 10 11 12
| package main import "fmt" func main() { channel := make(chan string, 2) go func() { channel <- "hello" channel <- "World" }() msg1 := <-channel msg2 := <-channel fmt.Println(msg1, msg2) }
|
Channel的阻塞
现在谈一谈对channe阻塞l的理解。
发送者角度:对于同一个通道,发送操作(协程或者函数中的),在接收者准备好之前是阻塞的。如果chan中的数据无人接收,就无法再给通道传入其他数据。因为新的输入无法在通道非空的情况下传入。所以发送操作会等待 chan 再次变为可用状态:就是通道值被接收时(可以传入变量)。
接收者角度:对于同一个通道,接收操作是阻塞的(协程或函数中的),直到发送者可用:如果通道中没有数据,接收者就阻塞了。
案例1:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| package main
import ( "fmt" )
func f1(in chan int) { fmt.Println(<-in) }
func main() { out := make(chan int) out <- 2 //13行 go f1(out) }
|
运行结果:fatal error: all goroutines are asleep - deadlock!
这是由于第13行之前不存在对out的接收,所以,对于out <- 2来说,永远是阻塞的,即一直会等下去。
1
| 这很明显,因为out<-2插入后并没有被使用,就一直阻塞在等待被使用,走不到go fl(out)。所以将go fl(out)放在out<-2前面即可。
|
案例2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| package main
import ( "fmt" )
func main() { c1 := make(chan int) func(){ time.Sleep(time.Second*2) c1<-"result 1" }() fmt.Println("c1 is",<-c1) }
|
结果:deadlock,因为push和pull永远不可能同时发生,这就是阻塞channel的不当用法。
解决方法
1 2 3 4 5 6 7 8 9
| func main() { c1 := make(chan int) go func(){ time.Sleep(time.Second*2) c1<-"result 1" }() fmt.Println("c1 is",<-c1) } #通过在另一个协程中run push代码,使得channel的生产和消费可以同时对接,正常的阻塞使用方式。
|
另外的解决方法
1 2 3 4 5 6 7 8 9 10
| func main() { c1 := make(chan int,1) func(){ time.Sleep(time.Second*2) c1<-"result 1" }() fmt.Println("c1 is",<-c1) }
# 给channel加一个buffer,只要buffer没用尽,大家就不用阻塞。
|
案例3:
注意,channel默认上是阻塞的,也就是说,如果Channel满了,就阻塞写,如果Channel空了,就阻塞读。于是,我们就可以使用这种特性来同步我们的发送和接收端。
下面这个例程说明了这一点,代码有点乱,不过我觉得不难理解。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package main
import "fmt" import "time"
func main() { channel := make(chan string)
go func() { channel <- "hello" //发送方等待接收 fmt.Println("write \"hello\" done!")
channel <- "World" //Reader在Sleep,这里在阻塞 fmt.Println("write \"World\" done!")
fmt.Println("Write go sleep...") time.Sleep(3*time.Second) channel <- "channel" fmt.Println("write \"channel\" done!") }()
time.Sleep(2*time.Second) fmt.Println("Reader Wake up...") //首先打印
msg := <-channel fmt.Println("Reader: ", msg)
msg = <-channel fmt.Println("Reader: ", msg)
msg = <-channel //Writer在Sleep,这里在阻塞 fmt.Println("Reader: ", msg) }
|
结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| Reader Wake up... Reader: hello write "hello" done! write "World" done! Write go sleep... Reader: World write "channel" done! Reader: channel
解释: Reader Wake up... //没有问题,不解释
Reader: hello 因为go func()的代码在前,所以会先执行”channel <- "hello"“,这时候被没有消费,阻塞。 当主程序运行到msg := <-channel,打印了Reader: hello,然后执行到下面的msg = <-channel,没有数据,被阻塞。
write "hello" done! write "World" done! Write go sleep... go func()执行channel<-"hello"阻塞被释放,执行代码fmt.Println("write \"hello\" done!"); 然后执行channel <- "World"的时候,插入数据,因为主程序已有读取请求,所以没有阻塞,并打印write "World" done!和Write go sleep...; 然后,channel <- "channel"因为需要等待3s未马上执行到,所以会比主程序慢。
Reader: World 主程序里面msg = <-channel因channel <- "World",有数据了,解除阻塞打印Reader: World,并再执行下面代码msg = <-channel,这时候因为channel <- "channel"还在等待中,再次被阻塞。
write "channel" done! 直到3s后,channel <- "channel"解除阻塞,打印write "channel" done!
Reader: channel 主程序解除阻塞,再打印Reader: channel
|
详解go语言 make(chan int, 1) 和 make (chan int) 的区别
无缓冲区channel
用make(chan int) 创建的chan, 是无缓冲区的, send 数据到chan 时,在没有协程取出数据的情况下, 会阻塞当前协程的运行。ch <- 后面的代码就不会再运行,直到channel 的数据被接收,当前协程才会继续往下执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package main
import ( "fmt" "time" )
func main() { ch := make(chan int) // 创建无缓冲channel go func() { fmt.Println("time sleep 5 second...") time.Sleep(5 * time.Second) <-ch fmt.Println("read...") }() fmt.Println("即将阻塞...") ch <-1 // 协程将会阻塞,等待数据被读取 fmt.Println("ch 数据被消费,主协程退出") }
#结果: 即将阻塞... time sleep 5 second... send... ch 数据被消费,主协程退出
##这边可以看到接收方的代码先打印,然后打印发送方下面的代码,这个符合逻辑,接收解锁执行,发送解锁执行。
|
有缓冲区channel
channel 的缓冲区为1,向channel 发送第一个数据,主协程不会退出。发送第二个时候,缓冲区已经满了, 此时阻塞主协程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package main
import ( "fmt" "time" )
func main() { ch := make(chan int, 1) // 创建有缓冲channel go func() { fmt.Println("time sleep 5 second...") time.Sleep(5 * time.Second) <-ch fmt.Println("read...") }() ch <-1 // 协程不会阻塞,等待数据被读取 fmt.Println("第二次发送数据到channel, 即将阻塞") ch <-1 // 第二次发送数据到channel, 在数据没有被读取之前,因为缓冲区满了, 所以会阻塞主协程。 fmt.Println("ch 数据被消费,主协程退出") }
结果: 第二次发送数据到channel, 即将阻塞 //阻塞了,因为buffer为1,等待前面的消费 time sleep 5 second... read... //消费了 ch 数据被消费,主协程退出 //插入,执行
|
多个Channel的select
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package main import "time" import "fmt" func main() { //创建两个channel - c1 c2 c1 := make(chan string) c2 := make(chan string) //创建两个goruntine来分别向这两个channel发送数据 go func() { time.Sleep(time.Second * 1) c1 <- "Hello" }() go func() { time.Sleep(time.Second * 1) c2 <- "World" }() //使用select来侦听两个channel for i := 0; i < 2; i++ { select { case msg1 := <-c1: fmt.Println("received", msg1) case msg2 := <-c2: fmt.Println("received", msg2) } } }
|
注意:上面的select是阻塞的,所以,才搞出ugly的for i <2这种东西。
Channel select阻塞的Timeout
解决上述那个for循环的问题,一般有两种方法:一种是阻塞但有timeout,一种是无阻塞。我们来看看如果给select设置上timeout的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| for { timeout_cnt := 0 select { case msg1 := <-c1: fmt.Println("msg1 received", msg1) case msg2 := <-c2: fmt.Println("msg2 received", msg2) case <-time.After(time.Second * 30): fmt.Println("Time Out") time_cnt++ } if time_cnt > 1 { break } }
|
上面代码中高亮的代码主要是用来让select返回的,注意 case中的time.After事件。
Channel的无阻塞
好,我们再来看看无阻塞的channel,其实也很简单,就是在select中加入default,如下所示:
1 2 3 4 5 6 7 8 9 10 11
| for { select { case msg1 := <-c1: fmt.Println("received", msg1) case msg2 := <-c2: fmt.Println("received", msg2) default: //default会导致无阻塞 fmt.Println("nothing received!") time.Sleep(time.Second) } }
|
Channel的关闭
关闭Channel可以通知对方内容发送完了,不用再等了。参看下面的例程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package main import "fmt" import "time" import "math/rand" func main() { channel := make(chan string) rand.Seed(time.Now().Unix()) //向channel发送随机个数的message go func () { cnt := rand.Intn(10) fmt.Println("message cnt :", cnt) for i:=0; i<cnt; i++{ channel <- fmt.Sprintf("message-%2d", i) } close(channel) //关闭Channel }() var more bool = true var msg string for more { select{ //channel会返回两个值,一个是内容,一个是还有没有内容 case msg, more = <- channel: if more { fmt.Println(msg) }else{ fmt.Println("channel closed!") } } } }
|
参考文章:
golang协程——通道channel阻塞
golang channel阻塞与非阻塞用法
详解go语言 make(chan int, 1) 和 make (chan int) 的区别
GO 语言简介(下)— 特性