Architecture

将 资源管理、job 调度/监控 给分离开了
提供了三个角色

  • ResourceManager
  • NodeManager
  • ApplicationMaster

调度流程:

  1. Client 向 Yarn 提交 Application,如 spark、flink,mr 等
  2. ResourceManager 向 NodeManager 通信,为该 Application 分配第一个容器。并在这个容器中运行这个应用程序对应的 ApplicationMaster。
  3. ApplicationMaster 启动以后,对 作业 进行拆分,拆分 task 出来,这些 task 可以运行在一个或多个容器中。然后向 ResourceManager 申请要运行程序的容器,并定时向 ResourceManager 发送心跳
  4. 申请到容器后,ApplicationMaster 会去和容器对应的 NodeManager 通信,而后将作业分发到对应的 NodeManager 中的容器去运行,这里会将拆分后的 MapReduce 进行分发,对应容器中运行的可能是 Map 任务,也可能是 Reduce 任务
  5. 容器中运行的任务会向 ApplicationMaster 发送心跳,汇报自身情况。当程序运行完成后, ApplicationMaster 再向 ResourceManager 注销并释放容器资源

Scheduler

capacity调度,一个简单例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
<property>
  <name>yarn.scheduler.capacity.root.queues</name>
  <value>a,b,c</value>
  <description>The queues at the this level (root is the root queue).
  </description>
</property>

<property>
  <name>yarn.scheduler.capacity.root.a.queues</name>
  <value>a1,a2</value>
  <description>The queues at the this level (root is the root queue).
  </description>
</property>

<property>
  <name>yarn.scheduler.capacity.root.b.queues</name>
  <value>b1,b2,b3</value>
  <description>The queues at the this level (root is the root queue).
  </description>
</property>

映射策略

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
 <property>
    <name>yarn.scheduler.capacity.queue-mappings</name>
    <value>u:%user:%primary_group.%user</value>
    <description>Maps users to queue with the same name as user but
    parent queue name should be same as primary group of the user</description>
 </property>
 ...
 <property>
    <name>yarn.scheduler.capacity.queue-mappings</name>
    <value>u:%user:%secondary_group.%user</value>
    <description>Maps users to queue with the same name as user but
    parent queue name should be same as any secondary group of the user</description>
 </property>
 ...
 <property>
    <name>yarn.scheduler.capacity.queue-mappings</name>
    <value>u:%user:%user</value>
    <description>Maps users to queues with the same name as user</description>
 </property>
 ...
 <property>
    <name>yarn.scheduler.capacity.queue-mappings</name>
    <value>u:user2:%primary_group</value>
    <description>user2 is mapped to queue name same as primary group</description>
 </property>
 ...
 <property>
    <name>yarn.scheduler.capacity.queue-mappings</name>
    <value>u:user3:%secondary_group</value>
    <description>user3 is mapped to queue name same as secondary group</description>
 </property>
 ...
 <property>
    <name>yarn.scheduler.capacity.queue-mappings</name>
    <value>u:user1:queue1</value>
    <description>user1 is mapped to queue1</description>
 </property>
 ...
 <property>
    <name>yarn.scheduler.capacity.queue-mappings</name>
    <value>g:group1:queue2</value>
    <description>group1 is mapped to queue2</description>
 </property>
 ...
 <property>
    <name>yarn.scheduler.capacity.queue-mappings</name>
    <value>u:user1:queue1,u:user2:queue2</value>
    <description>Here, <user1> is mapped to <queue1>, <user2> is mapped to <queue2> respectively</description>
 </property>

  <property>
    <name>yarn.scheduler.queue-placement-rules.app-name</name>
    <value>appName1:queue1,%application:%application</value>
    <description>
      Here, <appName1> is mapped to <queue1>, maps applications to queues with
      the same name as application respectively. The mappings will be
      evaluated from left to right, and the first valid mapping will be used.
    </description>
  </property>

自动创建 leaf 队列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
 <property>
   <name>yarn.scheduler.capacity.root.parent1.auto-create-child-queue.enabled</name>
   <value>true</value>
 </property>
 <property>
    <name>yarn.scheduler.capacity.root.parent1.leaf-queue-template.capacity</name>
    <value>5</value>
 </property>
 <property>
    <name>yarn.scheduler.capacity.root.parent1.leaf-queue-template.maximum-capacity</name>
    <value>100</value>
 </property>
 <property>
    <name>yarn.scheduler.capacity.root.parent1.leaf-queue-template.user-limit-factor</name>
    <value>3.0</value>
 </property>
 <property>
    <name>yarn.scheduler.capacity.root.parent1.leaf-queue-template.ordering-policy</name>
    <value>fair</value>
 </property>
 <property>
    <name>yarn.scheduler.capacity.root.parent1.GPU.capacity</name>
    <value>50</value>
 </property>
 <property>
     <name>yarn.scheduler.capacity.root.parent1.accessible-node-labels</name>
     <value>GPU,SSD</value>
   </property>
 <property>
     <name>yarn.scheduler.capacity.root.parent1.leaf-queue-template.accessible-node-labels</name>
     <value>GPU</value>
  </property>
 <property>
    <name>yarn.scheduler.capacity.root.parent1.leaf-queue-template.accessible-node-labels.GPU.capacity</name>
    <value>5</value>
 </property>

HA

resource manager 配置 HA

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<property>
  <name>yarn.resourcemanager.ha.enabled</name>
  <value>true</value>
</property>
<property>
  <name>yarn.resourcemanager.cluster-id</name>
  <value>cluster1</value>
</property>
<property>
  <name>yarn.resourcemanager.ha.rm-ids</name>
  <value>rm1,rm2</value>
</property>
<property>
  <name>yarn.resourcemanager.hostname.rm1</name>
  <value>master1</value>
</property>
<property>
  <name>yarn.resourcemanager.hostname.rm2</name>
  <value>master2</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm1</name>
  <value>master1:8088</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm2</name>
  <value>master2:8088</value>
</property>
<property>
  <name>hadoop.zk.address</name>
  <value>zk1:2181,zk2:2181,zk3:2181</value>
</property>

其他如 application master,node manager 是依靠 YARN 内置的机制实现 HA 的
application master可以提供自定义的 checkpoint 机制,这样可以快速恢复
或者实现 冥等写入机制等

Node Lable

有点类似 k8s 的 lables,可以给节点打标签
比如只对 node5 打了 GPU 的标签

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
yarn.scheduler.capacity.root.queues=engineering,marketing,sales
yarn.scheduler.capacity.root.engineering.capacity=33
yarn.scheduler.capacity.root.marketing.capacity=34
yarn.scheduler.capacity.root.sales.capacity=33

yarn.scheduler.capacity.root.engineering.accessible-node-labels=GPU
yarn.scheduler.capacity.root.marketing.accessible-node-labels=GPU

yarn.scheduler.capacity.root.engineering.accessible-node-labels.GPU.capacity=50
yarn.scheduler.capacity.root.marketing.accessible-node-labels.GPU.capacity=50

yarn.scheduler.capacity.root.engineering.default-node-label-expression=GPU

Timeline

Timeline Structure

层次结构

  • Timeline Domain
  • Timeline Entity
  • Timeline Events

V2 版本架构

YARN code

写一个 yarn 应用

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181

public class MyYarnClient {
    static private Logger log = Logger.getLogger("my-yarn-client");

    public static void main(String[] args) throws IOException, YarnException {
        YarnConfiguration conf = new YarnConfiguration();
        // Set the address of the YARN ResourceManager
        conf.set(YarnConfiguration.RM_ADDRESS, "YARN_HOST:8050");
        conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, "YARN_HOST:8030");
        conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "YARN_HOST:8025");
        conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, "YARN_HOST:8141");
        conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "YARN_HOST:8088");

        // 1 创建并启动YarnClient
        YarnClient yarnClient = YarnClient.createYarnClient();
        yarnClient.init(conf);
        /*
        YarnClient 内容通过 ApplicationClientProtocol 与 ResourceManager 通信,
        跟踪进去可以在 YarnClientImpl 找到 rpc,
        this.rmClient = (ApplicationClientProtocol)ClientRMProxy.createRMProxy(this.getConfig(), ApplicationClientProtocol.class);
         */
        yarnClient.start();


        // 2 通过YarnClient创建Application
        YarnClientApplication app = yarnClient.createApplication();
        // GetNewApplicationResponse 中包含了 ApplicationId, ResourceCapability 等内容
        GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
        System.out.println(appResponse.getApplicationId());

        // 3 完善 ApplicationSubmissionContext 所需内容
        // 需要配置 appId、queue、resource、priority 等等
        ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
        ApplicationId applicationId = appContext.getApplicationId();

        // 3.1 设置application name
        appContext.setApplicationName("my-test-app");
        // 3.2 设置ContainerLaunchContext
        // localResources, env, commands 等
        // application master 的 jar 放到 localResources 中
        ContainerLaunchContext amContainerCtx = createAMContainerLaunchContext(
                conf, app.getApplicationSubmissionContext().getApplicationId());
        appContext.setAMContainerSpec(amContainerCtx);
        // 3.3 设置优先级
        Priority pri = Priority.newInstance(0);
        appContext.setPriority(pri);
        // 3.4 设置队列
//        appContext.setQueue("default");
        appContext.setQueue("default");
        appContext.setApplicationName("my-test123");



        // 3.5 设置 am 资源
        int amMemory = 1024;
        int amVCores = 1;
        Resource capability = Resource.newInstance(amMemory, amVCores);
        appContext.setResource(capability);

        // 4 提交Application
        // 这里交给 YarnClientImpl 执行 rmClient.submitApplication(request),通过 RPC ApplicationClientProtocol 提交到 RM
        ApplicationId appId = yarnClient.submitApplication(appContext);

        // 5 获取Application信息
        monitorApplicationReport(yarnClient, appId);
    }

    private static ContainerLaunchContext createAMContainerLaunchContext(
            Configuration conf, ApplicationId appId) throws IOException {
        // localResources 中存储需要的文件(app master jar、log4j properties 等)
        Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
        // todo app master jar 在这里配置

        FileSystem fs = FileSystem.get(conf);
        String thisJar = ClassUtil.findContainingJar(Client.class);
        String thisJarBaseName = FilenameUtils.getName(thisJar);
        log.info("thisJar is " + thisJar);

        // addToLocalResources(fs, thisJar, thisJarBaseName, appId.toString(), localResources);

        //Set CLASSPATH environment
        Map<String, String> env = new HashMap<String, String>();
        StringBuilder classPathEnv = new StringBuilder(
                ApplicationConstants.Environment.CLASSPATH.$$());
        classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
        classPathEnv.append("./*");
        for (String c : conf.getStrings(
                        YarnConfiguration.YARN_APPLICATION_CLASSPATH,
                        YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
            classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
            classPathEnv.append(c.trim());
        }

        if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
            classPathEnv.append(':');
            classPathEnv.append(System.getProperty("java.class.path"));
        }
        env.put(ApplicationConstants.Environment.CLASSPATH.name(), classPathEnv.toString());

        //Build the execute command
        List<String> commands = new LinkedList<String>();
        StringBuilder command = new StringBuilder();
        command.append(ApplicationConstants.Environment.JAVA_HOME.$$()).append("/bin/java  ");
        command.append("-Dlog4j.configuration=container-log4j.properties ");
        command.append("-Dyarn.app.container.log.dir=" +
                ApplicationConstants.LOG_DIR_EXPANSION_VAR + " ");
        command.append("-Dyarn.app.container.log.filesize=0 ");
        command.append("-Dhadoop.root.logger=INFO,CLA ");
        command.append("trumanz.yarnExample.ApplicationMaster ");
        command.append("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout ");
        command.append("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr ");
        commands.add(command.toString());

        ContainerLaunchContext amContainer = ContainerLaunchContext
                .newInstance(localResources, env, commands, null, null, null);

        // Setup security tokens 权限认证
        if (UserGroupInformation.isSecurityEnabled()) {
            Credentials credentials = new Credentials();
            String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
            if (tokenRenewer == null || tokenRenewer.length() == 0) {
                throw new IOException(
                        "Can't get Master Kerberos principal for the RM to use as renewer");
            }

            // For now, only getting tokens for the default file-system.
            final org.apache.hadoop.security.token.Token<?>[] tokens = fs.addDelegationTokens(tokenRenewer, credentials);
            if (tokens != null) {
                for (Token<?> token : tokens) {
                    log.info("Got dt for " + fs.getUri() + "; " + token);
                }
            }
            DataOutputBuffer dob = new DataOutputBuffer();
            credentials.writeTokenStorageToStream(dob);
            ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
            amContainer.setTokens(fsTokens);
        }
        return amContainer;
    }

    private static void addToLocalResources(FileSystem fs, String fileSrcPath,
                                            String fileDstPath, String appId,
                                            Map<String, LocalResource> localResources)
            throws IllegalArgumentException, IOException {
        String suffix = "mytest" + "/" + appId + "/" + fileDstPath;
        Path dst = new Path(fs.getHomeDirectory(), suffix);
        log.info("hdfs copyFromLocalFile " + fileSrcPath + " =>" + dst);
        fs.copyFromLocalFile(new Path(fileSrcPath), dst);
        FileStatus scFileStatus = fs.getFileStatus(dst);
        LocalResource scRsrc = LocalResource.newInstance(
                ConverterUtils.getYarnUrlFromPath(dst), LocalResourceType.FILE,
                LocalResourceVisibility.APPLICATION, scFileStatus.getLen(),
                scFileStatus.getModificationTime());

        localResources.put(fileDstPath, scRsrc);

    }

    private static void monitorApplicationReport(YarnClient yarnClient, ApplicationId appId) throws YarnException, IOException {
        while (true) {
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ApplicationReport report = yarnClient.getApplicationReport(appId);
            System.out.println("Got application report " +
                    ", clientToAMToken=" + report.getClientToAMToken()
                    + ", appDiagnostics=" + report.getDiagnostics()
                    + ", appMasterHost=" + report.getHost()
                    + ", appQueue=" + report.getQueue()
                    + ", appMasterRpcPort=" + report.getRpcPort()
                    + ", appStartTime=" + report.getStartTime()
                    + ", yarnAppState=" + report.getYarnApplicationState().toString()
                    + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
                    + ", appTrackingUrl=" + report.getTrackingUrl()
                    + ", appUser=" + report.getUser());
        }
    }

}

写一个 application master

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196

public class MyApplicationMaster {

    private static final Log log = LogFactory.getLog(MyApplicationMaster.class);

    @SuppressWarnings("rawtypes")
    AMRMClientAsync amRMClient = null;
    NMClientAsyncImpl amNMClient = null;

    AtomicInteger numTotalContainers = new AtomicInteger(10);
    AtomicInteger numCompletedContainers = new AtomicInteger(0);
    ExecutorService exeService = Executors.newCachedThreadPool();
    Map<ContainerId, Container> runningContainers = new ConcurrentHashMap<ContainerId, Container>();

    private final AtomicInteger sleepSeconds = new AtomicInteger(0);


    public static void main(String[] args) throws Exception {
        MyApplicationMaster am = new MyApplicationMaster();
        am.run();
        am.waitComplete();
    }


    @SuppressWarnings("unchecked")
    void run() throws YarnException, IOException {

        // logInformation();
        Configuration conf = new Configuration();

        // 1 create amRMClient
        amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, new RMCallbackHandler());
        amRMClient.init(conf);
        amRMClient.start();

        // 2 Create nmClientAsync
        amNMClient = new NMClientAsyncImpl(new NMCallbackHandler());
        amNMClient.init(conf);
        amNMClient.start();

        // 3 register with RM and this will heart beating to RM
        RegisterApplicationMasterResponse response = amRMClient
                .registerApplicationMaster(NetUtils.getHostname(), -1, "");

        // 4 Request containers
        response.getContainersFromPreviousAttempts();
        // 4.1 check resource
        long maxMem = response.getMaximumResourceCapability().getMemorySize();
        int maxVCores = response.getMaximumResourceCapability().getVirtualCores();

        // 4.2 request containers base on avail resource
        for (int i = 0; i < numTotalContainers.get(); i++) {
            ContainerRequest containerAsk = new ContainerRequest(
                    //100*10M + 1vcpu
                    Resource.newInstance(100, 1), null, null,
                    Priority.newInstance(0));
            amRMClient.addContainerRequest(containerAsk);
        }
    }

    void waitComplete() throws YarnException, IOException{
        while(numTotalContainers.get() != numCompletedContainers.get()){
            try{
                Thread.sleep(1000);
                log.info("waitComplete" +
                        ", numTotalContainers=" + numTotalContainers.get() +
                        ", numCompletedConatiners=" + numCompletedContainers.get());
            } catch (InterruptedException ex){}
        }
        log.info("ShutDown exeService Start");
        exeService.shutdown();
        log.info("ShutDown exeService Complete");
        amNMClient.stop();
        log.info("amNMClient stop Complete");
        amRMClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "dummy Message", null);
        log.info("unregisterApplicationMaster Complete");
        amRMClient.stop();
        log.info("amRMClient stop Complete");
    }

    // 之前版本(如 2.6)使用的时候还是实现的接口,不知为何后面改为了继承抽象类
    // 增加了方法 onContainersUpdated,猜测是支持 container 容量变化使用的
    private class RMCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler {
        @Override
        public void onContainersCompleted(List<ContainerStatus> statuses) {
            for (ContainerStatus status : statuses) {
                log.info("Container completed: " + status.getContainerId().toString()
                        + " exitStatus=" + status.getExitStatus());
                if (status.getExitStatus() != 0) {
                    log.error("Container return error status: " + status.getExitStatus());
                    log.warn("Need rerun container!");
                    // do something restart container
                    continue;
                }
                ContainerId containerId = status.getContainerId();
                runningContainers.remove(containerId);
                numCompletedContainers.addAndGet(1);
            }
        }

        @Override
        public void onContainersAllocated(List<Container> containers) {
            for (Container c : containers) {
                log.info("Container Allocated, id = " + c.getId() + ", containerNode = " + c.getNodeId());
                exeService.submit(new LaunchContainerTask(c));
            }

        }

        @Override
        public void onContainersUpdated(List<UpdatedContainer> containers) {

        }

        @Override
        public void onShutdownRequest() {

        }

        @Override
        public void onNodesUpdated(List<NodeReport> updatedNodes) {

        }

        @Override
        public float getProgress() {
            return 0;
        }

        @Override
        public void onError(Throwable e) {
            amRMClient.stop();
        }
    }

    // 同理 nm 的 CallbackHandler 也推荐使用抽象类,多了方法 onContainerResourceIncreased,支持扩容容器
    // 也有问题是,为什么不是接口继承而是抽象类继承?猜测可能是为了避免接口使用继承
    private class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler {
        @Override
        public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
            log.info("Container Stared " + containerId.toString());
        }

        @Override
        public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {

        }

        @Override
        public void onContainerStopped(ContainerId containerId) {

        }

        @Override
        public void onStartContainerError(ContainerId containerId, Throwable t) {

        }

        @Override
        public void onContainerResourceIncreased(ContainerId containerId, Resource resource) {

        }

        @Override
        public void onGetContainerStatusError(ContainerId containerId, Throwable t) {

        }

        @Override
        public void onIncreaseContainerResourceError(ContainerId containerId, Throwable t) {

        }

        @Override
        public void onStopContainerError(ContainerId containerId, Throwable t) {

        }
    }

    private class LaunchContainerTask implements Runnable {
        Container container;

        public LaunchContainerTask(Container container) {
            this.container = container;
        }

        @Override
        public void run() {
            LinkedList<String> commands = new LinkedList<>();
            commands.add("sleep " + sleepSeconds.addAndGet(1));
            ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(null, null, commands, null, null, null);
            // 这里去执行 amNMClient 的回调
            amNMClient.startContainerAsync(container, ctx);
        }
    }
}

Timeline code

代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
    public void go() {
        // Create and start the Timeline client
        TimelineClient client = TimelineClient.createTimelineClient();
        client.init(null);
        client.start();

        try {
            TimelineDomain myDomain = new TimelineDomain();
            myDomain.setId("MyDomain");
            // Compose other Domain info ....

            client.putDomain(myDomain);

            TimelineEntity myEntity = new TimelineEntity();
            myEntity.setDomainId(myDomain.getId());
            myEntity.setEntityType("APPLICATION");
            myEntity.setEntityId("MyApp1");
            // Compose other entity info

            TimelinePutResponse response = client.putEntities(myEntity);

            TimelineEvent event = new TimelineEvent();
            event.setEventType("APP_FINISHED");
            event.setTimestamp(System.currentTimeMillis());
            event.addEventInfo("Exit Status", "SUCCESS");
            // Compose other Event info ....

            myEntity.addEvent(event);
            TimelinePutResponse response2 = client.putEntities(myEntity);

            response2.toString();

        } catch (IOException e) {
            // Handle the exception
        } catch (RuntimeException e) {
            // In Hadoop 2.6, if attempts submit information to the Timeline Server fail more than the retry limit,
            // a RuntimeException will be raised. This may change in future releases, being
            // replaced with a IOException that is (or wraps) that which triggered retry failures.
        } catch (YarnException e) {
            // Handle the exception
        } finally {
            // Stop the Timeline client
            client.stop();
        }
    }

参考