@Override public LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes) throws AccessControlException, FileNotFoundException, NotReplicatedYetException, SafeModeException, UnresolvedLinkException, IOException { AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder() .setSrc(src).setClientName(clientName).setFileId(fileId); if (previous != null) req.setPrevious(PBHelper.convert(previous)); if (excludeNodes != null) req.addAllExcludeNodes(PBHelper.convert(excludeNodes)); if (favoredNodes != null) { req.addAllFavoredNodes(Arrays.asList(favoredNodes)); } try { return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Override public LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludeNodes) throws AccessControlException, FileNotFoundException, NotReplicatedYetException, SafeModeException, UnresolvedLinkException, IOException { AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder().setSrc(src).setClientName(clientName); if (previous != null) { req.setPrevious(PBHelper.convert(previous)); } if (excludeNodes != null) { req.addAllExcludeNodes(PBHelper.convert(excludeNodes)); } try { return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
public LocatedBlock addBlock(String src, String clientName) throws IOException { num_calls++; if (num_calls > num_calls_allowed) { throw new IOException("addBlock called more times than " + RETRY_CONFIG + " allows."); } else { throw new RemoteException(NotReplicatedYetException.class.getName(), ADD_BLOCK_EXCEPTION); } }
public LocatedBlock addBlock(final String src, final String clientName, final ExtendedBlock previous, final DatanodeInfo[] excludeNodes) throws AccessControlException, FileNotFoundException, NotReplicatedYetException, SafeModeException, UnresolvedLinkException, IOException { ClientActionHandler handler = new ClientActionHandler() { @Override public Object doAction(ClientProtocol namenode) throws RemoteException, IOException { return namenode.addBlock(src, clientName, previous, excludeNodes); } }; return (LocatedBlock) doClientActionWithRetry(handler, "addBlock"); }
public LocatedBlock addBlock(String src, String clientName, DatanodeInfo[] excludedNode) throws IOException { num_calls++; if (num_calls > num_calls_allowed) { throw new IOException("addBlock called more times than " + RETRY_CONFIG + " allows."); } else { throw new RemoteException(NotReplicatedYetException.class.getName(), ADD_BLOCK_EXCEPTION); } }
private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException { int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; long sleeptime = 400; while (true) { long localstart = Time.monotonicNow(); while (true) { try { return dfsClient.namenode.addBlock(src, dfsClient.clientName, block, excludedNodes, fileId, favoredNodes); } catch (RemoteException e) { IOException ue = e.unwrapRemoteException(FileNotFoundException.class, AccessControlException.class, NSQuotaExceededException.class, DSQuotaExceededException.class, UnresolvedPathException.class); if (ue != e) { throw ue; // no need to retry these exceptions } if (NotReplicatedYetException.class.getName(). equals(e.getClassName())) { if (retries == 0) { throw e; } else { --retries; DFSClient.LOG.info("Exception while adding a block", e); long elapsed = Time.monotonicNow() - localstart; if (elapsed > 5000) { DFSClient.LOG.info("Waiting for replication for " + (elapsed / 1000) + " seconds"); } try { DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src + " retries left " + retries); Thread.sleep(sleeptime); sleeptime *= 2; } catch (InterruptedException ie) { DFSClient.LOG.warn("Caught exception ", ie); } } } else { throw e; } } } } }
/** * Verify that client will correctly give up after the specified number * of times trying to add a block */ @SuppressWarnings({ "serial", "unchecked" }) @Test public void testNotYetReplicatedErrors() throws IOException { final String exceptionMsg = "Nope, not replicated yet..."; final int maxRetries = 1; // Allow one retry (total of two calls) conf.setInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, maxRetries); NamenodeProtocols mockNN = mock(NamenodeProtocols.class); Answer<Object> answer = new ThrowsException(new IOException()) { int retryCount = 0; @Override public Object answer(InvocationOnMock invocation) throws Throwable { retryCount++; System.out.println("addBlock has been called " + retryCount + " times"); if(retryCount > maxRetries + 1) // First call was not a retry throw new IOException("Retried too many times: " + retryCount); else throw new RemoteException(NotReplicatedYetException.class.getName(), exceptionMsg); } }; when(mockNN.addBlock(anyString(), anyString(), any(ExtendedBlock.class), any(DatanodeInfo[].class), anyLong(), any(String[].class))).thenAnswer(answer); Mockito.doReturn( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( (short) 777), "owner", "group", new byte[0], new byte[0], 1010, 0, null, (byte) 0)).when(mockNN).getFileInfo(anyString()); Mockito.doReturn( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( (short) 777), "owner", "group", new byte[0], new byte[0], 1010, 0, null, (byte) 0)) .when(mockNN) .create(anyString(), (FsPermission) anyObject(), anyString(), (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(), anyShort(), anyLong(), (CryptoProtocolVersion[]) anyObject()); final DFSClient client = new DFSClient(null, mockNN, conf, null); OutputStream os = client.create("testfile", true); os.write(20); // write one random byte try { os.close(); } catch (Exception e) { assertTrue("Retries are not being stopped correctly: " + e.getMessage(), e.getMessage().equals(exceptionMsg)); } }
static LocatedBlock addBlock(DatanodeInfo[] excludedNodes, DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId, String[] favoredNodes) throws IOException { final DfsClientConf conf = dfsClient.getConf(); int retries = conf.getNumBlockWriteLocateFollowingRetry(); long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs(); long localstart = Time.monotonicNow(); while (true) { try { return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock, excludedNodes, fileId, favoredNodes); } catch (RemoteException e) { IOException ue = e.unwrapRemoteException(FileNotFoundException.class, AccessControlException.class, NSQuotaExceededException.class, DSQuotaExceededException.class, QuotaByStorageTypeExceededException.class, UnresolvedPathException.class); if (ue != e) { throw ue; // no need to retry these exceptions } if (NotReplicatedYetException.class.getName() .equals(e.getClassName())) { if (retries == 0) { throw e; } else { --retries; LOG.info("Exception while adding a block", e); long elapsed = Time.monotonicNow() - localstart; if (elapsed > 5000) { LOG.info("Waiting for replication for " + (elapsed / 1000) + " seconds"); } try { LOG.warn("NotReplicatedYetException sleeping " + src + " retries left " + retries); Thread.sleep(sleeptime); sleeptime *= 2; } catch (InterruptedException ie) { LOG.warn("Caught exception", ie); } } } else { throw e; } } } }
/** * Verify that client will correctly give up after the specified number * of times trying to add a block */ @SuppressWarnings({ "serial", "unchecked" }) @Test public void testNotYetReplicatedErrors() throws IOException { final String exceptionMsg = "Nope, not replicated yet..."; final int maxRetries = 1; // Allow one retry (total of two calls) conf.setInt(HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, maxRetries); NamenodeProtocols mockNN = mock(NamenodeProtocols.class); Answer<Object> answer = new ThrowsException(new IOException()) { int retryCount = 0; @Override public Object answer(InvocationOnMock invocation) throws Throwable { retryCount++; System.out.println("addBlock has been called " + retryCount + " times"); if(retryCount > maxRetries + 1) // First call was not a retry throw new IOException("Retried too many times: " + retryCount); else throw new RemoteException(NotReplicatedYetException.class.getName(), exceptionMsg); } }; when(mockNN.addBlock(anyString(), anyString(), any(ExtendedBlock.class), any(DatanodeInfo[].class), anyLong(), any(String[].class))).thenAnswer(answer); Mockito.doReturn( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( (short) 777), "owner", "group", new byte[0], new byte[0], 1010, 0, null, (byte) 0, null)).when(mockNN).getFileInfo(anyString()); Mockito.doReturn( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( (short) 777), "owner", "group", new byte[0], new byte[0], 1010, 0, null, (byte) 0, null)) .when(mockNN) .create(anyString(), (FsPermission) anyObject(), anyString(), (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(), anyShort(), anyLong(), (CryptoProtocolVersion[]) anyObject()); final DFSClient client = new DFSClient(null, mockNN, conf, null); OutputStream os = client.create("testfile", true); os.write(20); // write one random byte try { os.close(); } catch (Exception e) { assertTrue("Retries are not being stopped correctly: " + e.getMessage(), e.getMessage().equals(exceptionMsg)); } }
@Override public LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes) throws AccessControlException, FileNotFoundException, NotReplicatedYetException, SafeModeException, UnresolvedLinkException, IOException { RouteInfo routeInfo = router.route(src); return routeInfo.upstream.addBlock(routeInfo.realPath, clientName, previous, excludeNodes, fileId, favoredNodes); }
private LocatedBlock locateFollowingBlock(long start, DatanodeInfo[] excludedNodes) throws IOException { int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; long sleeptime = 400; while (true) { long localstart = Time.now(); while (true) { try { return dfsClient.namenode.addBlock(src, dfsClient.clientName, block, excludedNodes, fileId, favoredNodes); } catch (RemoteException e) { IOException ue = e.unwrapRemoteException(FileNotFoundException.class, AccessControlException.class, NSQuotaExceededException.class, DSQuotaExceededException.class, UnresolvedPathException.class); if (ue != e) { throw ue; // no need to retry these exceptions } if (NotReplicatedYetException.class.getName(). equals(e.getClassName())) { if (retries == 0) { throw e; } else { --retries; DFSClient.LOG.info("Exception while adding a block", e); if (Time.now() - localstart > 5000) { DFSClient.LOG.info("Waiting for replication for " + (Time.now() - localstart) / 1000 + " seconds"); } try { DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src + " retries left " + retries); Thread.sleep(sleeptime); sleeptime *= 2; } catch (InterruptedException ie) { DFSClient.LOG.warn("Caught exception ", ie); } } } else { throw e; } } } } }
private LocatedBlock locateFollowingBlock(long start, DatanodeInfo[] excludedNodes) throws IOException, UnresolvedLinkException { int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; long sleeptime = 400; while (true) { long localstart = Time.now(); while (true) { try { return dfsClient.namenode.addBlock(src, dfsClient.clientName, block, excludedNodes, fileId, favoredNodes); } catch (RemoteException e) { IOException ue = e.unwrapRemoteException(FileNotFoundException.class, AccessControlException.class, NSQuotaExceededException.class, DSQuotaExceededException.class, UnresolvedPathException.class); if (ue != e) { throw ue; // no need to retry these exceptions } if (NotReplicatedYetException.class.getName(). equals(e.getClassName())) { if (retries == 0) { throw e; } else { --retries; DFSClient.LOG.info("Exception while adding a block", e); if (Time.now() - localstart > 5000) { DFSClient.LOG.info("Waiting for replication for " + (Time.now() - localstart) / 1000 + " seconds"); } try { DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src + " retries left " + retries); Thread.sleep(sleeptime); sleeptime *= 2; } catch (InterruptedException ie) { } } } else { throw e; } } } } }
/** * Verify that client will correctly give up after the specified number * of times trying to add a block */ @SuppressWarnings({ "serial", "unchecked" }) @Test public void testNotYetReplicatedErrors() throws IOException { final String exceptionMsg = "Nope, not replicated yet..."; final int maxRetries = 1; // Allow one retry (total of two calls) conf.setInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, maxRetries); NamenodeProtocols mockNN = mock(NamenodeProtocols.class); Answer<Object> answer = new ThrowsException(new IOException()) { int retryCount = 0; @Override public Object answer(InvocationOnMock invocation) throws Throwable { retryCount++; System.out.println("addBlock has been called " + retryCount + " times"); if(retryCount > maxRetries + 1) // First call was not a retry throw new IOException("Retried too many times: " + retryCount); else throw new RemoteException(NotReplicatedYetException.class.getName(), exceptionMsg); } }; when(mockNN.addBlock(anyString(), anyString(), any(ExtendedBlock.class), any(DatanodeInfo[].class), anyLong(), any(String[].class))).thenAnswer(answer); Mockito.doReturn( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( (short) 777), "owner", "group", new byte[0], new byte[0], 1010, 0)).when(mockNN).getFileInfo(anyString()); Mockito.doReturn( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( (short) 777), "owner", "group", new byte[0], new byte[0], 1010, 0)) .when(mockNN) .create(anyString(), (FsPermission) anyObject(), anyString(), (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(), anyShort(), anyLong()); final DFSClient client = new DFSClient(null, mockNN, conf, null); OutputStream os = client.create("testfile", true); os.write(20); // write one random byte try { os.close(); } catch (Exception e) { assertTrue("Retries are not being stopped correctly: " + e.getMessage(), e.getMessage().equals(exceptionMsg)); } }
private LocatedBlock locateFollowingBlock(long start, DatanodeInfo[] excludedNodes) throws IOException, UnresolvedLinkException { int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; long sleeptime = 1000; //HOP default value was 400 while (true) { long localstart = Time.now(); while (true) { try { return dfsClient .addBlock(src, dfsClient.clientName, block, excludedNodes); } catch (RemoteException e) { IOException ue = e.unwrapRemoteException(FileNotFoundException.class, AccessControlException.class, NSQuotaExceededException.class, DSQuotaExceededException.class, UnresolvedPathException.class); if (ue != e) { throw ue; // no need to retry these exceptions } if (NotReplicatedYetException.class.getName(). equals(e.getClassName())) { if (retries == 0) { throw e; } else { --retries; DFSClient.LOG.debug("Exception while adding a block", e); if (Time.now() - localstart > 5000) { DFSClient.LOG.debug("Waiting for replication for " + (Time.now() - localstart) / 1000 + " seconds"); } try { Thread.sleep(sleeptime); sleeptime *= 2; } catch (InterruptedException ie) { } } } else { throw e; } } } } }
/** * Verify that client will correctly give up after the specified number * of times trying to add a block */ @SuppressWarnings("serial") @Test public void testNotYetReplicatedErrors() throws IOException { final String exceptionMsg = "Nope, not replicated yet..."; final int maxRetries = 1; // Allow one retry (total of two calls) conf.setInt( DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, maxRetries); NamenodeProtocols mockNN = mock(NamenodeProtocols.class); Answer<Object> answer = new ThrowsException(new IOException()) { int retryCount = 0; @Override public Object answer(InvocationOnMock invocation) throws Throwable { retryCount++; System.out.println("addBlock has been called " + retryCount + " times"); if (retryCount > maxRetries + 1) // First call was not a retry { throw new IOException("Retried too many times: " + retryCount); } else { throw new RemoteException(NotReplicatedYetException.class.getName(), exceptionMsg); } } }; when(mockNN.addBlock(anyString(), anyString(), any(ExtendedBlock.class), any(DatanodeInfo[].class))).thenAnswer(answer); final DFSClient client = new DFSClient(null, mockNN, conf, null); OutputStream os = client.create("testfile", true); os.write(20); // write one random byte try { os.close(); } catch (Exception e) { assertTrue("Retries are not being stopped correctly: " + e.getMessage(), e.getMessage().equals(exceptionMsg)); } }
private LocatedBlock locateFollowingBlock(long start, DatanodeInfo[] excludedNodes ) throws IOException { int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5); long sleeptime = 400; while (true) { long localstart = System.currentTimeMillis(); while (true) { try { if (serverSupportsHdfs630) { return namenode.addBlock(src, clientName, excludedNodes); } else { return namenode.addBlock(src, clientName); } } catch (RemoteException e) { IOException ue = e.unwrapRemoteException(FileNotFoundException.class, AccessControlException.class, NSQuotaExceededException.class, DSQuotaExceededException.class); if (ue != e) { throw ue; // no need to retry these exceptions } if (e.getMessage().startsWith( "java.io.IOException: java.lang.NoSuchMethodException: " + "org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock(" + "java.lang.String, java.lang.String, " + "[Lorg.apache.hadoop.hdfs.protocol.DatanodeInfo;)")) { // We're talking to a server that doesn't implement HDFS-630. // Mark that and try again serverSupportsHdfs630 = false; continue; } if (NotReplicatedYetException.class.getName(). equals(e.getClassName())) { if (retries == 0) { throw e; } else { --retries; LOG.info(StringUtils.stringifyException(e)); if (System.currentTimeMillis() - localstart > 5000) { LOG.info("Waiting for replication for " + (System.currentTimeMillis() - localstart) / 1000 + " seconds"); } try { LOG.warn("NotReplicatedYetException sleeping " + src + " retries left " + retries); Thread.sleep(sleeptime); sleeptime *= 2; } catch (InterruptedException ie) { } } } else { throw e; } } } } }
private LocatedBlock locateFollowingBlock(long start, DatanodeInfo[] excludedNodes) throws IOException { int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; long sleeptime = 400; while (true) { long localstart = Time.now(); while (true) { try { return dfsClient.namenode.addBlock(src, dfsClient.clientName, block, excludedNodes, fileId, favoredNodes); } catch (RemoteException e) { IOException ue = e.unwrapRemoteException(FileNotFoundException.class, AccessControlException.class, NSQuotaExceededException.class, DSQuotaExceededException.class, UnresolvedPathException.class); if (ue != e) { throw ue; // no need to retry these exceptions } if (NotReplicatedYetException.class.getName(). equals(e.getClassName())) { if (retries == 0) { throw e; } else { --retries; DFSClient.LOG.info("Exception while adding a block", e); if (Time.now() - localstart > 5000) { DFSClient.LOG.info("Waiting for replication for " + (Time.now() - localstart) / 1000 + " seconds"); } try { DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src + " retries left " + retries); Thread.sleep(sleeptime); sleeptime *= 2; } catch (InterruptedException ie) { } } } else { throw e; } } } } }
private LocatedBlock[] locateFollowingBlock(long start, DatanodeInfo[] excludedNodes) throws IOException, UnresolvedLinkException { int retries = conf.getInt( "dfs.client.block.write.locateFollowingBlock.retries", 5); long sleeptime = 400; while (true) { long localstart = System.currentTimeMillis(); while (true) { try { return dfsClient.namenode.addBlock(src, dfsClient.clientName, null, excludedNodes); } catch (RemoteException e) { IOException ue = e.unwrapRemoteException( FileNotFoundException.class, AccessControlException.class, NSQuotaExceededException.class, DSQuotaExceededException.class, UnresolvedPathException.class); if (ue != e) { throw ue; // no need to retry these exceptions } if (NotReplicatedYetException.class.getName().equals( e.getClassName())) { if (retries == 0) { throw e; } else { --retries; DFSClient.LOG.info(StringUtils.stringifyException(e)); if (System.currentTimeMillis() - localstart > 5000) { DFSClient.LOG.info("Waiting for replication for " + (System.currentTimeMillis() - localstart) / 1000 + " seconds"); } try { DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src + " retries left " + retries); Thread.sleep(sleeptime); sleeptime *= 2; } catch (InterruptedException ie) { } } } else { throw e; } } } } }