@SuppressWarnings("hiding") private TaskAttemptInfo scaleInfo(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt, int locality, int loggedLocality, double rackLocalOverNodeLocal, double rackRemoteOverNodeLocal) { TaskInfo taskInfo = getTaskInfo(loggedTask); double[] factors = new double[] { 1.0, rackLocalOverNodeLocal, rackRemoteOverNodeLocal }; double scaleFactor = factors[locality] / factors[loggedLocality]; State state = convertState(loggedAttempt.getResult()); if (loggedTask.getTaskType() == Values.MAP) { long taskTime = 0; if (loggedAttempt.getStartTime() == 0) { taskTime = makeUpMapRuntime(state, locality); } else { taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime(); } taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID()); taskTime *= scaleFactor; return new MapTaskAttemptInfo (state, taskInfo, taskTime, loggedAttempt.allSplitVectors()); } else { throw new IllegalArgumentException("taskType can only be MAP: " + loggedTask.getTaskType()); } }
private long doMakeUpReduceRuntime(State state) { long reduceTime; try { if (state == State.SUCCEEDED) { reduceTime = makeUpRuntime(job.getSuccessfulReduceAttemptCDF()); } else if (state == State.FAILED) { reduceTime = makeUpRuntime(job.getFailedReduceAttemptCDF()); } else { throw new IllegalArgumentException( "state is neither SUCCEEDED nor FAILED: " + state); } return reduceTime; } catch (NoValueToMakeUpRuntime e) { return 0; } }
private State makeUpState(int taskAttemptNumber, double[] numAttempts) { // if numAttempts == null we are returning FAILED. if(numAttempts == null) { return State.FAILED; } if (taskAttemptNumber >= numAttempts.length - 1) { // always succeed return State.SUCCEEDED; } else { double pSucceed = numAttempts[taskAttemptNumber]; double pFail = 0; for (int i = taskAttemptNumber + 1; i < numAttempts.length; i++) { pFail += numAttempts[i]; } return (random.nextDouble() < pSucceed / (pSucceed + pFail)) ? State.SUCCEEDED : State.FAILED; } }
@SuppressWarnings({ "deprecation", "incomplete-switch" }) @Override public TaskAttemptInfo getTaskAttemptInfo( TaskType taskType, int taskNumber, int taskAttemptNumber) { switch (taskType) { case MAP: return new MapTaskAttemptInfo( State.SUCCEEDED, new TaskInfo( m_bytesIn[taskNumber], m_recsIn[taskNumber], m_bytesOut[taskNumber], m_recsOut[taskNumber], -1), 100); case REDUCE: return new ReduceTaskAttemptInfo( State.SUCCEEDED, new TaskInfo( r_bytesIn[taskNumber], r_recsIn[taskNumber], r_bytesOut[taskNumber], r_recsOut[taskNumber], -1), 100, 100, 100); } throw new UnsupportedOperationException(); }
@Override public TaskAttemptInfo getTaskAttemptInfo( TaskType taskType, int taskNumber, int taskAttemptNumber) { switch (taskType) { case MAP: return new MapTaskAttemptInfo( State.SUCCEEDED, new TaskInfo( m_bytesIn[taskNumber], m_recsIn[taskNumber], m_bytesOut[taskNumber], m_recsOut[taskNumber], -1), 100); case REDUCE: return new ReduceTaskAttemptInfo( State.SUCCEEDED, new TaskInfo( r_bytesIn[taskNumber], r_recsIn[taskNumber], r_bytesOut[taskNumber], r_recsOut[taskNumber], -1), 100, 100, 100); } throw new UnsupportedOperationException(); }
@SuppressWarnings("hiding") private TaskAttemptInfo scaleInfo(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt, int locality, int loggedLocality, double rackLocalOverNodeLocal, double rackRemoteOverNodeLocal) { TaskInfo taskInfo = getTaskInfo(loggedTask); double[] factors = new double[] { 1.0, rackLocalOverNodeLocal, rackRemoteOverNodeLocal }; double scaleFactor = factors[locality] / factors[loggedLocality]; State state = convertState(loggedAttempt.getResult()); if (loggedTask.getTaskType() == Values.MAP) { long taskTime = 0; if (loggedAttempt.getStartTime() == 0) { taskTime = makeUpMapRuntime(state, locality); } else { taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime(); } taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID()); taskTime *= scaleFactor; return new MapTaskAttemptInfo(state, taskInfo, taskTime); } else { throw new IllegalArgumentException("taskType can only be MAP: " + loggedTask.getTaskType()); } }
private long makeUpMapRuntime(State state, int locality) { long runtime; // make up runtime if (state == State.SUCCEEDED || state == State.FAILED) { List<LoggedDiscreteCDF> cdfList = state == State.SUCCEEDED ? job.getSuccessfulMapAttemptCDFs() : job .getFailedMapAttemptCDFs(); // XXX MapCDFs is a ArrayList of 4 possible groups: distance=0, 1, 2, and // the last group is "distance cannot be determined". All pig jobs // would have only the 4th group, and pig tasks usually do not have // any locality, so this group should count as "distance=2". // However, setup/cleanup tasks are also counted in the 4th group. // These tasks do not make sense. try { runtime = makeUpRuntime(cdfList.get(locality)); } catch (NoValueToMakeUpRuntime e) { runtime = makeUpRuntime(cdfList); } } else { throw new IllegalArgumentException( "state is neither SUCCEEDED nor FAILED: " + state); } return runtime; }
@Override public TaskAttemptInfo getTaskAttemptInfo( TaskType taskType, int taskNumber, int taskAttemptNumber) { switch (taskType) { case MAP: return new MapTaskAttemptInfo( State.SUCCEEDED, new TaskInfo( m_bytesIn[taskNumber], m_recsIn[taskNumber], m_bytesOut[taskNumber], m_recsOut[taskNumber], -1),100); case REDUCE: return new ReduceTaskAttemptInfo( State.SUCCEEDED, new TaskInfo( r_bytesIn[taskNumber], r_recsIn[taskNumber], r_bytesOut[taskNumber], r_recsOut[taskNumber], -1),100,100,100); } throw new UnsupportedOperationException(); }
/** * Frees up bookkeping memory used by completed tasks. * Has no effect on the events or logs produced by the SimulatorTaskTracker. * We need this in order not to report completed task multiple times and * to ensure that we do not run out of Java heap memory in larger * simulations. */ private void garbageCollectCompletedTasks() { for (Iterator<TaskAttemptID> iter = tasks.keySet().iterator(); iter.hasNext();) { TaskAttemptID taskId = iter.next(); SimulatorTaskInProgress tip = tasks.get(taskId); if (tip.getTaskStatus().getRunState() != State.RUNNING) { iter.remove(); if (LOG.isDebugEnabled()) { LOG.debug("Garbage collected SimulatorTIP, taskId=" + taskId); } // We don't have to / must not touch usedMapSlots and usedReduceSlots // as those were already updated by processTaskAttemptCompletionEvent() // when the task switched its state from running } } }
private List<TaskStatus> collectAndCloneTaskStatuses() { ArrayList<TaskStatus> statuses = new ArrayList<TaskStatus>(); Set<TaskAttemptID> mark = new HashSet<TaskAttemptID>(); for (SimulatorTaskInProgress tip : tasks.values()) { statuses.add((TaskStatus) tip.getTaskStatus().clone()); if (tip.getFinalRunState() == State.SUCCEEDED) { mark.add(tip.getTaskStatus().getTaskID()); } } for (TaskAttemptID taskId : mark) { tasks.remove(taskId); } return statuses; }