@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { // Tell any speculator that we're requesting a container taskAttempt.eventHandler.handle (new SpeculatorEvent(taskAttempt.getID().getTaskId(), +1)); //request for container if (rescheduled) { taskAttempt.eventHandler.handle( ContainerRequestEvent.createContainerRequestEventForFailedContainer( taskAttempt.attemptId, taskAttempt.resourceCapability)); } else { taskAttempt.eventHandler.handle(new ContainerRequestEvent( taskAttempt.attemptId, taskAttempt.resourceCapability, taskAttempt.dataLocalHosts.toArray( new String[taskAttempt.dataLocalHosts.size()]), taskAttempt.dataLocalRacks.toArray( new String[taskAttempt.dataLocalRacks.size()]))); } }
private List<TaskAttemptContainerAssignedEvent> getContainerOnHost(JobId jobId, int taskAttemptId, int memory, String[] hosts, MockNM mockNM, DrainDispatcher dispatcher, MyContainerAllocator allocator) throws Exception { ContainerRequestEvent reqEvent = createReq(jobId, taskAttemptId, memory, hosts); allocator.sendRequest(reqEvent); // Send the request to the RM List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule(); dispatcher.await(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // Heartbeat from the required nodeManager mockNM.nodeHeartbeat(true); dispatcher.await(); assigned = allocator.schedule(); dispatcher.await(); return assigned; }
private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, int memory, String[] hosts, boolean earlierFailedAttempt, boolean reduce) { TaskId taskId; if (reduce) { taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); } else { taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); } TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, taskAttemptId); Resource containerNeed = Resource.newInstance(memory, 1); if (earlierFailedAttempt) { return ContainerRequestEvent .createContainerRequestEventForFailedContainer(attemptId, containerNeed); } return new ContainerRequestEvent(attemptId, containerNeed, hosts, new String[] { NetworkTopology.DEFAULT_RACK }); }
@SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { // Tell any speculator that we're requesting a container taskAttempt.eventHandler.handle (new SpeculatorEvent(taskAttempt.getID().getTaskId(), +1)); //request for container if (rescheduled) { taskAttempt.eventHandler.handle( ContainerRequestEvent.createContainerRequestEventForFailedContainer( taskAttempt.attemptId, taskAttempt.resourceCapability)); } else { taskAttempt.eventHandler.handle(new ContainerRequestEvent( taskAttempt.attemptId, taskAttempt.resourceCapability,true,true,true, taskAttempt.dataLocalHosts.toArray( new String[taskAttempt.dataLocalHosts.size()]), taskAttempt.dataLocalRacks.toArray( new String[taskAttempt.dataLocalRacks.size()]))); } }
@Test public void testSingleRackRequest() throws Exception { TaskAttemptImpl.RequestContainerTransition rct = new TaskAttemptImpl.RequestContainerTransition(false); EventHandler eventHandler = mock(EventHandler.class); String[] hosts = new String[3]; hosts[0] = "host1"; hosts[1] = "host2"; hosts[2] = "host3"; TaskSplitMetaInfo splitInfo = new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l); TaskAttemptImpl mockTaskAttempt = createMapTaskAttemptImplForTest(eventHandler, splitInfo); TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class); rct.transition(mockTaskAttempt, mockTAEvent); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); verify(eventHandler, times(2)).handle(arg.capture()); if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) { Assert.fail("Second Event not of type ContainerRequestEvent"); } ContainerRequestEvent cre = (ContainerRequestEvent) arg.getAllValues().get(1); String[] requestedRacks = cre.getRacks(); //Only a single occurrence of /DefaultRack assertEquals(1, requestedRacks.length); }
@Test public void testHostResolveAttempt() throws Exception { TaskAttemptImpl.RequestContainerTransition rct = new TaskAttemptImpl.RequestContainerTransition(false); EventHandler eventHandler = mock(EventHandler.class); String[] hosts = new String[3]; hosts[0] = "192.168.1.1"; hosts[1] = "host2"; hosts[2] = "host3"; TaskSplitMetaInfo splitInfo = new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l); TaskAttemptImpl mockTaskAttempt = createMapTaskAttemptImplForTest(eventHandler, splitInfo); TaskAttemptImpl spyTa = spy(mockTaskAttempt); when(spyTa.resolveHost(hosts[0])).thenReturn("host1"); spyTa.dataLocalHosts = spyTa.resolveHosts(splitInfo.getLocations()); TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class); rct.transition(spyTa, mockTAEvent); verify(spyTa).resolveHost(hosts[0]); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); verify(eventHandler, times(2)).handle(arg.capture()); if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) { Assert.fail("Second Event not of type ContainerRequestEvent"); } Map<String, Boolean> expected = new HashMap<String, Boolean>(); expected.put("host1", true); expected.put("host2", true); expected.put("host3", true); ContainerRequestEvent cre = (ContainerRequestEvent) arg.getAllValues().get(1); String[] requestedHosts = cre.getHosts(); for (String h : requestedHosts) { expected.remove(h); } assertEquals(0, expected.size()); }
private void checkAssignment(ContainerRequestEvent request, TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) { Assert.assertNotNull("Nothing assigned to attempt " + request.getAttemptID(), assigned); Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(), assigned.getTaskAttemptID()); if (checkHostMatch) { Assert.assertTrue("Not assigned to requested host", Arrays.asList( request.getHosts()).contains( assigned.getContainer().getNodeId().getHost())); } }
@Override public void handle(ContainerAllocatorEvent event) { if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ && ((ContainerRequestEvent)event).getEarlierAttemptFailed()) { failedMapContainerReqEventCnt.incrementAndGet(); } }
private List<TaskAttemptContainerAssignedEvent> getContainerOnHost(JobId jobId, int taskAttemptId, int memory, String[] hosts, MockNM mockNM, DrainDispatcher dispatcher, MyContainerAllocator allocator, int expectedAdditions1, int expectedRemovals1, int expectedAdditions2, int expectedRemovals2, MyResourceManager rm) throws Exception { ContainerRequestEvent reqEvent = createReq(jobId, taskAttemptId, memory, hosts); allocator.sendRequest(reqEvent); // Send the request to the RM List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule(); dispatcher.await(); assertBlacklistAdditionsAndRemovals( expectedAdditions1, expectedRemovals1, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // Heartbeat from the required nodeManager mockNM.nodeHeartbeat(true); dispatcher.await(); assigned = allocator.schedule(); dispatcher.await(); assertBlacklistAdditionsAndRemovals( expectedAdditions2, expectedRemovals2, rm); return assigned; }