Java 类org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType 实例源码

项目:hadoop    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testHandle() throws Exception {
  LOG.info("STARTING testHandle");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
  String cmAddress = "127.0.0.1:8000";
  ContainerManagementProtocolClient mockCM =
      mock(ContainerManagementProtocolClient.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:hadoop    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testOutOfOrder() throws Exception {
  LOG.info("STARTING testOutOfOrder");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocolClient mockCM =
      mock(ContainerManagementProtocolClient.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).stopContainers(any(StopContainersRequest.class));

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).startContainers(any(StartContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:hadoop    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testMyShutdown() throws Exception {
  LOG.info("in test Shutdown");

  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocolClient mockCM =
      mock(ContainerManagementProtocolClient.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent =
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    // skip cleanup and make sure stop kills the container

  } finally {
    ut.stop();
    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  }
}
项目:hadoop    文件:TestContainerLauncherImpl.java   
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test(timeout = 5000)
public void testContainerCleaned() throws Exception {
  LOG.info("STARTING testContainerCleaned");

  CyclicBarrier startLaunchBarrier = new CyclicBarrier(2);
  CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2);

  AppContext mockContext = mock(AppContext.class);

  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocolClient mockCM =
      new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    startLaunchBarrier.await();


    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    completeLaunchBarrier.await();

    ut.waitForPoolToIdle();

    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
    verify(mockEventHandler, atLeast(2)).handle(arg.capture());
    boolean containerCleaned = false;

    for (int i =0; i < arg.getAllValues().size(); i++) {
      LOG.info(arg.getAllValues().get(i).toString());
      Event currentEvent = arg.getAllValues().get(i);
      if (currentEvent.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) {
        containerCleaned = true;
      }
    }
    assert(containerCleaned);

  } finally {
    ut.stop();
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testHandle() throws Exception {
  LOG.info("STARTING testHandle");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
  String cmAddress = "127.0.0.1:8000";
  ContainerManagementProtocolClient mockCM =
      mock(ContainerManagementProtocolClient.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testOutOfOrder() throws Exception {
  LOG.info("STARTING testOutOfOrder");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocolClient mockCM =
      mock(ContainerManagementProtocolClient.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).stopContainers(any(StopContainersRequest.class));

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).startContainers(any(StartContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testMyShutdown() throws Exception {
  LOG.info("in test Shutdown");

  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocolClient mockCM =
      mock(ContainerManagementProtocolClient.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent =
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    // skip cleanup and make sure stop kills the container

  } finally {
    ut.stop();
    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestContainerLauncherImpl.java   
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test(timeout = 5000)
public void testContainerCleaned() throws Exception {
  LOG.info("STARTING testContainerCleaned");

  CyclicBarrier startLaunchBarrier = new CyclicBarrier(2);
  CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2);

  AppContext mockContext = mock(AppContext.class);

  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocolClient mockCM =
      new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    startLaunchBarrier.await();


    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    completeLaunchBarrier.await();

    ut.waitForPoolToIdle();

    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
    verify(mockEventHandler, atLeast(2)).handle(arg.capture());
    boolean containerCleaned = false;

    for (int i =0; i < arg.getAllValues().size(); i++) {
      LOG.info(arg.getAllValues().get(i).toString());
      Event currentEvent = arg.getAllValues().get(i);
      if (currentEvent.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) {
        containerCleaned = true;
      }
    }
    assert(containerCleaned);

  } finally {
    ut.stop();
  }
}
项目:big-c    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testHandle() throws Exception {
  LOG.info("STARTING testHandle");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
  String cmAddress = "127.0.0.1:8000";
  ContainerManagementProtocolClient mockCM =
      mock(ContainerManagementProtocolClient.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:big-c    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testOutOfOrder() throws Exception {
  LOG.info("STARTING testOutOfOrder");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocolClient mockCM =
      mock(ContainerManagementProtocolClient.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).stopContainers(any(StopContainersRequest.class));

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).startContainers(any(StartContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:big-c    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testMyShutdown() throws Exception {
  LOG.info("in test Shutdown");

  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocolClient mockCM =
      mock(ContainerManagementProtocolClient.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent =
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    // skip cleanup and make sure stop kills the container

  } finally {
    ut.stop();
    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  }
}
项目:big-c    文件:TestContainerLauncherImpl.java   
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test(timeout = 5000)
public void testContainerCleaned() throws Exception {
  LOG.info("STARTING testContainerCleaned");

  CyclicBarrier startLaunchBarrier = new CyclicBarrier(2);
  CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2);

  AppContext mockContext = mock(AppContext.class);

  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocolClient mockCM =
      new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    startLaunchBarrier.await();


    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    completeLaunchBarrier.await();

    ut.waitForPoolToIdle();

    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
    verify(mockEventHandler, atLeast(2)).handle(arg.capture());
    boolean containerCleaned = false;

    for (int i =0; i < arg.getAllValues().size(); i++) {
      LOG.info(arg.getAllValues().get(i).toString());
      Event currentEvent = arg.getAllValues().get(i);
      if (currentEvent.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) {
        containerCleaned = true;
      }
    }
    assert(containerCleaned);

  } finally {
    ut.stop();
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testHandle() throws Exception {
  LOG.info("STARTING testHandle");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
  String cmAddress = "127.0.0.1:8000";
  ContainerManagementProtocolClient mockCM =
      mock(ContainerManagementProtocolClient.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testOutOfOrder() throws Exception {
  LOG.info("STARTING testOutOfOrder");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocolClient mockCM =
      mock(ContainerManagementProtocolClient.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).stopContainers(any(StopContainersRequest.class));

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).startContainers(any(StartContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testMyShutdown() throws Exception {
  LOG.info("in test Shutdown");

  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocolClient mockCM =
      mock(ContainerManagementProtocolClient.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent =
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    // skip cleanup and make sure stop kills the container

  } finally {
    ut.stop();
    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestContainerLauncherImpl.java   
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test(timeout = 5000)
public void testContainerCleaned() throws Exception {
  LOG.info("STARTING testContainerCleaned");

  CyclicBarrier startLaunchBarrier = new CyclicBarrier(2);
  CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2);

  AppContext mockContext = mock(AppContext.class);

  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocolClient mockCM =
      new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    startLaunchBarrier.await();


    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    completeLaunchBarrier.await();

    ut.waitForPoolToIdle();

    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
    verify(mockEventHandler, atLeast(2)).handle(arg.capture());
    boolean containerCleaned = false;

    for (int i =0; i < arg.getAllValues().size(); i++) {
      LOG.info(arg.getAllValues().get(i).toString());
      Event currentEvent = arg.getAllValues().get(i);
      if (currentEvent.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) {
        containerCleaned = true;
      }
    }
    assert(containerCleaned);

  } finally {
    ut.stop();
  }
}
项目:hadoop-plus    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testHandle() throws Exception {
  LOG.info("STARTING testHandle");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
  String cmAddress = "127.0.0.1:8000";
  ContainerManagementProtocol mockCM =
      mock(ContainerManagementProtocol.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:hadoop-plus    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testOutOfOrder() throws Exception {
  LOG.info("STARTING testOutOfOrder");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocol mockCM =
      mock(ContainerManagementProtocol.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).stopContainers(any(StopContainersRequest.class));

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).startContainers(any(StartContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:hadoop-plus    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testMyShutdown() throws Exception {
  LOG.info("in test Shutdown");

  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocol mockCM =
      mock(ContainerManagementProtocol.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent =
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    // skip cleanup and make sure stop kills the container

  } finally {
    ut.stop();
    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  }
}
项目:hadoop-plus    文件:TestContainerLauncherImpl.java   
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test(timeout = 5000)
public void testContainerCleaned() throws Exception {
  LOG.info("STARTING testContainerCleaned");

  CyclicBarrier startLaunchBarrier = new CyclicBarrier(2);
  CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2);

  AppContext mockContext = mock(AppContext.class);

  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocol mockCM =
      new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    startLaunchBarrier.await();


    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    completeLaunchBarrier.await();

    ut.waitForPoolToIdle();

    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
    verify(mockEventHandler, atLeast(2)).handle(arg.capture());
    boolean containerCleaned = false;

    for (int i =0; i < arg.getAllValues().size(); i++) {
      LOG.info(arg.getAllValues().get(i).toString());
      Event currentEvent = arg.getAllValues().get(i);
      if (currentEvent.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) {
        containerCleaned = true;
      }
    }
    assert(containerCleaned);

  } finally {
    ut.stop();
  }
}
项目:FlexMap    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testHandle() throws Exception {
  LOG.info("STARTING testHandle");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
  String cmAddress = "127.0.0.1:8000";
  ContainerManagementProtocolClient mockCM =
      mock(ContainerManagementProtocolClient.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:FlexMap    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testOutOfOrder() throws Exception {
  LOG.info("STARTING testOutOfOrder");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocolClient mockCM =
      mock(ContainerManagementProtocolClient.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).stopContainers(any(StopContainersRequest.class));

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).startContainers(any(StartContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:FlexMap    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testMyShutdown() throws Exception {
  LOG.info("in test Shutdown");

  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocolClient mockCM =
      mock(ContainerManagementProtocolClient.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent =
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    // skip cleanup and make sure stop kills the container

  } finally {
    ut.stop();
    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  }
}
项目:FlexMap    文件:TestContainerLauncherImpl.java   
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test(timeout = 5000)
public void testContainerCleaned() throws Exception {
  LOG.info("STARTING testContainerCleaned");

  CyclicBarrier startLaunchBarrier = new CyclicBarrier(2);
  CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2);

  AppContext mockContext = mock(AppContext.class);

  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocolClient mockCM =
      new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    startLaunchBarrier.await();


    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    completeLaunchBarrier.await();

    ut.waitForPoolToIdle();

    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
    verify(mockEventHandler, atLeast(2)).handle(arg.capture());
    boolean containerCleaned = false;

    for (int i =0; i < arg.getAllValues().size(); i++) {
      LOG.info(arg.getAllValues().get(i).toString());
      Event currentEvent = arg.getAllValues().get(i);
      if (currentEvent.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) {
        containerCleaned = true;
      }
    }
    assert(containerCleaned);

  } finally {
    ut.stop();
  }
}
项目:hops    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testHandle() throws Exception {
  LOG.info("STARTING testHandle");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
  String cmAddress = "127.0.0.1:8000";
  ContainerManagementProtocolClient mockCM =
      mock(ContainerManagementProtocolClient.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:hops    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testOutOfOrder() throws Exception {
  LOG.info("STARTING testOutOfOrder");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocolClient mockCM =
      mock(ContainerManagementProtocolClient.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).stopContainers(any(StopContainersRequest.class));

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).startContainers(any(StartContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:hops    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testMyShutdown() throws Exception {
  LOG.info("in test Shutdown");

  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocolClient mockCM =
      mock(ContainerManagementProtocolClient.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent =
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    // skip cleanup and make sure stop kills the container

  } finally {
    ut.stop();
    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  }
}
项目:hops    文件:TestContainerLauncherImpl.java   
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test(timeout = 5000)
public void testContainerCleaned() throws Exception {
  LOG.info("STARTING testContainerCleaned");

  CyclicBarrier startLaunchBarrier = new CyclicBarrier(2);
  CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2);

  AppContext mockContext = mock(AppContext.class);

  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocolClient mockCM =
      new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    startLaunchBarrier.await();


    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    completeLaunchBarrier.await();

    ut.waitForPoolToIdle();

    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
    verify(mockEventHandler, atLeast(2)).handle(arg.capture());
    boolean containerCleaned = false;

    for (int i =0; i < arg.getAllValues().size(); i++) {
      LOG.info(arg.getAllValues().get(i).toString());
      Event currentEvent = arg.getAllValues().get(i);
      if (currentEvent.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) {
        containerCleaned = true;
      }
    }
    assert(containerCleaned);

  } finally {
    ut.stop();
  }
}
项目:hadoop-TCP    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testHandle() throws Exception {
  LOG.info("STARTING testHandle");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
  String cmAddress = "127.0.0.1:8000";
  ContainerManagementProtocol mockCM =
      mock(ContainerManagementProtocol.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:hadoop-TCP    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testOutOfOrder() throws Exception {
  LOG.info("STARTING testOutOfOrder");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocol mockCM =
      mock(ContainerManagementProtocol.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).stopContainers(any(StopContainersRequest.class));

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).startContainers(any(StartContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:hadoop-TCP    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testMyShutdown() throws Exception {
  LOG.info("in test Shutdown");

  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocol mockCM =
      mock(ContainerManagementProtocol.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent =
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    // skip cleanup and make sure stop kills the container

  } finally {
    ut.stop();
    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  }
}
项目:hadoop-TCP    文件:TestContainerLauncherImpl.java   
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test(timeout = 5000)
public void testContainerCleaned() throws Exception {
  LOG.info("STARTING testContainerCleaned");

  CyclicBarrier startLaunchBarrier = new CyclicBarrier(2);
  CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2);

  AppContext mockContext = mock(AppContext.class);

  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocol mockCM =
      new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    startLaunchBarrier.await();


    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    completeLaunchBarrier.await();

    ut.waitForPoolToIdle();

    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
    verify(mockEventHandler, atLeast(2)).handle(arg.capture());
    boolean containerCleaned = false;

    for (int i =0; i < arg.getAllValues().size(); i++) {
      LOG.info(arg.getAllValues().get(i).toString());
      Event currentEvent = arg.getAllValues().get(i);
      if (currentEvent.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) {
        containerCleaned = true;
      }
    }
    assert(containerCleaned);

  } finally {
    ut.stop();
  }
}
项目:hardfs    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testHandle() throws Exception {
  LOG.info("STARTING testHandle");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
  String cmAddress = "127.0.0.1:8000";
  ContainerManagementProtocol mockCM =
      mock(ContainerManagementProtocol.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:hardfs    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testOutOfOrder() throws Exception {
  LOG.info("STARTING testOutOfOrder");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocol mockCM =
      mock(ContainerManagementProtocol.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).stopContainers(any(StopContainersRequest.class));

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).startContainers(any(StartContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:hardfs    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testMyShutdown() throws Exception {
  LOG.info("in test Shutdown");

  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocol mockCM =
      mock(ContainerManagementProtocol.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent =
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    // skip cleanup and make sure stop kills the container

  } finally {
    ut.stop();
    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  }
}
项目:hardfs    文件:TestContainerLauncherImpl.java   
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test(timeout = 5000)
public void testContainerCleaned() throws Exception {
  LOG.info("STARTING testContainerCleaned");

  CyclicBarrier startLaunchBarrier = new CyclicBarrier(2);
  CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2);

  AppContext mockContext = mock(AppContext.class);

  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocol mockCM =
      new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    startLaunchBarrier.await();


    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    completeLaunchBarrier.await();

    ut.waitForPoolToIdle();

    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
    verify(mockEventHandler, atLeast(2)).handle(arg.capture());
    boolean containerCleaned = false;

    for (int i =0; i < arg.getAllValues().size(); i++) {
      LOG.info(arg.getAllValues().get(i).toString());
      Event currentEvent = arg.getAllValues().get(i);
      if (currentEvent.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) {
        containerCleaned = true;
      }
    }
    assert(containerCleaned);

  } finally {
    ut.stop();
  }
}
项目:hadoop-on-lustre2    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testHandle() throws Exception {
  LOG.info("STARTING testHandle");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
  String cmAddress = "127.0.0.1:8000";
  ContainerManagementProtocol mockCM =
      mock(ContainerManagementProtocol.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:hadoop-on-lustre2    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testOutOfOrder() throws Exception {
  LOG.info("STARTING testOutOfOrder");
  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocol mockCM =
      mock(ContainerManagementProtocol.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).stopContainers(any(StopContainersRequest.class));

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM, never()).startContainers(any(StartContainersRequest.class));
  } finally {
    ut.stop();
  }
}
项目:hadoop-on-lustre2    文件:TestContainerLauncherImpl.java   
@Test(timeout = 5000)
public void testMyShutdown() throws Exception {
  LOG.info("in test Shutdown");

  AppContext mockContext = mock(AppContext.class);
  @SuppressWarnings("rawtypes")
  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocol mockCM =
      mock(ContainerManagementProtocol.class);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);

    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent =
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    ut.waitForPoolToIdle();

    verify(mockCM).startContainers(any(StartContainersRequest.class));

    // skip cleanup and make sure stop kills the container

  } finally {
    ut.stop();
    verify(mockCM).stopContainers(any(StopContainersRequest.class));
  }
}
项目:hadoop-on-lustre2    文件:TestContainerLauncherImpl.java   
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test(timeout = 5000)
public void testContainerCleaned() throws Exception {
  LOG.info("STARTING testContainerCleaned");

  CyclicBarrier startLaunchBarrier = new CyclicBarrier(2);
  CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2);

  AppContext mockContext = mock(AppContext.class);

  EventHandler mockEventHandler = mock(EventHandler.class);
  when(mockContext.getEventHandler()).thenReturn(mockEventHandler);

  ContainerManagementProtocol mockCM =
      new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
  ContainerLauncherImplUnderTest ut =
      new ContainerLauncherImplUnderTest(mockContext, mockCM);

  Configuration conf = new Configuration();
  ut.init(conf);
  ut.start();
  try {
    ContainerId contId = makeContainerId(0l, 0, 0, 1);
    TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
    String cmAddress = "127.0.0.1:8000";
    StartContainersResponse startResp =
      recordFactory.newRecordInstance(StartContainersResponse.class);
    startResp.setAllServicesMetaData(serviceResponse);


    LOG.info("inserting launch event");
    ContainerRemoteLaunchEvent mockLaunchEvent = 
      mock(ContainerRemoteLaunchEvent.class);
    when(mockLaunchEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
    when(mockLaunchEvent.getContainerID())
      .thenReturn(contId);
    when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    when(mockLaunchEvent.getContainerToken()).thenReturn(
        createNewContainerToken(contId, cmAddress));
    ut.handle(mockLaunchEvent);

    startLaunchBarrier.await();


    LOG.info("inserting cleanup event");
    ContainerLauncherEvent mockCleanupEvent = 
      mock(ContainerLauncherEvent.class);
    when(mockCleanupEvent.getType())
      .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
    when(mockCleanupEvent.getContainerID())
      .thenReturn(contId);
    when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
    when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
    ut.handle(mockCleanupEvent);

    completeLaunchBarrier.await();

    ut.waitForPoolToIdle();

    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
    verify(mockEventHandler, atLeast(2)).handle(arg.capture());
    boolean containerCleaned = false;

    for (int i =0; i < arg.getAllValues().size(); i++) {
      LOG.info(arg.getAllValues().get(i).toString());
      Event currentEvent = arg.getAllValues().get(i);
      if (currentEvent.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) {
        containerCleaned = true;
      }
    }
    assert(containerCleaned);

  } finally {
    ut.stop();
  }
}