「这是我参与11月更文挑战的第2天,活动详情查看:2021最后一次更文挑战」
本文的阅读需要一些YARN Service的知识,相关内容请查看:
下面就ResourceManager的启动流程做详细的源码解读
1 serviceInit()
ResourceManager是一个Service,所以ResourceManager的启动当然是要找init()方法,该类本身没有实现Init()方法,所以使用父类方法,父类的Init()方法调用了serviceInit(),并且在ResourceManager 重写,下面详细看ResourceManager .serviceInit()
该方法主要做了下面三件事情:
- 创建各种子service实例
1
2
3 > java复制代码new xxx();createxxx();
>
>
- 将该实例加入到serviceList集合中
1
2
3
4 > java复制代码addService(service);
> addIfService(object);
>
>
- 执行各个子service的serviceInit()方法
1
2
3 > java复制代码super.init();
>
>
该类的serviceInit方法从上至下依次为:
生成上下文对象
1 | java复制代码this.rmContext = new RMContextImpl() |
加载配置文件core-site.xml
1 | java复制代码loadConfigurationXml(YarnConfiguration.CORE_SITE_CONFIGURATION_FILE); |
加载配置文件yarn-site.xml
1 | java复制代码loadConfigurationXml(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); |
HA的一些设置和相关服务地址的检查 默认:yarn.resourcemanager.ha.enabled = true
1 | java复制代码this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf)); |
初始化和添加AsyncDispatchen(重点)
1 | java复制代码rmDispatcher = setupDispatcher(); |
adminService
1 | java复制代码adminService = createAdminService(); |
选举相关
1 | java复制代码if (this.rmContext.isHAEnabled()) { |
当前节点为active RM才启动的服务 (service)
1 | java复制代码createAndInitActiveServices |
初始化各种服务(子service) 调用各种service实例的serviceInit( )方法
1 | java复制代码super.serviceInit(this.conf); |
初始化的内容很多,选取选取几个重要的内容进行详细说明:
2 createEmbeddedElector
下面详细看一下选举初始化的相关内容
createEmbeddedElector()方法作为入口
- 默认情况下是使用的zookeeperAPI进行选举,所以走else分支
1 | java复制代码createEmbeddedElector(){ |
- ActiveStandbyElectorBasedElectorService返回了该类型的选举实例,该类也是一个Service,所以该类的初始化找该类下面的serviceInit()方法
该方法主要做下面三件事情:
1
2
3
4
5 > 复制代码1.选举实例的创建
> 2.zookeeper的链接
> 3.创建选举过程中需要创建的路径
>
>
ActiveStandbyElectorBasedElectorService.serviceInit()具体为:
获取zookeeper地址
1 | java复制代码String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS); |
获取cluster id 和rescourcemanager id
1 | java复制代码String rmId = HAUtil.getRMHAId(conf); |
完成 zookeeper的链接 ,创建一个znode字节数组 localActiveNodeInfo对应zookeeperper znode上面所存储的数据
1 | java复制代码localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId); |
创建选举过程中需要的路径
zkBasePath = /yarn-leader-election (yarn-leader-election节点)
electionZNode = /yarn-leader-election/clusterId
1 | java复制代码String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH); |
选举的尝试次数 默认为3次
1 | java复制代码int maxRetryNum = |
创建选举实例
1 | java复制代码elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,electionZNode, zkAcls, zkAuths, this, maxRetryNum, false); |
确保父节点 zkBasePath 存在
1 | java复制代码elector.ensureParentZNode(); |
初始化服务,调用相关子服务的serviceInit()方法
1 | java复制代码super.serviceInit(conf) |
3 createAndInitActiveServices
createAndInitActiveServices是只要当前RecourceManager是active状态才需要启动的一些服务
核心入口就是下面这句代码,它的内部创建了一个RMActiveServices对象,并且调用了该实例的init)方法。其实init()方法的内部就是调用servicelnit()方法
1 | java复制代码protected void createAndInitActiveServices(boolean fromActive) { |
接下来看RMActiveServices的serviceInit()方法:
serviceInit()
创建一个选举线程来完成选举
- 之前创建了一个 选举实例
- 此时创建了一个 选举线程
选举实例,会尝试最多3次选举,如果没有成功, 则启动这个线程来执行选举
最终都是使用选举实例的选举方法实现选举的
1 | java复制代码standByTransitionRunnable = new StandByTransitionRunnable(); |
做token管理 指纹的一些工作
1 | ini复制代码rmSecretManagerService = createRMSecretManagerService(); |
在初始化方法中有好多好多的Service被创建并且添加到了serviceList列表中,后面就不在明确写出addService()方法
过期处理
当ApplicationMster 收到ResourceManager 新分配到一个container后,必须在一定的时间内(10min)在对应的NM上启动该container,否则就会回收
1 | ini复制代码containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher); |
资源调度策略 resourceScheduler 默认实现的是 CapacityScheduler
1 | java复制代码scheduler = createScheduler(); |
除了有各种各样的Service被创建外,还有很多的eventHandler被注册到AsyncDispatcher
注册方法有两个参数:
- 第一个参数为: 事件类型
- 第二个参数为: 事件处理器
如果提交一个事件到 AsyncDispatcher AsyncDispatcher就会找到之前注册的事件所对应的EventHandler 的handle 方法执行事件的处理
EventHandler 有可能是状态机,也可能就是一个简单的 EventHandler
1 | java复制代码rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); |
2.4 serviceStart()
接下来看ResourceManager的serviceStart()方法
刚启动当前的节点就成为standby
1 | java复制代码if (this.rmContext.isHAEnabled()) { |
启动用户webapp 用户认证服务
1 | java复制代码startWepApp(); |
起初始化创建的所有子Service进行启动
1 | java复制代码super.serviceStart(); |
如果没有启动HA 当前节点直接成为active,只有一台ResourceManager它肯定为active状态
1 | java复制代码if (!this.rmContext.isHAEnabled()) { |
此处注意,当启动了HA模式的时候,该怎么选举active节点呢?
还记得在ResourceManager.serviceInit()方法有下面一段代码
1 | java复制代码EmbeddedElector elector = createEmbeddedElector(); |
创建了一个选举的实例并且加入了serviceList中,现在所有的子service启动当然也包含了这个Service的启动,关于HARN HA 的选举机制将会在下一篇文章整理讲述。
本人在学习的路上,上述文章如有错误还请指教批评,谢谢。
本文转载自: 掘金