private void codecTestMapFile(Class<? extends CompressionCodec> clazz, CompressionType type, int records) throws Exception { FileSystem fs = FileSystem.get(conf); LOG.info("Creating MapFiles with " + records + " records using codec " + clazz.getSimpleName()); Path path = new Path(new Path( System.getProperty("test.build.data", "/tmp")), clazz.getSimpleName() + "-" + type + "-" + records); LOG.info("Writing " + path); createMapFile(conf, fs, path, clazz.newInstance(), type, records); MapFile.Reader reader = new MapFile.Reader(path, conf); Text key1 = new Text("002"); assertNotNull(reader.get(key1, new Text())); Text key2 = new Text("004"); assertNotNull(reader.get(key2, new Text())); }
public void testRecursiveSeqFileCreate() throws IOException { FileSystem fs = FileSystem.getLocal(conf); Path name = new Path(new Path(System.getProperty("test.build.data","."), "recursiveCreateDir") , "file"); boolean createParent = false; try { SequenceFile.createWriter(fs, conf, name, RandomDatum.class, RandomDatum.class, 512, (short) 1, 4096, createParent, CompressionType.NONE, null, new Metadata()); fail("Expected an IOException due to missing parent"); } catch (IOException ioe) { // Expected } createParent = true; SequenceFile.createWriter(fs, conf, name, RandomDatum.class, RandomDatum.class, 512, (short) 1, 4096, createParent, CompressionType.NONE, null, new Metadata()); // should succeed, fails if exception thrown }
@SuppressWarnings("deprecation") private void writeTest(FileSystem fs, int count, int seed, Path file, CompressionType compressionType, CompressionCodec codec) throws IOException { fs.delete(file, true); LOG.info("creating " + count + " records with " + compressionType + " compression"); SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, file, RandomDatum.class, RandomDatum.class, compressionType, codec); RandomDatum.Generator generator = new RandomDatum.Generator(seed); for (int i = 0; i < count; i++) { generator.next(); RandomDatum key = generator.getKey(); RandomDatum value = generator.getValue(); writer.append(key, value); } writer.close(); }
@SuppressWarnings("deprecation") private void writeMetadataTest(FileSystem fs, int count, int seed, Path file, CompressionType compressionType, CompressionCodec codec, SequenceFile.Metadata metadata) throws IOException { fs.delete(file, true); LOG.info("creating " + count + " records with metadata and with " + compressionType + " compression"); SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, file, RandomDatum.class, RandomDatum.class, compressionType, codec, null, metadata); RandomDatum.Generator generator = new RandomDatum.Generator(seed); for (int i = 0; i < count; i++) { generator.next(); RandomDatum key = generator.getKey(); RandomDatum value = generator.getValue(); writer.append(key, value); } writer.close(); }
@SuppressWarnings("deprecation") @Test public void testRecursiveSeqFileCreate() throws IOException { FileSystem fs = FileSystem.getLocal(conf); Path name = new Path(new Path(System.getProperty("test.build.data","."), "recursiveCreateDir") , "file"); boolean createParent = false; try { SequenceFile.createWriter(fs, conf, name, RandomDatum.class, RandomDatum.class, 512, (short) 1, 4096, createParent, CompressionType.NONE, null, new Metadata()); fail("Expected an IOException due to missing parent"); } catch (IOException ioe) { // Expected } createParent = true; SequenceFile.createWriter(fs, conf, name, RandomDatum.class, RandomDatum.class, 512, (short) 1, 4096, createParent, CompressionType.NONE, null, new Metadata()); // should succeed, fails if exception thrown }
@Override public void open(String filePath, CompressionCodec codeC, CompressionType compType) throws IOException { Configuration conf = new Configuration(); Path dstPath = new Path(filePath); FileSystem hdfs = dstPath.getFileSystem(conf); open(dstPath, codeC, compType, conf, hdfs); }
protected void open(Path dstPath, CompressionCodec codeC, CompressionType compType, Configuration conf, FileSystem hdfs) throws IOException { if (useRawLocalFileSystem) { if (hdfs instanceof LocalFileSystem) { hdfs = ((LocalFileSystem)hdfs).getRaw(); } else { logger.warn("useRawLocalFileSystem is set to true but file system " + "is not of type LocalFileSystem: " + hdfs.getClass().getName()); } } if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile(dstPath)) { outStream = hdfs.append(dstPath); } else { outStream = hdfs.create(dstPath); } writer = SequenceFile.createWriter(conf, outStream, serializer.getKeyClass(), serializer.getValueClass(), compType, codeC); registerCurrentStream(outStream, hdfs, dstPath); }
@Test public void testEventCountingRoller() throws IOException, InterruptedException { int maxEvents = 100; MockHDFSWriter hdfsWriter = new MockHDFSWriter(); BucketWriter bucketWriter = new BucketWriter( 0, 0, maxEvents, 0, ctx, "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); for (int i = 0; i < 1000; i++) { bucketWriter.append(e); } logger.info("Number of events written: {}", hdfsWriter.getEventsWritten()); logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten()); logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened()); Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten()); Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten()); Assert.assertEquals("files opened", 10, hdfsWriter.getFilesOpened()); }
@Test public void testSizeRoller() throws IOException, InterruptedException { int maxBytes = 300; MockHDFSWriter hdfsWriter = new MockHDFSWriter(); BucketWriter bucketWriter = new BucketWriter( 0, maxBytes, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); for (int i = 0; i < 1000; i++) { bucketWriter.append(e); } logger.info("Number of events written: {}", hdfsWriter.getEventsWritten()); logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten()); logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened()); Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten()); Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten()); Assert.assertEquals("files opened", 10, hdfsWriter.getFilesOpened()); }
/** Create the named map for keys of the named class. * @deprecated Use Writer(Configuration, Path, Option...) instead. */ @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, Class<? extends WritableComparable> keyClass, Class valClass, CompressionType compress) throws IOException { this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), compression(compress)); }
@Test public void testInUsePrefix() throws IOException, InterruptedException { final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test final String PREFIX = "BRNO_IS_CITY_IN_CZECH_REPUBLIC"; MockHDFSWriter hdfsWriter = new MockHDFSWriter(); HDFSTextSerializer formatter = new HDFSTextSerializer(); BucketWriter bucketWriter = new BucketWriter( ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); bucketWriter.append(e); Assert.assertTrue("Incorrect in use prefix", hdfsWriter.getOpenedFilePath().contains(PREFIX)); }
@Test public void testCallbackOnClose() throws IOException, InterruptedException { final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test final String SUFFIX = "WELCOME_TO_THE_EREBOR"; final AtomicBoolean callbackCalled = new AtomicBoolean(false); MockHDFSWriter hdfsWriter = new MockHDFSWriter(); BucketWriter bucketWriter = new BucketWriter( ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, new HDFSEventSink.WriterCallback() { @Override public void run(String filePath) { callbackCalled.set(true); } }, "blah", 30000, Executors.newSingleThreadExecutor(), 0, 0); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); bucketWriter.append(e); bucketWriter.close(true); Assert.assertTrue(callbackCalled.get()); }
@Override public void open(String filePath, CompressionCodec codeC, CompressionType compType) throws IOException { super.open(filePath, codeC, compType); if (closed) { opened = true; } }
protected void open(Path dstPath, CompressionCodec codeC, CompressionType compType, Configuration conf, FileSystem hdfs) throws IOException { if(useRawLocalFileSystem) { if(hdfs instanceof LocalFileSystem) { hdfs = ((LocalFileSystem)hdfs).getRaw(); } else { logger.warn("useRawLocalFileSystem is set to true but file system " + "is not of type LocalFileSystem: " + hdfs.getClass().getName()); } } if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile (dstPath)) { outStream = hdfs.append(dstPath); } else { outStream = hdfs.create(dstPath); } writer = SequenceFile.createWriter(conf, outStream, serializer.getKeyClass(), serializer.getValueClass(), compType, codeC); registerCurrentStream(outStream, hdfs, dstPath); }
private void writeMetadataTest(FileSystem fs, int count, int seed, Path file, CompressionType compressionType, CompressionCodec codec, SequenceFile.Metadata metadata) throws IOException { fs.delete(file, true); LOG.info("creating " + count + " records with metadata and with " + compressionType + " compression"); SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, file, RandomDatum.class, RandomDatum.class, compressionType, codec, null, metadata); RandomDatum.Generator generator = new RandomDatum.Generator(seed); for (int i = 0; i < count; i++) { generator.next(); RandomDatum key = generator.getKey(); RandomDatum value = generator.getValue(); writer.append(key, value); } writer.close(); }
private void createInputFile(String rootName) throws IOException { cleanup(); // clean up if previous run failed Path inputFile = new Path(MAP_INPUT_DIR, "in_file"); SequenceFile.Writer writer = SequenceFile.createWriter(fs, fsConfig, inputFile, Text.class, LongWritable.class, CompressionType.NONE); try { nrFiles = 0; listSubtree(new Path(rootName), writer); } finally { writer.close(); } LOG.info("Created map input files."); }
/** * Create control files before a test run. * Number of files created is equal to the number of maps specified * * @throws IOException on error */ private static void createControlFiles() throws IOException { FileSystem tempFS = FileSystem.get(config); LOG.info("Creating " + numberOfMaps + " control files"); for (int i = 0; i < numberOfMaps; i++) { String strFileName = "NNBench_Controlfile_" + i; Path filePath = new Path(new Path(baseDir, CONTROL_DIR_NAME), strFileName); SequenceFile.Writer writer = null; try { writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class, LongWritable.class, CompressionType.NONE); writer.append(new Text(strFileName), new LongWritable(0l)); } finally { if (writer != null) { writer.close(); } } } }
@SuppressWarnings("deprecation") void createTempFile(Path p, Configuration conf) throws IOException { SequenceFile.Writer writer = null; try { writer = SequenceFile.createWriter(fs, conf, p, Text.class, Text.class, CompressionType.NONE); writer.append(new Text("text"), new Text("moretext")); } catch(Exception e) { throw new IOException(e.getLocalizedMessage()); } finally { if (writer != null) { writer.close(); } writer = null; } LOG.info("created: " + p); }
@Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, Class<? extends WritableComparable> keyClass, Class<? extends Writable> valClass, CompressionType compress, CompressionCodec codec, Progressable progress) throws IOException { this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), compression(compress, codec), progressable(progress)); }
@Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, Class<? extends WritableComparable> keyClass, Class valClass, CompressionType compress, Progressable progress) throws IOException { this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), compression(compress), progressable(progress)); }
@Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, Class<? extends WritableComparable> keyClass, Class valClass, CompressionType compress) throws IOException { this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), compression(compress)); }
@Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, CompressionType compress, CompressionCodec codec, Progressable progress) throws IOException { this(conf, new Path(dirName), comparator(comparator), valueClass(valClass), compression(compress, codec), progressable(progress)); }
@Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, CompressionType compress, Progressable progress) throws IOException { this(conf, new Path(dirName), comparator(comparator), valueClass(valClass), compression(compress), progressable(progress)); }
@Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, CompressionType compress) throws IOException { this(conf, new Path(dirName), comparator(comparator), valueClass(valClass), compression(compress)); }
/** Create the named map for keys of the named class. * @deprecated Use Writer(Configuration, Path, Option...) instead. */ @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, Class<? extends WritableComparable> keyClass, Class valClass, CompressionType compress, Progressable progress) throws IOException { this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), compression(compress), progressable(progress)); }
/** * Test that makes sure createWriter succeeds on a file that was * already created * @throws IOException */ public void testCreateWriterOnExistingFile() throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.getLocal(conf); Path name = new Path(new Path(System.getProperty("test.build.data","."), "createWriterOnExistingFile") , "file"); fs.create(name); SequenceFile.createWriter(fs, conf, name, RandomDatum.class, RandomDatum.class, 512, (short) 1, 4096, false, CompressionType.NONE, null, new Metadata()); }
/** Create the named map using the named key comparator. * @deprecated Use Writer(Configuration, Path, Option...) instead. */ @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, SequenceFile.CompressionType compress) throws IOException { this(conf, new Path(dirName), comparator(comparator), valueClass(valClass), compression(compress)); }
/** Create the named map using the named key comparator. * @deprecated Use Writer(Configuration, Path, Option...)} instead. */ @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, SequenceFile.CompressionType compress, Progressable progress) throws IOException { this(conf, new Path(dirName), comparator(comparator), valueClass(valClass), compression(compress), progressable(progress)); }
/** Create the named map using the named key comparator. * @deprecated Use Writer(Configuration, Path, Option...) instead. */ @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, SequenceFile.CompressionType compress, CompressionCodec codec, Progressable progress) throws IOException { this(conf, new Path(dirName), comparator(comparator), valueClass(valClass), compression(compress, codec), progressable(progress)); }
/** Create the named file for values of the named class. */ public Writer(Configuration conf, FileSystem fs, String file, Class<? extends Writable> valClass, CompressionType compress, Progressable progress) throws IOException { super(conf, new Path(file), keyClass(LongWritable.class), valueClass(valClass), compression(compress), progressable(progress)); }
private static void createMapFile(Configuration conf, FileSystem fs, Path path, CompressionCodec codec, CompressionType type, int records) throws IOException { MapFile.Writer writer = new MapFile.Writer(conf, path, MapFile.Writer.keyClass(Text.class), MapFile.Writer.valueClass(Text.class), MapFile.Writer.compression(type, codec)); Text key = new Text(); for (int j = 0; j < records; j++) { key.set(String.format("%03d", j)); writer.append(key, key); } writer.close(); }
@Test public void testSetFile() throws Exception { FileSystem fs = FileSystem.getLocal(conf); try { RandomDatum[] data = generate(10000); writeTest(fs, data, FILE, CompressionType.NONE); readTest(fs, data, FILE); writeTest(fs, data, FILE, CompressionType.BLOCK); readTest(fs, data, FILE); } finally { fs.close(); } }
private static void writeTest(FileSystem fs, RandomDatum[] data, String file, CompressionType compress) throws IOException { MapFile.delete(fs, file); LOG.info("creating with " + data.length + " records"); SetFile.Writer writer = new SetFile.Writer(conf, fs, file, WritableComparator.get(RandomDatum.class), compress); for (int i = 0; i < data.length; i++) writer.append(data[i]); writer.close(); }
@Test(timeout = 30000) public void testAppendRecordCompression() throws Exception { GenericTestUtils.assumeInNativeProfile(); Path file = new Path(ROOT_PATH, "testseqappendblockcompr.seq"); fs.delete(file, true); Option compressOption = Writer.compression(CompressionType.RECORD, new GzipCodec()); Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), SequenceFile.Writer.keyClass(Long.class), SequenceFile.Writer.valueClass(String.class), compressOption); writer.append(1L, "one"); writer.append(2L, "two"); writer.close(); verify2Values(file); writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), SequenceFile.Writer.keyClass(Long.class), SequenceFile.Writer.valueClass(String.class), SequenceFile.Writer.appendIfExists(true), compressOption); writer.append(3L, "three"); writer.append(4L, "four"); writer.close(); verifyAll4Values(file); fs.deleteOnExit(file); }
/** * Test that makes sure createWriter succeeds on a file that was * already created * @throws IOException */ @SuppressWarnings("deprecation") @Test public void testCreateWriterOnExistingFile() throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.getLocal(conf); Path name = new Path(new Path(System.getProperty("test.build.data","."), "createWriterOnExistingFile") , "file"); fs.create(name); SequenceFile.createWriter(fs, conf, name, RandomDatum.class, RandomDatum.class, 512, (short) 1, 4096, false, CompressionType.NONE, null, new Metadata()); }
BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize, Context context, String filePath, String fileName, String inUsePrefix, String inUseSuffix, String fileSuffix, CompressionCodec codeC, CompressionType compType, HDFSWriter writer, ScheduledExecutorService timedRollerPool, PrivilegedExecutor proxyUser, SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback, String onCloseCallbackPath, long callTimeout, ExecutorService callTimeoutPool, long retryInterval, int maxCloseTries) { this.rollInterval = rollInterval; this.rollSize = rollSize; this.rollCount = rollCount; this.batchSize = batchSize; this.filePath = filePath; this.fileName = fileName; this.inUsePrefix = inUsePrefix; this.inUseSuffix = inUseSuffix; this.fileSuffix = fileSuffix; this.codeC = codeC; this.compType = compType; this.writer = writer; this.timedRollerPool = timedRollerPool; this.proxyUser = proxyUser; this.sinkCounter = sinkCounter; this.idleTimeout = idleTimeout; this.onCloseCallback = onCloseCallback; this.onCloseCallbackPath = onCloseCallbackPath; this.callTimeout = callTimeout; this.callTimeoutPool = callTimeoutPool; fileExtensionCounter = new AtomicLong(clock.currentTimeMillis()); this.retryInterval = retryInterval; this.maxRenameTries = maxCloseTries; isOpen = false; isUnderReplicated = false; this.writer.configure(context); }
@Test public void testFileSuffixNotGiven() throws IOException, InterruptedException { final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test final String suffix = null; MockHDFSWriter hdfsWriter = new MockHDFSWriter(); BucketWriter bucketWriter = new BucketWriter( ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); // Need to override system time use for test so we know what to expect final long testTime = System.currentTimeMillis(); Clock testClock = new Clock() { public long currentTimeMillis() { return testTime; } }; bucketWriter.setClock(testClock); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); bucketWriter.append(e); Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith( Long.toString(testTime + 1) + ".tmp")); }