Java 类org.apache.hadoop.io.file.tfile.TFile 实例源码

项目:hadoop    文件:AggregatedLogFormat.java   
/**
 * Read the next key and return the value-stream.
 * 
 * @param key
 * @return the valueStream if there are more keys or null otherwise.
 * @throws IOException
 */
public DataInputStream next(LogKey key) throws IOException {
  if (!this.atBeginning) {
    this.scanner.advance();
  } else {
    this.atBeginning = false;
  }
  if (this.scanner.atEnd()) {
    return null;
  }
  TFile.Reader.Scanner.Entry entry = this.scanner.entry();
  key.readFields(entry.getKeyStream());
  // Skip META keys
  if (RESERVED_KEYS.containsKey(key.toString())) {
    return next(key);
  }
  DataInputStream valueStream = entry.getValueStream();
  return valueStream;
}
项目:aliyun-oss-hadoop-fs    文件:AggregatedLogFormat.java   
/**
 * Returns the owner of the application.
 * 
 * @return the application owner.
 * @throws IOException
 */
public String getApplicationOwner() throws IOException {
  TFile.Reader.Scanner ownerScanner = null;
  try {
    ownerScanner = reader.createScanner();
    LogKey key = new LogKey();
    while (!ownerScanner.atEnd()) {
      TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
      key.readFields(entry.getKeyStream());
      if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
        DataInputStream valueStream = entry.getValueStream();
        return valueStream.readUTF();
      }
      ownerScanner.advance();
    }
    return null;
  } finally {
    IOUtils.cleanup(LOG, ownerScanner);
  }
}
项目:aliyun-oss-hadoop-fs    文件:AggregatedLogFormat.java   
/**
 * Read the next key and return the value-stream.
 * 
 * @param key
 * @return the valueStream if there are more keys or null otherwise.
 * @throws IOException
 */
public DataInputStream next(LogKey key) throws IOException {
  if (!this.atBeginning) {
    this.scanner.advance();
  } else {
    this.atBeginning = false;
  }
  if (this.scanner.atEnd()) {
    return null;
  }
  TFile.Reader.Scanner.Entry entry = this.scanner.entry();
  key.readFields(entry.getKeyStream());
  // Skip META keys
  if (RESERVED_KEYS.containsKey(key.toString())) {
    return next(key);
  }
  DataInputStream valueStream = entry.getValueStream();
  return valueStream;
}
项目:big-c    文件:AggregatedLogFormat.java   
/**
 * Read the next key and return the value-stream.
 * 
 * @param key
 * @return the valueStream if there are more keys or null otherwise.
 * @throws IOException
 */
public DataInputStream next(LogKey key) throws IOException {
  if (!this.atBeginning) {
    this.scanner.advance();
  } else {
    this.atBeginning = false;
  }
  if (this.scanner.atEnd()) {
    return null;
  }
  TFile.Reader.Scanner.Entry entry = this.scanner.entry();
  key.readFields(entry.getKeyStream());
  // Skip META keys
  if (RESERVED_KEYS.containsKey(key.toString())) {
    return next(key);
  }
  DataInputStream valueStream = entry.getValueStream();
  return valueStream;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:AggregatedLogFormat.java   
/**
 * Read the next key and return the value-stream.
 * 
 * @param key
 * @return the valueStream if there are more keys or null otherwise.
 * @throws IOException
 */
public DataInputStream next(LogKey key) throws IOException {
  if (!this.atBeginning) {
    this.scanner.advance();
  } else {
    this.atBeginning = false;
  }
  if (this.scanner.atEnd()) {
    return null;
  }
  TFile.Reader.Scanner.Entry entry = this.scanner.entry();
  key.readFields(entry.getKeyStream());
  // Skip META keys
  if (RESERVED_KEYS.containsKey(key.toString())) {
    return next(key);
  }
  DataInputStream valueStream = entry.getValueStream();
  return valueStream;
}
项目:hadoop-plus    文件:AggregatedLogFormat.java   
/**
 * Read the next key and return the value-stream.
 * 
 * @param key
 * @return the valueStream if there are more keys or null otherwise.
 * @throws IOException
 */
public DataInputStream next(LogKey key) throws IOException {
  if (!this.atBeginning) {
    this.scanner.advance();
  } else {
    this.atBeginning = false;
  }
  if (this.scanner.atEnd()) {
    return null;
  }
  TFile.Reader.Scanner.Entry entry = this.scanner.entry();
  key.readFields(entry.getKeyStream());
  // Skip META keys
  if (RESERVED_KEYS.containsKey(key.toString())) {
    return next(key);
  }
  DataInputStream valueStream = entry.getValueStream();
  return valueStream;
}
项目:Megh    文件:HadoopFilePerformanceTest.java   
public void writeTFile(Path file, String cname) throws Exception
{


  FSDataOutputStream fos = hdfs.create(file);

  TFile.Writer writer =
      new TFile.Writer(fos, blockSize, cname, "jclass:" +
      BytesWritable.Comparator.class.getName(), new Configuration());

  for (int i = 0; i < testSize; i++) {
    String k = getKey(i);
    String v = getValue();
    writer.append(k.getBytes(), v.getBytes());
  }

  writer.close();
  fos.close();
}
项目:Megh    文件:HadoopFilePerformanceTest.java   
@Test
public void testTFileWrite() throws Exception
{
  Path file = Testfile.TFILE.filepath();
  logger.info("Writing {} with {} key/value pairs", file, String.format("%,d", testSize));

  startTimer();
  writeTFile(file, TFile.COMPRESSION_NONE);
  logger.info("Duration: {}",  stopTimer(Testfile.TFILE, "WRITE"));

  Assert.assertTrue(hdfs.exists(file));
  ContentSummary fileInfo = hdfs.getContentSummary(file);
  logger.debug("Space consumed: {} bytes in {} files",
      String.format("%,d", fileInfo.getSpaceConsumed()),
      String.format("%,d", fileInfo.getFileCount()));
}
项目:Megh    文件:HadoopFilePerformanceTest.java   
@Test
public void testTFileWriteGZ() throws Exception
{
  Path file = Testfile.TFILE_GZ.filepath();
  logger.info("Writing {} with {} key/value pairs", file, String.format("%,d", testSize));

  startTimer();
  writeTFile(file, TFile.COMPRESSION_GZ);
  logger.info("Duration: {}",  stopTimer(Testfile.TFILE_GZ, "WRITE"));

  Assert.assertTrue(hdfs.exists(file));
  ContentSummary fileInfo = hdfs.getContentSummary(file);
  logger.debug("Space consumed: {} bytes in {} files",
      String.format("%,d", fileInfo.getSpaceConsumed()),
      String.format("%,d", fileInfo.getFileCount()));
}
项目:Megh    文件:HadoopFilePerformanceTest.java   
@Test
public void testTFileRead() throws Exception
{

  Path file = Testfile.TFILE.filepath();
  logger.info("Reading {} with {} key/value pairs", file, String.format("%,d", testSize));
  writeTFile(file, TFile.COMPRESSION_NONE);

  startTimer();
  readTFileSeq(file);
  logger.info("Duration for scanner.next() SEQUENTIAL keys: {}", stopTimer(Testfile.TFILE, "READ-SEQ"));

  startTimer();
  readTFileSeqId(file);
  logger.info("Duration for scanner.seekTo(key) SEQUENTIAL keys: {}", stopTimer(Testfile.TFILE, "READ-SEQ-ID"));

  startTimer();
  readTFileRandom(file);
  logger.info("Duration for scanner.seekTo(key) RANDOM keys: {}", stopTimer(Testfile.TFILE, "READ-RAND"));

}
项目:Megh    文件:HadoopFilePerformanceTest.java   
@Test
public void testTFileReadGZ() throws Exception
{

  Path file = Testfile.TFILE_GZ.filepath();
  logger.info("Reading {} with {} key/value pairs", file, String.format("%,d", testSize));
  writeTFile(file, TFile.COMPRESSION_GZ);

  startTimer();
  readTFileSeq(file);
  logger.info("Duration for scanner.next() SEQUENTIAL keys: {}", stopTimer(Testfile.TFILE_GZ, "READ-SEQ"));

  startTimer();
  readTFileSeqId(file);
  logger.info("Duration for scanner.seekTo(key) SEQUENTIAL keys: {}", stopTimer(Testfile.TFILE_GZ, "READ-SEQ-ID"));

  startTimer();
  readTFileRandom(file);
  logger.info("Duration for scanner.seekTo(key) RANDOM keys: {}",  stopTimer(Testfile.TFILE_GZ, "READ-RAND"));

}
项目:Megh    文件:HadoopFilePerformanceTest.java   
private void readTFileRandom(Path file) throws IOException
  {

    Random random = new Random();

    FSDataInputStream in = hdfs.open(file);
    long size = hdfs.getContentSummary(file).getLength();
    TFile.Reader reader = new TFile.Reader(in, size, new Configuration());
    Scanner scanner = reader.createScanner();
    scanner.rewind();

    for (int i = 0; i < testSize; i++) {
//      scanner.rewind();
      scanner.seekTo(getKey(random.nextInt(testSize)).getBytes());
//      Entry en = scanner.entry();
//      en.get(new BytesWritable(new byte[en.getKeyLength()]), new BytesWritable(new byte[en.getValueLength()]));
    }
    reader.close();


  }
项目:Megh    文件:HadoopFilePerformanceTest.java   
private void readTFileSeqId(Path file) throws IOException
{

  FSDataInputStream in = hdfs.open(file);
  long size = hdfs.getContentSummary(file).getLength();
  TFile.Reader reader = new TFile.Reader(in, size, new Configuration());
  Scanner scanner = reader.createScanner();
  scanner.rewind();

  for (int i = 0; i < testSize; i++) {
    scanner.seekTo(getKey(i).getBytes());
    Entry en = scanner.entry();
    en.get(new BytesWritable(new byte[en.getKeyLength()]), new BytesWritable(new byte[en.getValueLength()]));
  }
  reader.close();

}
项目:Megh    文件:HadoopFilePerformanceTest.java   
private void readTFileSeq(Path file) throws IOException
{

  FSDataInputStream in = hdfs.open(file);
  long size = hdfs.getContentSummary(file).getLength();
  TFile.Reader reader = new TFile.Reader(in, size, new Configuration());
  Scanner scanner = reader.createScanner();
  scanner.rewind();
  do {
    Entry en = scanner.entry();
    en.get(new BytesWritable(new byte[en.getKeyLength()]), new BytesWritable(new byte[en.getValueLength()]));
  } while (scanner.advance() && !scanner.atEnd());

  reader.close();

}
项目:Megh    文件:HadoopFilePerformanceTest.java   
@Test
public void testDTFileRead() throws Exception
{

  Path file = Testfile.DTFILE.filepath();
  logger.info("Reading {} with {} key/value pairs", file, String.format("%,d", testSize));
  writeTFile(file, TFile.COMPRESSION_NONE);

  startTimer();
  readDTFileSeq(file);
  logger.info("Duration for scanner.next() SEQUENTIAL keys: {}", stopTimer(Testfile.DTFILE, "READ-SEQ"));

  startTimer();
  readDTFileSeq(file);
  logger.info("Duration for scanner.seekTo(key) SEQUENTIAL keys: {}", stopTimer(Testfile.DTFILE, "READ-SEQ-ID"));

  startTimer();
  readDTFileRandom(file);
  logger.info("Duration for scanner.seekTo(key) RANDOM keys: {}", stopTimer(Testfile.DTFILE, "READ-RAND"));

}
项目:Megh    文件:HadoopFilePerformanceTest.java   
@Test
public void testDTFileReadGZ() throws Exception
{

  Path file = Testfile.DTFILE_GZ.filepath();
  logger.info("Reading {} with {} key/value pairs", file, String.format("%,d", testSize));
  writeTFile(file, TFile.COMPRESSION_GZ);

  startTimer();
  readDTFileSeq(file);
  logger.info("Duration for scanner.next() SEQUENTIAL keys: {}", stopTimer(Testfile.DTFILE_GZ, "READ-SEQ"));

  startTimer();
  readDTFileSeqId(file);
  logger.info("Duration for scanner.seekTo(key) SEQUENTIAL keys: {}", stopTimer(Testfile.DTFILE_GZ, "READ-SEQ-ID"));

  startTimer();
  readDTFileRandom(file);
  logger.info("Duration for scanner.seekTo(key) RANDOM keys: {}",  stopTimer(Testfile.DTFILE_GZ, "READ-RAND"));

}
项目:hopsworks    文件:LogReader.java   
/**
   * Read the next key and return the value-stream.
   *
   * @param key
   * @return the valueStream if there are more keys or null otherwise.
   * @throws IOException
   */
  public DataInputStream next(LogKey key) throws IOException {
    if (!this.atBeginning) {
      this.scanner.advance();
    } else {
      this.atBeginning = false;
    }
    if (this.scanner.atEnd()) {
      return null;
    }
    TFile.Reader.Scanner.Entry entry = this.scanner.entry();
    key.readFields(entry.getKeyStream());
//     Skip META keys
    if (RESERVED_KEYS.containsKey(key.toString())) {
      return next(key);
    }
    DataInputStream valueStream = entry.getValueStream();
    return valueStream;
  }
项目:hops    文件:AggregatedLogFormat.java   
/**
 * Returns the owner of the application.
 * 
 * @return the application owner.
 * @throws IOException
 */
public String getApplicationOwner() throws IOException {
  TFile.Reader.Scanner ownerScanner = null;
  try {
    ownerScanner = reader.createScanner();
    LogKey key = new LogKey();
    while (!ownerScanner.atEnd()) {
      TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
      key.readFields(entry.getKeyStream());
      if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
        DataInputStream valueStream = entry.getValueStream();
        return valueStream.readUTF();
      }
      ownerScanner.advance();
    }
    return null;
  } finally {
    IOUtils.cleanup(LOG, ownerScanner);
  }
}
项目:hops    文件:AggregatedLogFormat.java   
/**
 * Read the next key and return the value-stream.
 * 
 * @param key
 * @return the valueStream if there are more keys or null otherwise.
 * @throws IOException
 */
public DataInputStream next(LogKey key) throws IOException {
  if (!this.atBeginning) {
    this.scanner.advance();
  } else {
    this.atBeginning = false;
  }
  if (this.scanner.atEnd()) {
    return null;
  }
  TFile.Reader.Scanner.Entry entry = this.scanner.entry();
  key.readFields(entry.getKeyStream());
  // Skip META keys
  if (RESERVED_KEYS.containsKey(key.toString())) {
    return next(key);
  }
  DataInputStream valueStream = entry.getValueStream();
  return valueStream;
}
项目:hops    文件:FileSystemApplicationHistoryStore.java   
public HistoryFileWriter(Path historyFile) throws IOException {
  if (fs.exists(historyFile)) {
    fsdos = fs.append(historyFile);
  } else {
    fsdos = fs.create(historyFile);
  }
  try {
    fs.setPermission(historyFile, HISTORY_FILE_UMASK);
    writer =
        new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
            YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE,
            YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null,
            getConfig());
  } catch (IOException e) {
    IOUtils.cleanup(LOG, fsdos);
    throw e;
  }
}
项目:hadoop-TCP    文件:AggregatedLogFormat.java   
/**
 * Read the next key and return the value-stream.
 * 
 * @param key
 * @return the valueStream if there are more keys or null otherwise.
 * @throws IOException
 */
public DataInputStream next(LogKey key) throws IOException {
  if (!this.atBeginning) {
    this.scanner.advance();
  } else {
    this.atBeginning = false;
  }
  if (this.scanner.atEnd()) {
    return null;
  }
  TFile.Reader.Scanner.Entry entry = this.scanner.entry();
  key.readFields(entry.getKeyStream());
  // Skip META keys
  if (RESERVED_KEYS.containsKey(key.toString())) {
    return next(key);
  }
  DataInputStream valueStream = entry.getValueStream();
  return valueStream;
}
项目:hardfs    文件:AggregatedLogFormat.java   
/**
 * Read the next key and return the value-stream.
 * 
 * @param key
 * @return the valueStream if there are more keys or null otherwise.
 * @throws IOException
 */
public DataInputStream next(LogKey key) throws IOException {
  if (!this.atBeginning) {
    this.scanner.advance();
  } else {
    this.atBeginning = false;
  }
  if (this.scanner.atEnd()) {
    return null;
  }
  TFile.Reader.Scanner.Entry entry = this.scanner.entry();
  key.readFields(entry.getKeyStream());
  // Skip META keys
  if (RESERVED_KEYS.containsKey(key.toString())) {
    return next(key);
  }
  DataInputStream valueStream = entry.getValueStream();
  return valueStream;
}
项目:hadoop-on-lustre2    文件:AggregatedLogFormat.java   
/**
 * Read the next key and return the value-stream.
 * 
 * @param key
 * @return the valueStream if there are more keys or null otherwise.
 * @throws IOException
 */
public DataInputStream next(LogKey key) throws IOException {
  if (!this.atBeginning) {
    this.scanner.advance();
  } else {
    this.atBeginning = false;
  }
  if (this.scanner.atEnd()) {
    return null;
  }
  TFile.Reader.Scanner.Entry entry = this.scanner.entry();
  key.readFields(entry.getKeyStream());
  // Skip META keys
  if (RESERVED_KEYS.containsKey(key.toString())) {
    return next(key);
  }
  DataInputStream valueStream = entry.getValueStream();
  return valueStream;
}
项目:reef    文件:TFileParser.java   
/**
 * @param path
 * @return
 * @throws IOException
 */
private TFile.Reader.Scanner getScanner(final Path path) throws IOException {
  LOG.log(Level.FINE, "Creating Scanner for path {0}", path);
  final TFile.Reader reader = new TFile.Reader(this.fileSystem.open(path),
      this.fileSystem.getFileStatus(path).getLen(),
      this.configuration);
  final TFile.Reader.Scanner scanner = reader.createScanner();
  for (int counter = 0;
       counter < 3 && !scanner.atEnd();
       counter += 1) {
    //skip VERSION, APPLICATION_ACL, and APPLICATION_OWNER
    scanner.advance();
  }
  LOG.log(Level.FINE, "Created Scanner for path {0}", path);
  return scanner;
}
项目:tez    文件:TFileRecordReader.java   
private void populateKV(TFile.Reader.Scanner.Entry entry) throws IOException {
  entry.getKey(keyBytesWritable);
  //splitpath contains the machine name. Create the key as splitPath + realKey
  String keyStr = new StringBuilder()
      .append(splitPath.getName()).append(":")
      .append(new String(keyBytesWritable.getBytes()))
      .toString();

  /**
   * In certain cases, values can be huge (files > 2 GB). Stream is
   * better to handle such scenarios.
   */
  currentValueReader = new BufferedReader(
      new InputStreamReader(entry.getValueStream()));
  key.set(keyStr);
  String line = currentValueReader.readLine();
  value.set((line == null) ? "" : line);
}
项目:hadoop    文件:AggregatedLogFormat.java   
public LogWriter(final Configuration conf, final Path remoteAppLogFile,
    UserGroupInformation userUgi) throws IOException {
  try {
    this.fsDataOStream =
        userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
          @Override
          public FSDataOutputStream run() throws Exception {
            fc = FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
            fc.setUMask(APP_LOG_FILE_UMASK);
            return fc.create(
                remoteAppLogFile,
                EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
                new Options.CreateOpts[] {});
          }
        });
  } catch (InterruptedException e) {
    throw new IOException(e);
  }

  // Keys are not sorted: null arg
  // 256KB minBlockSize : Expected log size for each container too
  this.writer =
      new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
          YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
          YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
  //Write the version string
  writeVersion();
}
项目:hadoop    文件:AggregatedLogFormat.java   
public LogReader(Configuration conf, Path remoteAppLogFile)
    throws IOException {
  FileContext fileContext =
      FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
  this.fsDataIStream = fileContext.open(remoteAppLogFile);
  reader =
      new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
          remoteAppLogFile).getLen(), conf);
  this.scanner = reader.createScanner();
}
项目:hadoop    文件:AggregatedLogFormat.java   
/**
 * Returns the owner of the application.
 * 
 * @return the application owner.
 * @throws IOException
 */
public String getApplicationOwner() throws IOException {
  TFile.Reader.Scanner ownerScanner = reader.createScanner();
  LogKey key = new LogKey();
  while (!ownerScanner.atEnd()) {
    TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
    key.readFields(entry.getKeyStream());
    if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
      DataInputStream valueStream = entry.getValueStream();
      return valueStream.readUTF();
    }
    ownerScanner.advance();
  }
  return null;
}
项目:hadoop    文件:FileSystemApplicationHistoryStore.java   
public HistoryFileReader(Path historyFile) throws IOException {
  fsdis = fs.open(historyFile);
  reader =
      new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(),
        getConfig());
  reset();
}
项目:hadoop    文件:FileSystemApplicationHistoryStore.java   
public Entry next() throws IOException {
  TFile.Reader.Scanner.Entry entry = scanner.entry();
  DataInputStream dis = entry.getKeyStream();
  HistoryDataKey key = new HistoryDataKey();
  key.readFields(dis);
  dis = entry.getValueStream();
  byte[] value = new byte[entry.getValueLength()];
  dis.read(value);
  scanner.advance();
  return new Entry(key, value);
}
项目:hadoop    文件:FileSystemApplicationHistoryStore.java   
public HistoryFileWriter(Path historyFile) throws IOException {
  if (fs.exists(historyFile)) {
    fsdos = fs.append(historyFile);
  } else {
    fsdos = fs.create(historyFile);
  }
  fs.setPermission(historyFile, HISTORY_FILE_UMASK);
  writer =
      new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
        YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE,
        YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null,
        getConfig());
}
项目:aliyun-oss-hadoop-fs    文件:AggregatedLogFormat.java   
public LogWriter(final Configuration conf, final Path remoteAppLogFile,
    UserGroupInformation userUgi) throws IOException {
  try {
    this.fsDataOStream =
        userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
          @Override
          public FSDataOutputStream run() throws Exception {
            fc = FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
            fc.setUMask(APP_LOG_FILE_UMASK);
            return fc.create(
                remoteAppLogFile,
                EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
                new Options.CreateOpts[] {});
          }
        });
  } catch (InterruptedException e) {
    throw new IOException(e);
  }

  // Keys are not sorted: null arg
  // 256KB minBlockSize : Expected log size for each container too
  this.writer =
      new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
          YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
          YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
  //Write the version string
  writeVersion();
}
项目:aliyun-oss-hadoop-fs    文件:AggregatedLogFormat.java   
public LogReader(Configuration conf, Path remoteAppLogFile)
    throws IOException {
  FileContext fileContext =
      FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
  this.fsDataIStream = fileContext.open(remoteAppLogFile);
  reader =
      new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
          remoteAppLogFile).getLen(), conf);
  this.scanner = reader.createScanner();
}
项目:aliyun-oss-hadoop-fs    文件:FileSystemApplicationHistoryStore.java   
public HistoryFileReader(Path historyFile) throws IOException {
  fsdis = fs.open(historyFile);
  reader =
      new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(),
        getConfig());
  reset();
}
项目:aliyun-oss-hadoop-fs    文件:FileSystemApplicationHistoryStore.java   
public Entry next() throws IOException {
  TFile.Reader.Scanner.Entry entry = scanner.entry();
  DataInputStream dis = entry.getKeyStream();
  HistoryDataKey key = new HistoryDataKey();
  key.readFields(dis);
  dis = entry.getValueStream();
  byte[] value = new byte[entry.getValueLength()];
  dis.read(value);
  scanner.advance();
  return new Entry(key, value);
}
项目:aliyun-oss-hadoop-fs    文件:FileSystemApplicationHistoryStore.java   
public HistoryFileWriter(Path historyFile) throws IOException {
  if (fs.exists(historyFile)) {
    fsdos = fs.append(historyFile);
  } else {
    fsdos = fs.create(historyFile);
  }
  fs.setPermission(historyFile, HISTORY_FILE_UMASK);
  writer =
      new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
        YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE,
        YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null,
        getConfig());
}
项目:big-c    文件:AggregatedLogFormat.java   
public LogWriter(final Configuration conf, final Path remoteAppLogFile,
    UserGroupInformation userUgi) throws IOException {
  try {
    this.fsDataOStream =
        userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
          @Override
          public FSDataOutputStream run() throws Exception {
            fc = FileContext.getFileContext(conf);
            fc.setUMask(APP_LOG_FILE_UMASK);
            return fc.create(
                remoteAppLogFile,
                EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
                new Options.CreateOpts[] {});
          }
        });
  } catch (InterruptedException e) {
    throw new IOException(e);
  }

  // Keys are not sorted: null arg
  // 256KB minBlockSize : Expected log size for each container too
  this.writer =
      new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
          YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
          YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
  //Write the version string
  writeVersion();
}
项目:big-c    文件:AggregatedLogFormat.java   
public LogReader(Configuration conf, Path remoteAppLogFile)
    throws IOException {
  FileContext fileContext = FileContext.getFileContext(conf);
  this.fsDataIStream = fileContext.open(remoteAppLogFile);
  reader =
      new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
          remoteAppLogFile).getLen(), conf);
  this.scanner = reader.createScanner();
}
项目:big-c    文件:AggregatedLogFormat.java   
/**
 * Returns the owner of the application.
 * 
 * @return the application owner.
 * @throws IOException
 */
public String getApplicationOwner() throws IOException {
  TFile.Reader.Scanner ownerScanner = reader.createScanner();
  LogKey key = new LogKey();
  while (!ownerScanner.atEnd()) {
    TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
    key.readFields(entry.getKeyStream());
    if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
      DataInputStream valueStream = entry.getValueStream();
      return valueStream.readUTF();
    }
    ownerScanner.advance();
  }
  return null;
}
项目:big-c    文件:FileSystemApplicationHistoryStore.java   
public HistoryFileReader(Path historyFile) throws IOException {
  fsdis = fs.open(historyFile);
  reader =
      new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(),
        getConfig());
  reset();
}