public void testBlocksScheduledCounter() throws IOException { MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null); cluster.waitActive(); FileSystem fs = cluster.getFileSystem(); //open a file an write a few bytes: FSDataOutputStream out = fs.create(new Path("/testBlockScheduledCounter")); for (int i=0; i<1024; i++) { out.write(i); } // flush to make sure a block is allocated. ((DFSOutputStream)(out.getWrappedStream())).sync(); ArrayList<DatanodeDescriptor> dnList = new ArrayList<DatanodeDescriptor>(); cluster.getNameNode().namesystem.DFSNodesStatus(dnList, dnList); DatanodeDescriptor dn = dnList.get(0); assertEquals(1, dn.getBlocksScheduled()); // close the file and the counter should go to zero. out.close(); assertEquals(0, dn.getBlocksScheduled()); }
private void assertNumCurrentReplicas(short rep) throws Exception { DFSClient.DFSOutputStream hdfs_out = (DFSClient.DFSOutputStream) stm .getWrappedStream(); int actualRepl = hdfs_out.getNumCurrentReplicas(); assertTrue(file1 + " should be replicated to " + rep + " datanodes, not " + actualRepl + ".", actualRepl == rep); }
private void runDNRestartCorruptType(CorruptionType corrupt) throws Exception { cluster = new MiniDFSCluster(conf, 3, true, null); FileSystem fs1 = cluster.getFileSystem(); try { short rep = 3; // replication assertTrue(BLOCK_SIZE%4 == 0); file1 = new Path("/dnDeath.dat"); // write 1/2 block & close stm = fs1.create(file1, true, 1024, rep, 4096); AppendTestUtil.write(stm, 0, 1024); stm.sync(); loseLeases(fs1); DFSOutputStream dfso = (DFSOutputStream)stm.getWrappedStream(); dfso.abortForTests(); // close the primary DN DataNodeProperties badDN = cluster.stopDataNode(0); // Truncate the block on the primary DN corruptDataNode(0, corrupt); // Start the DN back up cluster.restartDataNode(badDN); // Recover the lease FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf()); recoverFile(fs2); assertFileSize(fs2, 1024); checkFile(fs2, 1024); } finally { // explicitly do not shut down fs1, since it's been frozen up by // killing the DataStreamer and not allowing recovery cluster.shutdown(); } }
/** This optional operation is not yet supported. */ public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { DFSOutputStream op = (DFSOutputStream)dfs.append(getPathName(f), bufferSize, progress); return new FSDataOutputStream(op, statistics, op.getInitialLen()); }
public void testFullClusterPowerLoss() throws Exception { cluster = new MiniDFSCluster(conf, 2, true, null); FileSystem fs1 = cluster.getFileSystem(); try { short rep = 2; // replication assertTrue(BLOCK_SIZE%4 == 0); file1 = new Path("/dnDeath.dat"); // write 1/2 block & close stm = fs1.create(file1, true, 1024, rep, 4096); AppendTestUtil.write(stm, 0, 1024); stm.sync(); loseLeases(fs1); DFSOutputStream dfso = (DFSOutputStream)stm.getWrappedStream(); dfso.abortForTests(); // close the DNs DataNodeProperties badDN = cluster.stopDataNode(0); DataNodeProperties badDN2 = cluster.stopDataNode(0); // what was 1 is now 0 assertNotNull(badDN); assertNotNull(badDN2); // Truncate one of them as if its journal got corrupted corruptDataNode(0, CorruptionType.TRUNCATE_BLOCK_HALF); // Start the DN back up cluster.restartDataNode(badDN); cluster.restartDataNode(badDN2); // Wait for a heartbeat to make sure we get the initial block // report of the replicasBeingWritten cluster.waitForDNHeartbeat(0, 10000); cluster.waitForDNHeartbeat(1, 10000); // Recover the lease FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf()); recoverFile(fs2); assertFileSize(fs2, 512); checkFile(fs2, 512); } finally { // explicitly do not shut down fs1, since it's been frozen up by // killing the DataStreamer and not allowing recovery cluster.shutdown(); } }