有这样一个需求,有一批学生,现在只有Name字段,需要根据Name字段做参数远程请求获取Score字段的值。
常规写法
type User struct { | |
Name string | |
Score int64 | |
} | |
//模拟远程调用数据 | |
func Dodata(user *User) { | |
user.Score = int64(len(user.Name)) | |
} | |
func main() { | |
s := CreatData() | |
var wg sync.WaitGroup | |
wg.Add(len(s)) //每个user开启一个协程处理 | |
for _, item := range s { | |
go func(i *User) { | |
defer wg.Done() | |
Dodata(i) | |
}(item) | |
} | |
wg.Wait() | |
//得到数据后下一步处理 | |
for _, val := range s { | |
fmt.Println(val.Score) | |
} | |
} |
当user数据为n时,需要开启n个协程去处理。代码不可控。能不能指定m个协程并发处理。
func main() { | |
s := CreatData() | |
var wg sync.WaitGroup | |
wg.Add(10) | |
Handler(10, &wg, s, Dodata)//只开启10协程去处理 | |
wg.Wait() | |
//得到数据后下一步处理 | |
for _, val := range s { | |
fmt.Println(val.Score) | |
} | |
} | |
func Handler(number int, wg *sync.WaitGroup, s []*User, workerFun func(*User)) { | |
inch := make(chan *User, 0) | |
//协程1:把需要处理的参数写入inch | |
go func() { | |
for _, item := range s { | |
inch <- item | |
} | |
close(inch) | |
}() | |
//协程2:开启number个协程,同时读取inch的参数 | |
for i := 0; i < number; i++ { | |
go func() { | |
defer wg.Done() | |
for item := range inch { | |
workerFun(item) | |
} | |
}() | |
} | |
} |
假如需要读取的结果是一个回执Receipt,则需要创建一个resCh通道,worker请求数据后把结果写入这个通道。
//开启指定个协程处理数组,使用扇出方式处理 | |
type User struct { | |
Name string | |
} | |
type Receipt struct { | |
Name string | |
Score int64 | |
} | |
//模拟远程调用数据 | |
func Dodata(user *User) *Receipt { | |
var res Receipt | |
res.Name = user.Name | |
res.Score = int64(len(user.Name)) | |
return &res | |
} | |
func main() { | |
s := CreatData() | |
var wg sync.WaitGroup | |
var resWg sync.WaitGroup | |
resCh := make(chan *Receipt) | |
//协程3:开启一个协程读取结果 | |
resWg.Add(1) | |
go func() { | |
defer resWg.Done() | |
for item := range resCh { | |
fmt.Println(item.Score) | |
} | |
}() | |
wg.Add(10) | |
Handler(10, &wg, s, resCh, Dodata) | |
wg.Wait() | |
close(resCh) //worker结束后需要及时关闭resCh | |
resWg.Wait() //保证读取结果完整 | |
} | |
func Handler(number int, wg *sync.WaitGroup, s []*User, resCh chan<- *Receipt, workerFun func(*User) *Receipt) { | |
inch := make(chan *User, 0) | |
//协程1:把需要处理的参数写入inch | |
go func() { | |
for _, item := range s { | |
inch <- item | |
} | |
close(inch) | |
}() | |
//协程2:开启number个协程,同时读取参数并把结果写入resCh | |
for i := 0; i < number; i++ { | |
go func() { | |
defer wg.Done() | |
for item := range inch { | |
res := workerFun(item) | |
resCh <- res | |
} | |
}() | |
} | |
} |
FAN-OUT模式:
多个goroutine从同一个通道读取数据,直到该通道关闭。OUT是一种张开的模式,所以又被称为扇出,可以用来分发任务。
FAN-IN模式:
1个goroutine从多个通道读取数据,直到这些通道关闭。IN是一种收敛的模式,所以又被称为扇入,用来收集处理的结果。