Go Worker pool 工作池实现详解
Channel缓冲的重要用途之一是实现工作池。
通常,工作池是等待分配给它们的任务的线程的集合。 一旦他们完成分配的任务,他们就会再次为下一个任务提供服务。
我们将使用缓冲通道实现一个工作池。 我们的工作池将执行计算输入数字的数字总和的任务。 例如,如果传递了 234,则输出将为 9 (2 + 3 + 4)。 工作池的输入将是一个伪随机整数列表。
下面是我们的工作池的核心功能
- 创建一个 Goroutines 池,它侦听输入缓冲通道,等待分配作业
- 将作业添加到输入缓冲通道
- 作业完成后将结果写入输出缓冲通道
- 从输出缓冲通道读取和打印结果
我们将逐步实现这个程序,以使其更易于理解。
第一步将是创建表示作业和结果的结构体
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
每个 Job
结构体都有一个 id 和一个 randomno ,必须为其计算各个数字的总和。
Result
结构体有一个 job 字段,该字段是在 sumofdigits 字段中保存结果(单个数字的总和)的作业。
下一步是创建用于接收作业和写入输出的缓冲通道。
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
Worker Goroutines 在 jobs 缓冲通道上监听新任务。 任务完成后,将结果写入 results 缓冲通道。
下面我们定义一个 digits
函数,该函数的主要功能是计算各个数字的总和并返回它。 我们将给这个函数添加一个 2 秒的睡眠,只是为了模拟这个函数需要一些时间来计算结果的事实。
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
接下来我们定义一个 worker
函数。
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
上述函数创建一个从 jobs 通道读取数据的工作协程,使用当前作业和 digits
函数的返回值创建一个 Result 结构体,然后将结果写入 results 缓冲通道。 此函数将 WaitGroup wg
作为参数,当所有作业完成后,它将调用 Done()
方法。
createWorkerPool
函数将创建一个 worker Goroutines 池。
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
上面的函数将要创建的 worker 数量作为参数。 它在创建 Goroutine 之前调用 wg.Add(1)
来增加 WaitGroup 计数器。 然后它通过将 WaitGroup wg 的指针传递给 worker
函数来创建 worker Goroutines。 在创建所需的 Goroutines 后,它通过调用 wg.Wait()
等待所有 Goroutines 完成它们的执行。 在所有 Goroutines 执行完毕后,它关闭 results 通道,因为所有 Goroutines 都已完成它们的执行,并且没有其他的协程会进一步写入 results 通道。
现在我们已经准备好工作池,让我们继续定义一个用来进行分配的函数。
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
上面的 allocate
函数以要创建的任务的数量作为输入参数,生成最大值为998的伪随机数,以随机数和for循环计数器i为id创建Job结构体,然后将它们写入 jobs 通道。 它在写入所有作业后关闭 jobs 通道。
下一步是创建读取 results 通道并打印输出的函数。
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
result
函数读取 results 通道并打印 Job ID,输入的随机数,以及随机数的数字总和。 result 函数还将 接收done
通道作为参数,一旦它打印了所有结果,它就会写入该通道。
我们现在已经准备好了一切。 让我们继续完成最后一步,编写 main()
函数调用所有这些函数。
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
我们首先保存程序的执行开始时间,在最后我们计算 endTime 和 startTime 之间的时间差并显示程序花费的总时间。 这是必要的,因为我们将通过改变 Goroutines 的数量来做一些基准测试。
noOfJobs
设置为 100,然后调用 allocate
将作业添加到 jobs 通道。
然后创建 done
通道并将其传递给 result Goroutine,以便它可以开始打印输出并在打印完所有内容后通知主协程。
最后,通过调用 createWorkerPool
函数创建了一个包含 10 个 job Goroutines 的池,然后 main 在 done 通道上等待所有要打印的结果。
下面是完整的程序。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
请在本地机器上运行此程序,以获得更准确的总时间计算。
上面程序执行结果如下
Job id 1, input random no 636, sum of digits 15
Job id 0, input random no 878, sum of digits 23
Job id 9, input random no 150, sum of digits 6
...
total time taken 20.025220391 seconds
上面程序总共将打印 100 行,对应 100 个作业,最后将在最后一行打印程序运行所需的总时间。 没个人的输出将与这里的结果不同,因为 Goroutine 可以以任何顺序运行,并且总时间也会因硬件而异。 就我而言,程序完成大约需要 20 秒。
现在让我们将主函数中的 noOfWorkers 增加到 20。我们已经将 worker 的数量增加了一倍。 由于 worker Goroutines 增加了(准确地说是翻了一番),程序完成所需的总时间应该减少(准确地说是减少一半)。
在我的机器上运行时间是 10.00877495。
现在我们可以理解,随着 worker Goroutine 数量的增加,完成作业所需的总时间减少。我们可以将 main 函数中的 noOfJobs 和 noOfWorkers 设置为不同的值并分析运行结果。
相关文章
在 JavaScript 中验证 Google ReCaptcha 第 2 版
发布时间:2024/03/23 浏览次数:193 分类:JavaScript
-
本文演示了如何在 JavaScript 中验证 Google Recaptcha。
C# 中的 goto 语句
发布时间:2024/02/02 浏览次数:184 分类:编程语言
-
本教程演示了如何在 C# 中使用 goto 以及何时使用它会有所帮助本教程将演示如何在 C# 中使用 goto 语法,并提供一些代码中的实际使用示例。
在 Python 中是否存在 goto 语句
发布时间:2023/12/20 浏览次数:197 分类:Python
-
本文为你提供了 Python 中是否存在 goto 语句的答案。本文为你提供了 Python 中是否存在 goto 语句的答案。基本上,Python 不支持 goto 语句。
避免 Python中的 TypeError: Input Expected at Most 1 Argument, Got 3 错误
发布时间:2023/07/08 浏览次数:671 分类:Python
-
Python 中避免 TypeError: input Expected atmost 1 argument, got 3 Error在Python编程中,我们有两个内置方法来获取用户的输入:input(prompt)和 raw_input(prompt)。
使用 Python 将文件上传到 Google 云端硬盘
发布时间:2023/06/15 浏览次数:544 分类:Python
-
本文将介绍我们如何使用 Python 将文件上传到云盘,以 Google Drive 为例。 为此,我们将使用 Google Drive API。
Python 错误 Valueerror: Expected 2d Array, Got 1d Array Instead
发布时间:2023/05/30 浏览次数:293 分类:Python
-
当我们在 numpy 中传递一维数组而不是二维数组时,会发生错误 ValueError: Expected 2D array, got 1D array instead 。如您所知,每种编程语言都会遇到很多错误,有些是在运行时,有些是在编译时。 Pyth