Java 类org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator 实例源码

项目:hadoop    文件:MRAppMaster.java   
@Override
protected void serviceStart() throws Exception {
  if (job.isUber()) {
    MRApps.setupDistributedCacheLocal(getConfig());
    this.containerAllocator = new LocalContainerAllocator(
        this.clientService, this.context, nmHost, nmPort, nmHttpPort
        , containerID);
  } else {
    this.containerAllocator = new RMContainerAllocator(
        this.clientService, this.context);
  }
  ((Service)this.containerAllocator).init(getConfig());
  ((Service)this.containerAllocator).start();
  super.serviceStart();
}
项目:aliyun-oss-hadoop-fs    文件:MRAppMaster.java   
@Override
protected void serviceStart() throws Exception {
  if (job.isUber()) {
    MRApps.setupDistributedCacheLocal(getConfig());
    this.containerAllocator = new LocalContainerAllocator(
        this.clientService, this.context, nmHost, nmPort, nmHttpPort
        , containerID);
  } else {
    this.containerAllocator = new RMContainerAllocator(
        this.clientService, this.context, preemptionPolicy);
  }
  ((Service)this.containerAllocator).init(getConfig());
  ((Service)this.containerAllocator).start();
  super.serviceStart();
}
项目:big-c    文件:MRAppMaster.java   
@Override
protected void serviceStart() throws Exception {
  if (job.isUber()) {
    MRApps.setupDistributedCacheLocal(getConfig());
    this.containerAllocator = new LocalContainerAllocator(
        this.clientService, this.context, nmHost, nmPort, nmHttpPort
        , containerID);
  } else {
    this.containerAllocator = new RMContainerAllocator(
        this.clientService, this.context);
  }
  ((Service)this.containerAllocator).init(getConfig());
  ((Service)this.containerAllocator).start();
  super.serviceStart();
}
项目:hadoop-2.6.0-cdh5.4.3    文件:MRAppMaster.java   
@Override
protected void serviceStart() throws Exception {
  if (job.isUber()) {
    MRApps.setupDistributedCacheLocal(getConfig());
    this.containerAllocator = new LocalContainerAllocator(
        this.clientService, this.context, nmHost, nmPort, nmHttpPort
        , containerID);
  } else {
    this.containerAllocator = new RMContainerAllocator(
        this.clientService, this.context);
  }
  ((Service)this.containerAllocator).init(getConfig());
  ((Service)this.containerAllocator).start();
  super.serviceStart();
}
项目:hadoop-plus    文件:MRAppMaster.java   
@Override
protected void serviceStart() throws Exception {
  if (job.isUber()) {
    this.containerAllocator = new LocalContainerAllocator(
        this.clientService, this.context, nmHost, nmPort, nmHttpPort
        , containerID);
  } else {
    this.containerAllocator = new RMContainerAllocator(
        this.clientService, this.context);
  }
  ((Service)this.containerAllocator).init(getConfig());
  ((Service)this.containerAllocator).start();
  super.serviceStart();
}
项目:hadoop-plus    文件:TestRMContainerAllocator.java   
@Test
public void testCompletedContainerEvent() {
  RMContainerAllocator allocator = new RMContainerAllocator(
      mock(ClientService.class), mock(AppContext.class));

  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
      MRBuilderUtils.newTaskId(
          MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1);
  ApplicationId applicationId = ApplicationId.newInstance(1, 1);
  ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(
      applicationId, 1);
  ContainerId containerId = ContainerId.newInstance(applicationAttemptId, 1);
  ContainerStatus status = ContainerStatus.newInstance(
      containerId, ContainerState.RUNNING, "", 0);

  ContainerStatus abortedStatus = ContainerStatus.newInstance(
      containerId, ContainerState.RUNNING, "",
      ContainerExitStatus.ABORTED);

  TaskAttemptEvent event = allocator.createContainerFinishedEvent(status,
      attemptId);
  Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED,
      event.getType());

  TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent(
      abortedStatus, attemptId);
  Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
}
项目:FlexMap    文件:MRAppMaster.java   
@Override
protected void serviceStart() throws Exception {
  if (job.isUber()) {
    MRApps.setupDistributedCacheLocal(getConfig());
    this.containerAllocator = new LocalContainerAllocator(
        this.clientService, this.context, nmHost, nmPort, nmHttpPort
        , containerID);
  } else {
    this.containerAllocator = new RMContainerAllocator(
        this.clientService, this.context);
  }
  ((Service)this.containerAllocator).init(getConfig());
  ((Service)this.containerAllocator).start();
  super.serviceStart();
}
项目:hops    文件:MRAppMaster.java   
@Override
protected void serviceStart() throws Exception {
  if (job.isUber()) {
    MRApps.setupDistributedCacheLocal(getConfig());
    this.containerAllocator = new LocalContainerAllocator(
        this.clientService, this.context, nmHost, nmPort, nmHttpPort
        , containerID);
  } else {
    this.containerAllocator = new RMContainerAllocator(
        this.clientService, this.context);
  }
  ((Service)this.containerAllocator).init(getConfig());
  ((Service)this.containerAllocator).start();
  super.serviceStart();
}
项目:hadoop-TCP    文件:MRAppMaster.java   
@Override
protected void serviceStart() throws Exception {
  if (job.isUber()) {
    this.containerAllocator = new LocalContainerAllocator(
        this.clientService, this.context, nmHost, nmPort, nmHttpPort
        , containerID);
  } else {
    this.containerAllocator = new RMContainerAllocator(
        this.clientService, this.context);
  }
  ((Service)this.containerAllocator).init(getConfig());
  ((Service)this.containerAllocator).start();
  super.serviceStart();
}
项目:hadoop-TCP    文件:TestRMContainerAllocator.java   
@Test
public void testCompletedContainerEvent() {
  RMContainerAllocator allocator = new RMContainerAllocator(
      mock(ClientService.class), mock(AppContext.class));

  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
      MRBuilderUtils.newTaskId(
          MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1);
  ApplicationId applicationId = ApplicationId.newInstance(1, 1);
  ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(
      applicationId, 1);
  ContainerId containerId = ContainerId.newInstance(applicationAttemptId, 1);
  ContainerStatus status = ContainerStatus.newInstance(
      containerId, ContainerState.RUNNING, "", 0);

  ContainerStatus abortedStatus = ContainerStatus.newInstance(
      containerId, ContainerState.RUNNING, "",
      ContainerExitStatus.ABORTED);

  TaskAttemptEvent event = allocator.createContainerFinishedEvent(status,
      attemptId);
  Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED,
      event.getType());

  TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent(
      abortedStatus, attemptId);
  Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
}
项目:hardfs    文件:MRAppMaster.java   
@Override
protected void serviceStart() throws Exception {
  if (job.isUber()) {
    this.containerAllocator = new LocalContainerAllocator(
        this.clientService, this.context, nmHost, nmPort, nmHttpPort
        , containerID);
  } else {
    this.containerAllocator = new RMContainerAllocator(
        this.clientService, this.context);
  }
  ((Service)this.containerAllocator).init(getConfig());
  ((Service)this.containerAllocator).start();
  super.serviceStart();
}
项目:hardfs    文件:TestRMContainerAllocator.java   
@Test
public void testCompletedContainerEvent() {
  RMContainerAllocator allocator = new RMContainerAllocator(
      mock(ClientService.class), mock(AppContext.class));

  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
      MRBuilderUtils.newTaskId(
          MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1);
  ApplicationId applicationId = ApplicationId.newInstance(1, 1);
  ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(
      applicationId, 1);
  ContainerId containerId = ContainerId.newInstance(applicationAttemptId, 1);
  ContainerStatus status = ContainerStatus.newInstance(
      containerId, ContainerState.RUNNING, "", 0);

  ContainerStatus abortedStatus = ContainerStatus.newInstance(
      containerId, ContainerState.RUNNING, "",
      ContainerExitStatus.ABORTED);

  TaskAttemptEvent event = allocator.createContainerFinishedEvent(status,
      attemptId);
  Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED,
      event.getType());

  TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent(
      abortedStatus, attemptId);
  Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
}
项目:hadoop-on-lustre2    文件:MRAppMaster.java   
@Override
protected void serviceStart() throws Exception {
  if (job.isUber()) {
    this.containerAllocator = new LocalContainerAllocator(
        this.clientService, this.context, nmHost, nmPort, nmHttpPort
        , containerID);
  } else {
    this.containerAllocator = new RMContainerAllocator(
        this.clientService, this.context);
  }
  ((Service)this.containerAllocator).init(getConfig());
  ((Service)this.containerAllocator).start();
  super.serviceStart();
}
项目:hadoop-on-lustre2    文件:TestRMContainerAllocator.java   
@Test
public void testCompletedContainerEvent() {
  RMContainerAllocator allocator = new RMContainerAllocator(
      mock(ClientService.class), mock(AppContext.class));

  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
      MRBuilderUtils.newTaskId(
          MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1);
  ApplicationId applicationId = ApplicationId.newInstance(1, 1);
  ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(
      applicationId, 1);
  ContainerId containerId = ContainerId.newInstance(applicationAttemptId, 1);
  ContainerStatus status = ContainerStatus.newInstance(
      containerId, ContainerState.RUNNING, "", 0);

  ContainerStatus abortedStatus = ContainerStatus.newInstance(
      containerId, ContainerState.RUNNING, "",
      ContainerExitStatus.ABORTED);

  TaskAttemptEvent event = allocator.createContainerFinishedEvent(status,
      attemptId);
  Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED,
      event.getType());

  TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent(
      abortedStatus, attemptId);
  Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
}
项目:hadoop-plus    文件:TestRMContainerAllocator.java   
@Test
public void testReduceScheduling() throws Exception {
  int totalMaps = 10;
  int succeededMaps = 1;
  int scheduledMaps = 10;
  int scheduledReduces = 0;
  int assignedMaps = 2;
  int assignedReduces = 0;
  int mapResourceReqt = 1024;
  int reduceResourceReqt = 2*1024;
  int numPendingReduces = 4;
  float maxReduceRampupLimit = 0.5f;
  float reduceSlowStart = 0.2f;

  RMContainerAllocator allocator = mock(RMContainerAllocator.class);
  doCallRealMethod().when(allocator).
      scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), 
          anyInt(), anyInt(), anyInt(), anyInt(), anyFloat(), anyFloat());

  // Test slow-start
  allocator.scheduleReduces(
      totalMaps, succeededMaps, 
      scheduledMaps, scheduledReduces, 
      assignedMaps, assignedReduces, 
      mapResourceReqt, reduceResourceReqt, 
      numPendingReduces, 
      maxReduceRampupLimit, reduceSlowStart);
  verify(allocator, never()).setIsReduceStarted(true);

  // verify slow-start still in effect when no more maps need to
  // be scheduled but some have yet to complete
  allocator.scheduleReduces(
      totalMaps, succeededMaps,
      0, scheduledReduces,
      totalMaps - succeededMaps, assignedReduces,
      mapResourceReqt, reduceResourceReqt,
      numPendingReduces,
      maxReduceRampupLimit, reduceSlowStart);
  verify(allocator, never()).setIsReduceStarted(true);
  verify(allocator, never()).scheduleAllReduces();

  succeededMaps = 3;
  allocator.scheduleReduces(
      totalMaps, succeededMaps, 
      scheduledMaps, scheduledReduces, 
      assignedMaps, assignedReduces, 
      mapResourceReqt, reduceResourceReqt, 
      numPendingReduces, 
      maxReduceRampupLimit, reduceSlowStart);
  verify(allocator, times(1)).setIsReduceStarted(true);

  // Test reduce ramp-up
  doReturn(100 * 1024).when(allocator).getMemLimit();
  allocator.scheduleReduces(
      totalMaps, succeededMaps, 
      scheduledMaps, scheduledReduces, 
      assignedMaps, assignedReduces, 
      mapResourceReqt, reduceResourceReqt, 
      numPendingReduces, 
      maxReduceRampupLimit, reduceSlowStart);
  verify(allocator).rampUpReduces(anyInt());
  verify(allocator, never()).rampDownReduces(anyInt());

  // Test reduce ramp-down
  scheduledReduces = 3;
  doReturn(10 * 1024).when(allocator).getMemLimit();
  allocator.scheduleReduces(
      totalMaps, succeededMaps, 
      scheduledMaps, scheduledReduces, 
      assignedMaps, assignedReduces, 
      mapResourceReqt, reduceResourceReqt, 
      numPendingReduces, 
      maxReduceRampupLimit, reduceSlowStart);
  verify(allocator).rampDownReduces(anyInt());
}
项目:hadoop-plus    文件:TestRMContainerAllocator.java   
@Test
public void testHeartbeatHandler() throws Exception {
  LOG.info("Running testHeartbeatHandler");

  Configuration conf = new Configuration();
  conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1);
  ControlledClock clock = new ControlledClock(new SystemClock());
  AppContext appContext = mock(AppContext.class);
  when(appContext.getClock()).thenReturn(clock);
  when(appContext.getApplicationID()).thenReturn(
      ApplicationId.newInstance(1, 1));

  RMContainerAllocator allocator = new RMContainerAllocator(
      mock(ClientService.class), appContext) {
        @Override
        protected void register() {
        }
        @Override
        protected ApplicationMasterProtocol createSchedulerProxy() {
          return mock(ApplicationMasterProtocol.class);
        }
        @Override
        protected synchronized void heartbeat() throws Exception {
        }
  };
  allocator.init(conf);
  allocator.start();

  clock.setTime(5);
  int timeToWaitMs = 5000;
  while (allocator.getLastHeartbeatTime() != 5 && timeToWaitMs > 0) {
    Thread.sleep(10);
    timeToWaitMs -= 10;
  }
  Assert.assertEquals(5, allocator.getLastHeartbeatTime());
  clock.setTime(7);
  timeToWaitMs = 5000;
  while (allocator.getLastHeartbeatTime() != 7 && timeToWaitMs > 0) {
    Thread.sleep(10);
    timeToWaitMs -= 10;
  }
  Assert.assertEquals(7, allocator.getLastHeartbeatTime());

  final AtomicBoolean callbackCalled = new AtomicBoolean(false);
  allocator.runOnNextHeartbeat(new Runnable() {
    @Override
    public void run() {
      callbackCalled.set(true);
    }
  });
  clock.setTime(8);
  timeToWaitMs = 5000;
  while (allocator.getLastHeartbeatTime() != 8 && timeToWaitMs > 0) {
    Thread.sleep(10);
    timeToWaitMs -= 10;
  }
  Assert.assertEquals(8, allocator.getLastHeartbeatTime());
  Assert.assertTrue(callbackCalled.get());
}
项目:hadoop-TCP    文件:TestRMContainerAllocator.java   
@Test
public void testReduceScheduling() throws Exception {
  int totalMaps = 10;
  int succeededMaps = 1;
  int scheduledMaps = 10;
  int scheduledReduces = 0;
  int assignedMaps = 2;
  int assignedReduces = 0;
  int mapResourceReqt = 1024;
  int reduceResourceReqt = 2*1024;
  int numPendingReduces = 4;
  float maxReduceRampupLimit = 0.5f;
  float reduceSlowStart = 0.2f;

  RMContainerAllocator allocator = mock(RMContainerAllocator.class);
  doCallRealMethod().when(allocator).
      scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), 
          anyInt(), anyInt(), anyInt(), anyInt(), anyFloat(), anyFloat());

  // Test slow-start
  allocator.scheduleReduces(
      totalMaps, succeededMaps, 
      scheduledMaps, scheduledReduces, 
      assignedMaps, assignedReduces, 
      mapResourceReqt, reduceResourceReqt, 
      numPendingReduces, 
      maxReduceRampupLimit, reduceSlowStart);
  verify(allocator, never()).setIsReduceStarted(true);

  // verify slow-start still in effect when no more maps need to
  // be scheduled but some have yet to complete
  allocator.scheduleReduces(
      totalMaps, succeededMaps,
      0, scheduledReduces,
      totalMaps - succeededMaps, assignedReduces,
      mapResourceReqt, reduceResourceReqt,
      numPendingReduces,
      maxReduceRampupLimit, reduceSlowStart);
  verify(allocator, never()).setIsReduceStarted(true);
  verify(allocator, never()).scheduleAllReduces();

  succeededMaps = 3;
  allocator.scheduleReduces(
      totalMaps, succeededMaps, 
      scheduledMaps, scheduledReduces, 
      assignedMaps, assignedReduces, 
      mapResourceReqt, reduceResourceReqt, 
      numPendingReduces, 
      maxReduceRampupLimit, reduceSlowStart);
  verify(allocator, times(1)).setIsReduceStarted(true);

  // Test reduce ramp-up
  doReturn(100 * 1024).when(allocator).getMemLimit();
  allocator.scheduleReduces(
      totalMaps, succeededMaps, 
      scheduledMaps, scheduledReduces, 
      assignedMaps, assignedReduces, 
      mapResourceReqt, reduceResourceReqt, 
      numPendingReduces, 
      maxReduceRampupLimit, reduceSlowStart);
  verify(allocator).rampUpReduces(anyInt());
  verify(allocator, never()).rampDownReduces(anyInt());

  // Test reduce ramp-down
  scheduledReduces = 3;
  doReturn(10 * 1024).when(allocator).getMemLimit();
  allocator.scheduleReduces(
      totalMaps, succeededMaps, 
      scheduledMaps, scheduledReduces, 
      assignedMaps, assignedReduces, 
      mapResourceReqt, reduceResourceReqt, 
      numPendingReduces, 
      maxReduceRampupLimit, reduceSlowStart);
  verify(allocator).rampDownReduces(anyInt());
}
项目:hadoop-TCP    文件:TestRMContainerAllocator.java   
@Test
public void testHeartbeatHandler() throws Exception {
  LOG.info("Running testHeartbeatHandler");

  Configuration conf = new Configuration();
  conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1);
  ControlledClock clock = new ControlledClock(new SystemClock());
  AppContext appContext = mock(AppContext.class);
  when(appContext.getClock()).thenReturn(clock);
  when(appContext.getApplicationID()).thenReturn(
      ApplicationId.newInstance(1, 1));

  RMContainerAllocator allocator = new RMContainerAllocator(
      mock(ClientService.class), appContext) {
        @Override
        protected void register() {
        }
        @Override
        protected ApplicationMasterProtocol createSchedulerProxy() {
          return mock(ApplicationMasterProtocol.class);
        }
        @Override
        protected synchronized void heartbeat() throws Exception {
        }
  };
  allocator.init(conf);
  allocator.start();

  clock.setTime(5);
  int timeToWaitMs = 5000;
  while (allocator.getLastHeartbeatTime() != 5 && timeToWaitMs > 0) {
    Thread.sleep(10);
    timeToWaitMs -= 10;
  }
  Assert.assertEquals(5, allocator.getLastHeartbeatTime());
  clock.setTime(7);
  timeToWaitMs = 5000;
  while (allocator.getLastHeartbeatTime() != 7 && timeToWaitMs > 0) {
    Thread.sleep(10);
    timeToWaitMs -= 10;
  }
  Assert.assertEquals(7, allocator.getLastHeartbeatTime());

  final AtomicBoolean callbackCalled = new AtomicBoolean(false);
  allocator.runOnNextHeartbeat(new Runnable() {
    @Override
    public void run() {
      callbackCalled.set(true);
    }
  });
  clock.setTime(8);
  timeToWaitMs = 5000;
  while (allocator.getLastHeartbeatTime() != 8 && timeToWaitMs > 0) {
    Thread.sleep(10);
    timeToWaitMs -= 10;
  }
  Assert.assertEquals(8, allocator.getLastHeartbeatTime());
  Assert.assertTrue(callbackCalled.get());
}
项目:hardfs    文件:TestRMContainerAllocator.java   
@Test
public void testReduceScheduling() throws Exception {
  int totalMaps = 10;
  int succeededMaps = 1;
  int scheduledMaps = 10;
  int scheduledReduces = 0;
  int assignedMaps = 2;
  int assignedReduces = 0;
  int mapResourceReqt = 1024;
  int reduceResourceReqt = 2*1024;
  int numPendingReduces = 4;
  float maxReduceRampupLimit = 0.5f;
  float reduceSlowStart = 0.2f;

  RMContainerAllocator allocator = mock(RMContainerAllocator.class);
  doCallRealMethod().when(allocator).
      scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), 
          anyInt(), anyInt(), anyInt(), anyInt(), anyFloat(), anyFloat());

  // Test slow-start
  allocator.scheduleReduces(
      totalMaps, succeededMaps, 
      scheduledMaps, scheduledReduces, 
      assignedMaps, assignedReduces, 
      mapResourceReqt, reduceResourceReqt, 
      numPendingReduces, 
      maxReduceRampupLimit, reduceSlowStart);
  verify(allocator, never()).setIsReduceStarted(true);

  // verify slow-start still in effect when no more maps need to
  // be scheduled but some have yet to complete
  allocator.scheduleReduces(
      totalMaps, succeededMaps,
      0, scheduledReduces,
      totalMaps - succeededMaps, assignedReduces,
      mapResourceReqt, reduceResourceReqt,
      numPendingReduces,
      maxReduceRampupLimit, reduceSlowStart);
  verify(allocator, never()).setIsReduceStarted(true);
  verify(allocator, never()).scheduleAllReduces();

  succeededMaps = 3;
  allocator.scheduleReduces(
      totalMaps, succeededMaps, 
      scheduledMaps, scheduledReduces, 
      assignedMaps, assignedReduces, 
      mapResourceReqt, reduceResourceReqt, 
      numPendingReduces, 
      maxReduceRampupLimit, reduceSlowStart);
  verify(allocator, times(1)).setIsReduceStarted(true);

  // Test reduce ramp-up
  doReturn(100 * 1024).when(allocator).getMemLimit();
  allocator.scheduleReduces(
      totalMaps, succeededMaps, 
      scheduledMaps, scheduledReduces, 
      assignedMaps, assignedReduces, 
      mapResourceReqt, reduceResourceReqt, 
      numPendingReduces, 
      maxReduceRampupLimit, reduceSlowStart);
  verify(allocator).rampUpReduces(anyInt());
  verify(allocator, never()).rampDownReduces(anyInt());

  // Test reduce ramp-down
  scheduledReduces = 3;
  doReturn(10 * 1024).when(allocator).getMemLimit();
  allocator.scheduleReduces(
      totalMaps, succeededMaps, 
      scheduledMaps, scheduledReduces, 
      assignedMaps, assignedReduces, 
      mapResourceReqt, reduceResourceReqt, 
      numPendingReduces, 
      maxReduceRampupLimit, reduceSlowStart);
  verify(allocator).rampDownReduces(anyInt());
}
项目:hardfs    文件:TestRMContainerAllocator.java   
@Test
public void testHeartbeatHandler() throws Exception {
  LOG.info("Running testHeartbeatHandler");

  Configuration conf = new Configuration();
  conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1);
  ControlledClock clock = new ControlledClock(new SystemClock());
  AppContext appContext = mock(AppContext.class);
  when(appContext.getClock()).thenReturn(clock);
  when(appContext.getApplicationID()).thenReturn(
      ApplicationId.newInstance(1, 1));

  RMContainerAllocator allocator = new RMContainerAllocator(
      mock(ClientService.class), appContext) {
        @Override
        protected void register() {
        }
        @Override
        protected ApplicationMasterProtocol createSchedulerProxy() {
          return mock(ApplicationMasterProtocol.class);
        }
        @Override
        protected synchronized void heartbeat() throws Exception {
        }
  };
  allocator.init(conf);
  allocator.start();

  clock.setTime(5);
  int timeToWaitMs = 5000;
  while (allocator.getLastHeartbeatTime() != 5 && timeToWaitMs > 0) {
    Thread.sleep(10);
    timeToWaitMs -= 10;
  }
  Assert.assertEquals(5, allocator.getLastHeartbeatTime());
  clock.setTime(7);
  timeToWaitMs = 5000;
  while (allocator.getLastHeartbeatTime() != 7 && timeToWaitMs > 0) {
    Thread.sleep(10);
    timeToWaitMs -= 10;
  }
  Assert.assertEquals(7, allocator.getLastHeartbeatTime());

  final AtomicBoolean callbackCalled = new AtomicBoolean(false);
  allocator.runOnNextHeartbeat(new Runnable() {
    @Override
    public void run() {
      callbackCalled.set(true);
    }
  });
  clock.setTime(8);
  timeToWaitMs = 5000;
  while (allocator.getLastHeartbeatTime() != 8 && timeToWaitMs > 0) {
    Thread.sleep(10);
    timeToWaitMs -= 10;
  }
  Assert.assertEquals(8, allocator.getLastHeartbeatTime());
  Assert.assertTrue(callbackCalled.get());
}
项目:hadoop-on-lustre2    文件:TestRMContainerAllocator.java   
@Test
public void testHeartbeatHandler() throws Exception {
  LOG.info("Running testHeartbeatHandler");

  Configuration conf = new Configuration();
  conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1);
  ControlledClock clock = new ControlledClock(new SystemClock());
  AppContext appContext = mock(AppContext.class);
  when(appContext.getClock()).thenReturn(clock);
  when(appContext.getApplicationID()).thenReturn(
      ApplicationId.newInstance(1, 1));

  RMContainerAllocator allocator = new RMContainerAllocator(
      mock(ClientService.class), appContext) {
        @Override
        protected void register() {
        }
        @Override
        protected ApplicationMasterProtocol createSchedulerProxy() {
          return mock(ApplicationMasterProtocol.class);
        }
        @Override
        protected synchronized void heartbeat() throws Exception {
        }
  };
  allocator.init(conf);
  allocator.start();

  clock.setTime(5);
  int timeToWaitMs = 5000;
  while (allocator.getLastHeartbeatTime() != 5 && timeToWaitMs > 0) {
    Thread.sleep(10);
    timeToWaitMs -= 10;
  }
  Assert.assertEquals(5, allocator.getLastHeartbeatTime());
  clock.setTime(7);
  timeToWaitMs = 5000;
  while (allocator.getLastHeartbeatTime() != 7 && timeToWaitMs > 0) {
    Thread.sleep(10);
    timeToWaitMs -= 10;
  }
  Assert.assertEquals(7, allocator.getLastHeartbeatTime());

  final AtomicBoolean callbackCalled = new AtomicBoolean(false);
  allocator.runOnNextHeartbeat(new Runnable() {
    @Override
    public void run() {
      callbackCalled.set(true);
    }
  });
  clock.setTime(8);
  timeToWaitMs = 5000;
  while (allocator.getLastHeartbeatTime() != 8 && timeToWaitMs > 0) {
    Thread.sleep(10);
    timeToWaitMs -= 10;
  }
  Assert.assertEquals(8, allocator.getLastHeartbeatTime());
  Assert.assertTrue(callbackCalled.get());
}