private static void checkOuterConsistency(Job job, Path[] src) throws IOException { Path outf = FileOutputFormat.getOutputPath(job); FileStatus[] outlist = cluster.getFileSystem().listStatus(outf, new Utils.OutputFileUtils.OutputFilesFilter()); assertEquals("number of part files is more than 1. It is" + outlist.length, 1, outlist.length); assertTrue("output file with zero length" + outlist[0].getLen(), 0 < outlist[0].getLen()); SequenceFile.Reader r = new SequenceFile.Reader(cluster.getFileSystem(), outlist[0].getPath(), job.getConfiguration()); IntWritable k = new IntWritable(); IntWritable v = new IntWritable(); while (r.next(k, v)) { assertEquals("counts does not match", v.get(), countProduct(k, src, job.getConfiguration())); } r.close(); }
/** @return a list of Path objects for each data file */ protected List<Path> getDataFilePaths() throws IOException { List<Path> paths = new ArrayList<Path>(); Configuration conf = new Configuration(); conf.set("fs.default.name", "file:///"); FileSystem fs = FileSystem.get(conf); FileStatus[] stats = fs.listStatus(getTablePath(), new Utils.OutputFileUtils.OutputFilesFilter()); for (FileStatus stat : stats) { paths.add(stat.getPath()); } return paths; }
/** @return a list of Path objects for each data file */ protected List<Path> getDataFilePaths() throws IOException { List<Path> paths = new ArrayList<Path>(); Configuration conf = new Configuration(); if (!BaseSqoopTestCase.isOnPhysicalCluster()) { conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS); } FileSystem fs = FileSystem.get(conf); FileStatus [] stats = fs.listStatus(getTablePath(), new Utils.OutputFileUtils.OutputFilesFilter()); for (FileStatus stat : stats) { paths.add(stat.getPath()); } return paths; }
private void validateOutput() throws IOException { Path[] outputFiles = FileUtil.stat2Paths( localFs.listStatus(new Path(TEST_ROOT_DIR + "/out"), new Utils.OutputFileUtils.OutputFilesFilter())); if (outputFiles.length > 0) { InputStream is = localFs.open(outputFiles[0]); BufferedReader reader = new BufferedReader(new InputStreamReader(is)); String line = reader.readLine(); while (line != null) { StringTokenizer tokeniz = new StringTokenizer(line, "\t"); String key = tokeniz.nextToken(); String value = tokeniz.nextToken(); LOG.info("Output: key: "+ key + " value: "+ value); int errors = Integer.parseInt(value); assertTrue(errors == 0); line = reader.readLine(); } reader.close(); } }
public static String readOutput(Path outDir, Configuration conf) throws IOException { FileSystem fs = outDir.getFileSystem(conf); StringBuffer result = new StringBuffer(); Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir, new Utils.OutputFileUtils.OutputFilesFilter())); for (Path outputFile : fileList) { LOG.info("Path" + ": "+ outputFile); BufferedReader file = new BufferedReader(new InputStreamReader(fs.open(outputFile))); String line = file.readLine(); while (line != null) { result.append(line); result.append("\n"); line = file.readLine(); } file.close(); } return result.toString(); }
static DataStatistics publishPlainDataStatistics(Configuration conf, Path inputDir) throws IOException { FileSystem fs = inputDir.getFileSystem(conf); // obtain input data file statuses long dataSize = 0; long fileCount = 0; RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inputDir, true); PathFilter filter = new Utils.OutputFileUtils.OutputFilesFilter(); while (iter.hasNext()) { LocatedFileStatus lStatus = iter.next(); if (filter.accept(lStatus.getPath())) { dataSize += lStatus.getLen(); ++fileCount; } } // publish the plain data statistics LOG.info("Total size of input data : " + StringUtils.humanReadableInt(dataSize)); LOG.info("Total number of input data files : " + fileCount); return new DataStatistics(dataSize, fileCount, false); }