0%

Golang通过Channel实现N个任务并发执行并统一返回结果

最近在项目中想要实现一个需求:

  1. 在一个过程中,可以实现多个任务并发执行,并同时返回结果
  2. 多个任务的入参相同或入参类型相同,返回结果类型相同
  3. 设置默认的超时时间,当某一任务执行超时,则返回默认值

举一个例子如:根据一个用户的 userId任务1 查询用户信息;任务2 查询用户所有订单信息;任务3 查询用户关注的商品信息;三个任务并发执行,并将三个任务的执行结果统一返回。


一个🌰

参考自:Golang 中的并发限制与超时控制 - 简书 中的 “超时控制” 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func main(){
Test()
}


func Run(task_id, sleeptime, timeout int, ch chan string) {
ch_run := make(chan string)
go run(task_id, sleeptime, ch_run)
select {
case re := <-ch_run:
ch <- re
case <-time.After(time.Duration(timeout) * time.Second):
re := fmt.Sprintf("task id %d, timeout ", task_id)
ch <- re
}
}

func run(task_id, sleeptime int, ch chan string) {
time.Sleep(time.Duration(sleeptime) * time.Second)
ch <- fmt.Sprintf("task id %d,sleep %d second", task_id, sleeptime)
return
}

func Test() {
input := []int{3, 5, 8}
timeout := 8
// 创建N个任务管道,用来接收各个并发任务的完成结果
chs := make([]chan string, len(input))
sTime := time.Now()
fmt.Println("start")

for i, sleeptime := range input {
chs[i] = make(chan string)
go Run(i, sleeptime, timeout, chs[i])
}

// 获取结果
for _, ch := range chs {
fmt.Println(<-ch)
}

eTime := time.Now()
fmt.Printf("finished,Process time %s. Number of task is %d \n", eTime.Sub(sTime), len(input))
}

我的改造

我将上面的示例改造成了:将要执行的方法作为参数传入,这样就可以随意传入不同的任务了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
func main(){
Test()
}


func Run(f func(s string, ch chan string), s string, timeout int, cOut chan string) {
ch_run := make(chan string)
// go run(s, ch_run)
go f(s, ch_run)

select {
case re := <-ch_run:
cOut <- re
case <-time.After(time.Duration(timeout) * time.Second):
re := fmt.Sprintf("task timeout ")
cOut <- re
}
}

// func run(s string, ch chan string) {
// time.Sleep(time.Duration(3) * time.Second)

// ch <- fmt.Sprintf("task input %s,sleep %d second", s, 3)
// return
// }

func aa1(s string, ch chan string) {
time.Sleep(time.Duration(3) * time.Second)
ch <- fmt.Sprintf("task1 input %s,sleep %d second", s, 3)

}

func aa2(s string, ch chan string) {
time.Sleep(time.Duration(5) * time.Second)
ch <- fmt.Sprintf("task2 input %s,sleep %d second", s, 5)

}

func aa3(s string, ch chan string) {
time.Sleep(time.Duration(10) * time.Second)
ch <- fmt.Sprintf("task3 input %s,sleep %d second", s, 10)

}

func Test() {
a := synchron(20, "aaa", aa1, aa2, aa3)
fmt.Printf("result: %v \n", a)
}

// timeout: 超时时间
// input: 统一入参
// args: 方法
func synchron(timeout int, input string, args ...func(s string, ch chan string)) []string {
// input := []string{"aaa", "bbb", "ccc"}
// timeout := 8
// 创建N个任务管道,用来接收各个并发任务的完成结果
chs := make([]chan string, len(args))

defer func() {
for _, c := range chs {
if c != nil {
close(c)
}
}
}()

sTime := time.Now()
fmt.Println("start")

for i, f := range args {
chs[i] = make(chan string)
go Run(f, input, timeout, chs[i])
}

resList := []string{}
// 获取结果
for _, ch := range chs {
v := <-ch
fmt.Println(v)
resList = append(resList, v)
}

eTime := time.Now()
fmt.Printf("finished,Process time %s. Number of task is %d \n", eTime.Sub(sTime), len(args))
// 将多个异步任务同时返回
return resList
}

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# timeout 20 input "aaa"
➜ go run main.go
start
task1 input aaa,sleep 3 second
task2 input aaa,sleep 5 second
task3 input aaa,sleep 10 second
finished,Process time 10.0028964s. Number of task is 3
result: [task1 input aaa,sleep 3 second task2 input aaa,sleep 5 second task3 input aaa,sleep 10 second]


# timeout 7 input "aaa"
➜ go run main.go
start
task1 input aaa,sleep 3 second
task2 input aaa,sleep 5 second
task timeout
finished,Process time 7.001273493s. Number of task is 3
result: [task1 input aaa,sleep 3 second task2 input aaa,sleep 5 second task timeout ]

你的扩展

当然,我上面只是实现了一种情况,还可以修改成为不同的方法传入相应类型的参数等等。

目前我对Golang的Channel了解的不深,只是想要实现类似的效果就在网上找了一下,后期会再深入去学习。


相关参考


如有疑问或需要技术讨论,请留言或发邮件到 service@itfanr.cc