/** * Update queue usage across all running jobs. * @param mapClusterCapacity * @param reduceClusterCapacity * @param mapScheduler * @param reduceScheduler */ void updateAll(int mapClusterCapacity, int reduceClusterCapacity, TaskSchedulingMgr mapScheduler, TaskSchedulingMgr reduceScheduler) { // Compute new capacities for maps and reduces mapSlots.updateCapacities(capacityPercent, maxCapacityPercent, mapClusterCapacity); reduceSlots.updateCapacities(capacityPercent, maxCapacityPercent, reduceClusterCapacity); // reset running/pending tasks, tasks per user resetSlotsUsage(TaskType.MAP); resetSlotsUsage(TaskType.REDUCE); Collection<JobInProgress> jobs = getRunningJobs(); // Safe to iterate since // we get a copy here for (JobInProgress j : jobs) { if (j.getStatus().getRunState() != JobStatus.RUNNING) { continue; } int numMapsRunningForThisJob = mapScheduler.getRunningTasks(j); int numReducesRunningForThisJob = reduceScheduler.getRunningTasks(j); int numRunningMapSlots = numMapsRunningForThisJob * mapScheduler.getSlotsPerTask(j); int numRunningReduceSlots = numReducesRunningForThisJob * reduceScheduler.getSlotsPerTask(j); int numMapSlotsForThisJob = mapScheduler.getSlotsOccupied(j); int numReduceSlotsForThisJob = reduceScheduler.getSlotsOccupied(j); int numReservedMapSlotsForThisJob = (mapScheduler.getNumReservedTaskTrackers(j) * mapScheduler.getSlotsPerTask(j)); int numReservedReduceSlotsForThisJob = (reduceScheduler.getNumReservedTaskTrackers(j) * reduceScheduler.getSlotsPerTask(j)); j.setSchedulingInfo( CapacityTaskScheduler.getJobQueueSchedInfo(numMapsRunningForThisJob, numRunningMapSlots, numReservedMapSlotsForThisJob, numReducesRunningForThisJob, numRunningReduceSlots, numReservedReduceSlotsForThisJob)); update(TaskType.MAP, j, j.getProfile().getUser(), numMapsRunningForThisJob, numMapSlotsForThisJob); update(TaskType.REDUCE, j, j.getProfile().getUser(), numReducesRunningForThisJob, numReduceSlotsForThisJob); if (LOG.isDebugEnabled()) { LOG.debug(String.format(queueName + " - updateQSI: job %s: run(m)=%d, " + "occupied(m)=%d, run(r)=%d, occupied(r)=%d, finished(m)=%d," + " finished(r)=%d, failed(m)=%d, failed(r)=%d, " + "spec(m)=%d, spec(r)=%d, total(m)=%d, total(r)=%d", j .getJobID().toString(), Integer .valueOf(numMapsRunningForThisJob), Integer .valueOf(numMapSlotsForThisJob), Integer .valueOf(numReducesRunningForThisJob), Integer .valueOf(numReduceSlotsForThisJob), Integer.valueOf(j .finishedMaps()), Integer.valueOf(j.finishedReduces()), Integer .valueOf(j.failedMapTasks), Integer.valueOf(j.failedReduceTasks), Integer .valueOf(j.speculativeMapTasks), Integer .valueOf(j.speculativeReduceTasks), Integer .valueOf(j.numMapTasks), Integer.valueOf(j.numReduceTasks))); } } }