@Test public void testCount() throws Exception { createTextInputFile(); JobClient.runJob(createJobConf()); Path[] outputFiles = FileUtil.stat2Paths(getFileSystem().listStatus( output, new OutputLogFilter())); Assert.assertEquals(1, outputFiles.length); InputStream is = getFileSystem().open(outputFiles[0]); BufferedReader reader = new BufferedReader(new InputStreamReader(is)); Assert.assertEquals("a\t2", reader.readLine()); Assert.assertEquals("b\t1", reader.readLine()); Assert.assertNull(reader.readLine()); reader.close(); }
public static void main(String[] args) throws IOException { // // setup job // JobConf job = new JobConf(); new GenericOptionsParser(job, args); job.setJobName(RecordPairsImproved.class.getSimpleName()); String suffix = job.get(FuzzyJoinDriver.DATA_SUFFIX_INPUT_PROPERTY, ""); if (suffix.isEmpty()) { // // self-jon // job.setMapperClass(MapBroadcastSelfJoin.class); job.setMapOutputKeyClass(IntPairWritable.class); } else { // // R-S join // job.setMapperClass(MapBroadcastJoin.class); job.setPartitionerClass(IntTriplePartitionerFirstSecond.class); job.setOutputValueGroupingComparator(IntTripleComparatorFirstSecond.class); job.setMapOutputKeyClass(IntTripleWritable.class); } job.setReducerClass(Reduce.class); job.setMapOutputValueClass(Text.class); // // set input & output // String dataDir = job.get(FuzzyJoinDriver.DATA_DIR_PROPERTY); if (dataDir == null) { throw new UnsupportedOperationException( "ERROR: fuzzyjoin.data.dir not set"); } int dataCopy = job.getInt(FuzzyJoinDriver.DATA_COPY_PROPERTY, 1); String dataCopyFormatted = String.format("-%03d", dataCopy - 1); if (suffix.isEmpty()) { FileInputFormat.addInputPath(job, new Path(dataDir + "/records" + dataCopyFormatted)); } else { for (String s : suffix.split(FuzzyJoinDriver.SEPSARATOR_REGEX)) { FileInputFormat.addInputPath(job, new Path(dataDir + "/records." + s + dataCopyFormatted)); } } Path outputPath = new Path(dataDir + "/recordpairs" + dataCopyFormatted); FileOutputFormat.setOutputPath(job, outputPath); FileSystem.get(job).delete(outputPath, true); // // set distribution cache // Path ridpairsPath = new Path(dataDir + "/ridpairs" + dataCopyFormatted); FileSystem fs = FileSystem.get(job); for (Path ridpairdFile : FileUtil.stat2Paths(fs.listStatus( ridpairsPath, new OutputLogFilter()))) { DistributedCache.addCacheFile(ridpairdFile.toUri(), job); } job.set(FuzzyJoinDriver.DATA_JOININDEX_PROPERTY, ridpairsPath.toString() + "/part-00000"); // // run // FuzzyJoinDriver.run(job); }
private void runProgram(MiniMRCluster mr, MiniDFSCluster dfs, Path program, Path inputPath, Path outputPath, int numMaps, int numReduces, String[] expectedResults ) throws IOException { Path wordExec = new Path("/testing/bin/application"); JobConf job = mr.createJobConf(); job.setNumMapTasks(numMaps); job.setNumReduceTasks(numReduces); { FileSystem fs = dfs.getFileSystem(); fs.delete(wordExec.getParent(), true); fs.copyFromLocalFile(program, wordExec); Submitter.setExecutable(job, fs.makeQualified(wordExec).toString()); Submitter.setIsJavaRecordReader(job, true); Submitter.setIsJavaRecordWriter(job, true); FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); RunningJob rJob = null; if (numReduces == 0) { rJob = Submitter.jobSubmit(job); while (!rJob.isComplete()) { try { Thread.sleep(1000); } catch (InterruptedException ie) { throw new RuntimeException(ie); } } } else { rJob = Submitter.runJob(job); } assertTrue("pipes job failed", rJob.isSuccessful()); Counters counters = rJob.getCounters(); Counters.Group wordCountCounters = counters.getGroup("WORDCOUNT"); int numCounters = 0; for (Counter c : wordCountCounters) { System.out.println(c); ++numCounters; } assertTrue("No counters found!", (numCounters > 0)); } List<String> results = new ArrayList<String>(); for (Path p:FileUtil.stat2Paths(dfs.getFileSystem().listStatus(outputPath, new OutputLogFilter()))) { results.add(TestMiniMRWithDFS.readOutput(p, job)); } assertEquals("number of reduces is wrong", expectedResults.length, results.size()); for(int i=0; i < results.size(); i++) { assertEquals("pipes program " + program + " output " + i + " wrong", expectedResults[i], results.get(i)); } }
public void configure(String keySpec, int expect) throws Exception { Path testdir = new Path("build/test/test.mapred.spill"); Path inDir = new Path(testdir, "in"); Path outDir = new Path(testdir, "out"); FileSystem fs = getFileSystem(); fs.delete(testdir, true); conf.setInputFormat(TextInputFormat.class); FileInputFormat.setInputPaths(conf, inDir); FileOutputFormat.setOutputPath(conf, outDir); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(LongWritable.class); conf.setNumMapTasks(1); conf.setNumReduceTasks(2); conf.setOutputFormat(TextOutputFormat.class); conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class); conf.setKeyFieldComparatorOptions(keySpec); conf.setKeyFieldPartitionerOptions("-k1.1,1.1"); conf.set("map.output.key.field.separator", " "); conf.setMapperClass(InverseMapper.class); conf.setReducerClass(IdentityReducer.class); if (!fs.mkdirs(testdir)) { throw new IOException("Mkdirs failed to create " + testdir.toString()); } if (!fs.mkdirs(inDir)) { throw new IOException("Mkdirs failed to create " + inDir.toString()); } // set up input data in 2 files Path inFile = new Path(inDir, "part0"); FileOutputStream fos = new FileOutputStream(inFile.toString()); fos.write((line1 + "\n").getBytes()); fos.write((line2 + "\n").getBytes()); fos.close(); JobClient jc = new JobClient(conf); RunningJob r_job = jc.submitJob(conf); while (!r_job.isComplete()) { Thread.sleep(1000); } if (!r_job.isSuccessful()) { fail("Oops! The job broke due to an unexpected error"); } Path[] outputFiles = FileUtil.stat2Paths( getFileSystem().listStatus(outDir, new OutputLogFilter())); if (outputFiles.length > 0) { InputStream is = getFileSystem().open(outputFiles[0]); BufferedReader reader = new BufferedReader(new InputStreamReader(is)); String line = reader.readLine(); //make sure we get what we expect as the first line, and also //that we have two lines (both the lines must end up in the same //reducer since the partitioner takes the same key spec for all //lines if (expect == 1) { assertTrue(line.startsWith(line1)); } else if (expect == 2) { assertTrue(line.startsWith(line2)); } line = reader.readLine(); if (expect == 1) { assertTrue(line.startsWith(line2)); } else if (expect == 2) { assertTrue(line.startsWith(line1)); } reader.close(); } }
private void validateOutput(RunningJob runningJob, boolean validateCount) throws Exception { LOG.info(runningJob.getCounters().toString()); assertTrue(runningJob.isSuccessful()); if(validateCount) { //validate counters String counterGrp = "org.apache.hadoop.mapred.Task$Counter"; Counters counters = runningJob.getCounters(); assertEquals(counters.findCounter(counterGrp, "MAP_SKIPPED_RECORDS"). getCounter(),MAPPER_BAD_RECORDS.size()); int mapRecs = INPUTSIZE - MAPPER_BAD_RECORDS.size(); assertEquals(counters.findCounter(counterGrp, "MAP_INPUT_RECORDS"). getCounter(),mapRecs); assertEquals(counters.findCounter(counterGrp, "MAP_OUTPUT_RECORDS"). getCounter(),mapRecs); int redRecs = mapRecs - REDUCER_BAD_RECORDS.size(); assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_RECORDS"). getCounter(),REDUCER_BAD_RECORDS.size()); assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_GROUPS"). getCounter(),REDUCER_BAD_RECORDS.size()); assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_GROUPS"). getCounter(),redRecs); assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_RECORDS"). getCounter(),redRecs); assertEquals(counters.findCounter(counterGrp, "REDUCE_OUTPUT_RECORDS"). getCounter(),redRecs); } List<String> badRecs = new ArrayList<String>(); badRecs.addAll(MAPPER_BAD_RECORDS); badRecs.addAll(REDUCER_BAD_RECORDS); Path[] outputFiles = FileUtil.stat2Paths( getFileSystem().listStatus(getOutputDir(), new OutputLogFilter())); if (outputFiles.length > 0) { InputStream is = getFileSystem().open(outputFiles[0]); BufferedReader reader = new BufferedReader(new InputStreamReader(is)); String line = reader.readLine(); int counter = 0; while (line != null) { counter++; StringTokenizer tokeniz = new StringTokenizer(line, "\t"); String value = tokeniz.nextToken(); int index = value.indexOf("hey"); assertTrue(index>-1); if(index>-1) { String heyStr = value.substring(index); assertTrue(!badRecs.contains(heyStr)); } line = reader.readLine(); } reader.close(); if(validateCount) { assertEquals(INPUTSIZE-badRecs.size(), counter); } } }