private void createMapTasks(JobImpl job, long inputLength, TaskSplitMetaInfo[] splits) { for (int i=0; i < job.numMapTasks; ++i) { TaskImpl task = new MapTaskImpl(job.jobId, i, job.eventHandler, job.remoteJobConfFile, job.conf, splits[i], job.taskAttemptListener, job.jobToken, job.jobCredentials, job.clock, job.applicationAttemptId.getAttemptId(), job.metrics, job.appContext); job.addTask(task); } LOG.info("Input size for job " + job.jobId + " = " + inputLength + ". Number of splits = " + splits.length); }
@Override protected Task createRemoteTask() { TaskSplitIndex splitIndex[] = new TaskSplitIndex[splitInfos.length]; int i=0; for(TaskSplitMetaInfo splitInfo:splitInfos){ splitIndex[i] = splitInfo.getSplitIndex(); i++; } MapTask mapTask = new MultiMapTask("", TypeConverter.fromYarn(getID()), partition,splitIndex, 1); // YARN doesn't have the concept of slots per task, set it as 1. //new MultiMapTask(); mapTask.setUser(conf.get(MRJobConfig.USER_NAME)); mapTask.setConf(conf); mapTask.setTaskType(TaskType.MULTI_MAP); return mapTask; }
private void createMapTasks(JobImpl job, long inputLength, TaskSplitMetaInfo[] splits) { for (int i=0; i < job.numMapTasks; ++i) { TaskImpl task = new MapTaskImpl(job.jobId, i, job.eventHandler, job.remoteJobConfFile, job.conf, splits[i], job.taskAttemptListener, job.jobToken, job.jobCredentials, job.clock, job.applicationAttemptId.getAttemptId(), job.metrics, job.appContext); job.addTask(task); LOG.info("split info for task: "+i+"get split location: "); } }
SplitInfo(TaskSplitMetaInfo taskSplitMetaInfo,String []topologyPath){ this.taskSplitMetaInfo = taskSplitMetaInfo; this.length = taskSplitMetaInfo.getInputDataLength(); this.hosts = taskSplitMetaInfo.getLocations(); assert(hosts.length==topologyPath.length||topologyPath.length==0); //if this fs does not have any rack information,use default rack if(topologyPath==null||topologyPath.length==0){ topologyPath = new String[hosts.length]; for(int i=0;i<hosts.length;i++){ topologyPath[i]=(new NodeBase(hosts[i],NetworkTopology.DEFAULT_RACK)).toString(); } } //the topology pahts have the host name as the last component,strip it this.racks = new String[hosts.length]; for(int i=0;i<racks.length;i++){ this.racks[i]=(new NodeBase(topologyPath[i])).getNetworkLocation(); } }
public MapTaskAttemptImpl(TaskId taskId, int attempt, EventHandler eventHandler, Path jobFile, int partition, TaskSplitMetaInfo splitInfo, JobConf conf, TaskAttemptListener taskAttemptListener, Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock, AppContext appContext) { super(taskId, attempt, eventHandler, taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(), jobToken, credentials, clock, appContext); this.splitInfo = splitInfo; }
public MapTaskImpl(JobId jobId, int partition, EventHandler eventHandler, Path remoteJobConfFile, JobConf conf, TaskSplitMetaInfo taskSplitMetaInfo, TaskAttemptListener taskAttemptListener, Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock, int appAttemptId, MRAppMetrics metrics, AppContext appContext) { super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile, conf, taskAttemptListener, jobToken, credentials, clock, appAttemptId, metrics, appContext); this.taskSplitMetaInfo = taskSplitMetaInfo; }
protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) { TaskSplitMetaInfo[] allTaskSplitMetaInfo; try { allTaskSplitMetaInfo = SplitMetaInfoReader.readSplitMetaInfo( job.oldJobId, job.fs, job.conf, job.remoteJobSubmitDir); } catch (IOException e) { throw new YarnRuntimeException(e); } return allTaskSplitMetaInfo; }
private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) { ApplicationId appId = ApplicationId.newInstance(clusterTimestamp, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1); int partitions = 2; Path remoteJobConfFile = mock(Path.class); JobConf conf = new JobConf(); TaskAttemptListener taskAttemptListener = mock(TaskAttemptListener.class); Token<JobTokenIdentifier> jobToken = (Token<JobTokenIdentifier>) mock(Token.class); Credentials credentials = null; Clock clock = new SystemClock(); int appAttemptId = 3; MRAppMetrics metrics = mock(MRAppMetrics.class); Resource minContainerRequirements = mock(Resource.class); when(minContainerRequirements.getMemory()).thenReturn(1000); ClusterInfo clusterInfo = mock(ClusterInfo.class); AppContext appContext = mock(AppContext.class); when(appContext.getClusterInfo()).thenReturn(clusterInfo); TaskSplitMetaInfo taskSplitMetaInfo = mock(TaskSplitMetaInfo.class); MapTaskImpl mapTask = new MapTaskImpl(jobId, partitions, eh, remoteJobConfFile, conf, taskSplitMetaInfo, taskAttemptListener, jobToken, credentials, clock, appAttemptId, metrics, appContext); return mapTask; }
@Test public void testSingleRackRequest() throws Exception { TaskAttemptImpl.RequestContainerTransition rct = new TaskAttemptImpl.RequestContainerTransition(false); EventHandler eventHandler = mock(EventHandler.class); String[] hosts = new String[3]; hosts[0] = "host1"; hosts[1] = "host2"; hosts[2] = "host3"; TaskSplitMetaInfo splitInfo = new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l); TaskAttemptImpl mockTaskAttempt = createMapTaskAttemptImplForTest(eventHandler, splitInfo); TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class); rct.transition(mockTaskAttempt, mockTAEvent); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); verify(eventHandler, times(2)).handle(arg.capture()); if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) { Assert.fail("Second Event not of type ContainerRequestEvent"); } ContainerRequestEvent cre = (ContainerRequestEvent) arg.getAllValues().get(1); String[] requestedRacks = cre.getRacks(); //Only a single occurrence of /DefaultRack assertEquals(1, requestedRacks.length); }
@Test public void testHostResolveAttempt() throws Exception { TaskAttemptImpl.RequestContainerTransition rct = new TaskAttemptImpl.RequestContainerTransition(false); EventHandler eventHandler = mock(EventHandler.class); String[] hosts = new String[3]; hosts[0] = "192.168.1.1"; hosts[1] = "host2"; hosts[2] = "host3"; TaskSplitMetaInfo splitInfo = new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l); TaskAttemptImpl mockTaskAttempt = createMapTaskAttemptImplForTest(eventHandler, splitInfo); TaskAttemptImpl spyTa = spy(mockTaskAttempt); when(spyTa.resolveHost(hosts[0])).thenReturn("host1"); spyTa.dataLocalHosts = spyTa.resolveHosts(splitInfo.getLocations()); TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class); rct.transition(spyTa, mockTAEvent); verify(spyTa).resolveHost(hosts[0]); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); verify(eventHandler, times(2)).handle(arg.capture()); if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) { Assert.fail("Second Event not of type ContainerRequestEvent"); } Map<String, Boolean> expected = new HashMap<String, Boolean>(); expected.put("host1", true); expected.put("host2", true); expected.put("host3", true); ContainerRequestEvent cre = (ContainerRequestEvent) arg.getAllValues().get(1); String[] requestedHosts = cre.getHosts(); for (String h : requestedHosts) { expected.remove(h); } assertEquals(0, expected.size()); }
private TaskAttemptImpl createMapTaskAttemptImplForTest( EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) { ApplicationId appId = ApplicationId.newInstance(1, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptListener taListener = mock(TaskAttemptListener.class); Path jobFile = mock(Path.class); JobConf jobConf = new JobConf(); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, taskSplitMetaInfo, jobConf, taListener, null, null, clock, null); return taImpl; }
private static InitTransition getInitTransition(final int numSplits) { InitTransition initTransition = new InitTransition() { @Override protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) { TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[numSplits]; for (int i = 0; i < numSplits; ++i) { splits[i] = new TaskSplitMetaInfo(); } return splits; } }; return initTransition; }
@Before @SuppressWarnings("unchecked") public void setup() { dispatcher = new InlineDispatcher(); ++startCount; conf = new JobConf(); taskAttemptListener = mock(TaskAttemptListener.class); jobToken = (Token<JobTokenIdentifier>) mock(Token.class); remoteJobConfFile = mock(Path.class); credentials = null; clock = new SystemClock(); metrics = mock(MRAppMetrics.class); dataLocations = new String[1]; appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); jobId = Records.newRecord(JobId.class); jobId.setId(1); jobId.setAppId(appId); appContext = mock(AppContext.class); taskSplitMetaInfo = mock(TaskSplitMetaInfo.class); when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations); taskAttempts = new ArrayList<MockTaskAttemptImpl>(); }
@Override protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) { TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[maps]; for (int i = 0; i < maps ; i++) { splits[i] = new TaskSplitMetaInfo(); } return splits; }
public MapTaskRunnable(TaskSplitMetaInfo info, int taskId, JobID jobId, Map<TaskAttemptID, MapOutputFile> mapOutputFiles) { this.info = info; this.taskId = taskId; this.mapOutputFiles = mapOutputFiles; this.jobId = jobId; this.localConf = new JobConf(job); }
/** * Create Runnables to encapsulate map tasks for use by the executor * service. * @param taskInfo Info about the map task splits * @param jobId the job id * @param mapOutputFiles a mapping from task attempts to output files * @return a List of Runnables, one per map task. */ protected List<RunnableWithThrowable> getMapTaskRunnables( TaskSplitMetaInfo [] taskInfo, JobID jobId, Map<TaskAttemptID, MapOutputFile> mapOutputFiles) { int numTasks = 0; ArrayList<RunnableWithThrowable> list = new ArrayList<RunnableWithThrowable>(); for (TaskSplitMetaInfo task : taskInfo) { list.add(new MapTaskRunnable(task, numTasks++, jobId, mapOutputFiles)); } return list; }