public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, Set<String> excludedNodes, long movedWinWidth, int moverThreads, int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) { this.nnc = nnc; this.excludedNodes = excludedNodes; this.includedNodes = includedNodes; this.movedBlocks = new MovedBlocks<StorageGroup>(movedWinWidth); this.cluster = NetworkTopology.getInstance(conf); this.moveExecutor = Executors.newFixedThreadPool(moverThreads); this.dispatchExecutor = dispatcherThreads == 0? null : Executors.newFixedThreadPool(dispatcherThreads); this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; this.saslClient = new SaslDataTransferClient(conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth); }
private void testEncryptedWrite(int numDns) throws IOException { MiniDFSCluster cluster = null; try { Configuration conf = new Configuration(); setEncryptionConfigKeys(conf); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build(); FileSystem fs = getFileSystem(conf); LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( LogFactory.getLog(SaslDataTransferServer.class)); LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( LogFactory.getLog(DataTransferSaslUtil.class)); try { writeTestDataToFile(fs); } finally { logs.stopCapturing(); logs1.stopCapturing(); } assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); fs.close(); if (resolverClazz == null) { // Test client and server negotiate cipher option GenericTestUtils.assertDoesNotMatch(logs.getOutput(), "Server using cipher suite"); // Check the IOStreamPair GenericTestUtils.assertDoesNotMatch(logs1.getOutput(), "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); } } finally { if (cluster != null) { cluster.shutdown(); } } }
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, Set<String> excludedNodes, long movedWinWidth, int moverThreads, int dispatcherThreads, int maxConcurrentMovesPerNode, long getBlocksSize, long getBlocksMinBlockSize, Configuration conf) { this.nnc = nnc; this.excludedNodes = excludedNodes; this.includedNodes = includedNodes; this.movedBlocks = new MovedBlocks<StorageGroup>(movedWinWidth); this.cluster = NetworkTopology.getInstance(conf); this.dispatchExecutor = dispatcherThreads == 0? null : Executors.newFixedThreadPool(dispatcherThreads); this.moverThreadAllocator = new Allocator(moverThreads); this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; this.getBlocksSize = getBlocksSize; this.getBlocksMinBlockSize = getBlocksMinBlockSize; this.saslClient = new SaslDataTransferClient(conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth); this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf); this.connectToDnViaHostname = conf.getBoolean( HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME, HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); placementPolicies = new BlockPlacementPolicies(conf, null, cluster, null); }
@Test public void testEncryptedRead() throws IOException { MiniDFSCluster cluster = null; try { Configuration conf = new Configuration(); cluster = new MiniDFSCluster.Builder(conf).build(); FileSystem fs = getFileSystem(conf); writeTestDataToFile(fs); assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); FileChecksum checksum = fs.getFileChecksum(TEST_PATH); fs.close(); cluster.shutdown(); setEncryptionConfigKeys(conf); cluster = new MiniDFSCluster.Builder(conf) .manageDataDfsDirs(false) .manageNameDfsDirs(false) .format(false) .startupOption(StartupOption.REGULAR) .build(); fs = getFileSystem(conf); LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( LogFactory.getLog(SaslDataTransferServer.class)); LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( LogFactory.getLog(DataTransferSaslUtil.class)); try { assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); } finally { logs.stopCapturing(); logs1.stopCapturing(); } fs.close(); if (resolverClazz == null) { // Test client and server negotiate cipher option GenericTestUtils.assertDoesNotMatch(logs.getOutput(), "Server using cipher suite"); // Check the IOStreamPair GenericTestUtils.assertDoesNotMatch(logs1.getOutput(), "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); } } finally { if (cluster != null) { cluster.shutdown(); } } }
@Test public void testEncryptedReadWithRC4() throws IOException { MiniDFSCluster cluster = null; try { Configuration conf = new Configuration(); cluster = new MiniDFSCluster.Builder(conf).build(); FileSystem fs = getFileSystem(conf); writeTestDataToFile(fs); assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); FileChecksum checksum = fs.getFileChecksum(TEST_PATH); fs.close(); cluster.shutdown(); setEncryptionConfigKeys(conf); // It'll use 3DES by default, but we set it to rc4 here. conf.set(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY, "rc4"); cluster = new MiniDFSCluster.Builder(conf) .manageDataDfsDirs(false) .manageNameDfsDirs(false) .format(false) .startupOption(StartupOption.REGULAR) .build(); fs = getFileSystem(conf); LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( LogFactory.getLog(SaslDataTransferServer.class)); LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( LogFactory.getLog(DataTransferSaslUtil.class)); try { assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); } finally { logs.stopCapturing(); logs1.stopCapturing(); } fs.close(); if (resolverClazz == null) { // Test client and server negotiate cipher option GenericTestUtils.assertDoesNotMatch(logs.getOutput(), "Server using cipher suite"); // Check the IOStreamPair GenericTestUtils.assertDoesNotMatch(logs1.getOutput(), "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); } } finally { if (cluster != null) { cluster.shutdown(); } } }
@Test public void testEncryptedReadWithAES() throws IOException { MiniDFSCluster cluster = null; try { Configuration conf = new Configuration(); conf.set(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, "AES/CTR/NoPadding"); cluster = new MiniDFSCluster.Builder(conf).build(); FileSystem fs = getFileSystem(conf); writeTestDataToFile(fs); assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); FileChecksum checksum = fs.getFileChecksum(TEST_PATH); fs.close(); cluster.shutdown(); setEncryptionConfigKeys(conf); cluster = new MiniDFSCluster.Builder(conf) .manageDataDfsDirs(false) .manageNameDfsDirs(false) .format(false) .startupOption(StartupOption.REGULAR) .build(); fs = getFileSystem(conf); LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( LogFactory.getLog(SaslDataTransferServer.class)); LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( LogFactory.getLog(DataTransferSaslUtil.class)); try { assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); } finally { logs.stopCapturing(); logs1.stopCapturing(); } fs.close(); if (resolverClazz == null) { // Test client and server negotiate cipher option GenericTestUtils.assertMatches(logs.getOutput(), "Server using cipher suite"); // Check the IOStreamPair GenericTestUtils.assertMatches(logs1.getOutput(), "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); } } finally { if (cluster != null) { cluster.shutdown(); } } }
@Test public void testEncryptedReadWithAES() throws IOException { MiniDFSCluster cluster = null; try { Configuration conf = new Configuration(); conf.set(HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, "AES/CTR/NoPadding"); cluster = new MiniDFSCluster.Builder(conf).build(); FileSystem fs = getFileSystem(conf); writeTestDataToFile(fs); assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); FileChecksum checksum = fs.getFileChecksum(TEST_PATH); fs.close(); cluster.shutdown(); setEncryptionConfigKeys(conf); cluster = new MiniDFSCluster.Builder(conf) .manageDataDfsDirs(false) .manageNameDfsDirs(false) .format(false) .startupOption(StartupOption.REGULAR) .build(); fs = getFileSystem(conf); LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( LogFactory.getLog(SaslDataTransferServer.class)); LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( LogFactory.getLog(DataTransferSaslUtil.class)); try { assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); } finally { logs.stopCapturing(); logs1.stopCapturing(); } fs.close(); if (resolverClazz == null) { // Test client and server negotiate cipher option GenericTestUtils.assertMatches(logs.getOutput(), "Server using cipher suite"); // Check the IOStreamPair GenericTestUtils.assertMatches(logs1.getOutput(), "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); } } finally { if (cluster != null) { cluster.shutdown(); } } }