YARN SLS-SLSRunner完整源码分析

xiaoxiao2021-03-01  9

整个slsruner从main函数开始:

public static void main(String[] argv) throws Exception { exitAtTheFinish = true; ToolRunner.run(new Configuration(), new SLSRunner(), argv);

函数中调用了run函数,并通过argv传递了命令行参数:

SLSrunner.java

public int run(final String[] argv) throws IOException, InterruptedException, ParseException, ClassNotFoundException, YarnException { Options options = new Options(); // Left for compatibility options.addOption("inputrumen", true, "input rumen files"); options.addOption("inputsls", true, "input sls files"); // New more general format options.addOption("tracetype", true, "the type of trace"); options.addOption("tracelocation", true, "input trace files"); options.addOption("nodes", true, "input topology"); options.addOption("output", true, "output directory"); options.addOption("trackjobs", true, "jobs to be tracked during simulating"); options.addOption("printsimulation", false, "print out simulation information"); CommandLineParser parser = new GnuParser(); CommandLine cmd = parser.parse(options, argv); String traceType = null; String traceLocation = null; // compatibility with old commandline if (cmd.hasOption("inputrumen")) { traceType = "RUMEN"; traceLocation = cmd.getOptionValue("inputrumen"); } if (cmd.hasOption("inputsls")) { traceType = "SLS"; traceLocation = cmd.getOptionValue("inputsls"); } if (cmd.hasOption("tracetype")) { traceType = cmd.getOptionValue("tracetype"); traceLocation = cmd.getOptionValue("tracelocation"); } String output = cmd.getOptionValue("output"); File outputFile = new File(output); if (!outputFile.exists() && !outputFile.mkdirs()) { System.err.println("ERROR: Cannot create output directory " + outputFile.getAbsolutePath()); throw new YarnException("Cannot create output directory"); } Set<String> trackedJobSet = new HashSet<String>(); if (cmd.hasOption("trackjobs")) { String trackjobs = cmd.getOptionValue("trackjobs"); String jobIds[] = trackjobs.split(","); trackedJobSet.addAll(Arrays.asList(jobIds)); } String tempNodeFile = cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : ""; TraceType tempTraceType = TraceType.SLS; switch (traceType) { case "SLS": tempTraceType = TraceType.SLS; break; case "RUMEN": tempTraceType = TraceType.RUMEN; break; case "SYNTH": tempTraceType = TraceType.SYNTH; break; default: printUsage(); throw new YarnException("Misconfigured input"); } String[] inputFiles = traceLocation.split(","); setSimulationParams(tempTraceType, inputFiles, tempNodeFile, output, trackedJobSet, cmd.hasOption("printsimulation")); start(); return 0; }

首先是添加一些option,也就是保证能够识别这些参数,比如inputsls,nodes等。

添加好后,和传入的参数进行核对,得到被选中的参数及其值,比如:

// compatibility with old commandline if (cmd.hasOption("inputrumen")) { traceType = "RUMEN"; traceLocation = cmd.getOptionValue("inputrumen"); } if (cmd.hasOption("inputsls")) { traceType = "SLS"; traceLocation = cmd.getOptionValue("inputsls"); }

这里如果传入的是SLS,那么traceType就是SLS,traceLocation就是inputsls参数传入的json文件了。tempTraceType就是TraceType.SLS。

函数最后调用start方法:

public void start() throws IOException, ClassNotFoundException, YarnException, InterruptedException { // start resource manager startRM(); // start node managers startNM(); // start application masters startAM(); // set queue & tracked apps information ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() .setQueueSet(this.queueAppNumMap.keySet()); ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() .setTrackedAppSet(this.trackedApps); // print out simulation info printSimulationInfo(); // blocked until all nodes RUNNING waitForNodesRunning(); // starting the runner once everything is ready to go, runner.start(); }

start方法比较核心,在其中依次开启RM,NM,AM,然后等待所有Node正常运行后,开启runner。依次分析这些函数:

startRM:

private void startRM() throws ClassNotFoundException, YarnException { Configuration rmConf = new YarnConfiguration(getConf()); String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER); if (Class.forName(schedulerClass) == CapacityScheduler.class) { rmConf.set(YarnConfiguration.RM_SCHEDULER, SLSCapacityScheduler.class.getName()); rmConf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); rmConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, ProportionalCapacityPreemptionPolicy.class.getName()); } else if (Class.forName(schedulerClass) == FairScheduler.class) { rmConf.set(YarnConfiguration.RM_SCHEDULER, SLSFairScheduler.class.getName()); } else if (Class.forName(schedulerClass) == FifoScheduler.class) { // TODO add support for FifoScheduler throw new YarnException("Fifo Scheduler is not supported yet."); } rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir); final SLSRunner se = this; rm = new ResourceManager() { @Override protected ApplicationMasterLauncher createAMLauncher() { return new MockAMLauncher(se, this.rmContext, amMap); } }; // Across runs of parametrized tests, the JvmMetrics objects is retained, // but is not registered correctly JvmMetrics jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null); jvmMetrics.registerIfNeeded(); // Init and start the actual ResourceManager rm.init(rmConf); rm.start(); }

首先是通过YarnConfiguration获得yarn-site.xml中的参数,然后从中获得schedule的方案,是capacity,FIFO或者fair。默认是fair。

然后创建一个ResourceManager的对象,并重写createAMLauncher方法,该方法是负责管理AM的,方法中调用了MockAMLauncher函数。该方法实现在MockAMLauncher.java中。

最后通过RM自身的rm.init和 rm.start开启RM。

startNM:

private void startNM() throws YarnException, IOException, InterruptedException { // nm configuration int heartbeatInterval = getConf().getInt( SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT); float resourceUtilizationRatio = getConf().getFloat( SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO, SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT); // nm information (fetch from topology file, or from sls/rumen json file) Map<String, Resource> nodeResourceMap = new HashMap<>(); Set<? extends String> nodeSet; if (nodeFile.isEmpty()) { for (String inputTrace : inputTraces) { switch (inputType) { case SLS: nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace); for (String node : nodeSet) { nodeResourceMap.put(node, null); } break; case RUMEN: nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace); for (String node : nodeSet) { nodeResourceMap.put(node, null); } break; case SYNTH: stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(), stjp.getNumNodes()/stjp.getNodesPerRack()); for (String node : nodeSet) { nodeResourceMap.put(node, null); } break; default: throw new YarnException("Input configuration not recognized, " + "trace type should be SLS, RUMEN, or SYNTH"); } } } else { nodeResourceMap = SLSUtils.parseNodesFromNodeFile(nodeFile, nodeManagerResource); } if (nodeResourceMap.size() == 0) { throw new YarnException("No node! Please configure nodes."); } // create NM simulators Random random = new Random(); Set<String> rackSet = new ConcurrentHashSet<>(); int threadPoolSize = Math.max(poolSize, SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); ExecutorService executorService = Executors. newFixedThreadPool(threadPoolSize); for (Map.Entry<String, Resource> entry : nodeResourceMap.entrySet()) { executorService.submit(new Runnable() { @Override public void run() { try { // we randomize the heartbeat start time from zero to 1 interval NMSimulator nm = new NMSimulator(); Resource nmResource = nodeManagerResource; String hostName = entry.getKey(); if (entry.getValue() != null) { nmResource = entry.getValue(); } nm.init(hostName, nmResource, random.nextInt(heartbeatInterval), heartbeatInterval, rm, resourceUtilizationRatio); nmMap.put(nm.getNode().getNodeID(), nm); runner.schedule(nm); rackSet.add(nm.getNode().getRackName()); } catch (IOException | YarnException e) { LOG.error("Got an error while adding node", e); } } }); } executorService.shutdown(); executorService.awaitTermination(10, TimeUnit.MINUTES); numRacks = rackSet.size(); numNMs = nmMap.size(); }

nodeResourceMap = SLSUtils.parseNodesFromNodeFile从--nodes参数获得的nodes机架配置文件nodeFile和nodeManagerResource作为参数,获得Node的所有参数。而nodeManagerResource是每个Node的mem,cpu信息。那么这些信息是什么时候从sls-runner.xml文件传入到nodeManagerResource的呢?

在SLSRunner.java中,有个init函数:

private void init(Configuration tempConf) throws ClassNotFoundException { nmMap = new ConcurrentHashMap<>(); queueAppNumMap = new HashMap<>(); amMap = new ConcurrentHashMap<>(); amClassMap = new HashMap<>(); // runner configuration setConf(tempConf); // runner poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); SLSRunner.runner.setQueueSize(poolSize); // <AMType, Class> map for (Map.Entry e : tempConf) { String key = e.getKey().toString(); if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) { String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length()); amClassMap.put(amType, Class.forName(tempConf.get(key))); } } nodeManagerResource = getNodeManagerResource(); }

该函数最后就从getNodeManagerResource初始化了nodeManagerResource。getNodeManagerResource方法如下:

private Resource getNodeManagerResource() { Resource resource = Resources.createResource(0); ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); for (ResourceInformation info : infors) { long value; if (info.getName().equals(ResourceInformation.MEMORY_URI)) { value = getConf().getInt(SLSConfiguration.NM_MEMORY_MB, SLSConfiguration.NM_MEMORY_MB_DEFAULT); } else if (info.getName().equals(ResourceInformation.VCORES_URI)) { value = getConf().getInt(SLSConfiguration.NM_VCORES, SLSConfiguration.NM_VCORES_DEFAULT); } else { value = getConf().getLong(SLSConfiguration.NM_PREFIX + info.getName(), SLSConfiguration.NM_RESOURCE_DEFAULT); } resource.setResourceValue(info.getName(), value); } return resource; }

可以看到这个方法就是从sls-runner.xml中获得每个Node的mem和cpu信息。

回到startNM函数:nodeResourceMap = SLSUtils.parseNodesFromNodeFile,获得信息后,开始创建NM simulator,调用nm.init来初始node的一些参数,比如hostName,nmResource(也就是nodeManagerResource:mem,cpu),然后加入到runner.schedule(nm); 也就是delayqueue中。

startAM:

private void startAM() throws YarnException, IOException { switch (inputType) { case SLS: for (String inputTrace : inputTraces) { startAMFromSLSTrace(inputTrace); } break; case RUMEN: long baselineTimeMS = 0; for (String inputTrace : inputTraces) { startAMFromRumenTrace(inputTrace, baselineTimeMS); } break; case SYNTH: startAMFromSynthGenerator(); break; default: throw new YarnException("Input configuration not recognized, " + "trace type should be SLS, RUMEN, or SYNTH"); } numAMs = amMap.size(); remainingApps = numAMs; }

这里就是一个选择输入的格式会有对应的函数,这里看下sls:startAMFromSLSTrace

/** * Parse workload from a SLS trace file. */ @SuppressWarnings("unchecked") private void startAMFromSLSTrace(String inputTrace) throws IOException { JsonFactory jsonF = new JsonFactory(); ObjectMapper mapper = new ObjectMapper(); try (Reader input = new InputStreamReader( new FileInputStream(inputTrace), "UTF-8")) { Iterator<Map> jobIter = mapper.readValues( jsonF.createParser(input), Map.class); while (jobIter.hasNext()) { try { createAMForJob(jobIter.next()); } catch (Exception e) { LOG.error("Failed to create an AM: {}", e.getMessage()); } } } }

从--input-sls参数后的文件中获得workload信息,也就是jobh和task信息。

while (jobIter.hasNext()) { try { createAMForJob(jobIter.next()); } catch (Exception e) { LOG.error("Failed to create an AM: {}", e.getMessage()); } }

while是循环看有多少个job,若通过官方给出的例子,应该是有两个job。每一个job会调用createAMForJob函数:

private void createAMForJob(Map jsonJob) throws YarnException { long jobStartTime = Long.parseLong( jsonJob.get(SLSConfiguration.JOB_START_MS).toString()); long jobFinishTime = 0; if (jsonJob.containsKey(SLSConfiguration.JOB_END_MS)) { jobFinishTime = Long.parseLong( jsonJob.get(SLSConfiguration.JOB_END_MS).toString()); } String user = (String) jsonJob.get(SLSConfiguration.JOB_USER); if (user == null) { user = "default"; } String queue = jsonJob.get(SLSConfiguration.JOB_QUEUE_NAME).toString(); increaseQueueAppNum(queue); String amType = (String)jsonJob.get(SLSConfiguration.AM_TYPE); if (amType == null) { amType = SLSUtils.DEFAULT_JOB_TYPE; } int jobCount = 1; if (jsonJob.containsKey(SLSConfiguration.JOB_COUNT)) { jobCount = Integer.parseInt( jsonJob.get(SLSConfiguration.JOB_COUNT).toString()); } jobCount = Math.max(jobCount, 1); String oldAppId = (String)jsonJob.get(SLSConfiguration.JOB_ID); // Job id is generated automatically if this job configuration allows // multiple job instances if(jobCount > 1) { oldAppId = null; } for (int i = 0; i < jobCount; i++) { runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime, getTaskContainers(jsonJob), getAMContainerResource(jsonJob)); } }

从sls-jobs.json中获得jobStartTime,jobFinishTime。user,queue,amType等信息。

然后调用runNewAM函数开启AM,将上面的参数信息传递过去。runNewAM还有其他两个参数:getTaskContainers(jsonJob), getAMContainerResource(jsonJob)。

getTaskContainers:

private List<ContainerSimulator> getTaskContainers(Map jsonJob) throws YarnException { List<ContainerSimulator> containers = new ArrayList<>(); List tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS); if (tasks == null || tasks.size() == 0) { throw new YarnException("No task for the job!"); } for (Object o : tasks) { Map jsonTask = (Map) o; String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST); long duration = 0; if (jsonTask.containsKey(SLSConfiguration.TASK_DURATION_MS)) { duration = Integer.parseInt( jsonTask.get(SLSConfiguration.TASK_DURATION_MS).toString()); } else if (jsonTask.containsKey(SLSConfiguration.DURATION_MS)) { // Also support "duration.ms" for backward compatibility duration = Integer.parseInt( jsonTask.get(SLSConfiguration.DURATION_MS).toString()); } else if (jsonTask.containsKey(SLSConfiguration.TASK_START_MS) && jsonTask.containsKey(SLSConfiguration.TASK_END_MS)) { long taskStart = Long.parseLong( jsonTask.get(SLSConfiguration.TASK_START_MS).toString()); long taskFinish = Long.parseLong( jsonTask.get(SLSConfiguration.TASK_END_MS).toString()); duration = taskFinish - taskStart; } if (duration <= 0) { throw new YarnException("Duration of a task shouldn't be less or equal" + " to 0!"); } Resource res = getResourceForContainer(jsonTask); int priority = DEFAULT_MAPPER_PRIORITY; if (jsonTask.containsKey(SLSConfiguration.TASK_PRIORITY)) { priority = Integer.parseInt( jsonTask.get(SLSConfiguration.TASK_PRIORITY).toString()); } String type = "map"; if (jsonTask.containsKey(SLSConfiguration.TASK_TYPE)) { type = jsonTask.get(SLSConfiguration.TASK_TYPE).toString(); } int count = 1; if (jsonTask.containsKey(SLSConfiguration.COUNT)) { count = Integer.parseInt( jsonTask.get(SLSConfiguration.COUNT).toString()); } count = Math.max(count, 1); ExecutionType executionType = ExecutionType.GUARANTEED; if (jsonTask.containsKey(SLSConfiguration.TASK_EXECUTION_TYPE)) { executionType = ExecutionType.valueOf( jsonTask.get(SLSConfiguration.TASK_EXECUTION_TYPE).toString()); } for (int i = 0; i < count; i++) { containers.add( new ContainerSimulator(res, duration, hostname, priority, type, executionType)); } } return containers; }

该函数是将sls-jobs.json中某个具体jobs下的task任务以container的形式分配出来。

List<ContainerSimulator> containers = new ArrayList<>();

函数返回的值就是List类型的containers,也就是有多少个task,list就有多大。

List tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS); if (tasks == null || tasks.size() == 0) { throw new YarnException("No task for the job!"); } for (Object o : tasks) { Map jsonTask = (Map) o; String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST); long duration = 0; if (jsonTask.containsKey(SLSConfiguration.TASK_DURATION_MS)) { duration = Integer.parseInt( jsonTask.get(SLSConfiguration.TASK_DURATION_MS).toString()); } else if (jsonTask.containsKey(SLSConfiguration.DURATION_MS)) { // Also support "duration.ms" for backward compatibility duration = Integer.parseInt( jsonTask.get(SLSConfiguration.DURATION_MS).toString()); } else if (jsonTask.containsKey(SLSConfiguration.TASK_START_MS) && jsonTask.containsKey(SLSConfiguration.TASK_END_MS)) { long taskStart = Long.parseLong( jsonTask.get(SLSConfiguration.TASK_START_MS).toString()); long taskFinish = Long.parseLong( jsonTask.get(SLSConfiguration.TASK_END_MS).toString()); duration = taskFinish - taskStart; } if (duration <= 0) { throw new YarnException("Duration of a task shouldn't be less or equal" + " to 0!"); } Resource res = getResourceForContainer(jsonTask); int priority = DEFAULT_MAPPER_PRIORITY; if (jsonTask.containsKey(SLSConfiguration.TASK_PRIORITY)) { priority = Integer.parseInt( jsonTask.get(SLSConfiguration.TASK_PRIORITY).toString()); } String type = "map"; if (jsonTask.containsKey(SLSConfiguration.TASK_TYPE)) { type = jsonTask.get(SLSConfiguration.TASK_TYPE).toString(); } int count = 1; if (jsonTask.containsKey(SLSConfiguration.COUNT)) { count = Integer.parseInt( jsonTask.get(SLSConfiguration.COUNT).toString()); } count = Math.max(count, 1); ExecutionType executionType = ExecutionType.GUARANTEED; if (jsonTask.containsKey(SLSConfiguration.TASK_EXECUTION_TYPE)) { executionType = ExecutionType.valueOf( jsonTask.get(SLSConfiguration.TASK_EXECUTION_TYPE).toString()); } for (int i = 0; i < count; i++) { containers.add( new ContainerSimulator(res, duration, hostname, priority, type, executionType)); } }

然后将tasks一一取出,通过for循环来为每一个分配container。

在循环中,先是获得task的信息,比如start,end 时间,优先级,task类型(map)。然后将这些信息收集起来,传递给getResourceForContainer函数,来申请container:

Resource res = getResourceForContainer(jsonTask); private Resource getResourceForContainer(Map jsonTask) { Resource res = getDefaultContainerResource(); ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); for (ResourceInformation info : infors) { if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) { long value = Long.parseLong( jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName()) .toString()); res.setResourceValue(info.getName(), value); } } return res; }

循环最后,将申请的资源res加入到待分配container list中:

for (int i = 0; i < count; i++) { containers.add( new ContainerSimulator(res, duration, hostname, priority, type, executionType)); }

跳出循环,最后返回containers。也就是作为runNewAM的参数了。

接下来是getAMContainerResource-runNewAM另一个参数:

private Resource getAMContainerResource(Map jsonJob) { Resource amContainerResource = SLSConfiguration.getAMContainerResource(getConf()); if (jsonJob == null) { return amContainerResource; } ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); for (ResourceInformation info : infors) { String key = SLSConfiguration.JOB_AM_PREFIX + info.getName(); if (jsonJob.containsKey(key)) { long value = Long.parseLong(jsonJob.get(key).toString()); amContainerResource.setResourceValue(info.getName(), value); } } return amContainerResource; }

这段就是为AM分配资源也就是container。

然后回到runNewAM函数,也就是为每一个jobs分配了一个AM的资源以及jobs下所有tasks的资源。

runNewAM:

private void runNewAM(String jobType, String user, String jobQueue, String oldJobId, long jobStartTimeMS, long jobFinishTimeMS, List<ContainerSimulator> containerList, ReservationId reservationId, long deadline, Resource amContainerResource, Map<String, String> params) { AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( amClassMap.get(jobType), new Configuration()); if (amSim != null) { int heartbeatInterval = getConf().getInt( SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); boolean isTracked = trackedApps.contains(oldJobId); if (oldJobId == null) { oldJobId = Integer.toString(AM_ID); } AM_ID++; amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, runner.getStartTimeMS(), amContainerResource, params); if(reservationId != null) { // if we have a ReservationId, delegate reservation creation to // AMSim (reservation shape is impl specific) UTCClock clock = new UTCClock(); amSim.initReservation(reservationId, deadline, clock.getTime()); } runner.schedule(amSim); maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); numTasks += containerList.size(); amMap.put(oldJobId, amSim); } }

、创建了AMSimulator对象。amSim.init初始化该AMSIM,然后通过runner.schedule加入到delayqueue中,和NM一样。

初始化完RM,NM,AM后,调用runner.start();开始运行:

TaskRunner.java:

@SuppressWarnings("unchecked") public void start() { if (executor != null && !executor.isTerminated()) { throw new IllegalStateException("Executor already running"); } DelayQueue preStartQueue = queue; queue = new DelayQueue(); executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 0, TimeUnit.MILLISECONDS, queue); executor.prestartAllCoreThreads(); startTimeMS = System.currentTimeMillis(); for (Object d : preStartQueue) { schedule((Task) d, startTimeMS); } }

也就是都放入schedule进行调度,循环运行了。

在java中,掉用完一个类的start函数后,会自动调用该类的run方法:

@Override public final void run() { try { if (nextRun == startTime) { firstStep(); nextRun += repeatInterval; if (nextRun <= endTime) { queue.add(this); } } else if (nextRun < endTime) { middleStep(); nextRun += repeatInterval; queue.add(this); } else { lastStep(); } } catch (Exception e) { e.printStackTrace(); Thread.getDefaultUncaughtExceptionHandler() .uncaughtException(Thread.currentThread(), e); } }

也就是循环运行run,在run中,检查nextRun的值,来确定task执行到哪一步了,并执行对应的函数。

其中firststep,middlestep,laststep都被AM,NM这些类重载了,也就是会调用到对应的函数。

比如如果是AM的task执行,首先执行firststep:

AMSimulator.java

/** * register with RM */ @Override public void firstStep() throws Exception { simulateStartTimeMS = System.currentTimeMillis() - baselineTimeMS; ReservationId reservationId = null; // submit a reservation if one is required, exceptions naturally happen // when the reservation does not fit, catch, log, and move on running job // without reservation. try { reservationId = submitReservationWhenSpecified(); } catch (UndeclaredThrowableException y) { LOG.warn("Unable to place reservation: " + y.getMessage()); } // submit application, waiting until ACCEPTED submitApp(reservationId); // track app metrics trackApp(); }

这一步是向RM注册,并提交APP(submitApp),其实这有点类似于YARN中client的行为。在SLS中都简化给AM做了。

也就是模拟client端向RM发送app信息。

再来看middlestep:

@Override public void middleStep() throws Exception { if (isAMContainerRunning) { // process responses in the queue processResponseQueue(); // send out request sendContainerRequest(); // check whether finish checkStop(); } }

首先判断AM有没有运行起来,如果运行起来就继续执行:

processResponseQueue函数是检测queue中container的运行状态,包括完成的,未完成的,刚分配的等。

sendContainerRequest是发送请求,也就是发送job中task的具体请求,是map还是reduce

sendContainerRequest:MRAMSimulator.java

@Override protected void sendContainerRequest() throws YarnException, IOException, InterruptedException { if (isFinished) { return; } // send out request List<ResourceRequest> ask = null; if (mapFinished != mapTotal) { // map phase if (!pendingMaps.isEmpty()) { ask = packageRequests(mergeLists(pendingMaps, scheduledMaps), PRIORITY_MAP); LOG.debug("Application {} sends out request for {} mappers.", appId, pendingMaps.size()); scheduledMaps.addAll(pendingMaps); pendingMaps.clear(); } else if (!pendingFailedMaps.isEmpty()) { ask = packageRequests(mergeLists(pendingFailedMaps, scheduledMaps), PRIORITY_MAP); LOG.debug("Application {} sends out requests for {} failed mappers.", appId, pendingFailedMaps.size()); scheduledMaps.addAll(pendingFailedMaps); pendingFailedMaps.clear(); } } else if (reduceFinished != reduceTotal) { // reduce phase if (!pendingReduces.isEmpty()) { ask = packageRequests(mergeLists(pendingReduces, scheduledReduces), PRIORITY_REDUCE); LOG.debug("Application {} sends out requests for {} reducers.", appId, pendingReduces.size()); scheduledReduces.addAll(pendingReduces); pendingReduces.clear(); } else if (!pendingFailedReduces.isEmpty()) { ask = packageRequests(mergeLists(pendingFailedReduces, scheduledReduces), PRIORITY_REDUCE); LOG.debug("Application {} sends out request for {} failed reducers.", appId, pendingFailedReduces.size()); scheduledReduces.addAll(pendingFailedReduces); pendingFailedReduces.clear(); } } if (ask == null) { ask = new ArrayList<>(); } final AllocateRequest request = createAllocateRequest(ask); if (totalContainers == 0) { request.setProgress(1.0f); } else { request.setProgress((float) finishedContainers / totalContainers); } UserGroupInformation ugi = UserGroupInformation.createRemoteUser(appAttemptId.toString()); Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps() .get(appAttemptId.getApplicationId()) .getRMAppAttempt(appAttemptId).getAMRMToken(); ugi.addTokenIdentifier(token.decodeIdentifier()); AllocateResponse response = ugi.doAs( new PrivilegedExceptionAction<AllocateResponse>() { @Override public AllocateResponse run() throws Exception { return rm.getApplicationMasterService().allocate(request); } }); if (response != null) { responseQueue.put(response); } }

将task对容器的请求通过packageRequests函数打包成ask。

packageRequests函数是对数据包进行打包的最重要的方法,也就是协议的打包。涉及到协议的传输。会专门写篇文章详细描述。

然后通过request = createAllocateRequest(ask);生成标准格式的request(AllocateRequest类),然后发送给RM:

UserGroupInformation ugi = UserGroupInformation.createRemoteUser(appAttemptId.toString()); Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps() .get(appAttemptId.getApplicationId()) .getRMAppAttempt(appAttemptId).getAMRMToken(); ugi.addTokenIdentifier(token.decodeIdentifier()); AllocateResponse response = ugi.doAs( new PrivilegedExceptionAction<AllocateResponse>() { @Override public AllocateResponse run() throws Exception { return rm.getApplicationMasterService().allocate(request); } }); if (response != null) { responseQueue.put(response);

得到的回复是response(AllocateResponse类),然后放入responseQueue中。完成申请过程。

可以看到请求:request(AllocateRequest类),回复:response(AllocateResponse类)

都是在org.apache.hadoop.yarn.api.protocolrecords.下的类也就是协议。

laststep:

@Override public void lastStep() throws Exception { LOG.info("Application {} is shutting down.", appId); // unregister tracking if (isTracked) { untrackApp(); } // Finish AM container if (amContainer != null) { LOG.info("AM container = {} reported to finish", amContainer.getId()); se.getNmMap().get(amContainer.getNodeId()).cleanupContainer( amContainer.getId()); } else { LOG.info("AM container is null"); } if (null == appAttemptId) { // If appAttemptId == null, AM is not launched from RM's perspective, so // it's unnecessary to finish am as well return; } // unregister application master final FinishApplicationMasterRequest finishAMRequest = recordFactory .newRecordInstance(FinishApplicationMasterRequest.class); finishAMRequest.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); UserGroupInformation ugi = UserGroupInformation.createRemoteUser(appAttemptId.toString()); Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps().get(appId) .getRMAppAttempt(appAttemptId).getAMRMToken(); ugi.addTokenIdentifier(token.decodeIdentifier()); ugi.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { rm.getApplicationMasterService() .finishApplicationMaster(finishAMRequest); return null; } }); simulateFinishTimeMS = System.currentTimeMillis() - baselineTimeMS; // record job running information SchedulerMetrics schedulerMetrics = ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics(); if (schedulerMetrics != null) { schedulerMetrics.addAMRuntime(appId, traceStartTimeMS, traceFinishTimeMS, simulateStartTimeMS, simulateFinishTimeMS); } }

首先结束掉AM的container,然后远程将RM的AM停掉:finishApplicationMaster(finishAMRequest)。

这就完成了整个slsrunner。回顾源码可以发现:从runner.start()开始,其实程序就不在slsrunner.java中了,一直在schedule中调度循环。

转载请注明原文地址: https://www.6miu.com/read-3100158.html

最新回复(0)