private void testMRAppHistory(MRApp app) throws Exception { Configuration conf = new Configuration(); Job job = app.submit(conf); app.waitForState(job, JobState.FAILED); Map<TaskId, Task> tasks = job.getTasks(); Assert.assertEquals("Num tasks is not correct", 1, tasks.size()); Task task = tasks.values().iterator().next(); Assert.assertEquals("Task state not correct", TaskState.FAILED, task .getReport().getTaskState()); Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator().next() .getAttempts(); Assert.assertEquals("Num attempts is not correct", 4, attempts.size()); Iterator<TaskAttempt> it = attempts.values().iterator(); TaskAttemptReport report = it.next().getReport(); Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED, report.getTaskAttemptState()); Assert.assertEquals("Diagnostic Information is not Correct", "Test Diagnostic Event", report.getDiagnosticInfo()); report = it.next().getReport(); Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED, report.getTaskAttemptState()); }
@Test public void testLogsView2() throws IOException { LOG.info("HsLogsPage with data"); MockAppContext ctx = new MockAppContext(0, 1, 1, 1); Map<String, String> params = new HashMap<String, String>(); params.put(CONTAINER_ID, MRApp.newContainerId(1, 1, 333, 1) .toString()); params.put(NM_NODENAME, NodeId.newInstance(MockJobs.NM_HOST, MockJobs.NM_PORT).toString()); params.put(ENTITY_STRING, "container_10_0001_01_000001"); params.put(APP_OWNER, "owner"); Injector injector = WebAppTests.testPage(AggregatedLogsPage.class, AppContext.class, ctx, params); PrintWriter spyPw = WebAppTests.getPrintWriter(injector); verify(spyPw).write( "Aggregation is not enabled. Try the nodemanager at " + MockJobs.NM_HOST + ":" + MockJobs.NM_PORT); }
@Test public void testLogsViewBadStartEnd() throws IOException { LOG.info("HsLogsPage with bad start/end params"); MockAppContext ctx = new MockAppContext(0, 1, 1, 1); Map<String, String> params = new HashMap<String, String>(); params.put("start", "foo"); params.put("end", "bar"); params.put(CONTAINER_ID, MRApp.newContainerId(1, 1, 333, 1) .toString()); params.put(NM_NODENAME, NodeId.newInstance(MockJobs.NM_HOST, MockJobs.NM_PORT).toString()); params.put(ENTITY_STRING, "container_10_0001_01_000001"); params.put(APP_OWNER, "owner"); Injector injector = WebAppTests.testPage(AggregatedLogsPage.class, AppContext.class, ctx, params); PrintWriter spyPw = WebAppTests.getPrintWriter(injector); verify(spyPw).write("Invalid log start value: foo"); verify(spyPw).write("Invalid log end value: bar"); }
@Test public void testMRWebAppSSLDisabled() throws Exception { MRApp app = new MRApp(2, 2, true, this.getClass().getName(), true) { @Override protected ClientService createClientService(AppContext context) { return new MRClientService(context); } }; Configuration conf = new Configuration(); // MR is explicitly disabling SSL, even though setting as HTTPS_ONLY conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, Policy.HTTPS_ONLY.name()); Job job = app.submit(conf); String hostPort = NetUtils.getHostPortString(((MRClientService) app.getClientService()) .getWebApp().getListenerAddress()); // http:// should be accessible URL httpUrl = new URL("http://" + hostPort); HttpURLConnection conn = (HttpURLConnection) httpUrl.openConnection(); InputStream in = conn.getInputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream(); IOUtils.copyBytes(in, out, 1024); Assert.assertTrue(out.toString().contains("MapReduce Application")); // https:// is not accessible. URL httpsUrl = new URL("https://" + hostPort); try { HttpURLConnection httpsConn = (HttpURLConnection) httpsUrl.openConnection(); httpsConn.getInputStream(); Assert.fail("https:// is not accessible, expected to fail"); } catch (Exception e) { Assert.assertTrue(e instanceof SSLException); } app.waitForState(job, JobState.SUCCEEDED); app.verifyCompleted(); }
@Test public void testMRWebAppRedirection() throws Exception { String[] schemePrefix = { WebAppUtils.HTTP_PREFIX, WebAppUtils.HTTPS_PREFIX }; for (String scheme : schemePrefix) { MRApp app = new MRApp(2, 2, true, this.getClass().getName(), true) { @Override protected ClientService createClientService(AppContext context) { return new MRClientService(context); } }; Configuration conf = new Configuration(); conf.set(YarnConfiguration.PROXY_ADDRESS, "9.9.9.9"); conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, scheme .equals(WebAppUtils.HTTPS_PREFIX) ? Policy.HTTPS_ONLY.name() : Policy.HTTP_ONLY.name()); webProxyBase = "/proxy/" + app.getAppID(); conf.set("hadoop.http.filter.initializers", TestAMFilterInitializer.class.getName()); Job job = app.submit(conf); String hostPort = NetUtils.getHostPortString(((MRClientService) app.getClientService()) .getWebApp().getListenerAddress()); URL httpUrl = new URL("http://" + hostPort + "/mapreduce"); HttpURLConnection conn = (HttpURLConnection) httpUrl.openConnection(); conn.setInstanceFollowRedirects(false); conn.connect(); String expectedURL = scheme + conf.get(YarnConfiguration.PROXY_ADDRESS) + ProxyUriUtils.getPath(app.getAppID(), "/mapreduce"); Assert.assertEquals(expectedURL, conn.getHeaderField(HttpHeaders.LOCATION)); Assert.assertEquals(HttpStatus.SC_MOVED_TEMPORARILY, conn.getResponseCode()); app.waitForState(job, JobState.SUCCEEDED); app.verifyCompleted(); } }
private Token createNewContainerToken(ContainerId contId, String containerManagerAddr) { long currentTime = System.currentTimeMillis(); return MRApp.newContainerToken(NodeId.newInstance("127.0.0.1", 1234), "password".getBytes(), new ContainerTokenIdentifier( contId, containerManagerAddr, "user", Resource.newInstance(1024, 1, 1), currentTime + 10000L, 123, currentTime, Priority.newInstance(0), 0)); }
private void finishNextNTasks(DrainDispatcher rmDispatcher, MockNM node, MRApp mrApp, Iterator<Task> it, int nextN) throws Exception { Task task; for (int i=0; i<nextN; i++) { task = it.next(); finishTask(rmDispatcher, node, mrApp, task); } }
private void finishTask(DrainDispatcher rmDispatcher, MockNM node, MRApp mrApp, Task task) throws Exception { TaskAttempt attempt = task.getAttempts().values().iterator().next(); List<ContainerStatus> contStatus = new ArrayList<ContainerStatus>(1); contStatus.add(ContainerStatus.newInstance(attempt.getAssignedContainerID(), ContainerState.COMPLETE, "", 0)); Map<ApplicationId,List<ContainerStatus>> statusUpdate = new HashMap<ApplicationId,List<ContainerStatus>>(1); statusUpdate.put(mrApp.getAppID(), contStatus); node.nodeHeartbeat(statusUpdate, true); rmDispatcher.await(); mrApp.getContext().getEventHandler().handle( new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE)); mrApp.waitForState(task, TaskState.SUCCEEDED); }
@Test public void testLogsViewSingle() throws IOException { LOG.info("HsLogsPage with params for single log and data limits"); MockAppContext ctx = new MockAppContext(0, 1, 1, 1); Map<String, String> params = new HashMap<String, String>(); final Configuration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); params.put("start", "-2048"); params.put("end", "-1024"); params.put(CONTAINER_LOG_TYPE, "syslog"); params.put(CONTAINER_ID, MRApp.newContainerId(1, 1, 333, 1) .toString()); params.put(NM_NODENAME, NodeId.newInstance(MockJobs.NM_HOST, MockJobs.NM_PORT).toString()); params.put(ENTITY_STRING, "container_10_0001_01_000001"); params.put(APP_OWNER, "owner"); Injector injector = WebAppTests.testPage(AggregatedLogsPage.class, AppContext.class, ctx, params, new AbstractModule() { @Override protected void configure() { bind(Configuration.class).toInstance(conf); } }); PrintWriter spyPw = WebAppTests.getPrintWriter(injector); verify(spyPw).write( "Logs not available for container_10_0001_01_000001." + " Aggregation may not be complete, " + "Check back later or try the nodemanager at " + MockJobs.NM_HOST + ":" + MockJobs.NM_PORT); }
/** * Verify that all the events are flushed on stopping the HistoryHandler * @throws Exception */ @Test public void testEventsFlushOnStop() throws Exception { Configuration conf = new Configuration(); MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this .getClass().getName(), true); app.submit(conf); Job job = app.getContext().getAllJobs().values().iterator().next(); JobId jobId = job.getID(); LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString()); app.waitForState(job, JobState.SUCCEEDED); // make sure all events are flushed app.waitForState(Service.STATE.STOPPED); /* * Use HistoryContext to read logged events and verify the number of * completed maps */ HistoryContext context = new JobHistory(); ((JobHistory) context).init(conf); Job parsedJob = context.getJob(jobId); Assert.assertEquals("CompletedMaps not correct", 1, parsedJob .getCompletedMaps()); Map<TaskId, Task> tasks = parsedJob.getTasks(); Assert.assertEquals("No of tasks not correct", 1, tasks.size()); verifyTask(tasks.values().iterator().next()); Map<TaskId, Task> maps = parsedJob.getTasks(TaskType.MAP); Assert.assertEquals("No of maps not correct", 1, maps.size()); Assert.assertEquals("Job state not currect", JobState.SUCCEEDED, parsedJob.getState()); }
@Test public void testJobHistoryEventHandlerIsFirstServiceToStop() { MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this .getClass().getName(), true); Configuration conf = new Configuration(); app.init(conf); Service[] services = app.getServices().toArray(new Service[0]); // Verifying that it is the last to be added is same as verifying that it is // the first to be stopped. CompositeService related tests already validate // this. Assert.assertEquals("JobHistoryEventHandler", services[services.length - 1].getName()); }
@Test public void testAssignedQueue() throws Exception { Configuration conf = new Configuration(); MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), true, "assignedQueue"); app.submit(conf); Job job = app.getContext().getAllJobs().values().iterator().next(); JobId jobId = job.getID(); LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString()); app.waitForState(job, JobState.SUCCEEDED); //make sure all events are flushed app.waitForState(Service.STATE.STOPPED); /* * Use HistoryContext to read logged events and verify the number of * completed maps */ HistoryContext context = new JobHistory(); // test start and stop states ((JobHistory)context).init(conf); ((JobHistory)context).start(); Assert.assertTrue( context.getStartTime()>0); Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STARTED); // get job before stopping JobHistory Job parsedJob = context.getJob(jobId); // stop JobHistory ((JobHistory)context).stop(); Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STOPPED); Assert.assertEquals("QueueName not correct", "assignedQueue", parsedJob.getQueueName()); }
private void verifyAttempt(TaskAttempt attempt) { Assert.assertEquals("TaskAttempt state not currect", TaskAttemptState.SUCCEEDED, attempt.getState()); Assert.assertNotNull(attempt.getAssignedContainerID()); //Verify the wrong ctor is not being used. Remove after mrv1 is removed. ContainerId fakeCid = MRApp.newContainerId(-1, -1, -1, -1); Assert.assertFalse(attempt.getAssignedContainerID().equals(fakeCid)); //Verify complete contianerManagerAddress Assert.assertEquals(MRApp.NM_HOST + ":" + MRApp.NM_PORT, attempt.getAssignedContainerMgrAddress()); }