Golang实现一个简单任务池
为什么需要任务池
在讨论这个问题的时候,我们先想下,如果你希望使用Golang的多协程特性,你会怎么做?
你会毫不犹豫的使用go
这个关键字,他可以快速创建一个协程,并将go
关键字后面的函数提交到协程中。
这就引出了一个问题,go
多少个最好?在一些串行代码中,可以通过业务逻辑,启动协程的方式,将同步改为异步,在这种情况下。可以直接使用多个go
,将一系列的任务转为异步,只需要利用通道或者锁的,在最后将结果回收了即可
[main goroutine] (主协程)
│
├─[同步任务1] → [同步任务2] → [同步任务3] (原始同步流程)
│
└─▶ 改造为异步
│
├─go [异步任务1] 🚀
├─go [异步任务2] 🚀
└─go [异步任务3] 🚀
但是问题就出现在这里,这种情况下,是我们有明确分配任务的场景,假如说,是过程重复且不明确分配任务的场景呢?
举个例子,我有一个函数,他会输入一个列表,列表的值在1-n之间,我需要处理列表内的值
此时,如果你用for
+go
这两个关键字的话,就会造成大量的协程被打开,在打开和关闭协程的时候,都会消耗资源,因此,我们最好是使用池模型,即为Golang Worker Pool
需要了解的概念
在使用池模型时,我们需要了解下面的概念
- channel:通道,是一个先入先出的模型,具有并发安全特性,在一个通道在没有关闭且没有值的情况下,会让代码无限阻塞
- Select:选择器,选择器有多个case,每个case对应一个通道操作,select会监听所有的通道,当有一个通道做好准备(输出值)时,就会执行对应的代码块
代码实现
这里展示一个简单的线程池,该线程池会接收传入的列表,并将内部每个值x2,并返回最终结果
直接看我代码就行
func runTask(input []int) []int {
// 定义一个入口通道
taskCh := make(chan int)
// 定义一个结果通道
resultCh := make(chan int)
// 启动一个线程用于将值注入到入通道
go func() {
// 不断将值生产到通道中|这里只能单协程
for _, v := range input {
taskCh <- v
}
// 关闭入参数通道,此时该通道只能读取不能写入
close(taskCh)
}()
var syntax sync.WaitGroup
// 启动协程池
for range 8 {
// 每启动一个线程,让锁数量+1
syntax.Add(1)
go func() {
defer syntax.Done() // 协程退出时,自动将锁解除
for v := range taskCh {
resultCh <- process(v)
}
}()
}
// 启动一个协程用于输出返回值
var res []int // 定义返回值
outChannel := make(chan bool, 1) // 定义返回通道,用于阻塞主进程
// 结果处理协程
go func() {
for v := range resultCh {
res = append(res, v)
}
outChannel <- true // 处理完成后,向通道内输入值,以用来释放通道
}()
// 当所有锁都被释放后,继续运行
syntax.Wait()
// 运行到这里说明所有的锁都释放了,关闭结果通道,只允许读取
close(resultCh)
// 阻塞,直到将结果处理协程(res值准备完毕)
<-outChannel
// 返回res
return res
}
// process 业务逻辑
func process(x int) int {
time.Sleep(time.Second * 2)
return x * 2
}
然后我们启动并运行一下
func Test_runTaks(t *testing.T) {
x := []int{1, 2, 3, 5, 5, 7, 8, 9, 5, 3, 1, 2, 45, 6, 3, 3, 4, 7, 4, 12, 4, 7, 3, 4, 5, 6}
runTask(x)
}
可以注意到,本来单线程情况下,需要等待26*2
秒的进程,由于使用了协程,所以只用了8s就完成了
和理论时间6.5s有差距?这是因为协程切换,写入值,还有运算过程都会消耗时间滴
总结
这就是一个简单的协程池模式,你只需要将process改为你的业务函数,即可实现实现一个简单的协程池,当然这个池子没有高级控制什么的,但是也够用啦
最后,新年快乐!希望新的一年我能顺利转正,快乐工作,不断进步,身体健康,事事顺心