Java 类org.apache.hadoop.io.SequenceFile.CompressionType 实例源码

项目:hadoop-oss    文件:TestCodec.java   
private void codecTestMapFile(Class<? extends CompressionCodec> clazz,
    CompressionType type, int records) throws Exception {

  FileSystem fs = FileSystem.get(conf);
  LOG.info("Creating MapFiles with " + records  + 
          " records using codec " + clazz.getSimpleName());
  Path path = new Path(new Path(
      System.getProperty("test.build.data", "/tmp")),
    clazz.getSimpleName() + "-" + type + "-" + records);

  LOG.info("Writing " + path);
  createMapFile(conf, fs, path, clazz.newInstance(), type, records);
  MapFile.Reader reader = new MapFile.Reader(path, conf);
  Text key1 = new Text("002");
  assertNotNull(reader.get(key1, new Text()));
  Text key2 = new Text("004");
  assertNotNull(reader.get(key2, new Text()));
}
项目:hadoop    文件:TestSequenceFile.java   
public void testRecursiveSeqFileCreate() throws IOException {
  FileSystem fs = FileSystem.getLocal(conf);
  Path name = new Path(new Path(System.getProperty("test.build.data","."),
      "recursiveCreateDir") , "file");
  boolean createParent = false;

  try {
    SequenceFile.createWriter(fs, conf, name, RandomDatum.class,
        RandomDatum.class, 512, (short) 1, 4096, createParent,
        CompressionType.NONE, null, new Metadata());
    fail("Expected an IOException due to missing parent");
  } catch (IOException ioe) {
    // Expected
  }

  createParent = true;
  SequenceFile.createWriter(fs, conf, name, RandomDatum.class,
      RandomDatum.class, 512, (short) 1, 4096, createParent,
      CompressionType.NONE, null, new Metadata());
  // should succeed, fails if exception thrown
}
项目:hadoop-oss    文件:TestSequenceFile.java   
@SuppressWarnings("deprecation")
private void writeTest(FileSystem fs, int count, int seed, Path file, 
                              CompressionType compressionType, CompressionCodec codec)
  throws IOException {
  fs.delete(file, true);
  LOG.info("creating " + count + " records with " + compressionType +
           " compression");
  SequenceFile.Writer writer = 
    SequenceFile.createWriter(fs, conf, file, 
                              RandomDatum.class, RandomDatum.class, compressionType, codec);
  RandomDatum.Generator generator = new RandomDatum.Generator(seed);
  for (int i = 0; i < count; i++) {
    generator.next();
    RandomDatum key = generator.getKey();
    RandomDatum value = generator.getValue();

    writer.append(key, value);
  }
  writer.close();
}
项目:hadoop-oss    文件:TestSequenceFile.java   
@SuppressWarnings("deprecation")
private void writeMetadataTest(FileSystem fs, int count, int seed, Path file, 
                                      CompressionType compressionType, CompressionCodec codec, SequenceFile.Metadata metadata)
  throws IOException {
  fs.delete(file, true);
  LOG.info("creating " + count + " records with metadata and with " + compressionType +
           " compression");
  SequenceFile.Writer writer = 
    SequenceFile.createWriter(fs, conf, file, 
                              RandomDatum.class, RandomDatum.class, compressionType, codec, null, metadata);
  RandomDatum.Generator generator = new RandomDatum.Generator(seed);
  for (int i = 0; i < count; i++) {
    generator.next();
    RandomDatum key = generator.getKey();
    RandomDatum value = generator.getValue();

    writer.append(key, value);
  }
  writer.close();
}
项目:hadoop-oss    文件:TestSequenceFile.java   
@SuppressWarnings("deprecation")
@Test
public void testRecursiveSeqFileCreate() throws IOException {
  FileSystem fs = FileSystem.getLocal(conf);
  Path name = new Path(new Path(System.getProperty("test.build.data","."),
      "recursiveCreateDir") , "file");
  boolean createParent = false;

  try {
    SequenceFile.createWriter(fs, conf, name, RandomDatum.class,
        RandomDatum.class, 512, (short) 1, 4096, createParent,
        CompressionType.NONE, null, new Metadata());
    fail("Expected an IOException due to missing parent");
  } catch (IOException ioe) {
    // Expected
  }

  createParent = true;
  SequenceFile.createWriter(fs, conf, name, RandomDatum.class,
      RandomDatum.class, 512, (short) 1, 4096, createParent,
      CompressionType.NONE, null, new Metadata());
  // should succeed, fails if exception thrown
}
项目:flume-release-1.7.0    文件:HDFSSequenceFile.java   
@Override
public void open(String filePath, CompressionCodec codeC,
    CompressionType compType) throws IOException {
  Configuration conf = new Configuration();
  Path dstPath = new Path(filePath);
  FileSystem hdfs = dstPath.getFileSystem(conf);
  open(dstPath, codeC, compType, conf, hdfs);
}
项目:flume-release-1.7.0    文件:HDFSSequenceFile.java   
protected void open(Path dstPath, CompressionCodec codeC,
    CompressionType compType, Configuration conf, FileSystem hdfs)
        throws IOException {
  if (useRawLocalFileSystem) {
    if (hdfs instanceof LocalFileSystem) {
      hdfs = ((LocalFileSystem)hdfs).getRaw();
    } else {
      logger.warn("useRawLocalFileSystem is set to true but file system " +
          "is not of type LocalFileSystem: " + hdfs.getClass().getName());
    }
  }
  if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile(dstPath)) {
    outStream = hdfs.append(dstPath);
  } else {
    outStream = hdfs.create(dstPath);
  }
  writer = SequenceFile.createWriter(conf, outStream,
      serializer.getKeyClass(), serializer.getValueClass(), compType, codeC);

  registerCurrentStream(outStream, hdfs, dstPath);
}
项目:flume-release-1.7.0    文件:TestBucketWriter.java   
@Test
public void testEventCountingRoller() throws IOException, InterruptedException {
  int maxEvents = 100;
  MockHDFSWriter hdfsWriter = new MockHDFSWriter();
  BucketWriter bucketWriter = new BucketWriter(
      0, 0, maxEvents, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
      SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
      new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
      Executors.newSingleThreadExecutor(), 0, 0);

  Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
  for (int i = 0; i < 1000; i++) {
    bucketWriter.append(e);
  }

  logger.info("Number of events written: {}", hdfsWriter.getEventsWritten());
  logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten());
  logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened());

  Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten());
  Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten());
  Assert.assertEquals("files opened", 10, hdfsWriter.getFilesOpened());
}
项目:flume-release-1.7.0    文件:TestBucketWriter.java   
@Test
public void testSizeRoller() throws IOException, InterruptedException {
  int maxBytes = 300;
  MockHDFSWriter hdfsWriter = new MockHDFSWriter();
  BucketWriter bucketWriter = new BucketWriter(
      0, maxBytes, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
      SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
      new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
      Executors.newSingleThreadExecutor(), 0, 0);

  Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
  for (int i = 0; i < 1000; i++) {
    bucketWriter.append(e);
  }

  logger.info("Number of events written: {}", hdfsWriter.getEventsWritten());
  logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten());
  logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened());

  Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten());
  Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten());
  Assert.assertEquals("files opened", 10, hdfsWriter.getFilesOpened());
}
项目:hadoop    文件:MapFile.java   
/** Create the named map for keys of the named class. 
 * @deprecated Use Writer(Configuration, Path, Option...) instead.
 */
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
              Class<? extends WritableComparable> keyClass, Class valClass,
              CompressionType compress) throws IOException {
  this(conf, new Path(dirName), keyClass(keyClass),
       valueClass(valClass), compression(compress));
}
项目:flume-release-1.7.0    文件:TestBucketWriter.java   
@Test
public void testInUsePrefix() throws IOException, InterruptedException {
  final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
  final String PREFIX = "BRNO_IS_CITY_IN_CZECH_REPUBLIC";

  MockHDFSWriter hdfsWriter = new MockHDFSWriter();
  HDFSTextSerializer formatter = new HDFSTextSerializer();
  BucketWriter bucketWriter = new BucketWriter(
      ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null,
      SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
      new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
      Executors.newSingleThreadExecutor(), 0, 0);

  Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
  bucketWriter.append(e);

  Assert.assertTrue("Incorrect in use prefix", hdfsWriter.getOpenedFilePath().contains(PREFIX));
}
项目:flume-release-1.7.0    文件:TestBucketWriter.java   
@Test
public void testCallbackOnClose() throws IOException, InterruptedException {
  final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
  final String SUFFIX = "WELCOME_TO_THE_EREBOR";
  final AtomicBoolean callbackCalled = new AtomicBoolean(false);

  MockHDFSWriter hdfsWriter = new MockHDFSWriter();
  BucketWriter bucketWriter = new BucketWriter(
      ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null,
      SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
      new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
      new HDFSEventSink.WriterCallback() {
        @Override
        public void run(String filePath) {
          callbackCalled.set(true);
        }
      }, "blah", 30000, Executors.newSingleThreadExecutor(), 0, 0);

  Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
  bucketWriter.append(e);
  bucketWriter.close(true);

  Assert.assertTrue(callbackCalled.get());
}
项目:flume-release-1.7.0    文件:HDFSTestSeqWriter.java   
@Override
public void open(String filePath, CompressionCodec codeC, CompressionType compType)
    throws IOException {
  super.open(filePath, codeC, compType);
  if (closed) {
    opened = true;
  }
}
项目:Transwarp-Sample-Code    文件:HDFSSequenceFile.java   
@Override
public void open(String filePath, CompressionCodec codeC,
    CompressionType compType) throws IOException {
  Configuration conf = new Configuration();
  Path dstPath = new Path(filePath);
  FileSystem hdfs = dstPath.getFileSystem(conf);
  open(dstPath, codeC, compType, conf, hdfs);
}
项目:Transwarp-Sample-Code    文件:HDFSSequenceFile.java   
protected void open(Path dstPath, CompressionCodec codeC,
    CompressionType compType, Configuration conf, FileSystem hdfs)
        throws IOException {
  if(useRawLocalFileSystem) {
    if(hdfs instanceof LocalFileSystem) {
      hdfs = ((LocalFileSystem)hdfs).getRaw();
    } else {
      logger.warn("useRawLocalFileSystem is set to true but file system " +
          "is not of type LocalFileSystem: " + hdfs.getClass().getName());
    }
  }
  if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
          (dstPath)) {
    outStream = hdfs.append(dstPath);
  } else {
    outStream = hdfs.create(dstPath);
  }
  writer = SequenceFile.createWriter(conf, outStream,
      serializer.getKeyClass(), serializer.getValueClass(), compType, codeC);

  registerCurrentStream(outStream, hdfs, dstPath);
}
项目:hadoop    文件:TestSequenceFile.java   
private void writeMetadataTest(FileSystem fs, int count, int seed, Path file, 
                                      CompressionType compressionType, CompressionCodec codec, SequenceFile.Metadata metadata)
  throws IOException {
  fs.delete(file, true);
  LOG.info("creating " + count + " records with metadata and with " + compressionType +
           " compression");
  SequenceFile.Writer writer = 
    SequenceFile.createWriter(fs, conf, file, 
                              RandomDatum.class, RandomDatum.class, compressionType, codec, null, metadata);
  RandomDatum.Generator generator = new RandomDatum.Generator(seed);
  for (int i = 0; i < count; i++) {
    generator.next();
    RandomDatum key = generator.getKey();
    RandomDatum value = generator.getValue();

    writer.append(key, value);
  }
  writer.close();
}
项目:hadoop    文件:DistributedFSCheck.java   
private void createInputFile(String rootName) throws IOException {
  cleanup();  // clean up if previous run failed

  Path inputFile = new Path(MAP_INPUT_DIR, "in_file");
  SequenceFile.Writer writer =
    SequenceFile.createWriter(fs, fsConfig, inputFile, 
                              Text.class, LongWritable.class, CompressionType.NONE);

  try {
    nrFiles = 0;
    listSubtree(new Path(rootName), writer);
  } finally {
    writer.close();
  }
  LOG.info("Created map input files.");
}
项目:hadoop    文件:NNBench.java   
/**
 * Create control files before a test run.
 * Number of files created is equal to the number of maps specified
 * 
 * @throws IOException on error
 */
private static void createControlFiles() throws IOException {
  FileSystem tempFS = FileSystem.get(config);
  LOG.info("Creating " + numberOfMaps + " control files");

  for (int i = 0; i < numberOfMaps; i++) {
    String strFileName = "NNBench_Controlfile_" + i;
    Path filePath = new Path(new Path(baseDir, CONTROL_DIR_NAME),
            strFileName);

    SequenceFile.Writer writer = null;
    try {
      writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class, 
              LongWritable.class, CompressionType.NONE);
      writer.append(new Text(strFileName), new LongWritable(0l));
    } finally {
      if (writer != null) {
        writer.close();
      }
    }
  }
}
项目:hadoop    文件:TestClientDistributedCacheManager.java   
@SuppressWarnings("deprecation")
void createTempFile(Path p, Configuration conf) throws IOException {
  SequenceFile.Writer writer = null;
  try {
    writer = SequenceFile.createWriter(fs, conf, p,
                                       Text.class, Text.class,
                                       CompressionType.NONE);
    writer.append(new Text("text"), new Text("moretext"));
  } catch(Exception e) {
    throw new IOException(e.getLocalizedMessage());
  } finally {
    if (writer != null) {
      writer.close();
    }
    writer = null;
  }
  LOG.info("created: " + p);
}
项目:hadoop-oss    文件:BloomMapFile.java   
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
    Class<? extends WritableComparable> keyClass,
    Class<? extends Writable> valClass, CompressionType compress,
    CompressionCodec codec, Progressable progress) throws IOException {
  this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 
       compression(compress, codec), progressable(progress));
}
项目:hadoop-oss    文件:BloomMapFile.java   
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
    Class<? extends WritableComparable> keyClass,
    Class valClass, CompressionType compress,
    Progressable progress) throws IOException {
  this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 
       compression(compress), progressable(progress));
}
项目:hadoop-oss    文件:BloomMapFile.java   
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
    Class<? extends WritableComparable> keyClass,
    Class valClass, CompressionType compress)
    throws IOException {
  this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 
       compression(compress));
}
项目:hadoop-oss    文件:BloomMapFile.java   
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
    WritableComparator comparator, Class valClass,
    CompressionType compress, CompressionCodec codec, Progressable progress)
    throws IOException {
  this(conf, new Path(dirName), comparator(comparator), 
       valueClass(valClass), compression(compress, codec), 
       progressable(progress));
}
项目:hadoop-oss    文件:BloomMapFile.java   
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
    WritableComparator comparator, Class valClass,
    CompressionType compress, Progressable progress) throws IOException {
  this(conf, new Path(dirName), comparator(comparator), 
       valueClass(valClass), compression(compress),
       progressable(progress));
}
项目:hadoop-oss    文件:BloomMapFile.java   
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
    WritableComparator comparator, Class valClass, CompressionType compress)
    throws IOException {
  this(conf, new Path(dirName), comparator(comparator), 
       valueClass(valClass), compression(compress));
}
项目:hadoop-oss    文件:MapFile.java   
/** Create the named map for keys of the named class. 
 * @deprecated Use Writer(Configuration, Path, Option...) instead.
 */
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
              Class<? extends WritableComparable> keyClass, Class valClass,
              CompressionType compress, 
              Progressable progress) throws IOException {
  this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
       compression(compress), progressable(progress));
}
项目:hadoop    文件:TestSequenceFile.java   
/**
 * Test that makes sure createWriter succeeds on a file that was 
 * already created
 * @throws IOException
 */
public void testCreateWriterOnExistingFile() throws IOException {
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.getLocal(conf);
  Path name = new Path(new Path(System.getProperty("test.build.data","."),
      "createWriterOnExistingFile") , "file");

  fs.create(name);
  SequenceFile.createWriter(fs, conf, name, RandomDatum.class,
      RandomDatum.class, 512, (short) 1, 4096, false,
      CompressionType.NONE, null, new Metadata());
}
项目:hadoop-oss    文件:MapFile.java   
/** Create the named map for keys of the named class. 
 * @deprecated Use Writer(Configuration, Path, Option...) instead.
 */
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
              Class<? extends WritableComparable> keyClass, Class valClass,
              CompressionType compress) throws IOException {
  this(conf, new Path(dirName), keyClass(keyClass),
       valueClass(valClass), compression(compress));
}
项目:hadoop-oss    文件:MapFile.java   
/** Create the named map using the named key comparator. 
 * @deprecated Use Writer(Configuration, Path, Option...) instead.
 */
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
              WritableComparator comparator, Class valClass,
              SequenceFile.CompressionType compress) throws IOException {
  this(conf, new Path(dirName), comparator(comparator),
       valueClass(valClass), compression(compress));
}
项目:hadoop-oss    文件:MapFile.java   
/** Create the named map using the named key comparator. 
 * @deprecated Use Writer(Configuration, Path, Option...)} instead.
 */
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
              WritableComparator comparator, Class valClass,
              SequenceFile.CompressionType compress,
              Progressable progress) throws IOException {
  this(conf, new Path(dirName), comparator(comparator),
       valueClass(valClass), compression(compress),
       progressable(progress));
}
项目:hadoop-oss    文件:MapFile.java   
/** Create the named map using the named key comparator. 
 * @deprecated Use Writer(Configuration, Path, Option...) instead.
 */
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
              WritableComparator comparator, Class valClass,
              SequenceFile.CompressionType compress, CompressionCodec codec,
              Progressable progress) throws IOException {
  this(conf, new Path(dirName), comparator(comparator),
       valueClass(valClass), compression(compress, codec),
       progressable(progress));
}
项目:hadoop-oss    文件:ArrayFile.java   
/** Create the named file for values of the named class. */
public Writer(Configuration conf, FileSystem fs,
              String file, Class<? extends Writable> valClass,
              CompressionType compress, Progressable progress)
  throws IOException {
  super(conf, new Path(file), 
        keyClass(LongWritable.class), 
        valueClass(valClass), 
        compression(compress), 
        progressable(progress));
}
项目:hadoop-oss    文件:TestCodec.java   
private static void createMapFile(Configuration conf, FileSystem fs, Path path, 
    CompressionCodec codec, CompressionType type, int records) throws IOException {
  MapFile.Writer writer = 
      new MapFile.Writer(conf, path,
          MapFile.Writer.keyClass(Text.class),
          MapFile.Writer.valueClass(Text.class),
          MapFile.Writer.compression(type, codec));
  Text key = new Text();
  for (int j = 0; j < records; j++) {
      key.set(String.format("%03d", j));
      writer.append(key, key);
  }
  writer.close();
}
项目:hadoop-oss    文件:TestSetFile.java   
@Test
public void testSetFile() throws Exception {
  FileSystem fs = FileSystem.getLocal(conf);
  try {
    RandomDatum[] data = generate(10000);
    writeTest(fs, data, FILE, CompressionType.NONE);
    readTest(fs, data, FILE);

    writeTest(fs, data, FILE, CompressionType.BLOCK);
    readTest(fs, data, FILE);
  } finally {
    fs.close();
  }
}
项目:hadoop-oss    文件:TestSetFile.java   
private static void writeTest(FileSystem fs, RandomDatum[] data,
                              String file, CompressionType compress)
  throws IOException {
  MapFile.delete(fs, file);
  LOG.info("creating with " + data.length + " records");
  SetFile.Writer writer =
    new SetFile.Writer(conf, fs, file,
                       WritableComparator.get(RandomDatum.class),
                       compress);
  for (int i = 0; i < data.length; i++)
    writer.append(data[i]);
  writer.close();
}
项目:hadoop-oss    文件:TestSequenceFileAppend.java   
@Test(timeout = 30000)
public void testAppendRecordCompression() throws Exception {
  GenericTestUtils.assumeInNativeProfile();

  Path file = new Path(ROOT_PATH, "testseqappendblockcompr.seq");
  fs.delete(file, true);

  Option compressOption = Writer.compression(CompressionType.RECORD,
      new GzipCodec());
  Writer writer = SequenceFile.createWriter(conf,
      SequenceFile.Writer.file(file),
      SequenceFile.Writer.keyClass(Long.class),
      SequenceFile.Writer.valueClass(String.class), compressOption);

  writer.append(1L, "one");
  writer.append(2L, "two");
  writer.close();

  verify2Values(file);

  writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
      SequenceFile.Writer.keyClass(Long.class),
      SequenceFile.Writer.valueClass(String.class),
      SequenceFile.Writer.appendIfExists(true), compressOption);

  writer.append(3L, "three");
  writer.append(4L, "four");
  writer.close();

  verifyAll4Values(file);

  fs.deleteOnExit(file);
}
项目:hadoop-oss    文件:TestSequenceFile.java   
/**
 * Test that makes sure createWriter succeeds on a file that was 
 * already created
 * @throws IOException
 */
@SuppressWarnings("deprecation")
@Test
public void testCreateWriterOnExistingFile() throws IOException {
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.getLocal(conf);
  Path name = new Path(new Path(System.getProperty("test.build.data","."),
      "createWriterOnExistingFile") , "file");

  fs.create(name);
  SequenceFile.createWriter(fs, conf, name, RandomDatum.class,
      RandomDatum.class, 512, (short) 1, 4096, false,
      CompressionType.NONE, null, new Metadata());
}
项目:flume-release-1.7.0    文件:BucketWriter.java   
BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
    Context context, String filePath, String fileName, String inUsePrefix,
    String inUseSuffix, String fileSuffix, CompressionCodec codeC,
    CompressionType compType, HDFSWriter writer,
    ScheduledExecutorService timedRollerPool, PrivilegedExecutor proxyUser,
    SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback,
    String onCloseCallbackPath, long callTimeout,
    ExecutorService callTimeoutPool, long retryInterval,
    int maxCloseTries) {
  this.rollInterval = rollInterval;
  this.rollSize = rollSize;
  this.rollCount = rollCount;
  this.batchSize = batchSize;
  this.filePath = filePath;
  this.fileName = fileName;
  this.inUsePrefix = inUsePrefix;
  this.inUseSuffix = inUseSuffix;
  this.fileSuffix = fileSuffix;
  this.codeC = codeC;
  this.compType = compType;
  this.writer = writer;
  this.timedRollerPool = timedRollerPool;
  this.proxyUser = proxyUser;
  this.sinkCounter = sinkCounter;
  this.idleTimeout = idleTimeout;
  this.onCloseCallback = onCloseCallback;
  this.onCloseCallbackPath = onCloseCallbackPath;
  this.callTimeout = callTimeout;
  this.callTimeoutPool = callTimeoutPool;
  fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());

  this.retryInterval = retryInterval;
  this.maxRenameTries = maxCloseTries;
  isOpen = false;
  isUnderReplicated = false;
  this.writer.configure(context);
}
项目:hadoop    文件:MapFile.java   
/** Create the named map using the named key comparator. 
 * @deprecated Use Writer(Configuration, Path, Option...) instead.
 */
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
              WritableComparator comparator, Class valClass,
              SequenceFile.CompressionType compress, CompressionCodec codec,
              Progressable progress) throws IOException {
  this(conf, new Path(dirName), comparator(comparator),
       valueClass(valClass), compression(compress, codec),
       progressable(progress));
}
项目:flume-release-1.7.0    文件:TestBucketWriter.java   
@Test
public void testFileSuffixNotGiven() throws IOException, InterruptedException {
  final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
  final String suffix = null;

  MockHDFSWriter hdfsWriter = new MockHDFSWriter();
  BucketWriter bucketWriter = new BucketWriter(
      ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
      SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
      new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
      Executors.newSingleThreadExecutor(), 0, 0);

  // Need to override system time use for test so we know what to expect
  final long testTime = System.currentTimeMillis();
  Clock testClock = new Clock() {
    public long currentTimeMillis() {
      return testTime;
    }
  };
  bucketWriter.setClock(testClock);

  Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
  bucketWriter.append(e);

  Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(
      Long.toString(testTime + 1) + ".tmp"));
}