@Test(timeout = 2000) public void testRMContainerAllocatorExceptionIsHandled() throws Exception { ClientService mockClientService = mock(ClientService.class); AppContext mockContext = mock(AppContext.class); MockRMCommunicator mockRMCommunicator = new MockRMCommunicator(mockClientService, mockContext); RMCommunicator communicator = spy(mockRMCommunicator); Clock mockClock = mock(Clock.class); when(mockContext.getClock()).thenReturn(mockClock); doThrow(new RMContainerAllocationException("Test")).doNothing() .when(communicator).heartbeat(); when(mockClock.getTime()).thenReturn(1L).thenThrow(new AssertionError( "GetClock called second time, when it should not have since the " + "thread should have quit")); AllocatorRunnable testRunnable = communicator.new AllocatorRunnable(); testRunnable.run(); }
public LocalContainerAllocator(ClientService clientService, AppContext context, String nmHost, int nmPort, int nmHttpPort , ContainerId cId) { super(clientService, context); this.eventHandler = context.getEventHandler(); this.nmHost = nmHost; this.nmPort = nmPort; this.nmHttpPort = nmHttpPort; this.containerId = cId; }
public RMCommunicator(ClientService clientService, AppContext context) { super("RMCommunicator"); this.clientService = clientService; this.context = context; this.eventHandler = context.getEventHandler(); this.applicationId = context.getApplicationID(); this.stopped = new AtomicBoolean(false); this.heartbeatCallbacks = new ConcurrentLinkedQueue<Runnable>(); this.schedulerResourceTypes = EnumSet.of(SchedulerResourceTypes.MEMORY); }
@Test public void testMRWebAppSSLDisabled() throws Exception { MRApp app = new MRApp(2, 2, true, this.getClass().getName(), true) { @Override protected ClientService createClientService(AppContext context) { return new MRClientService(context); } }; Configuration conf = new Configuration(); // MR is explicitly disabling SSL, even though setting as HTTPS_ONLY conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, Policy.HTTPS_ONLY.name()); Job job = app.submit(conf); String hostPort = NetUtils.getHostPortString(((MRClientService) app.getClientService()) .getWebApp().getListenerAddress()); // http:// should be accessible URL httpUrl = new URL("http://" + hostPort); HttpURLConnection conn = (HttpURLConnection) httpUrl.openConnection(); InputStream in = conn.getInputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream(); IOUtils.copyBytes(in, out, 1024); Assert.assertTrue(out.toString().contains("MapReduce Application")); // https:// is not accessible. URL httpsUrl = new URL("https://" + hostPort); try { HttpURLConnection httpsConn = (HttpURLConnection) httpsUrl.openConnection(); httpsConn.getInputStream(); Assert.fail("https:// is not accessible, expected to fail"); } catch (Exception e) { Assert.assertTrue(e instanceof SSLException); } app.waitForState(job, JobState.SUCCEEDED); app.verifyCompleted(); }
@Test public void testMRWebAppRedirection() throws Exception { String[] schemePrefix = { WebAppUtils.HTTP_PREFIX, WebAppUtils.HTTPS_PREFIX }; for (String scheme : schemePrefix) { MRApp app = new MRApp(2, 2, true, this.getClass().getName(), true) { @Override protected ClientService createClientService(AppContext context) { return new MRClientService(context); } }; Configuration conf = new Configuration(); conf.set(YarnConfiguration.PROXY_ADDRESS, "9.9.9.9"); conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, scheme .equals(WebAppUtils.HTTPS_PREFIX) ? Policy.HTTPS_ONLY.name() : Policy.HTTP_ONLY.name()); webProxyBase = "/proxy/" + app.getAppID(); conf.set("hadoop.http.filter.initializers", TestAMFilterInitializer.class.getName()); Job job = app.submit(conf); String hostPort = NetUtils.getHostPortString(((MRClientService) app.getClientService()) .getWebApp().getListenerAddress()); URL httpUrl = new URL("http://" + hostPort + "/mapreduce"); HttpURLConnection conn = (HttpURLConnection) httpUrl.openConnection(); conn.setInstanceFollowRedirects(false); conn.connect(); String expectedURL = scheme + conf.get(YarnConfiguration.PROXY_ADDRESS) + ProxyUriUtils.getPath(app.getAppID(), "/mapreduce"); Assert.assertEquals(expectedURL, conn.getHeaderField(HttpHeaders.LOCATION)); Assert.assertEquals(HttpStatus.SC_MOVED_TEMPORARILY, conn.getResponseCode()); app.waitForState(job, JobState.SUCCEEDED); app.verifyCompleted(); } }
@Override protected ClientService createClientService(AppContext context) { return new MRClientService(context) { @Override public InetSocketAddress getBindAddress() { return NetUtils.createSocketAddr("localhost:9876"); } @Override public int getHttpPort() { return -1; } }; }
@Override protected ClientService createClientService(AppContext context) { return new MRClientService(context) { @Override public void serviceStop() throws Exception { numStops++; clientServiceStopped = numStops; super.serviceStop(); } }; }
@Override protected ContainerAllocator createContainerAllocator( ClientService clientService, AppContext context) { context = spy(context); when(context.getEventHandler()).thenReturn(null); when(context.getApplicationID()).thenReturn(null); return new CustomContainerAllocator(this, context); }
@Override protected ContainerAllocator createContainerAllocator( final ClientService clientService, final AppContext context) { if(allocator == null) { if (crushUnregistration) { return new CustomContainerAllocator(context); } else { return super.createContainerAllocator(clientService, context); } } return allocator; }
private static ClientService createMockClientService() { ClientService service = mock(ClientService.class); when(service.getBindAddress()).thenReturn( NetUtils.createSocketAddr("localhost:4567")); when(service.getHttpPort()).thenReturn(890); return service; }
public RMContainerAllocator(ClientService clientService, AppContext context, AMPreemptionPolicy preemptionPolicy) { super(clientService, context); this.preemptionPolicy = preemptionPolicy; this.stopped = new AtomicBoolean(false); this.clock = context.getClock(); }
@Test(timeout = 2000) public void testRMContainerAllocatorYarnRuntimeExceptionIsHandled() throws Exception { ClientService mockClientService = mock(ClientService.class); AppContext mockContext = mock(AppContext.class); MockRMCommunicator mockRMCommunicator = new MockRMCommunicator(mockClientService, mockContext); final RMCommunicator communicator = spy(mockRMCommunicator); Clock mockClock = mock(Clock.class); when(mockContext.getClock()).thenReturn(mockClock); doThrow(new YarnRuntimeException("Test")).doNothing() .when(communicator).heartbeat(); when(mockClock.getTime()).thenReturn(1L).thenAnswer(new Answer<Integer>() { @Override public Integer answer(InvocationOnMock invocation) throws Throwable { communicator.stop(); return 2; } }).thenThrow(new AssertionError( "GetClock called second time, when it should not have since the thread " + "should have quit")); AllocatorRunnable testRunnable = communicator.new AllocatorRunnable(); testRunnable.run(); verify(mockClock, times(2)).getTime(); }