Yarn ResourceManager启动流程源码解读

「这是我参与11月更文挑战的第2天,活动详情查看:2021最后一次更文挑战

本文的阅读需要一些YARN Service的知识,相关内容请查看:

YARN源码不知从何入手,那是你不知道Service

下面就ResourceManager的启动流程做详细的源码解读

1 serviceInit()

ResourceManager是一个Service,所以ResourceManager的启动当然是要找init()方法,该类本身没有实现Init()方法,所以使用父类方法,父类的Init()方法调用了serviceInit(),并且在ResourceManager 重写,下面详细看ResourceManager .serviceInit()

该方法主要做了下面三件事情:

  1. 创建各种子service实例
1
2
3
> java复制代码new xxx();createxxx();
>
>
  1. 将该实例加入到serviceList集合中
1
2
3
4
> java复制代码addService(service); 
> addIfService(object);
>
>
  1. 执行各个子service的serviceInit()方法
1
2
3
> java复制代码super.init();
>
>

该类的serviceInit方法从上至下依次为:

生成上下文对象

1
java复制代码this.rmContext = new RMContextImpl()

加载配置文件core-site.xml

1
java复制代码loadConfigurationXml(YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);

加载配置文件yarn-site.xml

1
java复制代码loadConfigurationXml(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);

HA的一些设置和相关服务地址的检查 默认:yarn.resourcemanager.ha.enabled = true

1
java复制代码this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));

初始化和添加AsyncDispatchen(重点)

1
2
3
java复制代码rmDispatcher = setupDispatcher();
addIfService(rmDispatcher);
rmContext.setDispatcher(rmDispatcher);

adminService

1
2
3
java复制代码adminService = createAdminService();
addService(adminService);
rmContext.setRMAdminService(adminService);

选举相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码if (this.rmContext.isHAEnabled()) {
/**
* yarn.resourcemanager.ha.automatic-failover.emabled = true
* yarn.resourcemanager.ha.automatic-failover.embedde = true
* embedde = true 表明使用原生的zookeeperAPI来执行选举 否则使用curator框架来实现
* 不同点:zookeeperAPI复杂一点
*/
if (HAUtil.isAutomaticFailoverEnabled(conf)
&& HAUtil.isAutomaticFailoverEmbedded(conf)) {
//创建选举器 ActiveStandbyElectorBasedElectorService
// 阻塞的进行最大三次的尝试选举 不成功交给线程
EmbeddedElector elector = createEmbeddedElector();
addIfService(elector);
rmContext.setLeaderElectorService(elector);
}
}

当前节点为active RM才启动的服务 (service)

1
java复制代码createAndInitActiveServices

初始化各种服务(子service) 调用各种service实例的serviceInit( )方法

1
java复制代码super.serviceInit(this.conf);

初始化的内容很多,选取选取几个重要的内容进行详细说明:

2 createEmbeddedElector

下面详细看一下选举初始化的相关内容

createEmbeddedElector()方法作为入口

  1. 默认情况下是使用的zookeeperAPI进行选举,所以走else分支
1
2
3
4
5
6
7
8
java复制代码createEmbeddedElector(){
if (curatorEnabled) {
this.zkManager = createAndStartZKManager(conf);
elector = new CuratorBasedElectorService(this);
} else {
elector = new ActiveStandbyElectorBasedElectorService(this);
}
}
  1. ActiveStandbyElectorBasedElectorService返回了该类型的选举实例,该类也是一个Service,所以该类的初始化找该类下面的serviceInit()方法

该方法主要做下面三件事情:

1
2
3
4
5
> 复制代码1.选举实例的创建
> 2.zookeeper的链接
> 3.创建选举过程中需要创建的路径
>
>

ActiveStandbyElectorBasedElectorService.serviceInit()具体为:

获取zookeeper地址

1
java复制代码String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);

获取cluster id 和rescourcemanager id

1
2
java复制代码String rmId = HAUtil.getRMHAId(conf);
String clusterId = YarnConfiguration.getClusterId(conf);

完成 zookeeper的链接 ,创建一个znode字节数组 localActiveNodeInfo对应zookeeperper znode上面所存储的数据

1
java复制代码localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);

创建选举过程中需要的路径

zkBasePath = /yarn-leader-election (yarn-leader-election节点)

electionZNode = /yarn-leader-election/clusterId

1
2
java复制代码String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
String electionZNode = zkBasePath + "/" + clusterId;

选举的尝试次数 默认为3次

1
2
3
4
java复制代码int maxRetryNum =
conf.getInt(YarnConfiguration.RM_HA_FC_ELECTOR_ZK_RETRIES_KEY, conf
.getInt(CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT));

创建选举实例

1
java复制代码elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,electionZNode, zkAcls, zkAuths, this, maxRetryNum, false);

确保父节点 zkBasePath 存在

1
java复制代码elector.ensureParentZNode();

初始化服务,调用相关子服务的serviceInit()方法

1
java复制代码super.serviceInit(conf)

3 createAndInitActiveServices

createAndInitActiveServices是只要当前RecourceManager是active状态才需要启动的一些服务

核心入口就是下面这句代码,它的内部创建了一个RMActiveServices对象,并且调用了该实例的init)方法。其实init()方法的内部就是调用servicelnit()方法

1
2
3
4
5
java复制代码protected void createAndInitActiveServices(boolean fromActive) {
activeServices = new RMActiveServices(this);
activeServices.fromActive = fromActive;
activeServices.init(conf);
}

接下来看RMActiveServices的serviceInit()方法:

serviceInit()

创建一个选举线程来完成选举

  1. 之前创建了一个 选举实例
  2. 此时创建了一个 选举线程

选举实例,会尝试最多3次选举,如果没有成功, 则启动这个线程来执行选举
最终都是使用选举实例的选举方法实现选举的

1
java复制代码standByTransitionRunnable = new StandByTransitionRunnable();

做token管理 指纹的一些工作

1
2
ini复制代码rmSecretManagerService = createRMSecretManagerService();
addService(rmSecretManagerService);

在初始化方法中有好多好多的Service被创建并且添加到了serviceList列表中,后面就不在明确写出addService()方法


过期处理

当ApplicationMster 收到ResourceManager 新分配到一个container后,必须在一定的时间内(10min)在对应的NM上启动该container,否则就会回收

1
ini复制代码containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);

资源调度策略 resourceScheduler 默认实现的是 CapacityScheduler

1
java复制代码scheduler = createScheduler();

除了有各种各样的Service被创建外,还有很多的eventHandler被注册到AsyncDispatcher

注册方法有两个参数:

  • 第一个参数为: 事件类型
  • 第二个参数为: 事件处理器

如果提交一个事件到 AsyncDispatcher AsyncDispatcher就会找到之前注册的事件所对应的EventHandler 的handle 方法执行事件的处理

EventHandler 有可能是状态机,也可能就是一个简单的 EventHandler

1
2
3
4
5
6
7
8
9
10
java复制代码rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
// Register event handler for RmAppEvents
rmDispatcher.register(RMAppEventType.class,
new ApplicationEventDispatcher(rmContext));
// Register event handler for RmAppAttemptEvents
rmDispatcher.register(RMAppAttemptEventType.class,
new ApplicationAttemptEventDispatcher(rmContext));
// Register event handler for RmNodes
rmDispatcher.register(
RMNodeEventType.class, new NodeEventDispatcher(rmContext));

2.4 serviceStart()

接下来看ResourceManager的serviceStart()方法

刚启动当前的节点就成为standby

1
2
3
java复制代码if (this.rmContext.isHAEnabled()) {
transitionToStandby(false);
}

启动用户webapp 用户认证服务

1
2
3
4
5
6
java复制代码startWepApp();
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER,
false)) {
int port = webApp.port();
WebAppUtils.setRMWebAppPort(conf, port);
}

起初始化创建的所有子Service进行启动

1
java复制代码super.serviceStart();

如果没有启动HA 当前节点直接成为active,只有一台ResourceManager它肯定为active状态

1
2
3
java复制代码if (!this.rmContext.isHAEnabled()) {
transitionToActive();
}

此处注意,当启动了HA模式的时候,该怎么选举active节点呢?

还记得在ResourceManager.serviceInit()方法有下面一段代码

1
2
3
java复制代码EmbeddedElector elector = createEmbeddedElector();
addIfService(elector);
rmContext.setLeaderElectorService(elector);

创建了一个选举的实例并且加入了serviceList中,现在所有的子service启动当然也包含了这个Service的启动,关于HARN HA 的选举机制将会在下一篇文章整理讲述。

本人在学习的路上,上述文章如有错误还请指教批评,谢谢。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%