/** * Run a test with a misconfigured number of mappers. * Expect failure. */ @Test public void testInvalidMultiMapParallelism() throws Exception { Job job = Job.getInstance(); Path inputPath = createMultiMapsInput(); Path outputPath = getOutputPath(); Configuration conf = new Configuration(); FileSystem fs = FileSystem.getLocal(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } job.setMapperClass(StressMapper.class); job.setReducerClass(CountingReducer.class); job.setNumReduceTasks(1); LocalJobRunner.setLocalMaxRunningMaps(job, -6); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); boolean success = job.waitForCompletion(true); assertFalse("Job succeeded somehow", success); }
@Test public void testClusterWithLocalClientProvider() throws Exception { Configuration conf = new Configuration(); try { conf.set(MRConfig.FRAMEWORK_NAME, "incorrect"); new Cluster(conf); fail("Cluster should not be initialized with incorrect framework name"); } catch (IOException e) { } conf.set(MRConfig.FRAMEWORK_NAME, "local"); Cluster cluster = new Cluster(conf); assertTrue(cluster.getClient() instanceof LocalJobRunner); cluster.close(); }
/** * Run a test with a misconfigured number of mappers. * Expect failure. */ @Test public void testInvalidMultiMapParallelism() throws Exception { Job job = new Job(); Path inputPath = createMultiMapsInput(); Path outputPath = getOutputPath(); Configuration conf = new Configuration(); FileSystem fs = FileSystem.getLocal(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } job.setMapperClass(StressMapper.class); job.setReducerClass(CountingReducer.class); job.setNumReduceTasks(1); LocalJobRunner.setLocalMaxRunningMaps(job, -6); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); boolean success = job.waitForCompletion(true); assertFalse("Job succeeded somehow", success); }
@Test public void testClusterWithLocalClientProvider() throws Exception { Configuration conf = new Configuration(); conf.set(MRConfig.FRAMEWORK_NAME, "local"); Cluster cluster = new Cluster(conf); assertTrue(cluster.getClient() instanceof LocalJobRunner); cluster.close(); }
/** * Run a test which creates a SequenceMapper / IdentityReducer * job over a set of generated number files. */ private void doMultiReducerTest(int numMaps, int numReduces, int parallelMaps, int parallelReduces) throws Exception { Path in = getNumberDirPath(); Path out = getOutputPath(); // Clear data from any previous tests. Configuration conf = new Configuration(); FileSystem fs = FileSystem.getLocal(conf); if (fs.exists(out)) { fs.delete(out, true); } if (fs.exists(in)) { fs.delete(in, true); } for (int i = 0; i < numMaps; i++) { makeNumberFile(i, 100); } Job job = Job.getInstance(); job.setNumReduceTasks(numReduces); job.setMapperClass(SequenceMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, in); FileOutputFormat.setOutputPath(job, out); LocalJobRunner.setLocalMaxRunningMaps(job, parallelMaps); LocalJobRunner.setLocalMaxRunningReduces(job, parallelReduces); boolean result = job.waitForCompletion(true); assertTrue("Job failed!!", result); verifyNumberJob(numMaps); }
/** * Run a test with several mappers in parallel, operating at different * speeds. Verify that the correct amount of output is created. */ @Test public void testMultiMaps() throws Exception { Job job = new Job(); Path inputPath = createMultiMapsInput(); Path outputPath = getOutputPath(); Configuration conf = new Configuration(); FileSystem fs = FileSystem.getLocal(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } job.setMapperClass(StressMapper.class); job.setReducerClass(CountingReducer.class); job.setNumReduceTasks(1); LocalJobRunner.setLocalMaxRunningMaps(job, 6); job.getConfiguration().set("io.sort.record.pct", "0.50"); job.getConfiguration().set("io.sort.mb", "25"); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); job.waitForCompletion(true); verifyOutput(outputPath); }
/** * Run a test with several mappers in parallel, operating at different * speeds. Verify that the correct amount of output is created. */ @Test public void testMultiMaps() throws Exception { Path inputPath = createMultiMapsInput(); Path outputPath = getOutputPath(); Configuration conf = new Configuration(); conf.setBoolean("mapred.localrunner.sequential", false); conf.setBoolean("mapred.localrunner.debug", true); conf.setInt(LocalJobRunner.LOCAL_RUNNER_SLOTS, 6); conf.set(JobConf.MAPRED_TASK_JAVA_OPTS, "-DtestProperty=testValue"); Job job = new Job(conf); job.setMapperClass(StressMapper.class); job.setReducerClass(CountingReducer.class); job.setNumReduceTasks(1); job.getConfiguration().set("io.sort.record.pct", "0.50"); job.getConfiguration().set("io.sort.mb", "25"); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); FileSystem fs = FileSystem.getLocal(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } job.waitForCompletion(true); TaskCompletionEvent[] taskCompletionEvents = job.getTaskCompletionEvents(0); assertTrue(taskCompletionEvents.length == 6); for(int i = 0; i < 6; i++) { assertTrue(taskCompletionEvents[i].getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED); } verifyOutput(outputPath); }
/** * Run a test with several mappers in parallel, operating at different * speeds. Verify that the correct amount of output is created. */ @Test public void testMultiMaps() throws Exception { Path inputPath = createMultiMapsInput(); Path outputPath = getOutputPath(); Configuration conf = new Configuration(); conf.setBoolean("mapred.localrunner.sequential", false); conf.setBoolean("mapred.localrunner.debug", true); conf.setInt(LocalJobRunner.LOCAL_RUNNER_SLOTS, 6); conf.set(JobConf.MAPRED_TASK_JAVA_OPTS, "-DtestProperty=testValue"); Job job = new Job(conf); job.setMapperClass(StressMapper.class); job.setReducerClass(CountingReducer.class); job.setNumReduceTasks(1); job.getConfiguration().set("io.sort.record.pct", "0.50"); job.getConfiguration().set("io.sort.mb", "25"); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); FileSystem fs = FileSystem.getLocal(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } job.waitForCompletion(true); verifyOutput(outputPath); }
/** * Close the <code>Cluster</code>. */ public synchronized void close() throws IOException { if (!(client instanceof LocalJobRunner)) { RPC.stopProxy(client); } }