为什么需要 MapReduce
在实际的业务场景中我们常常需要从不同的 rpc 服务中获取相应属性来组装成复杂对象。
比如要查询商品详情:
- 商品服务-查询商品属性
- 库存服务-查询库存属性
- 价格服务-查询价格属性
- 营销服务-查询营销属性
如果是串行调用的话响应时间会随着 rpc 调用次数呈线性增长,所以我们要优化性能一般会将串行改并行。
简单的场景下使用 waitGroup 也能够满足需求,但是如果我们需要对 rpc 调用返回的数据进行校验、数据加工转换、数据汇总呢?继续使用 waitGroup 就有点力不从心了,go 的官方库中并没有这种工具(java 中提供了 CompleteFuture),go-zero 作者依据 mapReduce 架构思想实现了进程内的数据批处理 mapReduce 并发工具类。
设计思路
我们尝试把自己代入到作者的角色梳理一下并发工具可能的业务场景:
- 查询商品详情:支持并发调用多个服务来组合产品属性,支持调用错误可以立即结束。
- 商品详情页自动推荐用户卡券:支持并发校验卡券,校验失败自动剔除,返回全部卡券。
以上实际都是在进行对输入数据进行处理最后输出清洗后的数据,针对数据处理有个非常经典的异步模式:生产者消费者模式。于是我们可以抽象一下数据批处理的生命周期,大致可以分为三个阶段:
- 数据生产 generate
- 数据加工 mapper
- 数据聚合 reducer
其中数据生产是不可或缺的阶段,数据加工、数据聚合是可选阶段,数据生产与加工支持并发调用,数据聚合基本属于纯内存操作单协程即可。
再来思考一下不同阶段之间数据应该如何流转,既然不同阶段的数据处理都是由不同 goroutine 执行的,那么很自然的可以考虑采用 channel 来实现 goroutine 之间的通信。
如何实现随时终止流程呢?
很简单,goroutine 中监听一个全局的结束 channel 就行。
go-zero 代码实现
core/mr/mapreduce.go
详细源码可查看 github.com/Ouyangan/go…
前置知识 - channel 基本用法
因为 MapReduce 源码中大量使用 channel 进行通信,大概提一下 channel 基本用法:
- channel 写结束后记得关闭
1 | go复制代码 ch := make(chan interface{}) |
- 已关闭的 channel 依然支持读取
- 限定 channel 读写权限
1 | go复制代码//只读channel |
接口定义
先来看最核心的三个函数定义:
- 数据生产
- 数据加工
- 数据聚合
1 | go复制代码 //数据生产func |
面向用户的方法定义
面向用户的方法比较多,方法主要分为两大类:
- 无返回
- 执行过程发生错误立即终止
- 执行过程不关注错误
- 有返回值
- 手动写入 source,手动读取聚合数据 channel
- 手动写入 source,自动读取聚合数据 channel
- 外部传入 source,自动读取聚合数据 channel
1 | go复制代码//并发执行func,发生任何错误将会立即终止流程 |
核心方法是 MapReduceWithSource 和 Map,其他方法都在内部调用她两。弄清楚了 MapReduceWithSource 方法 Map 也不在话下。
MapReduceWithSource 源码实现
一切都在这张图里面了
1 | go复制代码//支持传入数据源channel,并返回聚合后的数据 |
1 | go复制代码//数据加工 |
总结
mapReduce 的源码我大概看了两个晚上,整体看下来比较累。一方面是我自身 go 语言并不是很熟练尤其是 channel 的用法,导致我需要频繁停下来查询相关文档理解作者的写法,另一方面是多个 goroutine 之间通过 channel 进行通信实现协作真的蛮烧脑(佩服作者的思维能力)。
其次看源码时第一遍看起来肯定会比较懵的,其实没关系找到程序的入口(公共基础组件一般是面向的方法)先沿着主线一路看下去把每一句代码都看懂加上注释,再看支线代码。
如果有实在看不懂的地方就查查这段代码的提交记录非常有可能是解决某个bug改动的,比如下面这段代码我死活看了好多遍都不理解。
1 | go复制代码 //聚合数据channel,需要手动调用write方法写入到output中 |
最后画出流程图基本就能把源码看懂了,对于我而言这方法比较笨但有效。
资料
本文转载自: 掘金