@Override protected void serviceStart() throws Exception { if (job.isUber()) { MRApps.setupDistributedCacheLocal(getConfig()); this.containerAllocator = new LocalContainerAllocator( this.clientService, this.context, nmHost, nmPort, nmHttpPort , containerID); } else { this.containerAllocator = new RMContainerAllocator( this.clientService, this.context); } ((Service)this.containerAllocator).init(getConfig()); ((Service)this.containerAllocator).start(); super.serviceStart(); }
@Override protected void serviceStart() throws Exception { if (job.isUber()) { MRApps.setupDistributedCacheLocal(getConfig()); this.containerAllocator = new LocalContainerAllocator( this.clientService, this.context, nmHost, nmPort, nmHttpPort , containerID); } else { this.containerAllocator = new RMContainerAllocator( this.clientService, this.context, preemptionPolicy); } ((Service)this.containerAllocator).init(getConfig()); ((Service)this.containerAllocator).start(); super.serviceStart(); }
@Override protected void serviceStart() throws Exception { if (job.isUber()) { this.containerAllocator = new LocalContainerAllocator( this.clientService, this.context, nmHost, nmPort, nmHttpPort , containerID); } else { this.containerAllocator = new RMContainerAllocator( this.clientService, this.context); } ((Service)this.containerAllocator).init(getConfig()); ((Service)this.containerAllocator).start(); super.serviceStart(); }
@Test public void testCompletedContainerEvent() { RMContainerAllocator allocator = new RMContainerAllocator( mock(ClientService.class), mock(AppContext.class)); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId( MRBuilderUtils.newTaskId( MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1); ApplicationId applicationId = ApplicationId.newInstance(1, 1); ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance( applicationId, 1); ContainerId containerId = ContainerId.newInstance(applicationAttemptId, 1); ContainerStatus status = ContainerStatus.newInstance( containerId, ContainerState.RUNNING, "", 0); ContainerStatus abortedStatus = ContainerStatus.newInstance( containerId, ContainerState.RUNNING, "", ContainerExitStatus.ABORTED); TaskAttemptEvent event = allocator.createContainerFinishedEvent(status, attemptId); Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED, event.getType()); TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent( abortedStatus, attemptId); Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType()); }
@Test public void testReduceScheduling() throws Exception { int totalMaps = 10; int succeededMaps = 1; int scheduledMaps = 10; int scheduledReduces = 0; int assignedMaps = 2; int assignedReduces = 0; int mapResourceReqt = 1024; int reduceResourceReqt = 2*1024; int numPendingReduces = 4; float maxReduceRampupLimit = 0.5f; float reduceSlowStart = 0.2f; RMContainerAllocator allocator = mock(RMContainerAllocator.class); doCallRealMethod().when(allocator). scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), anyFloat(), anyFloat()); // Test slow-start allocator.scheduleReduces( totalMaps, succeededMaps, scheduledMaps, scheduledReduces, assignedMaps, assignedReduces, mapResourceReqt, reduceResourceReqt, numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator, never()).setIsReduceStarted(true); // verify slow-start still in effect when no more maps need to // be scheduled but some have yet to complete allocator.scheduleReduces( totalMaps, succeededMaps, 0, scheduledReduces, totalMaps - succeededMaps, assignedReduces, mapResourceReqt, reduceResourceReqt, numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator, never()).setIsReduceStarted(true); verify(allocator, never()).scheduleAllReduces(); succeededMaps = 3; allocator.scheduleReduces( totalMaps, succeededMaps, scheduledMaps, scheduledReduces, assignedMaps, assignedReduces, mapResourceReqt, reduceResourceReqt, numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator, times(1)).setIsReduceStarted(true); // Test reduce ramp-up doReturn(100 * 1024).when(allocator).getMemLimit(); allocator.scheduleReduces( totalMaps, succeededMaps, scheduledMaps, scheduledReduces, assignedMaps, assignedReduces, mapResourceReqt, reduceResourceReqt, numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator).rampUpReduces(anyInt()); verify(allocator, never()).rampDownReduces(anyInt()); // Test reduce ramp-down scheduledReduces = 3; doReturn(10 * 1024).when(allocator).getMemLimit(); allocator.scheduleReduces( totalMaps, succeededMaps, scheduledMaps, scheduledReduces, assignedMaps, assignedReduces, mapResourceReqt, reduceResourceReqt, numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator).rampDownReduces(anyInt()); }
@Test public void testHeartbeatHandler() throws Exception { LOG.info("Running testHeartbeatHandler"); Configuration conf = new Configuration(); conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1); ControlledClock clock = new ControlledClock(new SystemClock()); AppContext appContext = mock(AppContext.class); when(appContext.getClock()).thenReturn(clock); when(appContext.getApplicationID()).thenReturn( ApplicationId.newInstance(1, 1)); RMContainerAllocator allocator = new RMContainerAllocator( mock(ClientService.class), appContext) { @Override protected void register() { } @Override protected ApplicationMasterProtocol createSchedulerProxy() { return mock(ApplicationMasterProtocol.class); } @Override protected synchronized void heartbeat() throws Exception { } }; allocator.init(conf); allocator.start(); clock.setTime(5); int timeToWaitMs = 5000; while (allocator.getLastHeartbeatTime() != 5 && timeToWaitMs > 0) { Thread.sleep(10); timeToWaitMs -= 10; } Assert.assertEquals(5, allocator.getLastHeartbeatTime()); clock.setTime(7); timeToWaitMs = 5000; while (allocator.getLastHeartbeatTime() != 7 && timeToWaitMs > 0) { Thread.sleep(10); timeToWaitMs -= 10; } Assert.assertEquals(7, allocator.getLastHeartbeatTime()); final AtomicBoolean callbackCalled = new AtomicBoolean(false); allocator.runOnNextHeartbeat(new Runnable() { @Override public void run() { callbackCalled.set(true); } }); clock.setTime(8); timeToWaitMs = 5000; while (allocator.getLastHeartbeatTime() != 8 && timeToWaitMs > 0) { Thread.sleep(10); timeToWaitMs -= 10; } Assert.assertEquals(8, allocator.getLastHeartbeatTime()); Assert.assertTrue(callbackCalled.get()); }