void writeFile(Path file, FSDataOutputStream stm, int size) throws IOException { long blocksBefore = stm.getPos() / BLOCK_SIZE; TestFileCreation.writeFile(stm, BLOCK_SIZE); // need to make sure the full block is completely flushed to the DataNodes // (see FSOutputSummer#flush) stm.flush(); int blocksAfter = 0; // wait until the block is allocated by DataStreamer BlockLocation[] locatedBlocks; while(blocksAfter <= blocksBefore) { locatedBlocks = DFSClientAdapter.getDFSClient(hdfs).getBlockLocations( file.toString(), 0L, BLOCK_SIZE*NUM_BLOCKS); blocksAfter = locatedBlocks == null ? 0 : locatedBlocks.length; } }
@Test public void ensureInvalidBlockTokensAreRejected() throws IOException, URISyntaxException { cluster.transitionToActive(0); FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); DFSTestUtil.writeFile(fs, TEST_PATH, TEST_DATA); assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH)); DFSClient dfsClient = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs); DFSClient spyDfsClient = Mockito.spy(dfsClient); Mockito.doAnswer( new Answer<LocatedBlocks>() { @Override public LocatedBlocks answer(InvocationOnMock arg0) throws Throwable { LocatedBlocks locatedBlocks = (LocatedBlocks)arg0.callRealMethod(); for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { Token<BlockTokenIdentifier> token = lb.getBlockToken(); BlockTokenIdentifier id = lb.getBlockToken().decodeIdentifier(); // This will make the token invalid, since the password // won't match anymore id.setExpiryDate(Time.now() + 10); Token<BlockTokenIdentifier> newToken = new Token<BlockTokenIdentifier>(id.getBytes(), token.getPassword(), token.getKind(), token.getService()); lb.setBlockToken(newToken); } return locatedBlocks; } }).when(spyDfsClient).getLocatedBlocks(Mockito.anyString(), Mockito.anyLong(), Mockito.anyLong()); DFSClientAdapter.setDFSClient((DistributedFileSystem)fs, spyDfsClient); try { assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH)); fail("Shouldn't have been able to read a file with invalid block tokens"); } catch (IOException ioe) { GenericTestUtils.assertExceptionContains("Could not obtain block", ioe); } }
void writeFile(Path file, FSDataOutputStream stm, int size) throws IOException { long blocksBefore = stm.getPos() / BLOCK_SIZE; TestFileCreation.writeFile(stm, BLOCK_SIZE); int blocksAfter = 0; // wait until the block is allocated by DataStreamer BlockLocation[] locatedBlocks; while(blocksAfter <= blocksBefore) { locatedBlocks = DFSClientAdapter.getDFSClient(hdfs).getBlockLocations( file.toString(), 0L, BLOCK_SIZE*NUM_BLOCKS); blocksAfter = locatedBlocks == null ? 0 : locatedBlocks.length; } }
void writeFile(Path file, FSDataOutputStream stm, int size) throws IOException { long blocksBefore = stm.getPos() / BLOCK_SIZE; TestFileCreation.writeFile(stm, BLOCK_SIZE); int blocksAfter = 0; // wait until the block is allocated by DataStreamer BlockLocation[] locatedBlocks; while (blocksAfter <= blocksBefore) { locatedBlocks = DFSClientAdapter.getDFSClient(hdfs) .getBlockLocations(file.toString(), 0L, BLOCK_SIZE * NUM_BLOCKS); blocksAfter = locatedBlocks == null ? 0 : locatedBlocks.length; } }
/** Test NN crash and client crash/stuck immediately after block allocation */ @Test(timeout = 100000) public void testOpenFileWhenNNAndClientCrashAfterAddBlock() throws Exception { cluster.getConfiguration(0).set( DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "1.0f"); String testData = "testData"; // to make sure we write the full block before creating dummy block at NN. cluster.getConfiguration(0).setInt("io.bytes.per.checksum", testData.length()); cluster.restartNameNode(0); try { cluster.waitActive(); cluster.transitionToActive(0); cluster.transitionToStandby(1); DistributedFileSystem dfs = cluster.getFileSystem(0); String pathString = "/tmp1.txt"; Path filePath = new Path(pathString); FSDataOutputStream create = dfs.create(filePath, FsPermission.getDefault(), true, 1024, (short) 3, testData.length(), null); create.write(testData.getBytes()); create.hflush(); long fileId = ((DFSOutputStream)create. getWrappedStream()).getFileId(); FileStatus fileStatus = dfs.getFileStatus(filePath); DFSClient client = DFSClientAdapter.getClient(dfs); // add one dummy block at NN, but not write to DataNode ExtendedBlock previousBlock = DFSClientAdapter.getPreviousBlock(client, fileId); DFSClientAdapter.getNamenode(client).addBlock( pathString, client.getClientName(), new ExtendedBlock(previousBlock), new DatanodeInfo[0], DFSClientAdapter.getFileId((DFSOutputStream) create .getWrappedStream()), null); cluster.restartNameNode(0, true); cluster.restartDataNode(0); cluster.transitionToActive(0); // let the block reports be processed. Thread.sleep(2000); FSDataInputStream is = dfs.open(filePath); is.close(); dfs.recoverLease(filePath);// initiate recovery assertTrue("Recovery also should be success", dfs.recoverLease(filePath)); } finally { cluster.shutdown(); } }
/** * The following test first creates a file. * It verifies the block information from a datanode. * Then, it updates the block with new information and verifies again. * @param useDnHostname whether DNs should connect to other DNs by hostname */ private void checkBlockMetaDataInfo(boolean useDnHostname) throws Exception { MiniDFSCluster cluster = null; conf.setBoolean(DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, useDnHostname); if (useDnHostname) { // Since the mini cluster only listens on the loopback we have to // ensure the hostname used to access DNs maps to the loopback. We // do this by telling the DN to advertise localhost as its hostname // instead of the default hostname. conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost"); } try { cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(3) .checkDataNodeHostConfig(true) .build(); cluster.waitActive(); //create a file DistributedFileSystem dfs = cluster.getFileSystem(); String filestr = "/foo"; Path filepath = new Path(filestr); DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L); assertTrue(dfs.exists(filepath)); //get block info LocatedBlock locatedblock = getLastLocatedBlock( DFSClientAdapter.getDFSClient(dfs).getNamenode(), filestr); DatanodeInfo[] datanodeinfo = locatedblock.getLocations(); assertTrue(datanodeinfo.length > 0); //connect to a data node DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort()); InterDatanodeProtocol idp = DataNodeTestUtils.createInterDatanodeProtocolProxy( datanode, datanodeinfo[0], conf, useDnHostname); // Stop the block scanners. datanode.getBlockScanner().removeAllVolumeScanners(); //verify BlockMetaDataInfo ExtendedBlock b = locatedblock.getBlock(); InterDatanodeProtocol.LOG.info("b=" + b + ", " + b.getClass()); checkMetaInfo(b, datanode); long recoveryId = b.getGenerationStamp() + 1; idp.initReplicaRecovery( new RecoveringBlock(b, locatedblock.getLocations(), recoveryId)); //verify updateBlock ExtendedBlock newblock = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(), b.getNumBytes()/2, b.getGenerationStamp()+1); idp.updateReplicaUnderRecovery(b, recoveryId, b.getBlockId(), newblock.getNumBytes()); checkMetaInfo(newblock, datanode); // Verify correct null response trying to init recovery for a missing block ExtendedBlock badBlock = new ExtendedBlock("fake-pool", b.getBlockId(), 0, 0); assertNull(idp.initReplicaRecovery( new RecoveringBlock(badBlock, locatedblock.getLocations(), recoveryId))); } finally { if (cluster != null) {cluster.shutdown();} } }
/** * Test for * {@link FsDatasetImpl#updateReplicaUnderRecovery(ExtendedBlock, long, long)} * */ @Test public void testUpdateReplicaUnderRecovery() throws IOException { MiniDFSCluster cluster = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster.waitActive(); String bpid = cluster.getNamesystem().getBlockPoolId(); //create a file DistributedFileSystem dfs = cluster.getFileSystem(); String filestr = "/foo"; Path filepath = new Path(filestr); DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L); //get block info final LocatedBlock locatedblock = getLastLocatedBlock( DFSClientAdapter.getDFSClient(dfs).getNamenode(), filestr); final DatanodeInfo[] datanodeinfo = locatedblock.getLocations(); Assert.assertTrue(datanodeinfo.length > 0); //get DataNode and FSDataset objects final DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort()); Assert.assertTrue(datanode != null); //initReplicaRecovery final ExtendedBlock b = locatedblock.getBlock(); final long recoveryid = b.getGenerationStamp() + 1; final long newlength = b.getNumBytes() - 1; final FsDatasetSpi<?> fsdataset = DataNodeTestUtils.getFSDataset(datanode); final ReplicaRecoveryInfo rri = fsdataset.initReplicaRecovery( new RecoveringBlock(b, null, recoveryid)); //check replica final ReplicaInfo replica = FsDatasetTestUtil.fetchReplicaInfo( fsdataset, bpid, b.getBlockId()); Assert.assertEquals(ReplicaState.RUR, replica.getState()); //check meta data before update FsDatasetImpl.checkReplicaFiles(replica); //case "THIS IS NOT SUPPOSED TO HAPPEN" //with (block length) != (stored replica's on disk length). { //create a block with same id and gs but different length. final ExtendedBlock tmp = new ExtendedBlock(b.getBlockPoolId(), rri .getBlockId(), rri.getNumBytes() - 1, rri.getGenerationStamp()); try { //update should fail fsdataset.updateReplicaUnderRecovery(tmp, recoveryid, tmp.getBlockId(), newlength); Assert.fail(); } catch(IOException ioe) { System.out.println("GOOD: getting " + ioe); } } //update final String storageID = fsdataset.updateReplicaUnderRecovery( new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid, rri.getBlockId(), newlength); assertTrue(storageID != null); } finally { if (cluster != null) cluster.shutdown(); } }
@Test public void testTransferRbw() throws Exception { final HdfsConfiguration conf = new HdfsConfiguration(); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf ).numDataNodes(REPLICATION).build(); try { cluster.waitActive(); final DistributedFileSystem fs = cluster.getFileSystem(); //create a file, write some data and leave it open. final Path p = new Path("/foo"); final int size = (1 << 16) + RAN.nextInt(1 << 16); LOG.info("size = " + size); final FSDataOutputStream out = fs.create(p, REPLICATION); final byte[] bytes = new byte[1024]; for(int remaining = size; remaining > 0; ) { RAN.nextBytes(bytes); final int len = bytes.length < remaining? bytes.length: remaining; out.write(bytes, 0, len); out.hflush(); remaining -= len; } //get the RBW final ReplicaBeingWritten oldrbw; final DataNode newnode; final DatanodeInfo newnodeinfo; final String bpid = cluster.getNamesystem().getBlockPoolId(); { final DataNode oldnode = cluster.getDataNodes().get(0); oldrbw = getRbw(oldnode, bpid); LOG.info("oldrbw = " + oldrbw); //add a datanode cluster.startDataNodes(conf, 1, true, null, null); newnode = cluster.getDataNodes().get(REPLICATION); final DatanodeInfo oldnodeinfo; { final DatanodeInfo[] datatnodeinfos = cluster.getNameNodeRpc( ).getDatanodeReport(DatanodeReportType.LIVE); Assert.assertEquals(2, datatnodeinfos.length); int i = 0; for(DatanodeRegistration dnReg = newnode.getDNRegistrationForBP(bpid); i < datatnodeinfos.length && !datatnodeinfos[i].equals(dnReg); i++); Assert.assertTrue(i < datatnodeinfos.length); newnodeinfo = datatnodeinfos[i]; oldnodeinfo = datatnodeinfos[1 - i]; } //transfer RBW final ExtendedBlock b = new ExtendedBlock(bpid, oldrbw.getBlockId(), oldrbw.getBytesAcked(), oldrbw.getGenerationStamp()); final BlockOpResponseProto s = DFSTestUtil.transferRbw( b, DFSClientAdapter.getDFSClient(fs), oldnodeinfo, newnodeinfo); Assert.assertEquals(Status.SUCCESS, s.getStatus()); } //check new rbw final ReplicaBeingWritten newrbw = getRbw(newnode, bpid); LOG.info("newrbw = " + newrbw); Assert.assertEquals(oldrbw.getBlockId(), newrbw.getBlockId()); Assert.assertEquals(oldrbw.getGenerationStamp(), newrbw.getGenerationStamp()); Assert.assertEquals(oldrbw.getVisibleLength(), newrbw.getVisibleLength()); LOG.info("DONE"); } finally { cluster.shutdown(); } }
/** * Test for * {@link FsDatasetImpl#updateReplicaUnderRecovery(ExtendedBlock, long, long)} * */ @Test public void testUpdateReplicaUnderRecovery() throws IOException { MiniDFSCluster cluster = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster.waitActive(); String bpid = cluster.getNamesystem().getBlockPoolId(); //create a file DistributedFileSystem dfs = cluster.getFileSystem(); String filestr = "/foo"; Path filepath = new Path(filestr); DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L); //get block info final LocatedBlock locatedblock = getLastLocatedBlock( DFSClientAdapter.getDFSClient(dfs).getNamenode(), filestr); final DatanodeInfo[] datanodeinfo = locatedblock.getLocations(); Assert.assertTrue(datanodeinfo.length > 0); //get DataNode and FSDataset objects final DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort()); Assert.assertTrue(datanode != null); //initReplicaRecovery final ExtendedBlock b = locatedblock.getBlock(); final long recoveryid = b.getGenerationStamp() + 1; final long newlength = b.getNumBytes() - 1; final FsDatasetSpi<?> fsdataset = DataNodeTestUtils.getFSDataset(datanode); final ReplicaRecoveryInfo rri = fsdataset.initReplicaRecovery( new RecoveringBlock(b, null, recoveryid)); //check replica final Replica replica = cluster.getFsDatasetTestUtils(datanode).fetchReplica(b); Assert.assertEquals(ReplicaState.RUR, replica.getState()); //check meta data before update cluster.getFsDatasetTestUtils(datanode).checkStoredReplica(replica); //case "THIS IS NOT SUPPOSED TO HAPPEN" //with (block length) != (stored replica's on disk length). { //create a block with same id and gs but different length. final ExtendedBlock tmp = new ExtendedBlock(b.getBlockPoolId(), rri .getBlockId(), rri.getNumBytes() - 1, rri.getGenerationStamp()); try { //update should fail fsdataset.updateReplicaUnderRecovery(tmp, recoveryid, tmp.getBlockId(), newlength); Assert.fail(); } catch(IOException ioe) { System.out.println("GOOD: getting " + ioe); } } //update final String storageID = fsdataset.updateReplicaUnderRecovery( new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid, rri.getBlockId(), newlength); assertTrue(storageID != null); } finally { if (cluster != null) cluster.shutdown(); } }
/** * The following test first creates a file. * It verifies the block information from a datanode. * Then, it updates the block with new information and verifies again. * @param useDnHostname whether DNs should connect to other DNs by hostname */ private void checkBlockMetaDataInfo(boolean useDnHostname) throws Exception { MiniDFSCluster cluster = null; conf.setBoolean(DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, useDnHostname); if (useDnHostname) { // Since the mini cluster only listens on the loopback we have to // ensure the hostname used to access DNs maps to the loopback. We // do this by telling the DN to advertise localhost as its hostname // instead of the default hostname. conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost"); } try { cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(3) .checkDataNodeHostConfig(true) .build(); cluster.waitActive(); //create a file DistributedFileSystem dfs = cluster.getFileSystem(); String filestr = "/foo"; Path filepath = new Path(filestr); DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L); assertTrue(dfs.exists(filepath)); //get block info LocatedBlock locatedblock = getLastLocatedBlock( DFSClientAdapter.getDFSClient(dfs).getNamenode(), filestr); DatanodeInfo[] datanodeinfo = locatedblock.getLocations(); assertTrue(datanodeinfo.length > 0); //connect to a data node DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort()); InterDatanodeProtocol idp = DataNodeTestUtils.createInterDatanodeProtocolProxy( datanode, datanodeinfo[0], conf, useDnHostname); // Stop the block scanners. datanode.getBlockScanner().removeAllVolumeScanners(); //verify BlockMetaDataInfo ExtendedBlock b = locatedblock.getBlock(); InterDatanodeProtocol.LOG.info("b=" + b + ", " + b.getClass()); checkMetaInfo(b, datanode); long recoveryId = b.getGenerationStamp() + 1; idp.initReplicaRecovery( new RecoveringBlock(b, locatedblock.getLocations(), recoveryId)); //verify updateBlock ExtendedBlock newblock = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(), b.getNumBytes()/2, b.getGenerationStamp()+1); idp.updateReplicaUnderRecovery(b, recoveryId, newblock.getNumBytes()); checkMetaInfo(newblock, datanode); // Verify correct null response trying to init recovery for a missing block ExtendedBlock badBlock = new ExtendedBlock("fake-pool", b.getBlockId(), 0, 0); assertNull(idp.initReplicaRecovery( new RecoveringBlock(badBlock, locatedblock.getLocations(), recoveryId))); } finally { if (cluster != null) {cluster.shutdown();} } }
/** * Test for * {@link FsDatasetImpl#updateReplicaUnderRecovery(ExtendedBlock, long, long)} * */ @Test public void testUpdateReplicaUnderRecovery() throws IOException { MiniDFSCluster cluster = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster.waitActive(); String bpid = cluster.getNamesystem().getBlockPoolId(); //create a file DistributedFileSystem dfs = cluster.getFileSystem(); String filestr = "/foo"; Path filepath = new Path(filestr); DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L); //get block info final LocatedBlock locatedblock = getLastLocatedBlock( DFSClientAdapter.getDFSClient(dfs).getNamenode(), filestr); final DatanodeInfo[] datanodeinfo = locatedblock.getLocations(); Assert.assertTrue(datanodeinfo.length > 0); //get DataNode and FSDataset objects final DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort()); Assert.assertTrue(datanode != null); //initReplicaRecovery final ExtendedBlock b = locatedblock.getBlock(); final long recoveryid = b.getGenerationStamp() + 1; final long newlength = b.getNumBytes() - 1; final FsDatasetSpi<?> fsdataset = DataNodeTestUtils.getFSDataset(datanode); final ReplicaRecoveryInfo rri = fsdataset.initReplicaRecovery( new RecoveringBlock(b, null, recoveryid)); //check replica final ReplicaInfo replica = FsDatasetTestUtil.fetchReplicaInfo( fsdataset, bpid, b.getBlockId()); Assert.assertEquals(ReplicaState.RUR, replica.getState()); //check meta data before update FsDatasetImpl.checkReplicaFiles(replica); //case "THIS IS NOT SUPPOSED TO HAPPEN" //with (block length) != (stored replica's on disk length). { //create a block with same id and gs but different length. final ExtendedBlock tmp = new ExtendedBlock(b.getBlockPoolId(), rri .getBlockId(), rri.getNumBytes() - 1, rri.getGenerationStamp()); try { //update should fail fsdataset.updateReplicaUnderRecovery(tmp, recoveryid, newlength); Assert.fail(); } catch(IOException ioe) { System.out.println("GOOD: getting " + ioe); } } //update final String storageID = fsdataset.updateReplicaUnderRecovery( new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid, newlength); assertTrue(storageID != null); } finally { if (cluster != null) cluster.shutdown(); } }
/** * The following test first creates a file. * It verifies the block information from a datanode. * Then, it updates the block with new information and verifies again. * @param useDnHostname whether DNs should connect to other DNs by hostname */ private void checkBlockMetaDataInfo(boolean useDnHostname) throws Exception { MiniDFSCluster cluster = null; conf.setBoolean(DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, useDnHostname); if (useDnHostname) { // Since the mini cluster only listens on the loopback we have to // ensure the hostname used to access DNs maps to the loopback. We // do this by telling the DN to advertise localhost as its hostname // instead of the default hostname. conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost"); } try { cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(3) .checkDataNodeHostConfig(true) .build(); cluster.waitActive(); //create a file DistributedFileSystem dfs = cluster.getFileSystem(); String filestr = "/foo"; Path filepath = new Path(filestr); DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L); assertTrue(dfs.exists(filepath)); //get block info LocatedBlock locatedblock = getLastLocatedBlock( DFSClientAdapter.getDFSClient(dfs).getNamenode(), filestr); DatanodeInfo[] datanodeinfo = locatedblock.getLocations(); assertTrue(datanodeinfo.length > 0); //connect to a data node DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort()); InterDatanodeProtocol idp = DataNodeTestUtils.createInterDatanodeProtocolProxy( datanode, datanodeinfo[0], conf, useDnHostname); //stop block scanner, so we could compare lastScanTime DataNodeTestUtils.shutdownBlockScanner(datanode); //verify BlockMetaDataInfo ExtendedBlock b = locatedblock.getBlock(); InterDatanodeProtocol.LOG.info("b=" + b + ", " + b.getClass()); checkMetaInfo(b, datanode); long recoveryId = b.getGenerationStamp() + 1; idp.initReplicaRecovery( new RecoveringBlock(b, locatedblock.getLocations(), recoveryId)); //verify updateBlock ExtendedBlock newblock = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(), b.getNumBytes()/2, b.getGenerationStamp()+1); idp.updateReplicaUnderRecovery(b, recoveryId, newblock.getNumBytes()); checkMetaInfo(newblock, datanode); // Verify correct null response trying to init recovery for a missing block ExtendedBlock badBlock = new ExtendedBlock("fake-pool", b.getBlockId(), 0, 0); assertNull(idp.initReplicaRecovery( new RecoveringBlock(badBlock, locatedblock.getLocations(), recoveryId))); } finally { if (cluster != null) {cluster.shutdown();} } }
@Test public void testTransferRbw() throws Exception { final HdfsConfiguration conf = new HdfsConfiguration(); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf ).numDataNodes(REPLICATION).build(); try { cluster.waitActive(); final DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem(); //create a file, write some data and leave it open. final Path p = new Path("/foo"); final int size = (1 << 16) + RAN.nextInt(1 << 16); LOG.info("size = " + size); final FSDataOutputStream out = fs.create(p, REPLICATION); final byte[] bytes = new byte[1024]; for(int remaining = size; remaining > 0; ) { RAN.nextBytes(bytes); final int len = bytes.length < remaining? bytes.length: remaining; out.write(bytes, 0, len); out.hflush(); remaining -= len; } //get the RBW final ReplicaBeingWritten oldrbw; final DataNode newnode; final DatanodeInfo newnodeinfo; final String bpid = cluster.getNamesystem().getBlockPoolId(); { final DataNode oldnode = cluster.getDataNodes().get(0); oldrbw = getRbw(oldnode, bpid); LOG.info("oldrbw = " + oldrbw); //add a datanode cluster.startDataNodes(conf, 1, true, null, null); newnode = cluster.getDataNodes().get(REPLICATION); final DatanodeInfo oldnodeinfo; { final DatanodeInfo[] datatnodeinfos = cluster.getNameNodeRpc( ).getDatanodeReport(DatanodeReportType.LIVE); Assert.assertEquals(2, datatnodeinfos.length); int i = 0; for(DatanodeRegistration dnReg = newnode.getDNRegistrationForBP(bpid); i < datatnodeinfos.length && !datatnodeinfos[i].equals(dnReg); i++); Assert.assertTrue(i < datatnodeinfos.length); newnodeinfo = datatnodeinfos[i]; oldnodeinfo = datatnodeinfos[1 - i]; } //transfer RBW final ExtendedBlock b = new ExtendedBlock(bpid, oldrbw.getBlockId(), oldrbw.getBytesAcked(), oldrbw.getGenerationStamp()); final BlockOpResponseProto s = DFSTestUtil.transferRbw( b, DFSClientAdapter.getDFSClient(fs), oldnodeinfo, newnodeinfo); Assert.assertEquals(Status.SUCCESS, s.getStatus()); } //check new rbw final ReplicaBeingWritten newrbw = getRbw(newnode, bpid); LOG.info("newrbw = " + newrbw); Assert.assertEquals(oldrbw.getBlockId(), newrbw.getBlockId()); Assert.assertEquals(oldrbw.getGenerationStamp(), newrbw.getGenerationStamp()); Assert.assertEquals(oldrbw.getVisibleLength(), newrbw.getVisibleLength()); LOG.info("DONE"); } finally { cluster.shutdown(); } }