MapReduce分解
- 参考连接
- MapReduce
引入官网的介绍
- 在微服务中开发中,api网关扮演对外提供restful api的角色,而api的数据往往会依赖其他服务,复杂的api更是会依赖多个甚至数十个服务。虽然单个被依赖服务的耗时一般都比较低,但如果多个服务串行依赖的话那么整个api的耗时将会大大增加。
- 那么通过什么手段来优化呢?我们首先想到的是通过并发来的方式来处理依赖,这样就能降低整个依赖的耗时,Go基础库中为我们提供了 WaitGroup 工具用来进行并发控制,但实际业务场景中多个依赖如果有一个出错我们期望能立即返回而不是等所有依赖都执行完再返回结果,而且WaitGroup中对变量的赋值往往需要加锁,每个依赖函数都需要添加Add和Done对于新手来说比较容易出错
- 基于以上的背景,go-zero框架中为我们提供了并发处理工具MapReduce,该工具开箱即用,不需要做什么初始化,我们通过下图看下使用MapReduce和没使用的耗时对比:
使用端案例
/**
* CheckLegal
* @Description: 场景二: 很多时候我们需要对一批数据进行处理,比如对一批用户id,效验每个用户的合法性并且效验过程中有一个出错就认为效验失败,返回的结果为效验合法的用户id
* @receiver l
* @param uids
* @return []int64
* @return error
*/
func (l defaultMapReduce) CheckLegal(uids []int64) ([]int64, error) {
r, err := dlmapreduce.MapReduce(func(source chan<- interface{}) {
//这里参数是一个管道(同步管道),写进入1个就会就会阻塞,直到消费端消费后才会继续range,往里面写数据
//然后循环uids把数据写入到管道中
for _, uid := range uids {
source <- uid
}
}, func(item interface{}, writer dlmapreduce.Writer, cancel func(error)) {
//数据断言
uid := item.(int64)
ok, err := check(uid)
if err != nil {
cancel(err)
}
if ok {
//因为 guardedWriter 结构体实现了 Writer interface 所以这里接口的形式调用会自动映射到对应的实现
writer.Write(uid)
}
}, func(pipe <-chan interface{}, writer dlmapreduce.Writer, cancel func(error)) {
//pipe :参数为一个只读管道,只能往pipe 里面写入数据不能取出数据
var uids []int64
//遍历管道,把管道中的数据写入到切片中
for p := range pipe {
uids = append(uids, p.(int64))
}
//取出管道中的数据写入
writer.Write(uids)
})
if err != nil {
log.Printf("check error: %v", err)
return nil, err
}
return r.([]int64), nil
}
func check(uid int64) (bool, error) {
// do something check user legal
return true, nil
}
查看源码
//需要用户手动将生产数据写入 source ,并返回聚合后的数据
//generate 生产
//mapper 加工
//reducer 聚合
//opts - 可选参数,目前包含:数据加工阶段协程数量
// 映射从给定的 generate func 生成的所有元素,
// 并使用给定的 reducer 减少输出元素。
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) {
//创建一个无缓存管道,然后走协程,把函数使用异步执行 generate 函数,这个时候
source := buildSource(generate)
//buildSource 内部会异步执行无需执行完成就会走 MapReduceWithSource 这个函数
return MapReduceWithSource(source, mapper, reducer, opts...)
}
1.把数据写入原始数据管道管道(source管道)
在实现方法体中是循环把用户写入管道,因为管道是同步管道,所以这时候写入一个就要等待消费端去消费这个管道,未消费玩的时候会是阻塞写入,这个时候循环就会暂停
/**
* buildSource
* @Description: MapReduce中首先通过buildSource方法通过执行generate(参数为无缓冲channel)产生数据,并返回无缓冲的channel,mapper会从该channel中读取数据
* Go语言函数也是一种类型,可以保存在变量中,
* @param generate type GenerateFunc func(source chan<- interface{}) 声明个匿名函数作为 参数.
* source chan<- interface{}) 表示的是参数是一个单向管道,保存管道的变量是 source 可以管道中可以写入的数据类型是 interface{}
* @return chan
*/
func buildSource(generate GenerateFunc) chan interface{} {
//先执行的操作就会阻塞等待,直到另一个相对应的操作准备好为止。这种无缓冲的通道我们也称之为同步通道。
source := make(chan interface{})
//threading.GoSafe 是启动一个协程去处理 里面的匿名函数,而这个匿名函数里面执行 generate(source)
threading.GoSafe(func() {
defer close(source)
//type GenerateFunc func(source chan<- interface{})
generate(source)
})
return source
}
开始读取数据源管道(source管道)中的数据进行业务处理
// 映射源中的所有元素,并使用给定的 reducer 减少输出元素。第一个参数 source 是一个管道类型,且这个管道是只读管道,只能从管道中读数据,不能写入数据,防止保证数据安全
//支持传入数据源channel,并返回聚合后的数据
//source - 数据源channel
//mapper - 读取source内容并处理
//reducer - 数据处理完毕发送至reducer聚合
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) {
//可选参数设置,这个可以暂时忽略
options := buildOptions(opts...)
//创建一个无缓存管道output,任务执行结束通知信号
//后面会聚合数据channel,会调用write方法写入到output中
output := make(chan interface{})
//这个后置函数保证 output只会被读取一次
defer func() {
//如果有多次写入的话则会造成阻塞从而导致协程泄漏
//这里用 for range检测是否可以读出数据,读出数据说明多次写入了
//为什么这里使用panic呢?显示的提醒用户用法错了会比自动修复掉好一些
for range output {
panic("more than one element written in reducer")
}
}()
//创建有缓冲的管道 collector,容量为workers,意味着最多允许 workers 个协程同时处理数据
//创建一个缓冲管道,大小默认是 16
//后面会将mapper处理完的数据写入collector管道中
collector := make(chan interface{}, options.workers)
//type DoneChan struct { done chan lang.PlaceholderType once sync.Once }
// 取消操作信号
//数据聚合任务完成标志
//done 用于执行异样的时候通知其它goroutine退出执行
done := syncx.NewDoneChan()
//支持阻塞写入chan的writer
//实例化 guardedWriter 结构体,因为这个结构体实现了 Writer接口(结构体实现了接口,那么可以用接口声明这个结构体的类型后面会说明),writer 保存的是
writer := newGuardedWriter(output, done.Done())
//单例关闭
//sync.Once 是 Go 标准库提供的使函数只执行一次的实现,常应用于单例模式,例如初始化配置、保持数据库连接等。作用与 init 函数类似,但有区别。
//sync.Once 仅提供了一个方法 Do,参数 f 是对象初始化函数。
var closeOnce sync.Once
var retErr errorx.AtomicError
//定义一个函数变量并初始化
//数据聚合任务已结束,发送完成标志
finish := func() {
closeOnce.Do(func() {
done.Close()
close(output)
})
}
//定义一个变量保存匿名函数,并初始化,这样定义一个
//取消操作
//调用once函数 func once(fn func(error)) func(error)
//once 里面的函数是调用的
cancel := once(func(err error) {
//设置error
if err != nil {
retErr.Set(err)
} else {
retErr.Set(ErrCancelWithNil)
}
//清空source channel
drain(source)
//调用完成方法
finish()
})
//开启协程
go func() {
defer func() {
//清空聚合任务channel
drain(collector)
//捕获panic
if r := recover(); r != nil {
//调用cancel方法,立即结束
//fmt.Errorf("%v", r)
cancel(fmt.Errorf("%v", r))
} else {
//正常结束
finish()
}
}()
//执行数据加工
//注意writer.write将加工后数据写入了output
// 将mapper处理完的数据写入collector管道中
reducer(collector, writer, cancel)
}()
// 真正从生成器通道取数据执行Mapper
//异步执行数据加工
//source - 数据生产
//collector - 数据收集
//done - 结束标志
//workers - 并发数
go executeMappers(func(item interface{}, w Writer) {
//func(item interface{}, writer dlmapreduce.Writer, cancel func(error)) {
// //数据断言
// uid := item.(int64)
// ok, err := check(uid)
// if err != nil {
// cancel(err)
// }
// if ok {
// //因为 guardedWriter 结构体实现了 Writer interface 所以这里接口的形式调用会自动映射到对应的实现
// writer.Write(uid)
// }
//}
// MapperFunc func(item interface{}, writer Writer, cancel func(error))
mapper(item, w, cancel)
}, source, collector, done.Done(), options.workers)
//reducer将加工后的数据写入了output,
//需要数据返回时读取output即可
//假如output被写入了超过两次
//则开始的defer func那里将还可以读到数据
//由此可以检测到用户调用了多次write方法
value, ok := <-output
if err := retErr.Load(); err != nil {
return nil, err
} else if ok {
return value, nil
} else {
return nil, ErrReduceNoOutput
}
}