public void add(Replica replica) { try { // zig-zag to reduce size of legacy blocks cos.writeSInt64NoTag(replica.getBlockId()); cos.writeRawVarint64(replica.getBytesOnDisk()); cos.writeRawVarint64(replica.getGenerationStamp()); ReplicaState state = replica.getState(); // although state is not a 64-bit value, using a long varint to // allow for future use of the upper bits cos.writeRawVarint64(state.getValue()); if (state == ReplicaState.FINALIZED) { numFinalized++; } numBlocks++; } catch (IOException ioe) { // shouldn't happen, ByteString.Output doesn't throw IOE throw new IllegalStateException(ioe); } }
@Override // FsDatasetSpi public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) throws IOException { synchronized(this) { final Replica replica = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); if (replica == null) { throw new ReplicaNotFoundException(block); } if (replica.getGenerationStamp() < block.getGenerationStamp()) { throw new IOException( "Replica generation stamp < block generation stamp, block=" + block + ", replica=" + replica); } else if (replica.getGenerationStamp() > block.getGenerationStamp()) { block.setGenerationStamp(replica.getGenerationStamp()); } } File datafile = getBlockFile(block); File metafile = FsDatasetUtil.getMetaFile(datafile, block.getGenerationStamp()); BlockLocalPathInfo info = new BlockLocalPathInfo(block, datafile.getAbsolutePath(), metafile.getAbsolutePath()); return info; }
@Test public void testFuzz() throws InterruptedException { Replica[] replicas = new Replica[100000]; Random rand = new Random(0); for (int i=0; i<replicas.length; i++) { Block b = new Block(rand.nextLong(), i, i<<4); switch (rand.nextInt(2)) { case 0: replicas[i] = new FinalizedReplica(b, null, null); break; case 1: replicas[i] = new ReplicaBeingWritten(b, null, null, null); break; case 2: replicas[i] = new ReplicaWaitingToBeRecovered(b, null, null); break; } } checkReport(replicas); }
private void checkReplicas(Map<Long,Replica> expectedReplicas, BlockListAsLongs decodedBlocks) { assertEquals(expectedReplicas.size(), decodedBlocks.getNumberOfBlocks()); Map<Long, Replica> reportReplicas = new HashMap<>(expectedReplicas); for (BlockReportReplica replica : decodedBlocks) { assertNotNull(replica); Replica expected = reportReplicas.remove(replica.getBlockId()); assertNotNull(expected); assertEquals("wrong bytes", expected.getNumBytes(), replica.getNumBytes()); assertEquals("wrong genstamp", expected.getGenerationStamp(), replica.getGenerationStamp()); assertEquals("wrong replica state", expected.getState(), replica.getState()); } assertTrue(reportReplicas.isEmpty()); }
@Override public Iterator<Replica> getStoredReplicas(String bpid) throws IOException { // Reload replicas from the disk. ReplicaMap replicaMap = new ReplicaMap(dataset); try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) { for (FsVolumeSpi vol : refs) { FsVolumeImpl volume = (FsVolumeImpl) vol; volume.getVolumeMap(bpid, replicaMap, dataset.ramDiskReplicaTracker); } } // Cast ReplicaInfo to Replica, because ReplicaInfo assumes a file-based // FsVolumeSpi implementation. List<Replica> ret = new ArrayList<>(); if (replicaMap.replicas(bpid) != null) { ret.addAll(replicaMap.replicas(bpid)); } return ret.iterator(); }
/** * Prepare an instance to encode the collection of replicas into an * efficient ByteString. * @param replicas - replicas to encode * @return BlockListAsLongs */ public static BlockListAsLongs encode( final Collection<? extends Replica> replicas) { BlockListAsLongs.Builder builder = builder(); for (Replica replica : replicas) { builder.add(replica); } return builder.build(); }
@Override public ByteString getBlocksBuffer() { Builder builder = builder(); for (Replica replica : this) { builder.add(replica); } return builder.build().getBlocksBuffer(); }
@Override // FsDatasetSpi public synchronized long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { final Replica replica = getReplicaInfo(block.getBlockPoolId(), block.getBlockId()); if (replica.getGenerationStamp() < block.getGenerationStamp()) { throw new IOException( "replica.getGenerationStamp() < block.getGenerationStamp(), block=" + block + ", replica=" + replica); } return replica.getVisibleLength(); }
private BlockListAsLongs checkReport(Replica...replicas) { Map<Long, Replica> expectedReplicas = new HashMap<>(); for (Replica replica : replicas) { expectedReplicas.put(replica.getBlockId(), replica); } expectedReplicas = Collections.unmodifiableMap(expectedReplicas); // encode the blocks and extract the buffers BlockListAsLongs blocks = BlockListAsLongs.encode(expectedReplicas.values()); List<ByteString> buffers = blocks.getBlocksBuffers(); // convert to old-style list of longs List<Long> longs = new ArrayList<Long>(); for (long value : blocks.getBlockListAsLongs()) { longs.add(value); } // decode the buffers and verify its contents BlockListAsLongs decodedBlocks = BlockListAsLongs.decodeBuffers(expectedReplicas.size(), buffers); checkReplicas(expectedReplicas, decodedBlocks); // decode the long and verify its contents BlockListAsLongs decodedList = BlockListAsLongs.decodeLongs(longs); checkReplicas(expectedReplicas, decodedList); return blocks; }
/** * Creates and closes a file of certain length. * Calls append to allow next write() operation to add to the end of it * After write() invocation, calls hflush() to make sure that data sunk through * the pipeline and check the state of the last block's replica. * It supposes to be in RBW state * * @throws IOException in case of an error */ @Test public void pipeline_01() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); if(LOG.isDebugEnabled()) { LOG.debug("Running " + METHOD_NAME); } Path filePath = new Path("/" + METHOD_NAME + ".dat"); DFSTestUtil.createFile(fs, filePath, FILE_SIZE, REPL_FACTOR, rand.nextLong()); if(LOG.isDebugEnabled()) { LOG.debug("Invoking append but doing nothing otherwise..."); } FSDataOutputStream ofs = fs.append(filePath); ofs.writeBytes("Some more stuff to write"); ((DFSOutputStream) ofs.getWrappedStream()).hflush(); List<LocatedBlock> lb = cluster.getNameNodeRpc().getBlockLocations( filePath.toString(), FILE_SIZE - 1, FILE_SIZE).getLocatedBlocks(); String bpid = cluster.getNamesystem().getBlockPoolId(); for (DataNode dn : cluster.getDataNodes()) { Replica r = DataNodeTestUtils.fetchReplicaInfo(dn, bpid, lb.get(0) .getBlock().getBlockId()); assertTrue("Replica on DN " + dn + " shouldn't be null", r != null); assertEquals("Should be RBW replica on " + dn + " after sequence of calls append()/write()/hflush()", HdfsServerConstants.ReplicaState.RBW, r.getState()); } ofs.close(); }
@Override public Replica createFinalizedReplica(ExtendedBlock block) throws IOException { try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) { return createFinalizedReplica(volumes.get(0), block); } }
@Override public Replica createFinalizedReplica(FsVolumeSpi volume, ExtendedBlock block) throws IOException { FsVolumeImpl vol = (FsVolumeImpl) volume; ReplicaInfo info = new FinalizedReplica(block.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()); dataset.volumeMap.add(block.getBlockPoolId(), info); info.getBlockFile().createNewFile(); info.getMetaFile().createNewFile(); return info; }
@Override public Replica createReplicaInPipeline(ExtendedBlock block) throws IOException { try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) { return createReplicaInPipeline(volumes.get(0), block); } }
@Override public Replica createReplicaInPipeline( FsVolumeSpi volume, ExtendedBlock block) throws IOException { FsVolumeImpl vol = (FsVolumeImpl) volume; ReplicaInPipeline rip = new ReplicaInPipeline( block.getBlockId(), block.getGenerationStamp(), volume, vol.createTmpFile( block.getBlockPoolId(), block.getLocalBlock()).getParentFile(), 0); dataset.volumeMap.add(block.getBlockPoolId(), rip); return rip; }
@Override public Replica createRBW(FsVolumeSpi volume, ExtendedBlock eb) throws IOException { FsVolumeImpl vol = (FsVolumeImpl) volume; final String bpid = eb.getBlockPoolId(); final Block block = eb.getLocalBlock(); ReplicaBeingWritten rbw = new ReplicaBeingWritten( eb.getLocalBlock(), volume, vol.createRbwFile(bpid, block).getParentFile(), null); rbw.getBlockFile().createNewFile(); rbw.getMetaFile().createNewFile(); dataset.volumeMap.add(bpid, rbw); return rbw; }
@Override public Replica createReplicaWaitingToBeRecovered(ExtendedBlock eb) throws IOException { try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) { return createReplicaInPipeline(volumes.get(0), eb); } }
@Override public Replica createReplicaWaitingToBeRecovered( FsVolumeSpi volume, ExtendedBlock eb) throws IOException { FsVolumeImpl vol = (FsVolumeImpl) volume; final String bpid = eb.getBlockPoolId(); final Block block = eb.getLocalBlock(); ReplicaWaitingToBeRecovered rwbr = new ReplicaWaitingToBeRecovered(eb.getLocalBlock(), volume, vol.createRbwFile(bpid, block).getParentFile()); dataset.volumeMap.add(bpid, rwbr); return rwbr; }
@Override public Replica createReplicaUnderRecovery( ExtendedBlock block, long recoveryId) throws IOException { try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) { FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0); ReplicaUnderRecovery rur = new ReplicaUnderRecovery(new FinalizedReplica( block.getLocalBlock(), volume, volume.getCurrentDir().getParentFile()), recoveryId ); dataset.volumeMap.add(block.getBlockPoolId(), rur); return rur; } }
/** * Creates and closes a file of certain length. * Calls append to allow next write() operation to add to the end of it * After write() invocation, calls hflush() to make sure that data sunk through * the pipeline and check the state of the last block's replica. * It supposes to be in RBW state * * @throws IOException in case of an error */ @Test public void pipeline_01() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); if(LOG.isDebugEnabled()) { LOG.debug("Running " + METHOD_NAME); } Path filePath = new Path("/" + METHOD_NAME + ".dat"); DFSTestUtil.createFile(fs, filePath, FILE_SIZE, REPL_FACTOR, rand.nextLong()); if(LOG.isDebugEnabled()) { LOG.debug("Invoking append but doing nothing otherwise..."); } FSDataOutputStream ofs = fs.append(filePath); ofs.writeBytes("Some more stuff to write"); ((DFSOutputStream) ofs.getWrappedStream()).hflush(); List<LocatedBlock> lb = cluster.getNameNodeRpc().getBlockLocations( filePath.toString(), FILE_SIZE - 1, FILE_SIZE).getLocatedBlocks(); for (DataNode dn : cluster.getDataNodes()) { Replica r = cluster.getFsDatasetTestUtils(dn).fetchReplica(lb.get(0).getBlock()); assertTrue("Replica on DN " + dn + " shouldn't be null", r != null); assertEquals("Should be RBW replica on " + dn + " after sequence of calls append()/write()/hflush()", HdfsServerConstants.ReplicaState.RBW, r.getState()); } ofs.close(); }