public void testFailingJobInitalization() throws Exception { Properties schedulerProps = new Properties(); schedulerProps.put( "mapred.capacity-scheduler.queue.default.capacity", "100"); Properties clusterProps = new Properties(); clusterProps .put("mapred.tasktracker.map.tasks.maximum", String.valueOf(1)); clusterProps.put("mapred.tasktracker.reduce.tasks.maximum", String .valueOf(1)); clusterProps.put("mapred.jobtracker.maxtasks.per.job", String .valueOf(1)); // cluster capacity 1 maps, 1 reduces startCluster(1, clusterProps, schedulerProps); ControlledMapReduceJobRunner jobRunner = ControlledMapReduceJobRunner.getControlledMapReduceJobRunner( getJobConf(), 3, 3); jobRunner.start(); JobID myJobID = jobRunner.getJobID(); JobInProgress myJob = getJobTracker().getJob(myJobID); while(!myJob.isComplete()) { Thread.sleep(1000); } assertTrue("The submitted job successfully completed", myJob.status.getRunState() == JobStatus.FAILED); CapacityTaskScheduler scheduler = (CapacityTaskScheduler) getJobTracker().getTaskScheduler(); JobQueuesManager mgr = scheduler.jobQueuesManager; assertEquals("Failed job present in Waiting queue", 0, mgr.getWaitingJobCount("default")); assertFalse("Failed job present in Waiting queue", mgr.getWaitingJobs("default").contains(myJob)); }
/** * Starts a job with 5 maps and 5 reduces. Then controls the finishing of * tasks. Signals finishing tasks in batches and then verifies their * completion. * * @throws Exception */ public void testControlledMapReduceJob() throws Exception { Properties props = new Properties(); props.setProperty("mapred.tasktracker.map.tasks.maximum", "2"); props.setProperty("mapred.tasktracker.reduce.tasks.maximum", "2"); startCluster(true, props); LOG.info("Started the cluster"); ControlledMapReduceJobRunner jobRunner = ControlledMapReduceJobRunner .getControlledMapReduceJobRunner(createJobConf(), 7, 6); jobRunner.start(); ControlledMapReduceJob controlledJob = jobRunner.getJob(); JobInProgress jip = getMRCluster().getJobTrackerRunner().getJobTracker().getJob( jobRunner.getJobID()); ControlledMapReduceJob.waitTillNTasksStartRunning(jip, true, 4); LOG.info("Finishing 3 maps"); controlledJob.finishNTasks(true, 3); ControlledMapReduceJob.waitTillNTotalTasksFinish(jip, true, 3); ControlledMapReduceJob.waitTillNTasksStartRunning(jip, true, 4); LOG.info("Finishing 4 more maps"); controlledJob.finishNTasks(true, 4); ControlledMapReduceJob.waitTillNTotalTasksFinish(jip, true, 7); ControlledMapReduceJob.waitTillNTasksStartRunning(jip, false, 4); LOG.info("Finishing 2 reduces"); controlledJob.finishNTasks(false, 2); ControlledMapReduceJob.waitTillNTotalTasksFinish(jip, false, 2); ControlledMapReduceJob.waitTillNTasksStartRunning(jip, false, 4); LOG.info("Finishing 4 more reduces"); controlledJob.finishNTasks(false, 4); ControlledMapReduceJob.waitTillNTotalTasksFinish(jip, false, 6); jobRunner.join(); }
/** * Test single queue. * * <p> * * Submit a job with more M/R tasks than total capacity. Full queue capacity * should be utilized and remaining M/R tasks should wait for slots to be * available. * * @throws Exception */ @Test public void testJobTrackerRestartWithCS() throws Exception { try { Properties schedulerProps = new Properties(); schedulerProps.put( "mapred.capacity-scheduler.queue.default.guaranteed-capacity", "100"); Properties clusterProps = new Properties(); clusterProps.put("mapred.tasktracker.map.tasks.maximum", String.valueOf(2)); clusterProps.put("mapred.tasktracker.reduce.tasks.maximum", String.valueOf(0)); // cluster capacity 2 maps, 0 reduces startCluster(1, clusterProps, schedulerProps); ControlledMapReduceJobRunner jobRunner = ControlledMapReduceJobRunner.getControlledMapReduceJobRunner( getJobConf(), 4, 0); jobRunner.start(); ControlledMapReduceJob controlledJob = jobRunner.getJob(); JobID myJobID = jobRunner.getJobID(); JobInProgress myJob = getJobTracker().getJob(myJobID); ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 2); LOG.info("Trying to finish 2 maps"); controlledJob.finishNTasks(true, 2); ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 2); assertTrue("Number of maps finished", myJob.finishedMaps() == 2); JobClient jobClient = new JobClient(getMrCluster().createJobConf()); getMrCluster().stopJobTracker(); getMrCluster().getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true); getMrCluster().startJobTracker(); UtilsForTests.waitForJobTracker(jobClient); ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 1); controlledJob.finishNTasks(true, 2); ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 2); } catch (Exception e) { e.printStackTrace(); } finally { tearDown(); } }
/** * Test single queue. * * <p> * * Submit a job with more M/R tasks than total capacity. Full queue capacity * should be utilized and remaining M/R tasks should wait for slots to be * available. * * @throws Exception */ public void testJobTrackerRestartWithCS() throws Exception { try { Properties schedulerProps = new Properties(); schedulerProps.put( "mapred.capacity-scheduler.queue.default.guaranteed-capacity", "100"); Properties clusterProps = new Properties(); clusterProps.put("mapred.tasktracker.map.tasks.maximum", String.valueOf(2)); clusterProps.put("mapred.tasktracker.reduce.tasks.maximum", String.valueOf(0)); // cluster capacity 2 maps, 0 reduces startCluster(1, clusterProps, schedulerProps); ControlledMapReduceJobRunner jobRunner = ControlledMapReduceJobRunner.getControlledMapReduceJobRunner( getJobConf(), 4, 0); jobRunner.start(); ControlledMapReduceJob controlledJob = jobRunner.getJob(); JobID myJobID = jobRunner.getJobID(); JobInProgress myJob = getJobTracker().getJob(myJobID); ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 2); LOG.info("Trying to finish 2 maps"); controlledJob.finishNTasks(true, 2); ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 2); assertTrue("Number of maps finished", myJob.finishedMaps() == 2); JobClient jobClient = new JobClient(getMrCluster().createJobConf()); getMrCluster().stopJobTracker(); getMrCluster().getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true); getMrCluster().startJobTracker(); UtilsForTests.waitForJobTracker(jobClient); ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 1); controlledJob.finishNTasks(true, 2); ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 2); } catch (Exception e) { e.printStackTrace(); } finally { tearDown(); } }
/** * Test single queue. * * <p> * * Submit a job with more M/R tasks than total capacity. Full queue capacity * should be utilized and remaining M/R tasks should wait for slots to be * available. * * @throws Exception */ public void testSingleQueue() throws Exception { Properties schedulerProps = new Properties(); schedulerProps.put( "mapred.capacity-scheduler.queue.default.guaranteed-capacity", "100"); Properties clusterProps = new Properties(); clusterProps .put("mapred.tasktracker.map.tasks.maximum", String.valueOf(3)); clusterProps.put("mapred.tasktracker.reduce.tasks.maximum", String .valueOf(3)); // cluster capacity 12 maps, 12 reduces startCluster(4, clusterProps, schedulerProps); ControlledMapReduceJobRunner jobRunner = ControlledMapReduceJobRunner.getControlledMapReduceJobRunner( getJobConf(), 16, 16); jobRunner.start(); ControlledMapReduceJob controlledJob = jobRunner.getJob(); JobID myJobID = jobRunner.getJobID(); JobInProgress myJob = getJobTracker().getJob(myJobID); ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 12); // Wait till the cluster reaches steady state. This confirms that the rest // of the tasks are not running and waiting for slots // to be freed. waitTillAllSlotsAreOccupied(true); LOG.info("Trying to finish 2 maps"); controlledJob.finishNTasks(true, 2); ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 2); assertTrue("Number of maps finished", myJob.finishedMaps() == 2); ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 12); waitTillAllSlotsAreOccupied(true); LOG.info("Trying to finish 2 more maps"); controlledJob.finishNTasks(true, 2); ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 4); assertTrue("Number of maps finished", myJob.finishedMaps() == 4); ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 12); waitTillAllSlotsAreOccupied(true); LOG.info("Trying to finish the last 12 maps"); controlledJob.finishNTasks(true, 12); ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 16); assertTrue("Number of maps finished", myJob.finishedMaps() == 16); ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 0); ControlledMapReduceJob.haveAllTasksFinished(myJob, true); ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, false, 12); waitTillAllSlotsAreOccupied(false); LOG.info("Trying to finish 4 reduces"); controlledJob.finishNTasks(false, 4); ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, false, 4); assertTrue("Number of reduces finished", myJob.finishedReduces() == 4); ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, false, 12); waitTillAllSlotsAreOccupied(false); LOG.info("Trying to finish the last 12 reduces"); controlledJob.finishNTasks(false, 12); ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, false, 16); assertTrue("Number of reduces finished", myJob.finishedReduces() == 16); ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, false, 0); ControlledMapReduceJob.haveAllTasksFinished(myJob, false); jobRunner.join(); }
/** * Submit a job with more M/R tasks than total queue capacity and then submit * another job. First job utilizes all the slots. When the second job is * submitted, the tasks of the second job wait for slots to be available. As * the tasks of the first jobs finish and there are no more tasks pending, the * tasks of the second job start running on the freed up slots. * * @throws Exception */ private void singleQMultipleJobs1() throws Exception { ControlledMapReduceJobRunner jobRunner1 = ControlledMapReduceJobRunner.getControlledMapReduceJobRunner( getJobConf(), 16, 0); ControlledMapReduceJobRunner jobRunner2 = ControlledMapReduceJobRunner.getControlledMapReduceJobRunner( getJobConf(), 12, 0); jobRunner1.start(); ControlledMapReduceJob controlledJob1 = jobRunner1.getJob(); JobID jobID1 = jobRunner1.getJobID(); JobInProgress jip1 = getJobTracker().getJob(jobID1); ControlledMapReduceJob.waitTillNTasksStartRunning(jip1, true, 12); // Confirm that the rest of the tasks are not running and waiting for slots // to be freed. waitTillAllSlotsAreOccupied(true); // Now start the second job. jobRunner2.start(); JobID jobID2 = jobRunner2.getJobID(); ControlledMapReduceJob controlledJob2 = jobRunner2.getJob(); JobInProgress jip2 = getJobTracker().getJob(jobID2); LOG.info("Trying to finish 2 map"); controlledJob1.finishNTasks(true, 2); ControlledMapReduceJob.waitTillNTotalTasksFinish(jip1, true, 2); assertTrue("Number of maps finished", jip1.finishedMaps() == 2); ControlledMapReduceJob.waitTillNTasksStartRunning(jip1, true, 12); waitTillAllSlotsAreOccupied(true); LOG.info("Trying to finish 2 more maps"); controlledJob1.finishNTasks(true, 2); ControlledMapReduceJob.waitTillNTotalTasksFinish(jip1, true, 4); assertTrue("Number of maps finished", jip1.finishedMaps() == 4); ControlledMapReduceJob.waitTillNTasksStartRunning(jip1, true, 12); waitTillAllSlotsAreOccupied(true); // All tasks of Job1 started running/finished. Now job2 should start LOG.info("Trying to finish 2 more maps"); controlledJob1.finishNTasks(true, 2); ControlledMapReduceJob.waitTillNTotalTasksFinish(jip1, true, 6); assertTrue("Number of maps finished", jip1.finishedMaps() == 6); ControlledMapReduceJob.waitTillNTasksStartRunning(jip1, true, 10); ControlledMapReduceJob.waitTillNTasksStartRunning(jip2, true, 2); waitTillAllSlotsAreOccupied(true); ControlledMapReduceJob.assertNumTasksRunning(jip1, true, 10); ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 2); LOG.info("Trying to finish 10 more maps and hence job1"); controlledJob1.finishNTasks(true, 10); ControlledMapReduceJob.waitTillNTotalTasksFinish(jip1, true, 16); assertTrue("Number of maps finished", jip1.finishedMaps() == 16); ControlledMapReduceJob.waitTillNTasksStartRunning(jip2, true, 12); controlledJob1.finishJob(); waitTillAllSlotsAreOccupied(true); ControlledMapReduceJob.assertNumTasksRunning(jip1, true, 0); ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 12); // Finish job2 also controlledJob2.finishJob(); ControlledMapReduceJob.waitTillNTotalTasksFinish(jip2, true, 12); ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 0); jobRunner1.join(); jobRunner2.join(); }
/** * Submit a job with less M/R tasks than total capacity and another job with * more M/R tasks than the remaining capacity. First job should utilize the * required slots and other job should utilize the available slots and its * remaining tasks wait for slots to become free. * * @throws Exception */ private void singleQMultipleJobs2() throws Exception { ControlledMapReduceJobRunner jobRunner1 = ControlledMapReduceJobRunner.getControlledMapReduceJobRunner( getJobConf(), 8, 0); ControlledMapReduceJobRunner jobRunner2 = ControlledMapReduceJobRunner.getControlledMapReduceJobRunner( getJobConf(), 12, 0); jobRunner1.start(); ControlledMapReduceJob controlledJob1 = jobRunner1.getJob(); JobID jobID1 = jobRunner1.getJobID(); JobInProgress jip1 = getJobTracker().getJob(jobID1); ControlledMapReduceJob.waitTillNTasksStartRunning(jip1, true, 8); ControlledMapReduceJob.assertNumTasksRunning(jip1, true, 8); // Now start the second job. jobRunner2.start(); JobID jobID2 = jobRunner2.getJobID(); ControlledMapReduceJob controlledJob2 = jobRunner2.getJob(); JobInProgress jip2 = getJobTracker().getJob(jobID2); ControlledMapReduceJob.waitTillNTasksStartRunning(jip2, true, 4); waitTillAllSlotsAreOccupied(true); ControlledMapReduceJob.assertNumTasksRunning(jip1, true, 8); // The rest of the tasks of job2 should wait. ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 4); LOG.info("Trying to finish 2 maps of job1"); controlledJob1.finishNTasks(true, 2); ControlledMapReduceJob.waitTillNTotalTasksFinish(jip1, true, 2); assertTrue("Number of maps finished", jip1.finishedMaps() == 2); ControlledMapReduceJob.waitTillNTasksStartRunning(jip1, true, 6); ControlledMapReduceJob.waitTillNTasksStartRunning(jip2, true, 6); waitTillAllSlotsAreOccupied(true); ControlledMapReduceJob.assertNumTasksRunning(jip1, true, 6); ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 6); LOG.info("Trying to finish 6 more maps of job1"); controlledJob1.finishNTasks(true, 6); ControlledMapReduceJob.waitTillNTotalTasksFinish(jip1, true, 8); assertTrue("Number of maps finished", jip1.finishedMaps() == 8); ControlledMapReduceJob.waitTillNTasksStartRunning(jip2, true, 12); waitTillAllSlotsAreOccupied(true); ControlledMapReduceJob.assertNumTasksRunning(jip1, true, 0); ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 12); // Finish job2 also controlledJob2.finishJob(); ControlledMapReduceJob.waitTillNTotalTasksFinish(jip2, true, 12); ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 0); jobRunner1.join(); jobRunner2.join(); }
/** * Test to verify running of tasks in a queue going over its capacity. In * queue default, user U1 starts a job J1, having more M/R tasks than the * total slots. M/R tasks of job J1 should start running on all the nodes (100 * % utilization). * * @throws Exception */ private void multipleQsWithOneQBeyondCapacity(String[] queues) throws Exception { JobConf conf = getJobConf(); conf.setQueueName(queues[0]); conf.setUser("U1"); ControlledMapReduceJobRunner jobRunner = ControlledMapReduceJobRunner.getControlledMapReduceJobRunner(conf, 15, 0); jobRunner.start(); ControlledMapReduceJob controlledJob = jobRunner.getJob(); JobID myJobID = jobRunner.getJobID(); JobInProgress myJob = getJobTracker().getJob(myJobID); ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 10); // Confirm that the rest of the tasks are not running and waiting for slots // to be freed. waitTillAllSlotsAreOccupied(true); ControlledMapReduceJob.assertNumTasksRunning(myJob, true, 10); LOG.info("Trying to finish 3 maps"); controlledJob.finishNTasks(true, 3); ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 3); assertTrue("Number of maps finished", myJob.finishedMaps() == 3); ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 10); waitTillAllSlotsAreOccupied(true); ControlledMapReduceJob.assertNumTasksRunning(myJob, true, 10); LOG.info("Trying to finish 2 more maps"); controlledJob.finishNTasks(true, 2); ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 5); assertTrue("Number of maps finished", myJob.finishedMaps() == 5); ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 10); waitTillAllSlotsAreOccupied(true); ControlledMapReduceJob.assertNumTasksRunning(myJob, true, 10); // Finish job controlledJob.finishJob(); ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 15); ControlledMapReduceJob.assertNumTasksRunning(myJob, true, 0); jobRunner.join(); }
/** * Test to verify queue capacities across multiple queues. In this test, jobs * are submitted to different queues - all below the queue's capacity and * verifies that all the jobs are running. This will test code paths related * to job initialization, considering multiple queues for scheduling jobs etc. * * <p> * * One user per queue. Four jobs are submitted to the four queues such that * they exactly fill up the queues. No queue should be beyond capacity. All * jobs should be running. * * @throws Exception */ private void multipleQueuesWithinCapacities(String[] queues) throws Exception { String[] users = new String[] { "U1", "U2", "U3", "U4" }; ControlledMapReduceJobRunner[] jobRunners = new ControlledMapReduceJobRunner[4]; ControlledMapReduceJob[] controlledJobs = new ControlledMapReduceJob[4]; JobInProgress[] jips = new JobInProgress[4]; // Initialize all the jobs // Start all the jobs in parallel JobConf conf = getJobConf(); int numTasks = 1; for (int i = 0; i < 4; i++) { conf.setQueueName(queues[i]); conf.setUser(users[i]); jobRunners[i] = ControlledMapReduceJobRunner.getControlledMapReduceJobRunner( getJobConf(), numTasks, numTasks); jobRunners[i].start(); controlledJobs[i] = jobRunners[i].getJob(); JobID jobID = jobRunners[i].getJobID(); jips[i] = getJobTracker().getJob(jobID); // Wait till all the jobs start running all of their tasks ControlledMapReduceJob.waitTillNTasksStartRunning(jips[i], true, numTasks); ControlledMapReduceJob.waitTillNTasksStartRunning(jips[i], false, numTasks); numTasks += 1; } // Ensure steady state behavior waitTillAllSlotsAreOccupied(true); waitTillAllSlotsAreOccupied(false); numTasks = 1; for (int i = 0; i < 4; i++) { ControlledMapReduceJob.assertNumTasksRunning(jips[i], true, numTasks); ControlledMapReduceJob.assertNumTasksRunning(jips[i], false, numTasks); numTasks += 1; } // Finish the jobs and join them numTasks = 1; for (int i = 0; i < 4; i++) { controlledJobs[i].finishJob(); ControlledMapReduceJob .waitTillNTotalTasksFinish(jips[i], true, numTasks); ControlledMapReduceJob.assertNumTasksRunning(jips[i], true, 0); ControlledMapReduceJob.waitTillNTotalTasksFinish(jips[i], false, numTasks); ControlledMapReduceJob.assertNumTasksRunning(jips[i], false, 0); jobRunners[i].join(); numTasks += 1; } }
/** * Starts a job with 5 maps and 5 reduces. Then controls the finishing of * tasks. Signals finishing tasks in batches and then verifies their * completion. * * @throws Exception */ public void testControlledMapReduceJob() throws Exception { Properties props = new Properties(); props.setProperty(TTConfig.TT_MAP_SLOTS, "2"); props.setProperty(TTConfig.TT_REDUCE_SLOTS, "2"); startCluster(true, props); LOG.info("Started the cluster"); ControlledMapReduceJobRunner jobRunner = ControlledMapReduceJobRunner .getControlledMapReduceJobRunner(createJobConf(), 7, 6); jobRunner.start(); ControlledMapReduceJob controlledJob = jobRunner.getJob(); JobInProgress jip = getMRCluster().getJobTrackerRunner().getJobTracker().getJob( jobRunner.getJobID()); ControlledMapReduceJob.waitTillNTasksStartRunning(jip, true, 4); LOG.info("Finishing 3 maps"); controlledJob.finishNTasks(true, 3); ControlledMapReduceJob.waitTillNTotalTasksFinish(jip, true, 3); ControlledMapReduceJob.waitTillNTasksStartRunning(jip, true, 4); LOG.info("Finishing 4 more maps"); controlledJob.finishNTasks(true, 4); ControlledMapReduceJob.waitTillNTotalTasksFinish(jip, true, 7); ControlledMapReduceJob.waitTillNTasksStartRunning(jip, false, 4); LOG.info("Finishing 2 reduces"); controlledJob.finishNTasks(false, 2); ControlledMapReduceJob.waitTillNTotalTasksFinish(jip, false, 2); ControlledMapReduceJob.waitTillNTasksStartRunning(jip, false, 4); LOG.info("Finishing 4 more reduces"); controlledJob.finishNTasks(false, 4); ControlledMapReduceJob.waitTillNTotalTasksFinish(jip, false, 6); jobRunner.join(); }