private void waitTokenExpires(FSDataOutputStream out) throws IOException { Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(out); while (!SecurityTestUtil.isBlockTokenExpired(token)) { try { Thread.sleep(10); } catch (InterruptedException ignored) { } } }
/** * testing that APPEND operation can handle token expiration when * re-establishing pipeline is needed */ @Test public void testAppend() throws Exception { MiniDFSCluster cluster = null; int numDataNodes = 2; Configuration conf = getConf(numDataNodes); try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); cluster.waitActive(); assertEquals(numDataNodes, cluster.getDataNodes().size()); final NameNode nn = cluster.getNameNode(); final BlockManager bm = nn.getNamesystem().getBlockManager(); final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager(); // set a short token lifetime (1 second) SecurityTestUtil.setBlockTokenLifetime(sm, 1000L); Path fileToAppend = new Path(FILE_TO_APPEND); FileSystem fs = cluster.getFileSystem(); // write a one-byte file FSDataOutputStream stm = writeFile(fs, fileToAppend, (short) numDataNodes, BLOCK_SIZE); stm.write(rawData, 0, 1); stm.close(); // open the file again for append stm = fs.append(fileToAppend); int mid = rawData.length - 1; stm.write(rawData, 1, mid - 1); stm.hflush(); /* * wait till token used in stm expires */ Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(stm); while (!SecurityTestUtil.isBlockTokenExpired(token)) { try { Thread.sleep(10); } catch (InterruptedException ignored) { } } // remove a datanode to force re-establishing pipeline cluster.stopDataNode(0); // append the rest of the file stm.write(rawData, mid, rawData.length - mid); stm.close(); // check if append is successful FSDataInputStream in5 = fs.open(fileToAppend); assertTrue(checkFile1(in5)); } finally { if (cluster != null) { cluster.shutdown(); } } }
/** * testing that WRITE operation can handle token expiration when * re-establishing pipeline is needed */ @Test public void testWrite() throws Exception { MiniDFSCluster cluster = null; int numDataNodes = 2; Configuration conf = getConf(numDataNodes); try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); cluster.waitActive(); assertEquals(numDataNodes, cluster.getDataNodes().size()); final NameNode nn = cluster.getNameNode(); final BlockManager bm = nn.getNamesystem().getBlockManager(); final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager(); // set a short token lifetime (1 second) SecurityTestUtil.setBlockTokenLifetime(sm, 1000L); Path fileToWrite = new Path(FILE_TO_WRITE); FileSystem fs = cluster.getFileSystem(); FSDataOutputStream stm = writeFile(fs, fileToWrite, (short) numDataNodes, BLOCK_SIZE); // write a partial block int mid = rawData.length - 1; stm.write(rawData, 0, mid); stm.hflush(); /* * wait till token used in stm expires */ Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(stm); while (!SecurityTestUtil.isBlockTokenExpired(token)) { try { Thread.sleep(10); } catch (InterruptedException ignored) { } } // remove a datanode to force re-establishing pipeline cluster.stopDataNode(0); // write the rest of the file stm.write(rawData, mid, rawData.length - mid); stm.close(); // check if write is successful FSDataInputStream in4 = fs.open(fileToWrite); assertTrue(checkFile1(in4)); } finally { if (cluster != null) { cluster.shutdown(); } } }
/** * testing that APPEND operation can handle token expiration when * re-establishing pipeline is needed */ @Test public void testAppend() throws Exception { MiniDFSCluster cluster = null; int numDataNodes = 2; Configuration conf = getConf(numDataNodes); try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); cluster.waitActive(); assertEquals(numDataNodes, cluster.getDataNodes().size()); final NameNode nn = cluster.getNameNode(); final BlockManager bm = nn.getNamesystem().getBlockManager(); final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager(); // set a short token lifetime (1 second) SecurityTestUtil.setBlockTokenLifetime(sm, 1000L); Path fileToAppend = new Path(FILE_TO_APPEND); FileSystem fs = cluster.getFileSystem(); byte[] expected = generateBytes(FILE_SIZE); // write a one-byte file FSDataOutputStream stm = writeFile(fs, fileToAppend, (short) numDataNodes, BLOCK_SIZE); stm.write(expected, 0, 1); stm.close(); // open the file again for append stm = fs.append(fileToAppend); int mid = expected.length - 1; stm.write(expected, 1, mid - 1); stm.hflush(); /* * wait till token used in stm expires */ Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(stm); while (!SecurityTestUtil.isBlockTokenExpired(token)) { try { Thread.sleep(10); } catch (InterruptedException ignored) { } } // remove a datanode to force re-establishing pipeline cluster.stopDataNode(0); // append the rest of the file stm.write(expected, mid, expected.length - mid); stm.close(); // check if append is successful FSDataInputStream in5 = fs.open(fileToAppend); assertTrue(checkFile1(in5, expected)); } finally { if (cluster != null) { cluster.shutdown(); } } }
/** * testing that WRITE operation can handle token expiration when * re-establishing pipeline is needed */ @Test public void testWrite() throws Exception { MiniDFSCluster cluster = null; int numDataNodes = 2; Configuration conf = getConf(numDataNodes); try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); cluster.waitActive(); assertEquals(numDataNodes, cluster.getDataNodes().size()); final NameNode nn = cluster.getNameNode(); final BlockManager bm = nn.getNamesystem().getBlockManager(); final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager(); // set a short token lifetime (1 second) SecurityTestUtil.setBlockTokenLifetime(sm, 1000L); Path fileToWrite = new Path(FILE_TO_WRITE); FileSystem fs = cluster.getFileSystem(); byte[] expected = generateBytes(FILE_SIZE); FSDataOutputStream stm = writeFile(fs, fileToWrite, (short) numDataNodes, BLOCK_SIZE); // write a partial block int mid = expected.length - 1; stm.write(expected, 0, mid); stm.hflush(); /* * wait till token used in stm expires */ Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(stm); while (!SecurityTestUtil.isBlockTokenExpired(token)) { try { Thread.sleep(10); } catch (InterruptedException ignored) { } } // remove a datanode to force re-establishing pipeline cluster.stopDataNode(0); // write the rest of the file stm.write(expected, mid, expected.length - mid); stm.close(); // check if write is successful FSDataInputStream in4 = fs.open(fileToWrite); assertTrue(checkFile1(in4, expected)); } finally { if (cluster != null) { cluster.shutdown(); } } }
protected boolean isBlockTokenExpired(LocatedBlock lb) throws IOException { return SecurityTestUtil.isBlockTokenExpired(lb.getBlockToken()); }