public IPFailoverProxyProvider(Configuration conf, URI uri, Class<T> xface) { Preconditions.checkArgument( xface.isAssignableFrom(NamenodeProtocols.class), "Interface class %s is not a valid NameNode protocol!"); this.xface = xface; this.nameNodeUri = uri; this.conf = new Configuration(conf); int maxRetries = this.conf.getInt( DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_KEY, DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_DEFAULT); this.conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, maxRetries); int maxRetriesOnSocketTimeouts = this.conf.getInt( DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); this.conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, maxRetriesOnSocketTimeouts); }
@Test public void testOpenFilesWithRename() throws Exception { Path path = new Path("/test"); doWriteAndAbort(fs, path); // check for zero sized blocks Path fileWithEmptyBlock = new Path("/test/test/test4"); fs.create(fileWithEmptyBlock); NamenodeProtocols nameNodeRpc = cluster.getNameNodeRpc(); String clientName = fs.getClient().getClientName(); // create one empty block nameNodeRpc.addBlock(fileWithEmptyBlock.toString(), clientName, null, null, INodeId.GRANDFATHER_INODE_ID, null); fs.createSnapshot(path, "s2"); fs.rename(new Path("/test/test"), new Path("/test/test-renamed")); fs.delete(new Path("/test/test-renamed"), true); NameNode nameNode = cluster.getNameNode(); NameNodeAdapter.enterSafeMode(nameNode, false); NameNodeAdapter.saveNamespace(nameNode); NameNodeAdapter.leaveSafeMode(nameNode); cluster.restartNameNode(true); }
@Test public void testAddBlockRetryShouldReturnBlockWithLocations() throws Exception { final String src = "/testAddBlockRetryShouldReturnBlockWithLocations"; NamenodeProtocols nameNodeRpc = cluster.getNameNodeRpc(); // create file nameNodeRpc.create(src, FsPermission.getFileDefault(), "clientName", new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, (short) 3, 1024, null); // start first addBlock() LOG.info("Starting first addBlock for " + src); LocatedBlock lb1 = nameNodeRpc.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID, null); assertTrue("Block locations should be present", lb1.getLocations().length > 0); cluster.restartNameNode(); nameNodeRpc = cluster.getNameNodeRpc(); LocatedBlock lb2 = nameNodeRpc.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID, null); assertEquals("Blocks are not equal", lb1.getBlock(), lb2.getBlock()); assertTrue("Wrong locations with retry", lb2.getLocations().length > 0); }
/** * go to each block on the 2nd DataNode until it fails... * @param path * @param size * @throws IOException */ private void triggerFailure(String path, long size) throws IOException { NamenodeProtocols nn = cluster.getNameNodeRpc(); List<LocatedBlock> locatedBlocks = nn.getBlockLocations(path, 0, size).getLocatedBlocks(); for (LocatedBlock lb : locatedBlocks) { DatanodeInfo dinfo = lb.getLocations()[1]; ExtendedBlock b = lb.getBlock(); try { accessBlock(dinfo, lb); } catch (IOException e) { System.out.println("Failure triggered, on block: " + b.getBlockId() + "; corresponding volume should be removed by now"); break; } } }
/** * Count datanodes that have copies of the blocks for a file * put it into the map * @param map * @param path * @param size * @return * @throws IOException */ private int countNNBlocks(Map<String, BlockLocs> map, String path, long size) throws IOException { int total = 0; NamenodeProtocols nn = cluster.getNameNodeRpc(); List<LocatedBlock> locatedBlocks = nn.getBlockLocations(path, 0, size).getLocatedBlocks(); //System.out.println("Number of blocks: " + locatedBlocks.size()); for(LocatedBlock lb : locatedBlocks) { String blockId = ""+lb.getBlock().getBlockId(); //System.out.print(blockId + ": "); DatanodeInfo[] dn_locs = lb.getLocations(); BlockLocs bl = map.get(blockId); if(bl == null) { bl = new BlockLocs(); } //System.out.print(dn_info.name+","); total += dn_locs.length; bl.num_locs += dn_locs.length; map.put(blockId, bl); //System.out.println(); } return total; }
/** * Regression test for HDFS-3626. Creates a file using a non-canonical path * (i.e. with extra slashes between components) and makes sure that the NN * rejects it. */ @Test public void testMkdirRpcNonCanonicalPath() throws IOException { MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); try { NamenodeProtocols nnrpc = cluster.getNameNodeRpc(); for (String pathStr : NON_CANONICAL_PATHS) { try { nnrpc.mkdirs(pathStr, new FsPermission((short)0755), true); fail("Did not fail when called with a non-canonicalized path: " + pathStr); } catch (InvalidPathException ipe) { // expected } } } finally { cluster.shutdown(); } }
public IPFailoverProxyProvider(Configuration conf, URI uri, Class<T> xface) { Preconditions.checkArgument( xface.isAssignableFrom(NamenodeProtocols.class), "Interface class %s is not a valid NameNode protocol!"); this.xface = xface; this.nameNodeUri = uri; this.conf = new Configuration(conf); int maxRetries = this.conf.getInt( HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY, HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT); this.conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, maxRetries); int maxRetriesOnSocketTimeouts = this.conf.getInt( HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY, HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); this.conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, maxRetriesOnSocketTimeouts); }
@Test public void testHedgingWhenOneIsSlow() throws Exception { final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class); Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() { @Override public long[] answer(InvocationOnMock invocation) throws Throwable { Thread.sleep(1000); return new long[]{1}; } }); final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); RequestHedgingProxyProvider<NamenodeProtocols> provider = new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, createFactory(goodMock, badMock)); long[] stats = provider.getProxy().proxy.getStats(); Assert.assertTrue(stats.length == 1); Assert.assertEquals(1, stats[0]); Mockito.verify(badMock).getStats(); Mockito.verify(goodMock).getStats(); }
@Test public void testHedgingWhenBothFail() throws Exception { NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class); Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!")); NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class); Mockito.when(worseMock.getStats()).thenThrow( new IOException("Worse mock !!")); RequestHedgingProxyProvider<NamenodeProtocols> provider = new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class, createFactory(badMock, worseMock)); try { provider.getProxy().proxy.getStats(); Assert.fail("Should fail since both namenodes throw IOException !!"); } catch (Exception e) { Assert.assertTrue(e instanceof MultiException); } Mockito.verify(badMock).getStats(); Mockito.verify(worseMock).getStats(); }
@Test public void testOpenFilesWithRename() throws Exception { Path path = new Path("/test"); doWriteAndAbort(fs, path); // check for zero sized blocks Path fileWithEmptyBlock = new Path("/test/test/test4"); fs.create(fileWithEmptyBlock); NamenodeProtocols nameNodeRpc = cluster.getNameNodeRpc(); String clientName = fs.getClient().getClientName(); // create one empty block nameNodeRpc.addBlock(fileWithEmptyBlock.toString(), clientName, null, null, HdfsConstants.GRANDFATHER_INODE_ID, null); fs.createSnapshot(path, "s2"); fs.rename(new Path("/test/test"), new Path("/test/test-renamed")); fs.delete(new Path("/test/test-renamed"), true); NameNode nameNode = cluster.getNameNode(); NameNodeAdapter.enterSafeMode(nameNode, false); NameNodeAdapter.saveNamespace(nameNode); NameNodeAdapter.leaveSafeMode(nameNode); cluster.restartNameNode(true); }
@Test public void testAddBlockRetryShouldReturnBlockWithLocations() throws Exception { final String src = "/testAddBlockRetryShouldReturnBlockWithLocations"; NamenodeProtocols nameNodeRpc = cluster.getNameNodeRpc(); // create file nameNodeRpc.create(src, FsPermission.getFileDefault(), "clientName", new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, (short) 3, 1024, null); // start first addBlock() LOG.info("Starting first addBlock for " + src); LocatedBlock lb1 = nameNodeRpc.addBlock(src, "clientName", null, null, HdfsConstants.GRANDFATHER_INODE_ID, null); assertTrue("Block locations should be present", lb1.getLocations().length > 0); cluster.restartNameNode(); nameNodeRpc = cluster.getNameNodeRpc(); LocatedBlock lb2 = nameNodeRpc.addBlock(src, "clientName", null, null, HdfsConstants.GRANDFATHER_INODE_ID, null); assertEquals("Blocks are not equal", lb1.getBlock(), lb2.getBlock()); assertTrue("Wrong locations with retry", lb2.getLocations().length > 0); }
@Test public void testDFSClientConfigurationLocateFollowingBlockInitialDelay() throws Exception { // test if HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY // is not configured, verify DFSClient uses the default value 400. MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); try { cluster.waitActive(); NamenodeProtocols nn = cluster.getNameNodeRpc(); DFSClient client = new DFSClient(null, nn, conf, null); assertEquals(client.getConf(). getBlockWriteLocateFollowingInitialDelayMs(), 400); // change HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY, // verify DFSClient uses the configured value 1000. conf.setInt( HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY, 1000); client = new DFSClient(null, nn, conf, null); assertEquals(client.getConf(). getBlockWriteLocateFollowingInitialDelayMs(), 1000); } finally { cluster.shutdown(); } }
/** * Regression test for HDFS-3626. Creates a file using a non-canonical path * (i.e. with extra slashes between components) and makes sure that the NN * rejects it. */ @Test public void testMkdirRpcNonCanonicalPath() throws IOException { MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); try { NamenodeProtocols nnrpc = cluster.getNameNodeRpc(); for (String pathStr : NON_CANONICAL_PATHS) { try { nnrpc.mkdirs(pathStr, new FsPermission((short) 0755), true); fail("Did not fail when called with a non-canonicalized path: " + pathStr); } catch (InvalidPathException ipe) { // expected } } } finally { cluster.shutdown(); } }
/** Creates the Failover proxy provider instance*/ @VisibleForTesting public static <T> FailoverProxyProvider<T> createFailoverProxyProvider( Configuration conf, Class<FailoverProxyProvider<T>> failoverProxyProviderClass, Class<T> xface, URI nameNodeUri) throws IOException { Preconditions.checkArgument( xface.isAssignableFrom(NamenodeProtocols.class), "Interface %s is not a NameNode protocol", xface); try { Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass .getConstructor(Configuration.class, URI.class, Class.class); FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri, xface); return provider; } catch (Exception e) { String message = "Couldn't create proxy provider " + failoverProxyProviderClass; if (LOG.isDebugEnabled()) { LOG.debug(message, e); } if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); } else { throw new IOException(message, e); } } }
private static NamenodeProtocols getRPCServer(NameNode namenode) throws IOException { final NamenodeProtocols np = namenode.getRpcServer(); if (np == null) { throw new RetriableException("Namenode is in startup mode"); } return np; }
private Response post( final UserGroupInformation ugi, final DelegationParam delegation, final UserParam username, final DoAsParam doAsUser, final String fullpath, final PostOpParam op, final ConcatSourcesParam concatSrcs, final BufferSizeParam bufferSize, final ExcludeDatanodesParam excludeDatanodes, final NewLengthParam newLength ) throws IOException, URISyntaxException { final NameNode namenode = (NameNode)context.getAttribute("name.node"); final NamenodeProtocols np = getRPCServer(namenode); switch(op.getValue()) { case APPEND: { final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, fullpath, op.getValue(), -1L, -1L, excludeDatanodes.getValue(), bufferSize); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } case CONCAT: { np.concat(fullpath, concatSrcs.getAbsolutePaths()); return Response.ok().build(); } case TRUNCATE: { // We treat each rest request as a separate client. final boolean b = np.truncate(fullpath, newLength.getValue(), "DFSClient_" + DFSUtil.getSecureRandom().nextLong()); final String js = JsonUtil.toJsonString("boolean", b); return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); } default: throw new UnsupportedOperationException(op + " is not supported"); } }
private static DirectoryListing getDirectoryListing(final NamenodeProtocols np, final String p, byte[] startAfter) throws IOException { final DirectoryListing listing = np.getListing(p, startAfter, false); if (listing == null) { // the directory does not exist throw new FileNotFoundException("File " + p + " does not exist."); } return listing; }
private Response delete( final UserGroupInformation ugi, final DelegationParam delegation, final UserParam username, final DoAsParam doAsUser, final String fullpath, final DeleteOpParam op, final RecursiveParam recursive, final SnapshotNameParam snapshotName ) throws IOException { final NameNode namenode = (NameNode)context.getAttribute("name.node"); final NamenodeProtocols np = getRPCServer(namenode); switch(op.getValue()) { case DELETE: { final boolean b = np.delete(fullpath, recursive.getValue()); final String js = JsonUtil.toJsonString("boolean", b); return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); } case DELETESNAPSHOT: { np.deleteSnapshot(fullpath, snapshotName.getValue()); return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build(); } default: throw new UnsupportedOperationException(op + " is not supported"); } }
static String getDelegationToken(final NamenodeProtocols nn, HttpServletRequest request, Configuration conf, final UserGroupInformation ugi) throws IOException, InterruptedException { Token<DelegationTokenIdentifier> token = ugi .doAs(new PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() { @Override public Token<DelegationTokenIdentifier> run() throws IOException { return nn.getDelegationToken(new Text(ugi.getUserName())); } }); return token == null ? null : token.encodeToUrlString(); }
/** * Make sure a RetriableException is thrown when rpcServer is null in * NamenodeWebHdfsMethods. */ @Test public void testRaceWhileNNStartup() throws Exception { MiniDFSCluster cluster = null; final Configuration conf = WebHdfsTestUtil.createConf(); try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); cluster.waitActive(); final NameNode namenode = cluster.getNameNode(); final NamenodeProtocols rpcServer = namenode.getRpcServer(); Whitebox.setInternalState(namenode, "rpcServer", null); final Path foo = new Path("/foo"); final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsFileSystem.SCHEME); try { webHdfs.mkdirs(foo); fail("Expected RetriableException"); } catch (RetriableException e) { GenericTestUtils.assertExceptionContains("Namenode is in startup mode", e); } Whitebox.setInternalState(namenode, "rpcServer", rpcServer); } finally { if (cluster != null) { cluster.shutdown(); } } }
@Test public void checkAnnotations() { Method[] methods = NamenodeProtocols.class.getMethods(); for (Method m : methods) { Assert.assertTrue( "Idempotent or AtMostOnce annotation is not present " + m, m.isAnnotationPresent(Idempotent.class) || m.isAnnotationPresent(AtMostOnce.class)); } }
private void testInvalidSymlinkTarget(NamenodeProtocols nnRpc, String invalidTarget, String link) throws IOException { try { FsPermission perm = FsPermission.createImmutable((short)0755); nnRpc.createSymlink(invalidTarget, link, perm, false); fail("Symbolic link creation of target " + invalidTarget + " should fail"); } catch (InvalidPathException expected) { // Expected } }