public SeqFileAppendable(FileSystem fs, Path path, int osBufferSize, String compress, int minBlkSize) throws IOException { Configuration conf = new Configuration(); CompressionCodec codec = null; if ("lzo".equals(compress)) { codec = Compression.Algorithm.LZO.getCodec(); } else if ("gz".equals(compress)) { codec = Compression.Algorithm.GZ.getCodec(); } else if (!"none".equals(compress)) throw new IOException("Codec not supported."); this.fsdos = fs.create(path, true, osBufferSize); if (!"none".equals(compress)) { writer = SequenceFile.createWriter(conf, fsdos, BytesWritable.class, BytesWritable.class, SequenceFile.CompressionType.BLOCK, codec); } else { writer = SequenceFile.createWriter(conf, fsdos, BytesWritable.class, BytesWritable.class, SequenceFile.CompressionType.NONE, null); } }
/** * Collect the list of <sourceRelativePath, sourceFileStatus> to be copied and write to the sequence file. In essence, * any file or directory that need to be copied or sync-ed is written as an entry to the sequence file, with the * possible exception of the source root: when either -update (sync) or -overwrite switch is specified, and if the the * source root is a directory, then the source root entry is not written to the sequence file, because only the * contents of the source directory need to be copied in this case. See * {@link com.hotels.bdp.circustrain.s3mapreducecp.util.ConfigurationUtil#getRelativePath} for how relative path is * computed. See computeSourceRootPath method for how the root path of the source is computed. * * @param fileListWriter * @param options * @param globbedPaths * @throws IOException */ @VisibleForTesting public void doBuildListing(SequenceFile.Writer fileListWriter, S3MapReduceCpOptions options) throws IOException { List<Path> globbedPaths = new ArrayList<>(options.getSources().size()); for (Path sourcePath : options.getSources()) { FileSystem fs = sourcePath.getFileSystem(getConf()); FileStatus sourceFileStatus = fs.getFileStatus(sourcePath); if (sourceFileStatus.isFile()) { LOG.debug("Adding path {}", sourceFileStatus.getPath()); globbedPaths.add(sourceFileStatus.getPath()); } else { FileStatus[] inputs = fs.globStatus(sourcePath); if (inputs != null && inputs.length > 0) { for (FileStatus onePath : inputs) { LOG.debug("Adding path {}", onePath.getPath()); globbedPaths.add(onePath.getPath()); } } else { throw new InvalidInputException("Source path " + sourcePath + " doesn't exist"); } } } doBuildListing(fileListWriter, options, globbedPaths); }
private void traverseNonEmptyDirectory( SequenceFile.Writer fileListWriter, FileStatus sourceStatus, Path sourcePathRoot, S3MapReduceCpOptions options) throws IOException { FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf()); Stack<FileStatus> pathStack = new Stack<>(); pathStack.push(sourceStatus); while (!pathStack.isEmpty()) { for (FileStatus child : getChildren(sourceFS, pathStack.pop())) { if (child.isFile()) { LOG.debug("Recording source-path: {} for copy.", sourceStatus.getPath()); CopyListingFileStatus childCopyListingStatus = new CopyListingFileStatus(child); writeToFileListing(fileListWriter, childCopyListingStatus, sourcePathRoot, options); } if (isDirectoryAndNotEmpty(sourceFS, child)) { LOG.debug("Traversing non-empty source dir: {}", sourceStatus.getPath()); pathStack.push(child); } } } }
private void writeToFileListing( SequenceFile.Writer fileListWriter, CopyListingFileStatus fileStatus, Path sourcePathRoot, S3MapReduceCpOptions options) throws IOException { LOG.debug("REL PATH: {}, FULL PATH: {}", PathUtil.getRelativePath(sourcePathRoot, fileStatus.getPath()), fileStatus.getPath()); FileStatus status = fileStatus; if (!shouldCopy(fileStatus.getPath(), options)) { return; } fileListWriter.append(new Text(PathUtil.getRelativePath(sourcePathRoot, fileStatus.getPath())), status); fileListWriter.sync(); if (!fileStatus.isDirectory()) { totalBytesToCopy += fileStatus.getLen(); } totalPaths++; }
private void writeToFileListing(SequenceFile.Writer fileListWriter, CopyListingFileStatus fileStatus, Path sourcePathRoot, DistCpOptions options) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot, fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath()); } FileStatus status = fileStatus; if (!shouldCopy(fileStatus.getPath(), options)) { return; } fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot, fileStatus.getPath())), status); fileListWriter.sync(); if (!fileStatus.isDirectory()) { totalBytesToCopy += fileStatus.getLen(); } totalPaths++; }
@Test(timeout = 10000) public void skipFlagFiles() throws Exception { FileSystem fs = cluster.getFileSystem(); Path source = new Path("/tmp/in4"); URI target = URI.create("s3://bucket/tmp/out4/"); createFile(fs, new Path(source, "1/_SUCCESS")); createFile(fs, new Path(source, "1/file")); createFile(fs, new Path(source, "2")); Path listingFile = new Path("/tmp/list4"); listing.buildListing(listingFile, options(source, target)); assertThat(listing.getNumberOfPaths(), is(2L)); try (SequenceFile.Reader reader = new SequenceFile.Reader(CONFIG, SequenceFile.Reader.file(listingFile))) { CopyListingFileStatus fileStatus = new CopyListingFileStatus(); Text relativePath = new Text(); assertThat(reader.next(relativePath, fileStatus), is(true)); assertThat(relativePath.toString(), is("/1/file")); assertThat(reader.next(relativePath, fileStatus), is(true)); assertThat(relativePath.toString(), is("/2")); assertThat(reader.next(relativePath, fileStatus), is(false)); } }
@Test public void failOnCloseError() throws IOException { File inFile = File.createTempFile("TestCopyListingIn", null); inFile.deleteOnExit(); File outFile = File.createTempFile("TestCopyListingOut", null); outFile.deleteOnExit(); Path source = new Path(inFile.toURI()); Exception expectedEx = new IOException("boom"); SequenceFile.Writer writer = mock(SequenceFile.Writer.class); doThrow(expectedEx).when(writer).close(); SimpleCopyListing listing = new SimpleCopyListing(CONFIG, CREDENTIALS); Exception actualEx = null; try { listing.doBuildListing(writer, options(source, outFile.toURI())); } catch (Exception e) { actualEx = e; } Assert.assertNotNull("close writer didn't fail", actualEx); Assert.assertEquals(expectedEx, actualEx); }
protected void open(Path dstPath, CompressionCodec codeC, CompressionType compType, Configuration conf, FileSystem hdfs) throws IOException { if (useRawLocalFileSystem) { if (hdfs instanceof LocalFileSystem) { hdfs = ((LocalFileSystem)hdfs).getRaw(); } else { logger.warn("useRawLocalFileSystem is set to true but file system " + "is not of type LocalFileSystem: " + hdfs.getClass().getName()); } } if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile(dstPath)) { outStream = hdfs.append(dstPath); } else { outStream = hdfs.create(dstPath); } writer = SequenceFile.createWriter(conf, outStream, serializer.getKeyClass(), serializer.getValueClass(), compType, codeC); registerCurrentStream(outStream, hdfs, dstPath); }
@Test public void testEventCountingRoller() throws IOException, InterruptedException { int maxEvents = 100; MockHDFSWriter hdfsWriter = new MockHDFSWriter(); BucketWriter bucketWriter = new BucketWriter( 0, 0, maxEvents, 0, ctx, "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); for (int i = 0; i < 1000; i++) { bucketWriter.append(e); } logger.info("Number of events written: {}", hdfsWriter.getEventsWritten()); logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten()); logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened()); Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten()); Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten()); Assert.assertEquals("files opened", 10, hdfsWriter.getFilesOpened()); }
@Test public void testSizeRoller() throws IOException, InterruptedException { int maxBytes = 300; MockHDFSWriter hdfsWriter = new MockHDFSWriter(); BucketWriter bucketWriter = new BucketWriter( 0, maxBytes, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); for (int i = 0; i < 1000; i++) { bucketWriter.append(e); } logger.info("Number of events written: {}", hdfsWriter.getEventsWritten()); logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten()); logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened()); Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten()); Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten()); Assert.assertEquals("files opened", 10, hdfsWriter.getFilesOpened()); }
private void verifyContents(Path listingPath) throws Exception { SequenceFile.Reader reader = new SequenceFile.Reader(cluster.getFileSystem(), listingPath, new Configuration()); Text key = new Text(); CopyListingFileStatus value = new CopyListingFileStatus(); Map<String, String> actualValues = new HashMap<String, String>(); while (reader.next(key, value)) { if (value.isDirectory() && key.toString().equals("")) { // ignore root with empty relPath, which is an entry to be // used for preserving root attributes etc. continue; } actualValues.put(value.getPath().toString(), key.toString()); } Assert.assertEquals(expectedValues.size(), actualValues.size()); for (Map.Entry<String, String> entry : actualValues.entrySet()) { Assert.assertEquals(entry.getValue(), expectedValues.get(entry.getKey())); } }
@Test public void testInUsePrefix() throws IOException, InterruptedException { final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test final String PREFIX = "BRNO_IS_CITY_IN_CZECH_REPUBLIC"; MockHDFSWriter hdfsWriter = new MockHDFSWriter(); HDFSTextSerializer formatter = new HDFSTextSerializer(); BucketWriter bucketWriter = new BucketWriter( ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); bucketWriter.append(e); Assert.assertTrue("Incorrect in use prefix", hdfsWriter.getOpenedFilePath().contains(PREFIX)); }
@Test public void testInUseSuffix() throws IOException, InterruptedException { final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test final String SUFFIX = "WELCOME_TO_THE_HELLMOUNTH"; MockHDFSWriter hdfsWriter = new MockHDFSWriter(); HDFSTextSerializer serializer = new HDFSTextSerializer(); BucketWriter bucketWriter = new BucketWriter( ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); bucketWriter.append(e); Assert.assertTrue("Incorrect in use suffix", hdfsWriter.getOpenedFilePath().contains(SUFFIX)); }
@Test public void testCallbackOnClose() throws IOException, InterruptedException { final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test final String SUFFIX = "WELCOME_TO_THE_EREBOR"; final AtomicBoolean callbackCalled = new AtomicBoolean(false); MockHDFSWriter hdfsWriter = new MockHDFSWriter(); BucketWriter bucketWriter = new BucketWriter( ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, new HDFSEventSink.WriterCallback() { @Override public void run(String filePath) { callbackCalled.set(true); } }, "blah", 30000, Executors.newSingleThreadExecutor(), 0, 0); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); bucketWriter.append(e); bucketWriter.close(true); Assert.assertTrue(callbackCalled.get()); }
@Test public void testGzipDurability() throws Exception { Context context = new Context(); HDFSCompressedDataStream writer = new HDFSCompressedDataStream(); writer.configure(context); writer.open(fileURI, factory.getCodec(new Path(fileURI)), SequenceFile.CompressionType.BLOCK); String[] bodies = { "yarf!" }; writeBodies(writer, bodies); byte[] buf = new byte[256]; GZIPInputStream cmpIn = new GZIPInputStream(new FileInputStream(file)); int len = cmpIn.read(buf); String result = new String(buf, 0, len, Charsets.UTF_8); result = result.trim(); // BodyTextEventSerializer adds a newline Assert.assertEquals("input and output must match", bodies[0], result); }
static private void finalize(Configuration conf, JobConf jobconf, final Path destPath, String presevedAttributes) throws IOException { if (presevedAttributes == null) { return; } EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes); if (!preseved.contains(FileAttribute.USER) && !preseved.contains(FileAttribute.GROUP) && !preseved.contains(FileAttribute.PERMISSION)) { return; } FileSystem dstfs = destPath.getFileSystem(conf); Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL)); try (SequenceFile.Reader in = new SequenceFile.Reader(jobconf, Reader.file(dstdirlist))) { Text dsttext = new Text(); FilePair pair = new FilePair(); for(; in.next(dsttext, pair); ) { Path absdst = new Path(destPath, pair.output); updateDestStatus(pair.input, dstfs.getFileStatus(absdst), preseved, dstfs); } } }
/** * @throws IOException if path can't be read, or its key or value class can't be instantiated */ public SequenceFileIterator(Path path, boolean reuseKeyValueInstances, Configuration conf) throws IOException { key = null; value = null; FileSystem fs = path.getFileSystem(conf); path = path.makeQualified(fs); reader = new SequenceFile.Reader(fs, path, conf); this.conf = conf; keyClass = (Class<K>) reader.getKeyClass(); valueClass = (Class<V>) reader.getValueClass(); noValue = NullWritable.class.equals(valueClass); this.reuseKeyValueInstances = reuseKeyValueInstances; }
@Override public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException { final JobConf jobConf = new JobConf(jobCtxt.getConfiguration()); final JobClient client = new JobClient(jobConf); ClusterStatus stat = client.getClusterStatus(true); int numTrackers = stat.getTaskTrackers(); final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1); // Total size of distributed cache files to be generated final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1); // Get the path of the special file String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST); if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) { throw new RuntimeException("Invalid metadata: #files (" + fileCount + "), total_size (" + totalSize + "), filelisturi (" + distCacheFileList + ")"); } Path sequenceFile = new Path(distCacheFileList); FileSystem fs = sequenceFile.getFileSystem(jobConf); FileStatus srcst = fs.getFileStatus(sequenceFile); // Consider the number of TTs * mapSlotsPerTracker as number of mappers. int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2); int numSplits = numTrackers * numMapSlotsPerTracker; List<InputSplit> splits = new ArrayList<InputSplit>(numSplits); LongWritable key = new LongWritable(); BytesWritable value = new BytesWritable(); // Average size of data to be generated by each map task final long targetSize = Math.max(totalSize / numSplits, DistributedCacheEmulator.AVG_BYTES_PER_MAP); long splitStartPosition = 0L; long splitEndPosition = 0L; long acc = 0L; long bytesRemaining = srcst.getLen(); SequenceFile.Reader reader = null; try { reader = new SequenceFile.Reader(fs, sequenceFile, jobConf); while (reader.next(key, value)) { // If adding this file would put this split past the target size, // cut the last split and put this file in the next split. if (acc + key.get() > targetSize && acc != 0) { long splitSize = splitEndPosition - splitStartPosition; splits.add(new FileSplit( sequenceFile, splitStartPosition, splitSize, (String[])null)); bytesRemaining -= splitSize; splitStartPosition = splitEndPosition; acc = 0L; } acc += key.get(); splitEndPosition = reader.getPosition(); } } finally { if (reader != null) { reader.close(); } } if (bytesRemaining != 0) { splits.add(new FileSplit( sequenceFile, splitStartPosition, bytesRemaining, (String[])null)); } return splits; }
public void run() { try { for(int i=start; i < end; i++) { String name = getFileName(i); Path controlFile = new Path(INPUT_DIR, "in_file_" + name); SequenceFile.Writer writer = null; try { writer = SequenceFile.createWriter(fs, fs.getConf(), controlFile, Text.class, LongWritable.class, CompressionType.NONE); String logFile = jhLogFiles[i].getPath().toString(); writer.append(new Text(logFile), new LongWritable(0)); } catch(Exception e) { throw new IOException(e); } finally { if (writer != null) writer.close(); writer = null; } } } catch(IOException ex) { LOG.error("FileCreateDaemon failed.", ex); } numFinishedThreads++; }
private static SequenceFile.Writer[] createWriters(Path testdir, Configuration conf, int srcs, Path[] src) throws IOException { for (int i = 0; i < srcs; ++i) { src[i] = new Path(testdir, Integer.toString(i + 10, 36)); } SequenceFile.Writer out[] = new SequenceFile.Writer[srcs]; for (int i = 0; i < srcs; ++i) { out[i] = new SequenceFile.Writer(testdir.getFileSystem(conf), conf, src[i], IntWritable.class, IntWritable.class); } return out; }
public List<InputSplit> getSplits(JobContext job) throws IOException { Configuration conf = job.getConfiguration(); Path src = new Path(conf.get(INDIRECT_INPUT_FILE, null)); FileSystem fs = src.getFileSystem(conf); List<InputSplit> splits = new ArrayList<InputSplit>(); LongWritable key = new LongWritable(); Text value = new Text(); for (SequenceFile.Reader sl = new SequenceFile.Reader(fs, src, conf); sl.next(key, value);) { splits.add(new IndirectSplit(new Path(value.toString()), key.get())); } return splits; }
private static <T extends WritableComparable<?>> Path writePartitionFile( String testname, Configuration conf, T[] splits) throws IOException { final FileSystem fs = FileSystem.getLocal(conf); final Path testdir = new Path(System.getProperty("test.build.data", "/tmp") ).makeQualified(fs); Path p = new Path(testdir, testname + "/_partition.lst"); TotalOrderPartitioner.setPartitionFile(conf, p); conf.setInt(MRJobConfig.NUM_REDUCES, splits.length + 1); SequenceFile.Writer w = null; try { w = SequenceFile.createWriter(fs, conf, p, splits[0].getClass(), NullWritable.class, SequenceFile.CompressionType.NONE); for (int i = 0; i < splits.length; ++i) { w.append(splits[i], NullWritable.get()); } } finally { if (null != w) w.close(); } return p; }
private static int countProduct(IntWritable key, Path[] src, Configuration conf) throws IOException { int product = 1; for (Path p : src) { int count = 0; SequenceFile.Reader r = new SequenceFile.Reader( cluster.getFileSystem(), p, conf); IntWritable k = new IntWritable(); IntWritable v = new IntWritable(); while (r.next(k, v)) { if (k.equals(key)) { count++; } } r.close(); if (count != 0) { product *= count; } } return product; }
private static void createFiles(int length, int numFiles, Random random, Job job) throws IOException { Range[] ranges = createRanges(length, numFiles, random); for (int i = 0; i < numFiles; i++) { Path file = new Path(workDir, "test_" + i + ".seq"); // create a file with length entries @SuppressWarnings("deprecation") SequenceFile.Writer writer = SequenceFile.createWriter(localFs, job.getConfiguration(), file, IntWritable.class, BytesWritable.class); Range range = ranges[i]; try { for (int j = range.start; j < range.end; j++) { IntWritable key = new IntWritable(j); byte[] data = new byte[random.nextInt(10)]; random.nextBytes(data); BytesWritable value = new BytesWritable(data); writer.append(key, value); } } finally { writer.close(); } } }
/** * Create control files before a test run. * Number of files created is equal to the number of maps specified * * @throws IOException on error */ private static void createControlFiles() throws IOException { FileSystem tempFS = FileSystem.get(config); LOG.info("Creating " + numberOfMaps + " control files"); for (int i = 0; i < numberOfMaps; i++) { String strFileName = "NNBench_Controlfile_" + i; Path filePath = new Path(new Path(baseDir, CONTROL_DIR_NAME), strFileName); SequenceFile.Writer writer = null; try { writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class, LongWritable.class, CompressionType.NONE); writer.append(new Text(strFileName), new LongWritable(0l)); } finally { if (writer != null) { writer.close(); } } } }
private SequenceFile.Reader getListingFileReader(Configuration configuration) { final Path listingFilePath = getListingFilePath(configuration); try { final FileSystem fileSystem = listingFilePath.getFileSystem(configuration); if (!fileSystem.exists(listingFilePath)) throw new IllegalArgumentException("Listing file doesn't exist at: " + listingFilePath); return new SequenceFile.Reader(configuration, SequenceFile.Reader.file(listingFilePath)); } catch (IOException exception) { LOG.error("Couldn't find listing file at: " + listingFilePath, exception); throw new IllegalArgumentException("Couldn't find listing-file at: " + listingFilePath, exception); } }
/** * Read the cut points from the given IFile. * @param fs The file system * @param p The path to read * @param keyClass The map output key class * @param job The job config * @throws IOException */ // matching key types enforced by passing in @SuppressWarnings("unchecked") // map output key class private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass, Configuration conf) throws IOException { SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf); ArrayList<K> parts = new ArrayList<K>(); K key = ReflectionUtils.newInstance(keyClass, conf); NullWritable value = NullWritable.get(); try { while (reader.next(key, value)) { parts.add(key); key = ReflectionUtils.newInstance(keyClass, conf); } reader.close(); reader = null; } finally { IOUtils.cleanup(LOG, reader); } return parts.toArray((K[])Array.newInstance(keyClass, parts.size())); }
@SuppressWarnings("deprecation") void createTempFile(Path p, Configuration conf) throws IOException { SequenceFile.Writer writer = null; try { writer = SequenceFile.createWriter(fs, conf, p, Text.class, Text.class, CompressionType.NONE); writer.append(new Text("text"), new Text("moretext")); } catch(Exception e) { throw new IOException(e.getLocalizedMessage()); } finally { if (writer != null) { writer.close(); } writer = null; } LOG.info("created: " + p); }
@Override public DatasetJsonRecord getSchema(Path path) throws IOException { DatasetJsonRecord record = null; if (!fs.exists(path)) LOG.error("sequencefileanalyzer file : " + path.toUri().getPath() + " is not exist on hdfs"); else { try { LOG.info("sequencefileanalyzer start parse schema for file path : {}", path.toUri().getPath()); SequenceFile.Reader reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(path)); String keyName = "Key"; String keyType = getWritableType(reader.getKeyClassName()); String valueName = "Value"; String valueType = getWritableType(reader.getValueClassName()); FileStatus status = fs.getFileStatus(path); String storage = STORAGE_TYPE; String abstractPath = path.toUri().getPath(); String codec = "sequence.codec"; String schemaString = "{\"fields\": [{\"name\": \"" + keyName + "\", \"type\": \"" + keyType + "\"}, {\"name\": \"" + valueName + "\", \"type\": \"" + valueType + "\"}], \"name\": \"Result\", \"namespace\": \"com.tencent.lake\", \"type\": \"record\"}"; record = new DatasetJsonRecord(schemaString, abstractPath, status.getModificationTime(), status.getOwner(), status.getGroup(), status.getPermission().toString(), codec, storage, ""); LOG.info("sequencefileanalyzer parse path :{},schema is {}", path.toUri().getPath(), record.toCsvString()); } catch (Exception e) { LOG.error("path : {} content " + " is not Sequence File format content ",path.toUri().getPath()); LOG.info(e.getStackTrace().toString()); } } return record; }
@Override public SampleDataRecord getSampleData(Path path) throws IOException { SampleDataRecord dataRecord = null; if (!fs.exists(path)) LOG.error("sequence file : " + path.toUri().getPath() + " is not exist on hdfs"); else { try { LOG.info("sequencefileanalyzer start parse sampledata for file path : {}", path.toUri().getPath()); SequenceFile.Reader reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(path)); List<Object> sampleValues = new ArrayList<Object>(); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf()); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf()); int count = 0; String keyName = "Key"; String valueName = "Value"; while (reader.next(key, value) && count < 12) { sampleValues.add("{\"" + keyName + "\": \"" + key + "\", \"" + valueName + "\": \"" + value + "\"}"); count++; } dataRecord = new SampleDataRecord(path.toUri().getPath(), sampleValues); LOG.info("sequence file path : {}, sample data is {}", path.toUri().getPath(), sampleValues); } catch (Exception e) { LOG.error("path : {} content " + " is not Sequence File format content ",path.toUri().getPath()); LOG.info(e.getStackTrace().toString()); } } return dataRecord; }
public TextRecordInputStream(FileStatus f) throws IOException { final Path fpath = f.getPath(); final Configuration lconf = getConf(); r = new SequenceFile.Reader(lconf, SequenceFile.Reader.file(fpath)); key = ReflectionUtils.newInstance( r.getKeyClass().asSubclass(Writable.class), lconf); val = ReflectionUtils.newInstance( r.getValueClass().asSubclass(Writable.class), lconf); inbuf = new DataInputBuffer(); outbuf = new DataOutputBuffer(); }
private void readPartitionFile(FileSystem fs, Configuration conf, Path path) throws IOException { @SuppressWarnings("deprecation") SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); ImmutableBytesWritable key = new ImmutableBytesWritable(); partitions = new ArrayList<ImmutableBytesWritable>(); while (reader.next(key)) { partitions.add(new ImmutableBytesWritable(key.copyBytes())); } reader.close(); if (!Ordering.natural().isOrdered(partitions)) { throw new IOException("Partitions are not ordered!"); } }
@VisibleForTesting public void doBuildListing(SequenceFile.Writer fileListWriter, S3MapReduceCpOptions options, List<Path> globbedPaths) throws IOException { try { for (Path path : globbedPaths) { FileSystem sourceFS = path.getFileSystem(getConf()); path = makeQualified(path); FileStatus rootStatus = sourceFS.getFileStatus(path); Path sourcePathRoot = computeSourceRootPath(rootStatus, options); LOG.info("Root source path is {}", sourcePathRoot); FileStatus[] sourceFiles = sourceFS.listStatus(path); boolean explore = (sourceFiles != null && sourceFiles.length > 0); if (explore || rootStatus.isDirectory()) { for (FileStatus sourceStatus : sourceFiles) { if (sourceStatus.isFile()) { LOG.debug("Recording source-path: {} for copy.", sourceStatus.getPath()); CopyListingFileStatus sourceCopyListingStatus = new CopyListingFileStatus(sourceStatus); writeToFileListing(fileListWriter, sourceCopyListingStatus, sourcePathRoot, options); } if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) { LOG.debug("Traversing non-empty source dir: {}", sourceStatus.getPath()); traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot, options); } } } } fileListWriter.close(); fileListWriter = null; } finally { IoUtil.closeSilently(LOG, fileListWriter); } }
private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException { FileSystem fs = pathToListFile.getFileSystem(getConf()); if (fs.exists(pathToListFile)) { fs.delete(pathToListFile, false); } return SequenceFile.createWriter(getConf(), SequenceFile.Writer.file(pathToListFile), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(CopyListingFileStatus.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE)); }
private SequenceFile.Reader getListingFileReader(Configuration configuration) { final Path listingFilePath = getListingFilePath(configuration); try { final FileSystem fileSystem = listingFilePath.getFileSystem(configuration); if (!fileSystem.exists(listingFilePath)) { throw new IllegalArgumentException("Listing file doesn't exist at: " + listingFilePath); } return new SequenceFile.Reader(configuration, SequenceFile.Reader.file(listingFilePath)); } catch (IOException exception) { LOG.error("Couldn't find listing file at: " + listingFilePath, exception); throw new IllegalArgumentException("Couldn't find listing-file at: " + listingFilePath, exception); } }
/** * Validate the final resulting path listing. Checks if there are duplicate entries. If preserving ACLs, checks that * file system can support ACLs. If preserving XAttrs, checks that file system can support XAttrs. * * @param pathToListFile path listing build by doBuildListing * @param options Input options to S3MapReduceCp * @throws IOException Any issues while checking for duplicates and throws * @throws DuplicateFileException if there are duplicates */ private void validateFinalListing(Path pathToListFile, S3MapReduceCpOptions options) throws DuplicateFileException, IOException { Configuration config = getConf(); FileSystem fs = pathToListFile.getFileSystem(config); Path sortedList = sortListing(fs, config, pathToListFile); SequenceFile.Reader reader = new SequenceFile.Reader(config, SequenceFile.Reader.file(sortedList)); try { Text lastKey = new Text("*"); // source relative path can never hold * CopyListingFileStatus lastFileStatus = new CopyListingFileStatus(); Text currentKey = new Text(); while (reader.next(currentKey)) { if (currentKey.equals(lastKey)) { CopyListingFileStatus currentFileStatus = new CopyListingFileStatus(); reader.getCurrentValue(currentFileStatus); throw new DuplicateFileException("File " + lastFileStatus.getPath() + " and " + currentFileStatus.getPath() + " would cause duplicates. Aborting"); } reader.getCurrentValue(lastFileStatus); lastKey.set(currentKey); } } finally { IOUtils.closeStream(reader); } }
/** * Sort sequence file containing FileStatus and Text as key and value respecitvely * * @param fs File System * @param conf Configuration * @param sourceListing Source listing file * @return Path of the sorted file. Is source file with _sorted appended to the name * @throws IOException Any exception during sort. */ private static Path sortListing(FileSystem fs, Configuration conf, Path sourceListing) throws IOException { SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class, CopyListingFileStatus.class, conf); Path output = new Path(sourceListing.toString() + "_sorted"); if (fs.exists(output)) { fs.delete(output, false); } sorter.sort(sourceListing, output); return output; }
@Test(timeout = 10000) public void buildListingForSingleFile() throws Exception { FileSystem fs = cluster.getFileSystem(); String testRootString = "/tmp/source"; Path testRoot = new Path(testRootString); if (fs.exists(testRoot)) { delete(fs, testRootString); } Path sourceFile = new Path(testRoot, "foo/bar/source.txt"); Path decoyFile = new Path(testRoot, "target/mooc"); URI targetFile = URI.create("s3://bucket/target/moo/target.txt"); createFile(fs, sourceFile.toString()); createFile(fs, decoyFile.toString()); final Path listFile = new Path(testRoot, "/tmp/fileList.seq"); listing.buildListing(listFile, options(sourceFile, targetFile)); try (SequenceFile.Reader reader = new SequenceFile.Reader(CONFIG, SequenceFile.Reader.file(listFile))) { CopyListingFileStatus fileStatus = new CopyListingFileStatus(); Text relativePath = new Text(); assertThat(reader.next(relativePath, fileStatus), is(true)); assertThat(relativePath.toString(), is("/source.txt")); } }
@Test(timeout = 10000) public void buildListingForMultipleSources() throws Exception { FileSystem fs = cluster.getFileSystem(); String testRootString = "/tmp/source"; Path testRoot = new Path(testRootString); if (fs.exists(testRoot)) { delete(fs, testRootString); } Path sourceDir1 = new Path(testRoot, "foo/baz/"); Path sourceDir2 = new Path(testRoot, "foo/bang/"); Path sourceFile1 = new Path(testRoot, "foo/bar/source.txt"); URI target = URI.create("s3://bucket/target/moo/"); fs.mkdirs(sourceDir1); fs.mkdirs(sourceDir2); createFile(fs, new Path(sourceDir1, "baz_1.dat")); createFile(fs, new Path(sourceDir1, "baz_2.dat")); createFile(fs, new Path(sourceDir2, "bang_0.dat")); createFile(fs, sourceFile1.toString()); final Path listFile = new Path(testRoot, "/tmp/fileList.seq"); listing.buildListing(listFile, options(Arrays.asList(sourceFile1, sourceDir1, sourceDir2), target)); try (SequenceFile.Reader reader = new SequenceFile.Reader(CONFIG, SequenceFile.Reader.file(listFile))) { CopyListingFileStatus fileStatus = new CopyListingFileStatus(); Text relativePath = new Text(); assertThat(reader.next(relativePath, fileStatus), is(true)); assertThat(relativePath.toString(), is("/source.txt")); assertThat(reader.next(relativePath, fileStatus), is(true)); assertThat(relativePath.toString(), is("/baz_1.dat")); assertThat(reader.next(relativePath, fileStatus), is(true)); assertThat(relativePath.toString(), is("/baz_2.dat")); assertThat(reader.next(relativePath, fileStatus), is(true)); assertThat(relativePath.toString(), is("/bang_0.dat")); } }
void writePartitionFile(Configuration conf, Path path) throws IOException { FileSystem fs = path.getFileSystem(conf); @SuppressWarnings("deprecation") SequenceFile.Writer writer = SequenceFile.createWriter( fs, conf, path, ImmutableBytesWritable.class, NullWritable.class); for (int i = 0; i < partitions.size(); i++) { writer.append(partitions.get(i), NullWritable.get()); } writer.close(); }