「这是我参与11月更文挑战的第3天,活动详情查看:2021最后一次更文挑战」
YARN HA方案 EmbeddedElector +StandByTransitionRunnable
执行原理 :
YARN HA ResourceManager
的选举,首先由EmbeddedElector
来做最多3次的同步尝试选举,如果没有选举成功,则交给StandByTransitionRunnable
线程来维护选举
首先来研究YARN HA的实现方案:
先来简要的说明上图:
- 灰色部分代表的是zookeeper中的部分,
/
是根目录,在选举方案中根目录下面有一个znode = yarn-leader-election,在该znode下面还有一个znode代表的clusterID- 当开启了HA模式的时候,集群启动,两个ResourceManager就会争相竞选,的在clusterID的znode下面创建ActiveStandbyElectorLock锁节点,两个ResourceManager谁首先创建了该节点谁就是active竞选成功,然后将自己的 ResourceManagerID 写到此节点中,并且创建另一个znode(ActiveBreadCrumb)来存储active节点的一些信息
- 另一个竞选失败的RM将会监听ActiveStandbyElectorLock,当集群中active发展故障下线的时候,ActiveStandbyElectorLock锁节点就会被删除,然后standby节点就会开始争抢创建锁节点
下面从源码中来印证,开启了HA,集群启动时候,会走ResourceManager.serviceInit() 方法的elector服务来进行竞选,也就是 ActiveStandbyElectorBasedElectorService
类,其serviceInit()方法之前的文章(Yarn ResourceManager启动流程源码解读)已经叙述,下面直接从serviceStart()方法入手:
进行选举的入口
localActiveNodeInfo = clusterid + rmid
对应zookeeper上面znode所存储的数据的内容
1 | java复制代码protected void serviceStart() throws Exception { |
下面详看 joinElection(localActiveNodeInfo) 方法
首先是拷贝数据,,然后转到另一个方法joinElectionInternal()
1 | ini复制代码appData = new byte[data.length]; |
详看 joinElectionInternal() 方法
确保Client的存在
1 | ini复制代码if (zkClient == null) { |
通过异步的方式创建分布式锁节点
1 | javascript复制代码createLockNodeAsync(){ |
上述代码的cb为一个回调函数(第五个参数),当zookeeper创建节点后转到回调函数进行相关的处理
回调函数cb为:callback = ActiveStandbyElector.processResult()
下面详细的查看函数processResult()
- 当创建锁节点成功的时候,当前的节点就会变为active状态
1 | scss复制代码processResult(){ |
becomeActive() 方法的具体内容为:
1 | scss复制代码ActiveStandbyElector.becomeActive(){ |
跳转到 ActiveStandbyElectorBasedElectorService.becomeActive() 方法
将active节点才能启动的Service都进行启动
1 | scss复制代码rm.getRMContext().getRMAdminService().transitionToActive(req){ |
走到此处的时候,就跳转到另一个Service activeServices
,直接看该Service的serviceStart()方法即可
- 当没有成功创建的时候
isSuccess(code) = false
此时有两种情况:
1 | markdown复制代码1. 节点已经存在,别人已经创建成功 |
第一种,别人已经创建成功锁节点,isNodeExists(code) = true
,此时当前节点直接成为standby状态
1 | scss复制代码if (isNodeExists(code)) { |
becomeStandby() 方法的具体内容为:
1 | scss复制代码ActiveStandbyElectorBasedElectorService.becomeStandby(){ |
第二种,没有成功创建锁节点并且锁节点到目前为止还没有被创建成功
- 当重试的次数小于最大的重试次数的时候就重新进行竞选,createLockNodeAsync() 就又返回到创建锁节点的地方
1 | ini复制代码if (createRetryCount < maxRetryNum) { |
- 当重试的次数大于了 maxRetryNum 的时候
最后交给线程StandByTransitionRunnable来选举 使用一个守护线程来进行选举
不要阻塞ResourceManager主线程其它服务的启动
1 | scss复制代码fatalError(errorMessage){ |
上面的方法用到了AsyncDispatcher,跳转到 RMFatalEventDispatcher.handle()
1 | typescript复制代码RMFatalEventDispatcher.handle(){ |
此处就转到了原来的方法 joinElectionInternal()
重新进行选举,之后的流程就完全相同了
至此,YARN HA 的ResourceManager的选举就全部结束
本人在学习的路上,上述文章如有错误还请指教批评。
本文转载自: 掘金