聊聊Go的并发编程 (二)
时间:2022-02-24 10:28
相关文章推荐:《聊聊Go的并发编程 (一)》 以上就是聊聊Go的并发编程 (二)的详细内容,更多请关注gxlsystem.com其它相关文章!聊聊Go的goroutine和Channel
一、使用channel等待任务结束
package mainimport (
"fmt"
"time")func createWorker(id int) chan<- int {
c := make(chan int)
go worker(id, c)
return c}func worker(id int, c chan int) {
for n := range c {
fmt.Printf("Worker %d receive %c\n", id, n)
}}func channelDemo() {
var channels [10]chan<- int
for i := 0; i < 10; i++ {
channels[i] = createWorker(i)
}
for i := 0; i < 10; i++ {
channels[i] <- 'a' + i }
for i := 0; i < 10; i++ {
channels[i] <- 'A' + i }
time.Sleep(time.Millisecond)}func main() {
channelDemo()}
package mainimport (
"fmt")type worker struct {
in chan int
done chan bool}func createWorker(id int) worker {
w := worker{
in: make(chan int),
done: make(chan bool),
}
go doWorker(id, w.in, w.done)
return w}func doWorker(id int, c chan int, done chan bool) {
for n := range c {
fmt.Printf("Worker %d receive %c\n", id, n)
done <- true
}}func channelDemo() {
var workers [10]worker for i := 0; i < 10; i++ {
workers[i] = createWorker(i)
}
for i := 0; i < 10; i++ {
workers[i].in <- 'a' + i <-workers[i].done }
for i := 0; i < 10; i++ {
workers[i].in <- 'A' + i <-workers[i].done }}func main() {
channelDemo()}
package mainimport (
"fmt")type worker struct {
in chan int
done chan bool}func createWorker(id int) worker {
w := worker{
in: make(chan int),
done: make(chan bool),
}
go doWorker(id, w.in, w.done)
return w}func doWorker(id int, c chan int, done chan bool) {
for n := range c {
fmt.Printf("Worker %d receive %c\n", id, n)
done <- true
}}func channelDemo() {
var workers [10]worker for i := 0; i < 10; i++ {
workers[i] = createWorker(i)
}
for i, worker := range workers {
worker.in <- 'a' + i }
for i, worker := range workers {
worker.in <- 'A' + i }
for _, worker := range workers {
<-worker.done <-worker.done }}func main() {
channelDemo()}
sync.WaitGroup的用法
package mainimport (
"fmt"
"sync")type worker struct {
in chan int
wg *sync.WaitGroup}func createWorker(id int, wg *sync.WaitGroup) worker {
w := worker{
in: make(chan int),
wg: wg,
}
go doWorker(id, w.in, wg)
return w}func doWorker(id int, c chan int, wg *sync.WaitGroup) {
for n := range c {
fmt.Printf("Worker %d receive %c\n", id, n)
wg.Done()
}}func channelDemo() {
var wg sync.WaitGroup var workers [10]worker for i := 0; i < 10; i++ {
workers[i] = createWorker(i, &wg)
}
// 添加20个任务
wg.Add(20)
for i, worker := range workers {
worker.in <- 'a' + i }
for i, worker := range workers {
worker.in <- 'A' + i }
wg.Wait()}func main() {
channelDemo()}
抽象代码
package mainimport (
"fmt"
"sync")type worker struct {
in chan int
done func()}func createWorker(id int, wg *sync.WaitGroup) worker {
w := worker{
in: make(chan int),
done: func() {
wg.Done()
},
}
go doWorker(id, w)
return w}func doWorker(id int, w worker) {
for n := range w.in {
fmt.Printf("Worker %d receive %c\n", id, n)
w.done()
}}func channelDemo() {
var wg sync.WaitGroup var workers [10]worker for i := 0; i < 10; i++ {
workers[i] = createWorker(i, &wg)
}
// 添加20个任务
wg.Add(20)
for i, worker := range workers {
worker.in <- 'a' + i }
for i, worker := range workers {
worker.in <- 'A' + i }
wg.Wait()}func main() {
channelDemo()}
二、使用select进行调度
package mainimport (
"fmt"
"math/rand"
"time")func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
// 随机睡眠1500毫秒以内
time.Sleep(
time.Duration(rand.Intn(1500)) *
time.Millisecond)
// 往out这个channel发送i值
out <- i
i++
}
}()
return out}func main() {
// 这里需要明白如果代码为var c1, c2 chan int 则c1和c2都为nil
// 在 select里面也是可以使用的,只不过是堵塞状态!
var c1, c2 = generator(), generator()
for {
/**
select 方式进行调度
使用场景:比如有多个通道,但我打算是哪一个通道先给我数据,我就先执行谁
这个select 可以是并行执行 channel管道
*/
select {
case n := <-c1:
fmt.Printf("receive from c1 %d\n", n)
case n := <-c2:
fmt.Printf("receive from c2 %d\n", n)
}
}}
package mainimport (
"fmt"
"math/rand"
"time")func worker(id int, c chan int) {
for n := range c {
fmt.Printf("Worker %d receive %d\n", id, n)
}}func createWorker(id int) chan<- int {
c := make(chan int)
go worker(id, c)
return c}func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
// 随机睡眠1500毫秒以内
time.Sleep(
time.Duration(rand.Intn(1500)) *
time.Millisecond)
// 往out这个channel发送i值
out <- i
i++
}
}()
return out}func main() {
// 这里需要明白如果代码为var c1, c2 chan int 则c1和c2都为nil
// 在 select里面也是可以使用的,只不过是堵塞状态!
var c1, c2 = generator(), generator()
// 直接调用createWorker方法,返回的就是一个channel
w := createWorker(0)
for {
/**
select 方式进行调度
使用场景:比如有多个通道,但我打算是哪一个通道先给我数据,我就先执行谁
这个select 可以是并行执行 channel管道
*/
select {
case n := <-c1:
w <- n case n := <-c2:
w <- n }
}}
package mainimport (
"fmt"
"math/rand"
"time")func worker(id int, c chan int) {
for n := range c {
fmt.Printf("Worker %d receive %d\n", id, n)
}}func createWorker(id int) chan<- int {
c := make(chan int)
go worker(id, c)
return c}func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
// 随机睡眠1500毫秒以内
time.Sleep(
time.Duration(rand.Intn(1500)) *
time.Millisecond)
// 往out这个channel发送i值
out <- i
i++
}
}()
return out}func main() {
// 这里需要明白如果代码为var c1, c2 chan int 则c1和c2都为nil
// 在 select里面也是可以使用的,只不过是堵塞状态!
var c1, c2 = generator(), generator()
// 直接调用createWorker方法,返回的就是一个channel
var worker = createWorker(0)
// 这个n如果放在for循环里边,就会一直打印0,因为从c1和c2收数据需要时间,所以会把0直接传给worker
n := 0
// 使用这个标识告诉有没有值
hasValue := false
for {
// 利用nil channel的特性
var activeWorker chan<- int
if hasValue {
activeWorker = worker }
/**
select 方式进行调度
使用场景:比如有多个通道,但我打算是哪一个通道先给我数据,我就先执行谁
这个select 可以是并行执行 channel管道
*/
select {
case n = <-c1:
// 收到值的话就标记为true
hasValue = true
case n = <-c2:
// 收到值的话就标记为true
hasValue = true
case activeWorker <- n:
hasValue = false
}
}}
package mainimport (
"fmt"
"math/rand"
"time")func worker(id int, c chan int) {
for n := range c {
// 手动让消耗速度变慢
time.Sleep(5 * time.Second)
fmt.Printf("Worker %d receive %d\n", id, n)
}}func createWorker(id int) chan<- int {
c := make(chan int)
go worker(id, c)
return c}func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
// 随机睡眠1500毫秒以内
time.Sleep(
time.Duration(rand.Intn(1500)) *
time.Millisecond)
// 往out这个channel发送i值
out <- i
i++
}
}()
return out}func main() {
// 这里需要明白如果代码为var c1, c2 chan int 则c1和c2都为nil
// 在 select里面也是可以使用的,只不过是堵塞状态!
var c1, c2 = generator(), generator()
// 直接调用createWorker方法,返回的就是一个channel
var worker = createWorker(0)
// 用来收n的值
var values []int
for {
// 利用nil channel的特性
var activeWorker chan<- int
var activeValue int
// 判断当values中有值时
if len(values) > 0 {
activeWorker = worker // 取出索引为0的值
activeValue = values[0]
}
/**
select 方式进行调度
使用场景:比如有多个通道,但我打算是哪一个通道先给我数据,我就先执行谁
这个select 可以是并行执行 channel管道
*/
select {
case n := <-c1:
// 将收到的数据存到values中
values = append(values, n)
case n := <-c2:
// 将收到的数据存到values中
values = append(values, n)
case activeWorker <- activeValue:
// 送出去后就需要把values中的第一个值拿掉
values = values[1:]
}
}}
计时器的使用
package mainimport (
"fmt"
"math/rand"
"time")func worker(id int, c chan int) {
for n := range c {
// 手动让消耗速度变慢
time.Sleep(time.Second)
fmt.Printf("Worker %d receive %d\n", id, n)
}}func createWorker(id int) chan<- int {
c := make(chan int)
go worker(id, c)
return c}func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
// 随机睡眠1500毫秒以内
time.Sleep(
time.Duration(rand.Intn(1500)) *
time.Millisecond)
// 往out这个channel发送i值
out <- i
i++
}
}()
return out}func main() {
// 这里需要明白如果代码为var c1, c2 chan int 则c1和c2都为nil
// 在 select里面也是可以使用的,只不过是堵塞状态!
var c1, c2 = generator(), generator()
// 直接调用createWorker方法,返回的就是一个channel
var worker = createWorker(0)
// 用来收n的值
var values []int
// 返回的是一个channel
tm := time.After(10 * time.Second)
for {
// 利用nil channel的特性
var activeWorker chan<- int
var activeValue int
// 判断当values中有值时
if len(values) > 0 {
activeWorker = worker // 取出索引为0的值
activeValue = values[0]
}
/**
select 方式进行调度
使用场景:比如有多个通道,但我打算是哪一个通道先给我数据,我就先执行谁
这个select 可以是并行执行 channel管道
*/
select {
case n := <-c1:
// 将收到的数据存到values中
values = append(values, n)
case n := <-c2:
// 将收到的数据存到values中
values = append(values, n)
case activeWorker <- activeValue:
// 送出去后就需要把values中的第一个值拿掉
values = values[1:]
case <-tm:
fmt.Println("Bye")
return
}
}}
package mainimport (
"fmt"
"math/rand"
"time")func worker(id int, c chan int) {
for n := range c {
// 手动让消耗速度变慢
time.Sleep(time.Second)
fmt.Printf("Worker %d receive %d\n", id, n)
}}func createWorker(id int) chan<- int {
c := make(chan int)
go worker(id, c)
return c}func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
// 随机睡眠1500毫秒以内
time.Sleep(
time.Duration(rand.Intn(1500)) *
time.Millisecond)
// 往out这个channel发送i值
out <- i
i++
}
}()
return out}func main() {
// 这里需要明白如果代码为var c1, c2 chan int 则c1和c2都为nil
// 在 select里面也是可以使用的,只不过是堵塞状态!
var c1, c2 = generator(), generator()
// 直接调用createWorker方法,返回的就是一个channel
var worker = createWorker(0)
// 用来收n的值
var values []int
// 返回的是一个channel
tm := time.After(10 * time.Second)
tick := time.Tick(time.Second)
for {
// 利用nil channel的特性
var activeWorker chan<- int
var activeValue int
// 判断当values中有值时
if len(values) > 0 {
activeWorker = worker // 取出索引为0的值
activeValue = values[0]
}
/**
select 方式进行调度
使用场景:比如有多个通道,但我打算是哪一个通道先给我数据,我就先执行谁
这个select 可以是并行执行 channel管道
*/
select {
case n := <-c1:
// 将收到的数据存到values中
values = append(values, n)
case n := <-c2:
// 将收到的数据存到values中
values = append(values, n)
case activeWorker <- activeValue:
// 送出去后就需要把values中的第一个值拿掉
values = values[1:]
case <-time.After(800 * time.Millisecond):
// 如果在800毫秒没有收到数据则提示超时
fmt.Println("timeout")
case <-tick:
// 每秒获取一下values中队列的长度
fmt.Println("queue len = ", len(values))
case <-tm:
fmt.Println("Bye")
return
}
}}
三、总结