/** * This test exercises delay scheduling at the node level. We submit a job * with data on rack1.node2 and check that it doesn't get assigned on earlier * nodes. A second job with no locality info should get assigned instead. * * TaskTracker names in this test map to nodes as follows: * - tt1 = rack1.node1 * - tt2 = rack1.node2 * - tt3 = rack2.node1 * - tt4 = rack2.node2 */ public void testDelaySchedulingAtNodeLevel() throws IOException { setUpCluster(2, 2, true); scheduler.assignMultiple = true; JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1", new String[][] { {"rack2.node2"} }); JobInfo info1 = scheduler.infos.get(job1); // Advance time before submitting another job j2, to make j1 be ahead // of j2 in the queue deterministically. advanceTime(100); JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 0); // Assign tasks on nodes 1-3 and check that j2 gets them checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1", "attempt_test_0002_m_000001_0 on tt1"); checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2", "attempt_test_0002_m_000003_0 on tt2"); checkAssignment("tt3", "attempt_test_0002_m_000004_0 on tt3", "attempt_test_0002_m_000005_0 on tt3"); // Assign a task on node 4 now and check that j1 gets it. The other slot // on the node should be given to j2 because j1 will be out of tasks. checkAssignment("tt4", "attempt_test_0001_m_000000_0 on tt4", "attempt_test_0002_m_000006_0 on tt4"); // Check that delay scheduling info is properly set assertEquals(info1.lastMapLocalityLevel, LocalityLevel.NODE); assertEquals(info1.timeWaitedForLocalMap, 0); assertEquals(info1.skippedAtLastHeartbeat, false); }
/** * We submit two jobs at interval of 200 such that job2 has 2x the priority * of the job1, then wait for 100 ms, and check that all slots are assigned * to job 1 even though job 2 has higher priority and fair scheduler would * have allocated atleast a few slots to job 2 */ public void testFifoJobScheduler() throws Exception { // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); out.println("<pool name=\"pool_a\">"); out.println("<minMaps>2</minMaps>"); out.println("<minReduces>2</minReduces>"); // enable fifo out.println("<fifo>true</fifo>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a"); JobInfo info1 = scheduler.infos.get(job1); // Advance time 200ms and submit job 2 advanceTime(200); JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a"); JobInfo info2 = scheduler.infos.get(job2); job2.setPriority(JobPriority.HIGH); // Advance time 100ms advanceTime(100); // Assign tasks and check that all slots are given to job1 checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1"); checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2"); }
/** * This test configures a pool pool_a, and redirects the default to it. */ public void testPoolRedirect() throws Exception { // Set up pools file // pool_a has 0 totalInitedTasks, default does not have that restriction. // The redirect from default -> pool_a should enforce 0 total inited tasks. PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); out.println("<pool name=\"default\">"); out.println("<maxTotalInitedTasks>100</maxTotalInitedTasks>"); out.println("<redirect>pool_a</redirect>"); out.println("</pool>"); out.println("<pool name=\"pool_a\">"); out.println("<maxTotalInitedTasks>0</maxTotalInitedTasks>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); // Submit a job. JobInProgress job1 = submitJobNoInitialization(JobStatus.PREP, 10, 10); JobInfo info1 = scheduler.infos.get(job1); advanceTime(10); Thread.sleep(1000L); // Let JobInitializaer to finish the work // Should have gone to pool_a, not default assertEquals(info1.poolName, "pool_a"); }
/** * This test submits jobs in two pools, pool_a and pool_b. None of the * jobs in pool_a have maps, but this should not affect their reduce * share. */ public void testPoolWeightsWhenNoMaps() throws Exception { // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); out.println("<pool name=\"pool_a\">"); out.println("<weight>2.0</weight>"); out.println("</pool>"); out.println("<pool name=\"pool_b\">"); out.println("<weight>1.0</weight>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); // Submit jobs, advancing time in-between to make sure that they are // all submitted at distinct times. JobInProgress job1 = submitJob(JobStatus.RUNNING, 0, 10, "pool_a"); JobInfo info1 = scheduler.infos.get(job1); JobInProgress job2 = submitJob(JobStatus.RUNNING, 0, 10, "pool_a"); JobInfo info2 = scheduler.infos.get(job2); JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "pool_b"); JobInfo info3 = scheduler.infos.get(job3); advanceTime(10); assertEquals(0, info1.mapWeight, 0.01); assertEquals(1.0, info1.reduceWeight, 0.01); assertEquals(0, info2.mapWeight, 0.01); assertEquals(1.0, info2.reduceWeight, 0.01); assertEquals(1.0, info3.mapWeight, 0.01); assertEquals(1.0, info3.reduceWeight, 0.01); assertEquals(0, info1.mapFairShare, ALLOW_ERROR); assertEquals(1.33, info1.reduceFairShare, ALLOW_ERROR); assertEquals(0, info2.mapFairShare, ALLOW_ERROR); assertEquals(1.33, info2.reduceFairShare, ALLOW_ERROR); assertEquals(4, info3.mapFairShare, ALLOW_ERROR); assertEquals(1.33, info3.reduceFairShare, ALLOW_ERROR); }
/** * Verify the FIFO pool weight adjust */ public void testPoolFifoWeight() throws Exception { // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); out.println("<pool name=\"pool_a\">"); out.println("<fifo>true</fifo>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a"); advanceTime(1L); JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a"); advanceTime(2L); JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a"); advanceTime(3L); JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10, "pool_a"); scheduler.update(); JobInfo info1 = scheduler.infos.get(job1); JobInfo info2 = scheduler.infos.get(job2); JobInfo info3 = scheduler.infos.get(job3); JobInfo info4 = scheduler.infos.get(job4); final double ALLOWED_ERROR = 0.00001; assertEquals(8.0 / 15, info1.mapWeight, ALLOWED_ERROR); assertEquals(4.0 / 15, info2.mapWeight, ALLOWED_ERROR); assertEquals(2.0 / 15, info3.mapWeight, ALLOWED_ERROR); assertEquals(1.0 / 15, info4.mapWeight, ALLOWED_ERROR); }
/** * Verify the min slots of FIFO pools */ public void testPoolFifoMin() throws Exception { // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); out.println("<pool name=\"pool_a\">"); out.println("<fifo>true</fifo>"); out.println("<minMaps>12</minMaps>"); out.println("<minReduces>12</minReduces>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); JobInProgress job1 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a"); advanceTime(1L); JobInProgress job2 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a"); advanceTime(2L); JobInProgress job3 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a"); advanceTime(3L); JobInProgress job4 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a"); scheduler.update(); JobInfo info1 = scheduler.infos.get(job1); JobInfo info2 = scheduler.infos.get(job2); JobInfo info3 = scheduler.infos.get(job3); JobInfo info4 = scheduler.infos.get(job4); assertEquals(5, info1.minMaps); assertEquals(5, info2.minMaps); assertEquals(2, info3.minMaps); assertEquals(0, info4.minMaps); assertEquals(5, info1.minReduces); assertEquals(5, info2.minReduces); assertEquals(2, info3.minReduces); assertEquals(0, info4.minReduces); }
/** * Verify the max slots of FIFO pools */ public void testPoolFifoMax() throws Exception { // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); out.println("<pool name=\"pool_a\">"); out.println("<fifo>true</fifo>"); out.println("<maxMaps>12</maxMaps>"); out.println("<maxReduces>12</maxReduces>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); JobInProgress job1 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a"); advanceTime(1L); JobInProgress job2 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a"); advanceTime(2L); JobInProgress job3 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a"); advanceTime(3L); JobInProgress job4 = submitJob(JobStatus.RUNNING, 5, 5, "pool_a"); scheduler.update(); JobInfo info1 = scheduler.infos.get(job1); JobInfo info2 = scheduler.infos.get(job2); JobInfo info3 = scheduler.infos.get(job3); JobInfo info4 = scheduler.infos.get(job4); assertEquals(5, info1.maxMaps); assertEquals(5, info2.maxMaps); assertEquals(2, info3.maxMaps); assertEquals(0, info4.maxMaps); assertEquals(5, info1.maxReduces); assertEquals(5, info2.maxReduces); assertEquals(2, info3.maxReduces); assertEquals(0, info4.maxReduces); }
/** * This test exercises delay scheduling at the node level. We submit a job * with data on rack1.node2 and check that it doesn't get assigned on earlier * nodes. A second job with no locality info should get assigned instead. * * TaskTracker names in this test map to nodes as follows: * - tt1 = rack1.node1 * - tt2 = rack1.node2 * - tt3 = rack2.node1 * - tt4 = rack2.node2 */ public void testDelaySchedulingAtNodeLevel() throws IOException { setUpCluster(2, 2, true); scheduler.assignMultiple = true; JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1", new String[][] { {"rack2.node2"} }, true); JobInfo info1 = scheduler.infos.get(job1); // Advance time before submitting another job j2, to make j1 be ahead // of j2 in the queue deterministically. advanceTime(100); JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 0); // Assign tasks on nodes 1-3 and check that j2 gets them checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1", "attempt_test_0002_m_000001_0 on tt1"); checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2", "attempt_test_0002_m_000003_0 on tt2"); checkAssignment("tt3", "attempt_test_0002_m_000004_0 on tt3", "attempt_test_0002_m_000005_0 on tt3"); // Assign a task on node 4 now and check that j1 gets it. The other slot // on the node should be given to j2 because j1 will be out of tasks. checkAssignment("tt4", "attempt_test_0001_m_000000_0 on tt4", "attempt_test_0002_m_000006_0 on tt4"); // Check that delay scheduling info is properly set assertEquals(info1.lastMapLocalityLevel, LocalityLevel.NODE); assertEquals(info1.timeWaitedForLocalMap, 0); assertEquals(info1.skippedAtLastHeartbeat, false); }
/** * This test submits jobs in two pools, poolA and poolB. None of the * jobs in poolA have maps, but this should not affect their reduce * share. */ public void testPoolWeightsWhenNoMaps() throws Exception { // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); out.println("<pool name=\"poolA\">"); out.println("<weight>2.0</weight>"); out.println("</pool>"); out.println("<pool name=\"poolB\">"); out.println("<weight>1.0</weight>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); // Submit jobs, advancing time in-between to make sure that they are // all submitted at distinct times. JobInProgress job1 = submitJob(JobStatus.RUNNING, 0, 10, "poolA"); JobInfo info1 = scheduler.infos.get(job1); JobInProgress job2 = submitJob(JobStatus.RUNNING, 0, 10, "poolA"); JobInfo info2 = scheduler.infos.get(job2); JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB"); JobInfo info3 = scheduler.infos.get(job3); advanceTime(10); assertEquals(0, info1.mapWeight, 0.01); assertEquals(1.0, info1.reduceWeight, 0.01); assertEquals(0, info2.mapWeight, 0.01); assertEquals(1.0, info2.reduceWeight, 0.01); assertEquals(1.0, info3.mapWeight, 0.01); assertEquals(1.0, info3.reduceWeight, 0.01); assertEquals(0, info1.mapFairShare, 0.01); assertEquals(1.33, info1.reduceFairShare, 0.01); assertEquals(0, info2.mapFairShare, 0.01); assertEquals(1.33, info2.reduceFairShare, 0.01); assertEquals(4, info3.mapFairShare, 0.01); assertEquals(1.33, info3.reduceFairShare, 0.01); }
private boolean isRunnable() { JobInfo info = scheduler.getJobInfo(job); int runState = job.getStatus().getRunState(); return (info != null && info.runnable && runState == JobStatus.RUNNING); }
public void addJob(JobInProgress job) { JobInfo info = scheduler.getJobInfo(job); jobScheds.add(taskType == TaskType.MAP ? info.mapSchedulable : info.reduceSchedulable); }
/** * This test contains two jobs with fewer required tasks than there are slots. * We check that all tasks are assigned, but job 1 gets them first because it * was submitted earlier. */ public void testSmallJobs() throws IOException { JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1); JobInfo info1 = scheduler.infos.get(job1); // Check scheduler variables assertEquals(0, info1.mapSchedulable.getRunningTasks()); assertEquals(0, info1.reduceSchedulable.getRunningTasks()); assertEquals(2, info1.mapSchedulable.getDemand()); assertEquals(1, info1.reduceSchedulable.getDemand()); assertEquals(2.0, info1.mapSchedulable.getFairShare()); assertEquals(1.0, info1.reduceSchedulable.getFairShare()); verifyMetrics(); // Advance time before submitting another job j2, to make j1 run before j2 // deterministically. advanceTime(100); JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 2); JobInfo info2 = scheduler.infos.get(job2); // Check scheduler variables assertEquals(0, info1.mapSchedulable.getRunningTasks()); assertEquals(0, info1.reduceSchedulable.getRunningTasks()); assertEquals(2, info1.mapSchedulable.getDemand()); assertEquals(1, info1.reduceSchedulable.getDemand()); assertEquals(2.0, info1.mapSchedulable.getFairShare()); assertEquals(1.0, info1.reduceSchedulable.getFairShare()); assertEquals(0, info2.mapSchedulable.getRunningTasks()); assertEquals(0, info2.reduceSchedulable.getRunningTasks()); assertEquals(1, info2.mapSchedulable.getDemand()); assertEquals(2, info2.reduceSchedulable.getDemand()); assertEquals(1.0, info2.mapSchedulable.getFairShare()); assertEquals(2.0, info2.reduceSchedulable.getFairShare()); verifyMetrics(); // Assign tasks and check that jobs alternate in filling slots checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2"); assertNull(scheduler.assignTasks(tracker("tt2"))); // Check that the scheduler has started counting the tasks as running // as soon as it launched them. assertEquals(2, info1.mapSchedulable.getRunningTasks()); assertEquals(1, info1.reduceSchedulable.getRunningTasks()); assertEquals(2, info1.mapSchedulable.getDemand()); assertEquals(1, info1.reduceSchedulable.getDemand()); assertEquals(1, info2.mapSchedulable.getRunningTasks()); assertEquals(2, info2.reduceSchedulable.getRunningTasks()); assertEquals(1, info2.mapSchedulable.getDemand()); assertEquals(2, info2.reduceSchedulable.getDemand()); verifyMetrics(); }
/** * This test is identical to testSmallJobs but sets assignMultiple to * true so that multiple tasks can be assigned per heartbeat. */ public void testSmallJobsWithAssignMultiple() throws IOException { setUpCluster(1, 2, true); JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1); JobInfo info1 = scheduler.infos.get(job1); // Check scheduler variables assertEquals(0, info1.mapSchedulable.getRunningTasks()); assertEquals(0, info1.reduceSchedulable.getRunningTasks()); assertEquals(2, info1.mapSchedulable.getDemand()); assertEquals(1, info1.reduceSchedulable.getDemand()); assertEquals(2.0, info1.mapSchedulable.getFairShare()); assertEquals(1.0, info1.reduceSchedulable.getFairShare()); verifyMetrics(); // Advance time before submitting another job j2, to make j1 run before j2 // deterministically. advanceTime(100); JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 2); JobInfo info2 = scheduler.infos.get(job2); // Check scheduler variables assertEquals(0, info1.mapSchedulable.getRunningTasks()); assertEquals(0, info1.reduceSchedulable.getRunningTasks()); assertEquals(2, info1.mapSchedulable.getDemand()); assertEquals(1, info1.reduceSchedulable.getDemand()); assertEquals(2.0, info1.mapSchedulable.getFairShare()); assertEquals(1.0, info1.reduceSchedulable.getFairShare()); assertEquals(0, info2.mapSchedulable.getRunningTasks()); assertEquals(0, info2.reduceSchedulable.getRunningTasks()); assertEquals(1, info2.mapSchedulable.getDemand()); assertEquals(2, info2.reduceSchedulable.getDemand()); assertEquals(1.0, info2.mapSchedulable.getFairShare()); assertEquals(2.0, info2.reduceSchedulable.getFairShare()); verifyMetrics(); // Assign tasks and check that jobs alternate in filling slots checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1", "attempt_test_0001_r_000000_0 on tt1", "attempt_test_0002_m_000000_0 on tt1", "attempt_test_0002_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2", "attempt_test_0002_r_000001_0 on tt2"); assertNull(scheduler.assignTasks(tracker("tt2"))); // Check that the scheduler has started counting the tasks as running // as soon as it launched them. assertEquals(2, info1.mapSchedulable.getRunningTasks()); assertEquals(1, info1.reduceSchedulable.getRunningTasks()); assertEquals(2, info1.mapSchedulable.getDemand()); assertEquals(1, info1.reduceSchedulable.getDemand()); assertEquals(1, info2.mapSchedulable.getRunningTasks()); assertEquals(2, info2.reduceSchedulable.getRunningTasks()); assertEquals(1, info2.mapSchedulable.getDemand()); assertEquals(2, info2.reduceSchedulable.getDemand()); verifyMetrics(); }
/** * We submit two jobs such that one has 2x the priority of the other to * a cluster of 3 nodes, wait for 100 ms, and check that the weights/shares * the high-priority job gets 4 tasks while the normal-priority job gets 2. */ public void testJobsWithPriorities() throws IOException { setUpCluster(1, 3, false); JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10); JobInfo info1 = scheduler.infos.get(job1); JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10); JobInfo info2 = scheduler.infos.get(job2); job2.setPriority(JobPriority.HIGH); scheduler.update(); // Check scheduler variables assertEquals(0, info1.mapSchedulable.getRunningTasks()); assertEquals(0, info1.reduceSchedulable.getRunningTasks()); assertEquals(10, info1.mapSchedulable.getDemand()); assertEquals(10, info1.reduceSchedulable.getDemand()); assertEquals(2.0, info1.mapSchedulable.getFairShare(), 0.1); assertEquals(2.0, info1.reduceSchedulable.getFairShare(), 0.1); assertEquals(0, info2.mapSchedulable.getRunningTasks()); assertEquals(0, info2.reduceSchedulable.getRunningTasks()); assertEquals(10, info2.mapSchedulable.getDemand()); assertEquals(10, info2.reduceSchedulable.getDemand()); assertEquals(4.0, info2.mapSchedulable.getFairShare(), 0.1); assertEquals(4.0, info2.reduceSchedulable.getFairShare(), 0.1); // Advance time advanceTime(100); // Assign tasks and check that j2 gets 2x more tasks than j1. In addition, // whenever the jobs' runningTasks/weight ratios are tied, j1 should get // the new task first because it started first; thus the tasks of each // type should be handed out alternately to 1, 2, 2, 1, 2, 2, etc. System.out.println("HEREEEE"); checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2"); checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3"); checkAssignment("tt3", "attempt_test_0002_r_000002_0 on tt3"); checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3"); checkAssignment("tt3", "attempt_test_0002_r_000003_0 on tt3"); }
/** * This test starts by submitting three large jobs: * - job1 in the default pool, at time 0 * - job2 in poolA, with an allocation of 1 map / 2 reduces, at time 200 * - job3 in poolB, with an allocation of 2 maps / 1 reduce, at time 300 * * We then assign tasks to all slots. The maps should be assigned in the * order job2, job3, job 3, job1 because jobs 3 and 2 have guaranteed slots * (1 and 2 respectively). Job2 comes before job3 when they are both at 0 * slots because it has an earlier start time. In a similar manner, * reduces should be assigned as job2, job3, job2, job1. */ public void testLargeJobsWithPools() throws Exception { // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); // Give pool A a minimum of 1 map, 2 reduces out.println("<pool name=\"poolA\">"); out.println("<minMaps>1</minMaps>"); out.println("<minReduces>2</minReduces>"); out.println("</pool>"); // Give pool B a minimum of 2 maps, 1 reduce out.println("<pool name=\"poolB\">"); out.println("<minMaps>2</minMaps>"); out.println("<minReduces>1</minReduces>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); Pool defaultPool = scheduler.getPoolManager().getPool("default"); Pool poolA = scheduler.getPoolManager().getPool("poolA"); Pool poolB = scheduler.getPoolManager().getPool("poolB"); JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10); JobInfo info1 = scheduler.infos.get(job1); // Check scheduler variables assertEquals(0, info1.mapSchedulable.getRunningTasks()); assertEquals(0, info1.reduceSchedulable.getRunningTasks()); assertEquals(10, info1.mapSchedulable.getDemand()); assertEquals(10, info1.reduceSchedulable.getDemand()); assertEquals(4.0, info1.mapSchedulable.getFairShare()); assertEquals(4.0, info1.reduceSchedulable.getFairShare()); // Advance time 200ms and submit jobs 2 and 3 advanceTime(200); JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA"); JobInfo info2 = scheduler.infos.get(job2); advanceTime(100); JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB"); JobInfo info3 = scheduler.infos.get(job3); // Check that minimum and fair shares have been allocated assertEquals(0, defaultPool.getMapSchedulable().getMinShare()); assertEquals(0, defaultPool.getReduceSchedulable().getMinShare()); assertEquals(1.0, info1.mapSchedulable.getFairShare()); assertEquals(1.0, info1.reduceSchedulable.getFairShare()); assertEquals(1, poolA.getMapSchedulable().getMinShare()); assertEquals(2, poolA.getReduceSchedulable().getMinShare()); assertEquals(1.0, info2.mapSchedulable.getFairShare()); assertEquals(2.0, info2.reduceSchedulable.getFairShare()); assertEquals(2, poolB.getMapSchedulable().getMinShare()); assertEquals(1, poolB.getReduceSchedulable().getMinShare()); assertEquals(2.0, info3.mapSchedulable.getFairShare()); assertEquals(1.0, info3.reduceSchedulable.getFairShare()); // Advance time 100ms advanceTime(100); // Assign tasks and check that slots are first given to needy jobs checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000000_0 on tt2"); }
/** * This test starts by submitting three large jobs: * - job1 in the default pool, at time 0 * - job2 in poolA, with an allocation of 2 maps / 2 reduces, at time 200 * - job3 in poolA, with an allocation of 2 maps / 2 reduces, at time 300 * * After this, we start assigning tasks. The first two tasks of each type * should be assigned to job2 and job3 since they are in a pool with an * allocation guarantee, but the next two slots should be assigned to job 3 * because the pool will no longer be needy. */ public void testLargeJobsWithExcessCapacity() throws Exception { // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); // Give pool A a minimum of 2 maps, 2 reduces out.println("<pool name=\"poolA\">"); out.println("<minMaps>2</minMaps>"); out.println("<minReduces>2</minReduces>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); Pool poolA = scheduler.getPoolManager().getPool("poolA"); JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10); JobInfo info1 = scheduler.infos.get(job1); // Check scheduler variables assertEquals(0, info1.mapSchedulable.getRunningTasks()); assertEquals(0, info1.reduceSchedulable.getRunningTasks()); assertEquals(10, info1.mapSchedulable.getDemand()); assertEquals(10, info1.reduceSchedulable.getDemand()); assertEquals(4.0, info1.mapSchedulable.getFairShare()); assertEquals(4.0, info1.reduceSchedulable.getFairShare()); // Advance time 200ms and submit job 2 advanceTime(200); JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA"); JobInfo info2 = scheduler.infos.get(job2); // Check that minimum and fair shares have been allocated assertEquals(2, poolA.getMapSchedulable().getMinShare()); assertEquals(2, poolA.getReduceSchedulable().getMinShare()); assertEquals(2.0, info1.mapSchedulable.getFairShare()); assertEquals(2.0, info1.reduceSchedulable.getFairShare()); assertEquals(2.0, info2.mapSchedulable.getFairShare()); assertEquals(2.0, info2.reduceSchedulable.getFairShare()); // Advance time 100ms and submit job 3 advanceTime(100); JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolA"); JobInfo info3 = scheduler.infos.get(job3); // Check that minimum and fair shares have been allocated assertEquals(2, poolA.getMapSchedulable().getMinShare()); assertEquals(2, poolA.getReduceSchedulable().getMinShare()); assertEquals(2.0, info1.mapSchedulable.getFairShare()); assertEquals(2.0, info1.reduceSchedulable.getFairShare()); assertEquals(1.0, info2.mapSchedulable.getFairShare()); assertEquals(1.0, info2.reduceSchedulable.getFairShare()); assertEquals(1.0, info3.mapSchedulable.getFairShare()); assertEquals(1.0, info3.reduceSchedulable.getFairShare()); // Advance time advanceTime(100); // Assign tasks and check that slots are first given to needy jobs, but // that job 1 gets two tasks after due to having a larger share. checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000000_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2"); }
/** * A copy of testLargeJobsWithExcessCapacity that enables assigning multiple * tasks per heartbeat. Results should match testLargeJobsWithExcessCapacity. */ public void testLargeJobsWithExcessCapacityAndAssignMultiple() throws Exception { setUpCluster(1, 2, true); // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); // Give pool A a minimum of 2 maps, 2 reduces out.println("<pool name=\"poolA\">"); out.println("<minMaps>2</minMaps>"); out.println("<minReduces>2</minReduces>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); Pool poolA = scheduler.getPoolManager().getPool("poolA"); JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10); JobInfo info1 = scheduler.infos.get(job1); // Check scheduler variables assertEquals(0, info1.mapSchedulable.getRunningTasks()); assertEquals(0, info1.reduceSchedulable.getRunningTasks()); assertEquals(10, info1.mapSchedulable.getDemand()); assertEquals(10, info1.reduceSchedulable.getDemand()); assertEquals(4.0, info1.mapSchedulable.getFairShare()); assertEquals(4.0, info1.reduceSchedulable.getFairShare()); // Advance time 200ms and submit job 2 advanceTime(200); JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA"); JobInfo info2 = scheduler.infos.get(job2); // Check that minimum and fair shares have been allocated assertEquals(2, poolA.getMapSchedulable().getMinShare()); assertEquals(2, poolA.getReduceSchedulable().getMinShare()); assertEquals(2.0, info1.mapSchedulable.getFairShare()); assertEquals(2.0, info1.reduceSchedulable.getFairShare()); assertEquals(2.0, info2.mapSchedulable.getFairShare()); assertEquals(2.0, info2.reduceSchedulable.getFairShare()); // Advance time 100ms and submit job 3 advanceTime(100); JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolA"); JobInfo info3 = scheduler.infos.get(job3); // Check that minimum and fair shares have been allocated assertEquals(2, poolA.getMapSchedulable().getMinShare()); assertEquals(2, poolA.getReduceSchedulable().getMinShare()); assertEquals(2.0, info1.mapSchedulable.getFairShare()); assertEquals(2.0, info1.reduceSchedulable.getFairShare()); assertEquals(1.0, info2.mapSchedulable.getFairShare()); assertEquals(1.0, info2.reduceSchedulable.getFairShare()); assertEquals(1.0, info3.mapSchedulable.getFairShare()); assertEquals(1.0, info3.reduceSchedulable.getFairShare()); // Advance time advanceTime(100); // Assign tasks and check that slots are first given to needy jobs, but // that job 1 gets two tasks after due to having a larger share. checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1", "attempt_test_0002_r_000000_0 on tt1", "attempt_test_0003_m_000000_0 on tt1", "attempt_test_0003_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2", "attempt_test_0001_r_000000_0 on tt2", "attempt_test_0001_m_000001_0 on tt2", "attempt_test_0001_r_000001_0 on tt2"); }
/** * This test starts by submitting two jobs at time 0: * - job1 in the default pool * - job2, with 1 map and 1 reduce, in poolA, which has an alloc of 4 * maps and 4 reduces * * When we assign the slots, job2 should only get 1 of each type of task. */ public void testSmallJobInLargePool() throws Exception { // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); // Give pool A a minimum of 4 maps, 4 reduces out.println("<pool name=\"poolA\">"); out.println("<minMaps>4</minMaps>"); out.println("<minReduces>4</minReduces>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10); JobInfo info1 = scheduler.infos.get(job1); JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 1, "poolA"); JobInfo info2 = scheduler.infos.get(job2); // Check scheduler variables assertEquals(0, info1.mapSchedulable.getRunningTasks()); assertEquals(0, info1.reduceSchedulable.getRunningTasks()); assertEquals(10, info1.mapSchedulable.getDemand()); assertEquals(10, info1.reduceSchedulable.getDemand()); assertEquals(3.0, info1.mapSchedulable.getFairShare()); assertEquals(3.0, info1.reduceSchedulable.getFairShare()); assertEquals(0, info2.mapSchedulable.getRunningTasks()); assertEquals(0, info2.reduceSchedulable.getRunningTasks()); assertEquals(1, info2.mapSchedulable.getDemand()); assertEquals(1, info2.reduceSchedulable.getDemand()); assertEquals(1.0, info2.mapSchedulable.getFairShare()); assertEquals(1.0, info2.reduceSchedulable.getFairShare()); // Assign tasks and check that slots are first given to needy jobs checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2"); }
/** * This test starts by submitting four jobs in the default pool. However, the * maxRunningJobs limit for this pool has been set to two. We should see only * the first two jobs get scheduled, each with half the total slots. */ public void testPoolMaxJobs() throws Exception { // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); out.println("<pool name=\"default\">"); out.println("<maxRunningJobs>2</maxRunningJobs>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); // Submit jobs, advancing time in-between to make sure that they are // all submitted at distinct times. JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10); JobInfo info1 = scheduler.infos.get(job1); advanceTime(10); JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10); JobInfo info2 = scheduler.infos.get(job2); advanceTime(10); JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10); JobInfo info3 = scheduler.infos.get(job3); advanceTime(10); JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10); JobInfo info4 = scheduler.infos.get(job4); // Check scheduler variables assertEquals(2.0, info1.mapSchedulable.getFairShare()); assertEquals(2.0, info1.reduceSchedulable.getFairShare()); assertEquals(2.0, info2.mapSchedulable.getFairShare()); assertEquals(2.0, info2.reduceSchedulable.getFairShare()); assertEquals(0.0, info3.mapSchedulable.getFairShare()); assertEquals(0.0, info3.reduceSchedulable.getFairShare()); assertEquals(0.0, info4.mapSchedulable.getFairShare()); assertEquals(0.0, info4.reduceSchedulable.getFairShare()); // Assign tasks and check that only jobs 1 and 2 get them checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1"); advanceTime(100); checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2"); }
/** * This test starts by submitting two jobs by user "user1" to the default * pool, and two jobs by "user2". We set user1's job limit to 1. We should * see one job from user1 and two from user2. */ public void testUserMaxJobs() throws Exception { // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); out.println("<user name=\"user1\">"); out.println("<maxRunningJobs>1</maxRunningJobs>"); out.println("</user>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); // Submit jobs, advancing time in-between to make sure that they are // all submitted at distinct times. JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10); job1.getJobConf().set("user.name", "user1"); JobInfo info1 = scheduler.infos.get(job1); advanceTime(10); JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10); job2.getJobConf().set("user.name", "user1"); JobInfo info2 = scheduler.infos.get(job2); advanceTime(10); JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10); job3.getJobConf().set("user.name", "user2"); JobInfo info3 = scheduler.infos.get(job3); advanceTime(10); JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10); job4.getJobConf().set("user.name", "user2"); JobInfo info4 = scheduler.infos.get(job4); // Check scheduler variables assertEquals(1.33, info1.mapSchedulable.getFairShare(), 0.1); assertEquals(1.33, info1.reduceSchedulable.getFairShare(), 0.1); assertEquals(0.0, info2.mapSchedulable.getFairShare()); assertEquals(0.0, info2.reduceSchedulable.getFairShare()); assertEquals(1.33, info3.mapSchedulable.getFairShare(), 0.1); assertEquals(1.33, info3.reduceSchedulable.getFairShare(), 0.1); assertEquals(1.33, info4.mapSchedulable.getFairShare(), 0.1); assertEquals(1.33, info4.reduceSchedulable.getFairShare(), 0.1); // Assign tasks and check that slots are given only to jobs 1, 3 and 4 checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1"); advanceTime(100); checkAssignment("tt2", "attempt_test_0004_m_000000_0 on tt2"); checkAssignment("tt2", "attempt_test_0004_r_000000_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2"); }
/** * This test submits jobs in three pools: poolA, which has a weight * of 2.0; poolB, which has a weight of 0.5; and the default pool, which * should have a weight of 1.0. It then checks that the map and reduce * fair shares are given out accordingly. We then submit a second job to * pool B and check that each gets half of the pool (weight of 0.25). */ public void testPoolWeights() throws Exception { // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); out.println("<pool name=\"poolA\">"); out.println("<weight>2.0</weight>"); out.println("</pool>"); out.println("<pool name=\"poolB\">"); out.println("<weight>0.5</weight>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); // Submit jobs, advancing time in-between to make sure that they are // all submitted at distinct times. JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10); JobInfo info1 = scheduler.infos.get(job1); JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA"); JobInfo info2 = scheduler.infos.get(job2); JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB"); JobInfo info3 = scheduler.infos.get(job3); advanceTime(10); assertEquals(1.14, info1.mapSchedulable.getFairShare(), 0.01); assertEquals(1.14, info1.reduceSchedulable.getFairShare(), 0.01); assertEquals(2.28, info2.mapSchedulable.getFairShare(), 0.01); assertEquals(2.28, info2.reduceSchedulable.getFairShare(), 0.01); assertEquals(0.57, info3.mapSchedulable.getFairShare(), 0.01); assertEquals(0.57, info3.reduceSchedulable.getFairShare(), 0.01); JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10, "poolB"); JobInfo info4 = scheduler.infos.get(job4); advanceTime(10); assertEquals(1.14, info1.mapSchedulable.getFairShare(), 0.01); assertEquals(1.14, info1.reduceSchedulable.getFairShare(), 0.01); assertEquals(2.28, info2.mapSchedulable.getFairShare(), 0.01); assertEquals(2.28, info2.reduceSchedulable.getFairShare(), 0.01); assertEquals(0.28, info3.mapSchedulable.getFairShare(), 0.01); assertEquals(0.28, info3.reduceSchedulable.getFairShare(), 0.01); assertEquals(0.28, info4.mapSchedulable.getFairShare(), 0.01); assertEquals(0.28, info4.reduceSchedulable.getFairShare(), 0.01); verifyMetrics(); }
/** * This test submits jobs in two pools, poolA and poolB. None of the * jobs in poolA have maps, but this should not affect their reduce * share. */ public void testPoolWeightsWhenNoMaps() throws Exception { // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); out.println("<pool name=\"poolA\">"); out.println("<weight>2.0</weight>"); out.println("</pool>"); out.println("<pool name=\"poolB\">"); out.println("<weight>1.0</weight>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); // Submit jobs, advancing time in-between to make sure that they are // all submitted at distinct times. JobInProgress job1 = submitJob(JobStatus.RUNNING, 0, 10, "poolA"); JobInfo info1 = scheduler.infos.get(job1); JobInProgress job2 = submitJob(JobStatus.RUNNING, 0, 10, "poolA"); JobInfo info2 = scheduler.infos.get(job2); JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB"); JobInfo info3 = scheduler.infos.get(job3); advanceTime(10); /* assertEquals(0, info1.mapWeight, 0.01); assertEquals(1.0, info1.reduceWeight, 0.01); assertEquals(0, info2.mapWeight, 0.01); assertEquals(1.0, info2.reduceWeight, 0.01); assertEquals(1.0, info3.mapWeight, 0.01); assertEquals(1.0, info3.reduceWeight, 0.01); */ assertEquals(0, info1.mapSchedulable.getFairShare(), 0.01); assertEquals(1.33, info1.reduceSchedulable.getFairShare(), 0.01); assertEquals(0, info2.mapSchedulable.getFairShare(), 0.01); assertEquals(1.33, info2.reduceSchedulable.getFairShare(), 0.01); assertEquals(4, info3.mapSchedulable.getFairShare(), 0.01); assertEquals(1.33, info3.reduceSchedulable.getFairShare(), 0.01); }
/** * This test submits a job that takes all 2 slots in a pool has both a min * share of 2 slots with minshare timeout of 5s, and then a second job in * default pool with a fair share timeout of 5s. After 60 seconds, this pool * will be starved of fair share (2 slots of each type), and we test that it * does not kill more than 2 tasks of each type. */ public void testFairSharePreemptionWithShortTimeout() throws Exception { // Enable preemption in scheduler scheduler.preemptionEnabled = true; // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); out.println("<fairSharePreemptionTimeout>5</fairSharePreemptionTimeout>"); out.println("<pool name=\"pool1\">"); out.println("<minMaps>2</minMaps>"); out.println("<minReduces>2</minReduces>"); out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); Pool pool1 = scheduler.getPoolManager().getPool("pool1"); Pool defaultPool = scheduler.getPoolManager().getPool("default"); // Submit job 1 and assign all slots to it. Sleep a bit before assigning // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first. JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10, "pool1"); JobInfo info1 = scheduler.infos.get(job1); checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1"); advanceTime(100); checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2"); advanceTime(10000); assertEquals(4, info1.mapSchedulable.getRunningTasks()); assertEquals(4, info1.reduceSchedulable.getRunningTasks()); assertEquals(4.0, info1.mapSchedulable.getFairShare()); assertEquals(4.0, info1.reduceSchedulable.getFairShare()); // Ten seconds later, submit job 2. JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "default"); // Advance time by 6 seconds without update the scheduler. // This simulates the time gap between update and task preemption. clock.advance(6000); assertEquals(4, info1.mapSchedulable.getRunningTasks()); assertEquals(4, info1.reduceSchedulable.getRunningTasks()); assertEquals(2.0, info1.mapSchedulable.getFairShare()); assertEquals(2.0, info1.reduceSchedulable.getFairShare()); assertEquals(0, scheduler.tasksToPreempt(pool1.getMapSchedulable(), clock.getTime())); assertEquals(0, scheduler.tasksToPreempt(pool1.getReduceSchedulable(), clock.getTime())); assertEquals(2, scheduler.tasksToPreempt(defaultPool.getMapSchedulable(), clock.getTime())); assertEquals(2, scheduler.tasksToPreempt(defaultPool.getReduceSchedulable(), clock.getTime())); // Test that the tasks actually get preempted and we can assign new ones scheduler.preemptTasksIfNecessary(); scheduler.update(); assertEquals(2, job1.runningMaps()); assertEquals(2, job1.runningReduces()); checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2"); assertNull(scheduler.assignTasks(tracker("tt1"))); assertNull(scheduler.assignTasks(tracker("tt2"))); }
/** * This test contains two jobs with fewer required tasks than there are slots. * We check that all tasks are assigned, but job 1 gets them first because it * was submitted earlier. */ public void testSmallJobs() throws IOException { JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1); JobInfo info1 = scheduler.infos.get(job1); // Check scheduler variables assertEquals(0, info1.runningMaps); assertEquals(0, info1.runningReduces); assertEquals(2, info1.neededMaps); assertEquals(1, info1.neededReduces); assertEquals(0, info1.mapDeficit, ALLOW_DEFICIT_ERROR); assertEquals(0, info1.reduceDeficit, ALLOW_DEFICIT_ERROR); assertEquals(2.0, info1.mapFairShare, ALLOW_ERROR); assertEquals(1.0, info1.reduceFairShare, ALLOW_ERROR); // Advance time before submitting another job j2, to make j1 run before j2 // deterministically. advanceTime(100); JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 2); JobInfo info2 = scheduler.infos.get(job2); // Check scheduler variables; the fair shares should now have been allocated // equally between j1 and j2, but j1 should have (2 slots)*(100 ms) map // deficit and (1 slots) * (100 ms) reduce deficit assertEquals(0, info1.runningMaps); assertEquals(0, info1.runningReduces); assertEquals(2, info1.neededMaps); assertEquals(1, info1.neededReduces); assertEquals(200, info1.mapDeficit, ALLOW_DEFICIT_ERROR); assertEquals(100, info1.reduceDeficit, ALLOW_DEFICIT_ERROR); assertEquals(2.0, info1.mapFairShare, ALLOW_ERROR); assertEquals(1.0, info1.reduceFairShare, ALLOW_ERROR); assertEquals(0, info2.runningMaps); assertEquals(0, info2.runningReduces); assertEquals(1, info2.neededMaps); assertEquals(2, info2.neededReduces); assertEquals(0, info2.mapDeficit, ALLOW_DEFICIT_ERROR); assertEquals(0, info2.reduceDeficit, ALLOW_DEFICIT_ERROR); assertEquals(1.0, info2.mapFairShare, ALLOW_ERROR); assertEquals(2.0, info2.reduceFairShare, ALLOW_ERROR); // Assign tasks and check that all slots are filled with j1, then j2 checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2"); assertNull(scheduler.assignTasks(tracker("tt2"))); // Check that the scheduler has started counting the tasks as running // as soon as it launched them. assertEquals(2, info1.runningMaps); assertEquals(1, info1.runningReduces); assertEquals(0, info1.neededMaps); assertEquals(0, info1.neededReduces); assertEquals(1, info2.runningMaps); assertEquals(2, info2.runningReduces); assertEquals(0, info2.neededMaps); assertEquals(0, info2.neededReduces); }
/** * We submit two jobs such that one has 2x the priority of the other, wait * for 100 ms, and check that the weights/deficits are okay and that the * tasks all go to the high-priority job. */ public void testJobsWithPriorities() throws IOException { JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10); JobInfo info1 = scheduler.infos.get(job1); JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10); JobInfo info2 = scheduler.infos.get(job2); job2.setPriority(JobPriority.HIGH); scheduler.update(); // Check scheduler variables assertEquals(0, info1.runningMaps); assertEquals(0, info1.runningReduces); assertEquals(10, info1.neededMaps); assertEquals(10, info1.neededReduces); assertEquals(0, info1.mapDeficit, ALLOW_DEFICIT_ERROR); assertEquals(0, info1.reduceDeficit, ALLOW_DEFICIT_ERROR); assertEquals(1.33, info1.mapFairShare, ALLOW_ERROR); assertEquals(1.33, info1.reduceFairShare, ALLOW_ERROR); assertEquals(0, info2.runningMaps); assertEquals(0, info2.runningReduces); assertEquals(10, info2.neededMaps); assertEquals(10, info2.neededReduces); assertEquals(0, info2.mapDeficit, ALLOW_DEFICIT_ERROR); assertEquals(0, info2.reduceDeficit, ALLOW_DEFICIT_ERROR); assertEquals(2.66, info2.mapFairShare, ALLOW_ERROR); assertEquals(2.66, info2.reduceFairShare, ALLOW_ERROR); // Advance time and check deficits advanceTime(100); assertEquals(133, info1.mapDeficit, 1.0); assertEquals(133, info1.reduceDeficit, 1.0); assertEquals(266, info2.mapDeficit, 1.0); assertEquals(266, info2.reduceDeficit, 1.0); // Assign tasks and check that all slots are filled with j1, then j2 checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1"); checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000002_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2"); }
/** * This test starts by submitting two jobs at time 0: * - job1 in the default pool * - job2, with 1 map and 1 reduce, in pool_a, which has an alloc of 4 * maps and 4 reduces * * When we assign the slots, job2 should only get 1 of each type of task. * * The fair share for job 2 should be 2.0 however, because even though it is * running only one task, it accumulates deficit in case it will have failures * or need speculative tasks later. (TODO: This may not be a good policy.) */ public void testSmallJobInLargePool() throws Exception { // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); // Give pool A a minimum of 4 maps, 4 reduces out.println("<pool name=\"pool_a\">"); out.println("<minMaps>4</minMaps>"); out.println("<minReduces>4</minReduces>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10); JobInfo info1 = scheduler.infos.get(job1); JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 1, "pool_a"); JobInfo info2 = scheduler.infos.get(job2); // Check scheduler variables assertEquals(0, info1.runningMaps); assertEquals(0, info1.runningReduces); assertEquals(10, info1.neededMaps); assertEquals(10, info1.neededReduces); assertEquals(0, info1.mapDeficit, ALLOW_DEFICIT_ERROR); assertEquals(0, info1.reduceDeficit, ALLOW_DEFICIT_ERROR); assertEquals(3.0, info1.mapFairShare, ALLOW_ERROR); assertEquals(3.0, info1.reduceFairShare, ALLOW_ERROR); assertEquals(0, info2.runningMaps); assertEquals(0, info2.runningReduces); assertEquals(1, info2.neededMaps); assertEquals(1, info2.neededReduces); assertEquals(0, info2.mapDeficit, ALLOW_DEFICIT_ERROR); assertEquals(0, info2.reduceDeficit, ALLOW_DEFICIT_ERROR); assertEquals(1.0, info2.mapFairShare, ALLOW_ERROR); assertEquals(1.0, info2.reduceFairShare, ALLOW_ERROR); // Assign tasks and check that slots are first given to needy jobs checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2"); }
/** * This test starts by submitting four jobs in the default pool. However, the * maxRunningJobs limit for this pool has been set to two. We should see only * the first two jobs get scheduled, each with half the total slots. */ public void testPoolMaxJobs() throws Exception { // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); out.println("<pool name=\"default\">"); out.println("<maxRunningJobs>2</maxRunningJobs>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); // Submit jobs, advancing time in-between to make sure that they are // all submitted at distinct times. JobInProgress job1 = submitJobNoInitialization(JobStatus.PREP, 10, 10); JobInfo info1 = scheduler.infos.get(job1); advanceTime(10); JobInProgress job2 = submitJobNoInitialization(JobStatus.PREP, 10, 10); JobInfo info2 = scheduler.infos.get(job2); advanceTime(10); JobInProgress job3 = submitJobNoInitialization(JobStatus.PREP, 10, 10); JobInfo info3 = scheduler.infos.get(job3); advanceTime(10); JobInProgress job4 = submitJobNoInitialization(JobStatus.PREP, 10, 10); JobInfo info4 = scheduler.infos.get(job4); Thread.sleep(1000L); // Let JobInitializaer to finish the work // Only two of the jobs should be initialized. assertTrue(((FakeJobInProgress)job1).isInitialized()); assertTrue(((FakeJobInProgress)job2).isInitialized()); assertFalse(((FakeJobInProgress)job3).isInitialized()); assertFalse(((FakeJobInProgress)job4).isInitialized()); // Check scheduler variables assertEquals(2.0, info1.mapFairShare, ALLOW_ERROR); assertEquals(2.0, info1.reduceFairShare, ALLOW_ERROR); assertEquals(2.0, info2.mapFairShare, ALLOW_ERROR); assertEquals(2.0, info2.reduceFairShare, ALLOW_ERROR); assertEquals(0.0, info3.mapFairShare, ALLOW_ERROR); assertEquals(0.0, info3.reduceFairShare, ALLOW_ERROR); assertEquals(0.0, info4.mapFairShare, ALLOW_ERROR); assertEquals(0.0, info4.reduceFairShare, ALLOW_ERROR); // Assign tasks and check that slots are first to jobs 1 and 2 checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1"); advanceTime(100); checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2"); }
/** * This test configures a pool pool_a, tries the submit a job * before and after blacklisting of pool_a. */ public void testpool_blacklisted() throws Exception { // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); out.println("<pool name=\"default\">"); out.println("<maxTotalInitedTasks>100</maxTotalInitedTasks>"); out.println("</pool>"); out.println("<pool name=\"pool_a\">"); out.println("<maxTotalInitedTasks>0</maxTotalInitedTasks>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); // Submit a job to the not blacklisted pool_a JobInProgress job1 = submitJobNoInitialization(JobStatus.PREP, 10, 10, "pool_a"); JobInfo info1 = scheduler.infos.get(job1); advanceTime(10); Thread.sleep(1000L); // Let JobInitializaer to finish the work out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); out.println("<pool name=\"default\">"); out.println("<maxTotalInitedTasks>100</maxTotalInitedTasks>"); out.println("</pool>"); out.println("<pool name=\"pool_a\">"); out.println("<maxTotalInitedTasks>0</maxTotalInitedTasks>"); out.println("<blacklisted/>"); out.println("</pool>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); // Submit a job to the newly blacklisted pool_a JobInProgress job2 = submitJobNoInitialization(JobStatus.PREP, 10, 10, "pool_a"); JobInfo info2 = scheduler.infos.get(job2); advanceTime(10); Thread.sleep(1000L); // Let JobInitializaer to finish the work // pool_a is not blacklisted, so goes to pool_a assertEquals(info1.poolName, "pool_a"); // pool_a is blacklisted, so goes to default assertEquals(info2.poolName, "default"); }
/** * This test starts by submitting two jobs by user "user1" to the default * pool, and two jobs by "user2". We set user1's job limit to 1. We should * see one job from user1 and two from user2. */ public void testUserMaxJobs() throws Exception { // Set up pools file PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); out.println("<user name=\"user1\">"); out.println("<maxRunningJobs>1</maxRunningJobs>"); out.println("</user>"); out.println("</allocations>"); out.close(); scheduler.getPoolManager().reloadAllocs(); // Submit jobs, advancing time in-between to make sure that they are // all submitted at distinct times. JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10); job1.getJobConf().set("user.name", "user1"); JobInfo info1 = scheduler.infos.get(job1); advanceTime(10); JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10); job2.getJobConf().set("user.name", "user1"); JobInfo info2 = scheduler.infos.get(job2); advanceTime(10); JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10); job3.getJobConf().set("user.name", "user2"); JobInfo info3 = scheduler.infos.get(job3); advanceTime(10); JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10); job4.getJobConf().set("user.name", "user2"); JobInfo info4 = scheduler.infos.get(job4); // Check scheduler variables assertEquals(1.33, info1.mapFairShare, ALLOW_ERROR); assertEquals(1.33, info1.reduceFairShare, ALLOW_ERROR); assertEquals(0.0, info2.mapFairShare, ALLOW_ERROR); assertEquals(0.0, info2.reduceFairShare, ALLOW_ERROR); assertEquals(1.33, info3.mapFairShare, ALLOW_ERROR); assertEquals(1.33, info3.reduceFairShare, ALLOW_ERROR); assertEquals(1.33, info4.mapFairShare, ALLOW_ERROR); assertEquals(1.33, info4.reduceFairShare, ALLOW_ERROR); // Assign tasks and check that slots are first to jobs 1 and 3 checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1"); advanceTime(100); checkAssignment("tt2", "attempt_test_0003_m_000000_0 on tt2"); checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0003_r_000000_0 on tt2"); checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2"); }