Java 类org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream 实例源码

项目:hadoop-EAR    文件:TestDFSClientMetrics.java   
public void testRead() throws Exception{
    for(int i = 0; i < TEST_FILE_NUM; ++i) {
        String file = "/tmp" + i +".txt";
        DFSTestUtil.createFile(fs, new Path(file), FILE_LEN, (short)5, 1L);

        DFSDataInputStream in = (DFSDataInputStream)fs.open(new Path(file));
        int numOfRead = 0;
        while(in.read() > 0){ 
            numOfRead ++;
        }
        assertEquals(FILE_LEN * (i+1), 
                metrics.readSize.getCurrentIntervalValue());
        assertEquals(numOfRead * (i+1), 
                metrics.readOps.getCurrentIntervalValue());         
    }
}
项目:hadoop-EAR    文件:TestIoprio.java   
public void runPreadTest(Configuration conf) throws Exception {
  DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem(conf);
  String fileName = "/test";
  Path p = new Path(fileName);
  for (int pri = 0; pri < 8; pri++) {
    createFile(p, pri);

    ioprioClass = ioprioData = 0;
    DFSDataInputStream in = (DFSDataInputStream) fs.open(p);

    byte[] buffer = new byte[BLOCK_SIZE * 2];
    ReadOptions options = new ReadOptions();
    options.setIoprio(NativeIO.IOPRIO_CLASS_BE, pri);
    in.read(BLOCK_SIZE / 2, buffer, 0, BLOCK_SIZE / 2, options);

    if (NativeIO.isAvailable()) {
      assertTrue(NativeIO.isIoprioPossible());
      assertEquals(NativeIO.IOPRIO_CLASS_BE, ioprioClass);
      assertEquals(pri, ioprioData);
    }
  }
}
项目:hadoop-EAR    文件:ParallelStreamReader.java   
public void collectSrcBlocksChecksum(ChecksumStore ckmStore)
    throws IOException {
  if (ckmStore == null) {
    return;
  }
  LOG.info("Store the checksums of source blocks into checksumStore");
  for (int i = 0; i < streams.length; i++) {
    if (streams[i] != null &&
        streams[i] instanceof DFSDataInputStream && 
        !(streams[i] instanceof RaidUtils.ZeroInputStream)) {
      DFSDataInputStream stream = (DFSDataInputStream)this.streams[i];
      Long newVal = checksums[i].getValue(); 
      ckmStore.putIfAbsentChecksum(stream.getCurrentBlock(), newVal);
    }
  }
}
项目:cumulus    文件:TestReadWhileWriting.java   
static void checkFile(Path p, int expectedsize, final Configuration conf
    ) throws IOException, InterruptedException {
  //open the file with another user account
  final String username = UserGroupInformation.getCurrentUser().getShortUserName()
      + "_" + ++userCount;

  UserGroupInformation ugi = UserGroupInformation.createUserForTesting(username, 
                               new String[] {"supergroup"});

  final FileSystem fs = DFSTestUtil.getFileSystemAs(ugi, conf);

  final DFSDataInputStream in = (DFSDataInputStream)fs.open(p);

  //Check visible length
  Assert.assertTrue(in.getVisibleLength() >= expectedsize);

  //Able to read?
  for(int i = 0; i < expectedsize; i++) {
    Assert.assertEquals((byte)i, (byte)in.read());  
  }

  in.close();
}
项目:RDFS    文件:TestDFSClientMetrics.java   
public void testRead() throws Exception{
    for(int i = 0; i < TEST_FILE_NUM; ++i) {
        String file = "/tmp" + i +".txt";
        DFSTestUtil.createFile(fs, new Path(file), FILE_LEN, (short)5, 1L);

        DFSDataInputStream in = (DFSDataInputStream)fs.open(new Path(file));
        int numOfRead = 0;
        while(in.read() > 0){ 
            numOfRead ++;
        }
        assertEquals(FILE_LEN * (i+1), 
                metrics.readSize.getCurrentIntervalValue());
        assertEquals(numOfRead * (i+1), 
                metrics.readOps.getCurrentIntervalValue());         
    }
}
项目:hadoop-EAR    文件:RaidUtils.java   
public static int readTillEnd(InputStream in, byte[] buf, boolean eofOK, 
    long endOffset, int toRead)
  throws IOException {
  int numRead = 0;
  while (numRead < toRead) {
    int readLen = toRead - numRead;
    if (in instanceof DFSDataInputStream) {
      int available = (int)(endOffset - ((DFSDataInputStream)in).getPos());
      if (available < readLen) {
        readLen = available;
      }
    }
    int nread = readLen > 0? in.read(buf, numRead, readLen): 0;
    if (nread < 0) {
      if (eofOK) {
        // EOF hit, fill with zeros
        Arrays.fill(buf, numRead, toRead, (byte)0);
        break;
      } else {
        // EOF hit, throw.
        throw new IOException("Premature EOF");
      }
    } else if (nread == 0) {
      // reach endOffset, fill with zero;
      Arrays.fill(buf, numRead, toRead, (byte)0);
      break;
    } else {
      numRead += nread;
    }
  }

  // return 0 if we read a ZeroInputStream
  if (in instanceof ZeroInputStream) {
    return 0;
  }
  return numRead;
}
项目:hops    文件:ParallelStreamReader.java   
/**
 * Reads data from multiple streams in parallel and puts the data in a queue.
 *
 * @param streams
 *     The input streams to read from.
 * @param bufSize
 *     The amount of data to read from each stream in each go.
 * @param numThreads
 *     Number of threads to use for parallelism.
 * @param boundedBuffer
 *     The queue to place the results in.
 */
public ParallelStreamReader(Progressable reporter, InputStream[] streams,
    int bufSize, int numThreads, int boundedBufferCapacity,
    long maxBytesPerStream) throws IOException {
  this.reporter = reporter;
  this.streams = new InputStream[streams.length];
  this.endOffsets = new long[streams.length];
  for (int i = 0; i < streams.length; i++) {
    this.streams[i] = streams[i];
    if (this.streams[i] instanceof DFSDataInputStream) {
      DFSDataInputStream stream = (DFSDataInputStream) this.streams[i];
      // in directory raiding, the block size for each input stream 
      // might be different, so we need to determine the endOffset of
      // each stream by their own block size.
      List<LocatedBlock> blocks = stream.getAllBlocks();
      if (blocks.size() == 0) {
        this.endOffsets[i] = Long.MAX_VALUE;
      } else {
        this.endOffsets[i] = stream.getPos() + blocks.get(0).getBlockSize();
      }
    } else {
      this.endOffsets[i] = Long.MAX_VALUE;
    }
    streams[i] = null; // Take over ownership of streams.
  }
  this.bufSize = bufSize;
  this.boundedBuffer =
      new ArrayBlockingQueue<ReadResult>(boundedBufferCapacity);
  if (numThreads > streams.length) {
    this.numThreads = streams.length;
  } else {
    this.numThreads = numThreads;
  }
  this.remainingBytesPerStream = maxBytesPerStream;
  this.slots = new Semaphore(this.numThreads);
  this.readPool = Executors.newFixedThreadPool(this.numThreads);
  this.mainThread = new MainThread();
}
项目:hops    文件:RaidUtils.java   
public static int readTillEnd(InputStream in, byte[] buf, boolean eofOK,
    long endOffset, int toRead) throws IOException {
  int numRead = 0;
  while (numRead < toRead) {
    int readLen = toRead - numRead;
    if (in instanceof DFSDataInputStream) {
      int available = (int) (endOffset - ((DFSDataInputStream) in).getPos());
      if (available < readLen) {
        readLen = available;
      }
    }
    int nread = readLen > 0 ? in.read(buf, numRead, readLen) : 0;
    if (nread < 0) {
      if (eofOK) {
        // EOF hit, fill with zeros
        Arrays.fill(buf, numRead, toRead, (byte) 0);
        break;
      } else {
        // EOF hit, throw.
        throw new IOException("Premature EOF");
      }
    } else if (nread == 0) {
      // reach endOffset, fill with zero;
      Arrays.fill(buf, numRead, toRead, (byte) 0);
      break;
    } else {
      numRead += nread;
    }
  }

  // return 0 if we read a ZeroInputStream
  if (in instanceof ZeroInputStream) {
    return 0;
  }
  return numRead;
}
项目:hops    文件:TestShortCircuitLocalRead.java   
/**
 * Verifies that reading a file with the direct read(ByteBuffer) api gives the
 * expected set of bytes.
 */
static void checkFileContentDirect(FileSystem fs, Path name, byte[] expected,
    int readOffset) throws IOException {
  DFSDataInputStream stm = (DFSDataInputStream) fs.open(name);

  ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);

  IOUtils.skipFully(stm, readOffset);

  actual.limit(3);

  //Read a small number of bytes first.
  int nread = stm.read(actual);
  actual.limit(nread + 2);
  nread += stm.read(actual);

  // Read across chunk boundary
  actual.limit(Math.min(actual.capacity(), nread + 517));
  nread += stm.read(actual);
  checkData(arrayFromByteBuffer(actual), readOffset, expected, nread,
      "A few bytes");
  //Now read rest of it
  actual.limit(actual.capacity());
  while (actual.hasRemaining()) {
    int nbytes = stm.read(actual);

    if (nbytes < 0) {
      throw new EOFException("End of file reached before reading fully.");
    }
    nread += nbytes;
  }
  checkData(arrayFromByteBuffer(actual), readOffset, expected, "Read 3");
  stm.close();
}
项目:cumulus    文件:DistributedFileSystem.java   
@SuppressWarnings("deprecation")
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
  statistics.incrementReadOps(1);
  return new DFSClient.DFSDataInputStream(
        dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
}
项目:RDFS    文件:RaidUtils.java   
public static int readTillEnd(InputStream in, byte[] buf, boolean eofOK, 
    long endOffset, int toRead)
  throws IOException {
  int numRead = 0;
  while (numRead < toRead) {
    int readLen = toRead - numRead;
    if (in instanceof DFSDataInputStream) {
      int available = (int)(endOffset - ((DFSDataInputStream)in).getPos());
      if (available < readLen) {
        readLen = available;
      }
    }
    int nread = readLen > 0? in.read(buf, numRead, readLen): 0;
    if (nread < 0) {
      if (eofOK) {
        // EOF hit, fill with zeros
        Arrays.fill(buf, numRead, toRead, (byte)0);
        break;
      } else {
        // EOF hit, throw.
        throw new IOException("Premature EOF");
      }
    } else if (nread == 0) {
      // reach endOffset, fill with zero;
      Arrays.fill(buf, numRead, toRead, (byte)0);
      break;
    } else {
      numRead += nread;
    }
  }

  // return 0 if we read a ZeroInputStream
  if (in instanceof ZeroInputStream) {
    return 0;
  }
  return numRead;
}
项目:hadoop-EAR    文件:DFSTestUtil.java   
public static Block getFirstBlock(FileSystem fs, Path path) throws IOException {
  DFSDataInputStream in = 
    (DFSDataInputStream) ((DistributedFileSystem)fs).open(path);
  in.readByte();
  return in.getCurrentBlock();
}
项目:hadoop-EAR    文件:TestDistributedFileSystem.java   
public void testUnfavoredNodes() throws IOException {
  Configuration conf = new Configuration();
  conf.setBoolean("dfs.client.block.location.renewal.enabled", false);
  MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);

  try {
    FileSystem fs = cluster.getFileSystem();
    DistributedFileSystem dfs = DFSUtil.convertToDFS(fs);
    TestCase.assertNotNull(dfs);

    Path path = new Path("/testUnfavoredNodes");
    FSDataOutputStream stm = fs
        .create(path, true, 4096, (short) 2, (long) 2048);
    stm.write(new byte[4096]);
    stm.close();

    FSDataInputStream is = fs.open(path);
    DFSDataInputStream dis = (DFSDataInputStream) is;
    TestCase.assertNotNull(dis);

    is.read(new byte[1024]);
    DatanodeInfo currentDn1 = dis.getCurrentDatanode();
    dis.setUnfavoredNodes(Arrays.asList(new DatanodeInfo[] { currentDn1 }));

    is.read(new byte[512]);
    DatanodeInfo currentDn2 = dis.getCurrentDatanode();
    TestCase.assertTrue(!currentDn2.equals(currentDn1));
    dis.setUnfavoredNodes(Arrays.asList(new DatanodeInfo[] { currentDn1, currentDn2 }));

    is.read(new byte[512]);
    TestCase.assertEquals(currentDn1, dis.getCurrentDatanode());

    is.read(new byte[1024]);

    TestCase.assertEquals(dis.getAllBlocks().get(1).getLocations()[0],
        dis.getCurrentDatanode());
  }
  finally {
    if (cluster != null) {cluster.shutdown();}
  }    
}
项目:hadoop-EAR    文件:ParallelStreamReader.java   
/**
 * Reads data from multiple streams in parallel and puts the data in a queue.
 * @param streams The input streams to read from.
 * @param bufSize The amount of data to read from each stream in each go.
 * @param numThreads Number of threads to use for parallelism.
 * @param boundedBuffer The queue to place the results in.
 */

public ParallelStreamReader(
    Progressable reporter,
    InputStream[] streams,
    int bufSize,
    int numThreads,
    int boundedBufferCapacity,
    long maxBytesPerStream, 
    boolean computeChecksum,
    OutputStream[] outs) throws IOException {
  this.reporter = reporter;
  this.computeChecksum = computeChecksum;
  this.streams = new InputStream[streams.length];
  this.endOffsets = new long[streams.length];
  if (computeChecksum) { 
    this.checksums = new CRC32[streams.length];
  }
  this.outs = outs;
  for (int i = 0; i < streams.length; i++) {
    this.streams[i] = streams[i];
    if (this.streams[i] instanceof DFSDataInputStream) {
      DFSDataInputStream stream = (DFSDataInputStream)this.streams[i];
      // in directory raiding, the block size for each input stream 
      // might be different, so we need to determine the endOffset of
      // each stream by their own block size.
      List<LocatedBlock> blocks = stream.getAllBlocks();
      if (blocks.size() == 0) {
        this.endOffsets[i] = Long.MAX_VALUE;
        if (computeChecksum) {
          this.checksums[i] = null;
        }
      } else {
        long blockSize = blocks.get(0).getBlockSize();
        this.endOffsets[i] = stream.getPos() + blockSize;
        if (computeChecksum) {
          this.checksums[i] = new CRC32();
        }
      }
    } else {
      this.endOffsets[i] = Long.MAX_VALUE;
      if (computeChecksum) {
        this.checksums[i] = null;
      }
    }
    streams[i] = null; // Take over ownership of streams.
  }
  this.bufSize = bufSize;
  this.boundedBuffer =
    new ArrayBlockingQueue<ReadResult>(boundedBufferCapacity);
  if (numThreads > streams.length) {
    this.numThreads = streams.length;
  } else {
    this.numThreads = numThreads;
  }
  this.remainingBytesPerStream = maxBytesPerStream;
  this.slots = new Semaphore(this.numThreads);
  ThreadFactory ParallelStreamReaderFactory = new ThreadFactoryBuilder()
    .setNameFormat("ParallelStreamReader-read-pool-%d")
    .build();
  this.readPool = Executors.newFixedThreadPool(this.numThreads, ParallelStreamReaderFactory);
  this.mainThread = new MainThread();
  mainThread.setName("ParallelStreamReader-main");
}
项目:hadoop-on-lustre    文件:DFSTestUtil.java   
public static Block getFirstBlock(FileSystem fs, Path path) throws IOException {
  DFSDataInputStream in = 
    (DFSDataInputStream) ((DistributedFileSystem)fs).open(path);
  in.readByte();
  return in.getCurrentBlock();
}
项目:hadoop-on-lustre    文件:DFSTestUtil.java   
public static List<LocatedBlock> getAllBlocks(FSDataInputStream in)
    throws IOException {
  return ((DFSClient.DFSDataInputStream) in).getAllBlocks();
}
项目:cumulus    文件:DistributedFileSystem.java   
/**
 * We need to find the blocks that didn't match.  Likely only one 
 * is corrupt but we will report both to the namenode.  In the future,
 * we can consider figuring out exactly which block is corrupt.
 */
public boolean reportChecksumFailure(Path f, 
  FSDataInputStream in, long inPos, 
  FSDataInputStream sums, long sumsPos) {

  if(!(in instanceof DFSDataInputStream && sums instanceof DFSDataInputStream))
    throw new IllegalArgumentException("Input streams must be types " +
                                       "of DFSDataInputStream");

  LocatedBlock lblocks[] = new LocatedBlock[2];

  // Find block in data stream.
  DFSClient.DFSDataInputStream dfsIn = (DFSClient.DFSDataInputStream) in;
  Block dataBlock = dfsIn.getCurrentBlock();
  if (dataBlock == null) {
    LOG.error("Error: Current block in data stream is null! ");
    return false;
  }
  DatanodeInfo[] dataNode = {dfsIn.getCurrentDatanode()}; 
  lblocks[0] = new LocatedBlock(dataBlock, dataNode);
  LOG.info("Found checksum error in data stream at block="
      + dataBlock + " on datanode="
      + dataNode[0].getName());

  // Find block in checksum stream
  DFSClient.DFSDataInputStream dfsSums = (DFSClient.DFSDataInputStream) sums;
  Block sumsBlock = dfsSums.getCurrentBlock();
  if (sumsBlock == null) {
    LOG.error("Error: Current block in checksum stream is null! ");
    return false;
  }
  DatanodeInfo[] sumsNode = {dfsSums.getCurrentDatanode()}; 
  lblocks[1] = new LocatedBlock(sumsBlock, sumsNode);
  LOG.info("Found checksum error in checksum stream at block="
      + sumsBlock + " on datanode="
      + sumsNode[0].getName());

  // Ask client to delete blocks.
  dfs.reportChecksumFailure(f.toString(), lblocks);

  return true;
}
项目:cumulus    文件:DFSTestUtil.java   
public static Block getFirstBlock(FileSystem fs, Path path) throws IOException {
  DFSDataInputStream in = 
    (DFSDataInputStream) ((DistributedFileSystem)fs).open(path);
  in.readByte();
  return in.getCurrentBlock();
}
项目:cumulus    文件:DFSTestUtil.java   
public static List<LocatedBlock> getAllBlocks(FSDataInputStream in)
    throws IOException {
  return ((DFSClient.DFSDataInputStream) in).getAllBlocks();
}
项目:RDFS    文件:DFSTestUtil.java   
public static Block getFirstBlock(FileSystem fs, Path path) throws IOException {
  DFSDataInputStream in = 
    (DFSDataInputStream) ((DistributedFileSystem)fs).open(path);
  in.readByte();
  return in.getCurrentBlock();
}
项目:RDFS    文件:ParallelStreamReader.java   
/**
 * Reads data from multiple streams in parallel and puts the data in a queue.
 * @param streams The input streams to read from.
 * @param bufSize The amount of data to read from each stream in each go.
 * @param numThreads Number of threads to use for parallelism.
 * @param boundedBuffer The queue to place the results in.
 */

public ParallelStreamReader(
    Progressable reporter,
    InputStream[] streams,
    int bufSize,
    int numThreads,
    int boundedBufferCapacity,
    long maxBytesPerStream) throws IOException {
  this.reporter = reporter;
  this.streams = new InputStream[streams.length];
  this.endOffsets = new long[streams.length];
  for (int i = 0; i < streams.length; i++) {
    this.streams[i] = streams[i];
    if (this.streams[i] instanceof DFSDataInputStream) {
      DFSDataInputStream stream = (DFSDataInputStream)this.streams[i];
      // in directory raiding, the block size for each input stream 
      // might be different, so we need to determine the endOffset of
      // each stream by their own block size.
      List<LocatedBlock> blocks = stream.getAllBlocks();
      if (blocks.size() == 0) {
        this.endOffsets[i] = Long.MAX_VALUE;
      } else {
        this.endOffsets[i] = stream.getPos() + blocks.get(0).getBlockSize();
      }
    } else {
      this.endOffsets[i] = Long.MAX_VALUE;
    }
    streams[i] = null; // Take over ownership of streams.
  }
  this.bufSize = bufSize;
  this.boundedBuffer =
    new ArrayBlockingQueue<ReadResult>(boundedBufferCapacity);
  if (numThreads > streams.length) {
    this.numThreads = streams.length;
  } else {
    this.numThreads = numThreads;
  }
  this.remainingBytesPerStream = maxBytesPerStream;
  this.slots = new Semaphore(this.numThreads);
  this.readPool = Executors.newFixedThreadPool(this.numThreads);
  this.mainThread = new MainThread();
}
项目:hadoop-0.20    文件:DFSTestUtil.java   
public static Block getFirstBlock(FileSystem fs, Path path) throws IOException {
  DFSDataInputStream in = 
    (DFSDataInputStream) ((DistributedFileSystem)fs).open(path);
  in.readByte();
  return in.getCurrentBlock();
}
项目:hortonworks-extension    文件:DFSTestUtil.java   
public static Block getFirstBlock(FileSystem fs, Path path) throws IOException {
  DFSDataInputStream in = 
    (DFSDataInputStream) ((DistributedFileSystem)fs).open(path);
  in.readByte();
  return in.getCurrentBlock();
}
项目:hortonworks-extension    文件:DFSTestUtil.java   
public static List<LocatedBlock> getAllBlocks(FSDataInputStream in)
    throws IOException {
  return ((DFSClient.DFSDataInputStream) in).getAllBlocks();
}
项目:hortonworks-extension    文件:DFSTestUtil.java   
public static Block getFirstBlock(FileSystem fs, Path path) throws IOException {
  DFSDataInputStream in = 
    (DFSDataInputStream) ((DistributedFileSystem)fs).open(path);
  in.readByte();
  return in.getCurrentBlock();
}
项目:hortonworks-extension    文件:DFSTestUtil.java   
public static List<LocatedBlock> getAllBlocks(FSDataInputStream in)
    throws IOException {
  return ((DFSClient.DFSDataInputStream) in).getAllBlocks();
}
项目:hadoop-gpu    文件:DFSTestUtil.java   
public static Block getFirstBlock(FileSystem fs, Path path) throws IOException {
  DFSDataInputStream in = 
    (DFSDataInputStream) ((DistributedFileSystem)fs).open(path);
  in.readByte();
  return in.getCurrentBlock();
}