/** Method to get stream returned by append call */ private DFSOutputStream callAppend(String src, int buffersize, EnumSet<CreateFlag> flag, Progressable progress, String[] favoredNodes) throws IOException { CreateFlag.validateForAppend(flag); try { LastBlockWithStatus blkWithStatus = namenode.append(src, clientName, new EnumSetWritable<>(flag, CreateFlag.class)); return DFSOutputStream.newStreamForAppend(this, src, flag, buffersize, progress, blkWithStatus.getLastBlock(), blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(), favoredNodes); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, SafeModeException.class, DSQuotaExceededException.class, UnsupportedOperationException.class, UnresolvedPathException.class, SnapshotAccessControlException.class); } }
@Override public AppendResponseProto append(RpcController controller, AppendRequestProto req) throws ServiceException { try { EnumSetWritable<CreateFlag> flags = req.hasFlag() ? PBHelper.convertCreateFlag(req.getFlag()) : new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)); LastBlockWithStatus result = server.append(req.getSrc(), req.getClientName(), flags); AppendResponseProto.Builder builder = AppendResponseProto.newBuilder(); if (result.getLastBlock() != null) { builder.setBlock(PBHelper.convert(result.getLastBlock())); } if (result.getFileStatus() != null) { builder.setStat(PBHelper.convert(result.getFileStatus())); } return builder.build(); } catch (IOException e) { throw new ServiceException(e); } }
@Override public LastBlockWithStatus append(String src, String clientName, EnumSetWritable<CreateFlag> flag) throws AccessControlException, DSQuotaExceededException, FileNotFoundException, SafeModeException, UnresolvedLinkException, IOException { AppendRequestProto req = AppendRequestProto.newBuilder().setSrc(src) .setClientName(clientName).setFlag(PBHelper.convertCreateFlag(flag)) .build(); try { AppendResponseProto res = rpcProxy.append(null, req); LocatedBlock lastBlock = res.hasBlock() ? PBHelper .convert(res.getBlock()) : null; HdfsFileStatus stat = (res.hasStat()) ? PBHelper.convert(res.getStat()) : null; return new LastBlockWithStatus(lastBlock, stat); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Override public LastBlockWithStatus append(String src, String clientName, EnumSetWritable<CreateFlag> flag) throws IOException { AppendRequestProto req = AppendRequestProto.newBuilder().setSrc(src) .setClientName(clientName).setFlag( PBHelperClient.convertCreateFlag(flag)) .build(); try { AppendResponseProto res = rpcProxy.append(null, req); LocatedBlock lastBlock = res.hasBlock() ? PBHelperClient .convertLocatedBlockProto(res.getBlock()) : null; HdfsFileStatus stat = (res.hasStat()) ? PBHelperClient.convert(res.getStat()) : null; return new LastBlockWithStatus(lastBlock, stat); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Override public AppendResponseProto append(RpcController controller, AppendRequestProto req) throws ServiceException { try { EnumSetWritable<CreateFlag> flags = req.hasFlag() ? PBHelperClient.convertCreateFlag(req.getFlag()) : new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)); LastBlockWithStatus result = server.append(req.getSrc(), req.getClientName(), flags); AppendResponseProto.Builder builder = AppendResponseProto.newBuilder(); if (result.getLastBlock() != null) { builder.setBlock(PBHelperClient.convertLocatedBlock( result.getLastBlock())); } if (result.getFileStatus() != null) { builder.setStat(PBHelperClient.convert(result.getFileStatus())); } return builder.build(); } catch (IOException e) { throw new ServiceException(e); } }
/** * Append to an existing file in the namespace. */ LastBlockWithStatus appendFile(String src, String holder, String clientMachine, EnumSet<CreateFlag> flag, boolean logRetryCache) throws IOException { try { return appendFileInt(src, holder, clientMachine, flag.contains(CreateFlag.NEW_BLOCK), logRetryCache); } catch (AccessControlException e) { logAuditEvent(false, "append", src); throw e; } }
@Override // ClientProtocol public LastBlockWithStatus append(String src, String clientName, EnumSetWritable<CreateFlag> flag) throws IOException { checkNNStartup(); String clientMachine = getClientMachine(); if (stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*DIR* NameNode.append: file " +src+" for "+clientName+" at "+clientMachine); } CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null); if (cacheEntry != null && cacheEntry.isSuccess()) { return (LastBlockWithStatus) cacheEntry.getPayload(); } LastBlockWithStatus info = null; boolean success = false; try { info = namesystem.appendFile(src, clientName, clientMachine, flag.get(), cacheEntry != null); success = true; } finally { RetryCache.setState(cacheEntry, success, info); } metrics.incrFilesAppended(); return info; }
/** * Test for rename1 */ @Test public void testAppend() throws Exception { String src = "/testNamenodeRetryCache/testAppend/src"; resetCall(); // Create a file with partial block DFSTestUtil.createFile(filesystem, new Path(src), 128, (short)1, 0L); // Retried append requests succeed newCall(); LastBlockWithStatus b = nnRpc.append(src, "holder", new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND))); Assert.assertEquals(b, nnRpc.append(src, "holder", new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)))); Assert.assertEquals(b, nnRpc.append(src, "holder", new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)))); // non-retried call fails newCall(); try { nnRpc.append(src, "holder", new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND))); Assert.fail("testAppend - expected exception is not thrown"); } catch (Exception e) { // Expected } }
/** Method to get stream returned by append call */ private DFSOutputStream callAppend(String src, EnumSet<CreateFlag> flag, Progressable progress, String[] favoredNodes) throws IOException { CreateFlag.validateForAppend(flag); try { LastBlockWithStatus blkWithStatus = namenode.append(src, clientName, new EnumSetWritable<>(flag, CreateFlag.class)); HdfsFileStatus status = blkWithStatus.getFileStatus(); if (status == null) { DFSClient.LOG.debug("NameNode is on an older version, request file " + "info with additional RPC call for file: " + src); status = getFileInfo(src); } return DFSOutputStream.newStreamForAppend(this, src, flag, progress, blkWithStatus.getLastBlock(), status, dfsClientConf.createChecksum(null), favoredNodes); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, SafeModeException.class, DSQuotaExceededException.class, QuotaByStorageTypeExceededException.class, UnsupportedOperationException.class, UnresolvedPathException.class, SnapshotAccessControlException.class); } }
@Override // ClientProtocol public LastBlockWithStatus append(String src, String clientName, EnumSetWritable<CreateFlag> flag) throws IOException { checkNNStartup(); String clientMachine = getClientMachine(); if (stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*DIR* NameNode.append: file " +src+" for "+clientName+" at "+clientMachine); } namesystem.checkOperation(OperationCategory.WRITE); CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null); if (cacheEntry != null && cacheEntry.isSuccess()) { return (LastBlockWithStatus) cacheEntry.getPayload(); } LastBlockWithStatus info = null; boolean success = false; try { info = namesystem.appendFile(src, clientName, clientMachine, flag.get(), cacheEntry != null); success = true; } finally { RetryCache.setState(cacheEntry, success, info); } metrics.incrFilesAppended(); return info; }
private LastBlockWithStatus appendFileInt(final String srcArg, String holder, String clientMachine, boolean newBlock, boolean logRetryCache) throws IOException { String src = srcArg; NameNode.stateChangeLog.debug( "DIR* NameSystem.appendFile: src={}, holder={}, clientMachine={}", src, holder, clientMachine); boolean skipSync = false; if (!supportAppends) { throw new UnsupportedOperationException( "Append is not enabled on this NameNode. Use the " + DFS_SUPPORT_APPEND_KEY + " configuration option to enable it."); } LocatedBlock lb = null; HdfsFileStatus stat = null; FSPermissionChecker pc = getPermissionChecker(); checkOperation(OperationCategory.WRITE); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); writeLock(); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot append to file" + src); src = dir.resolvePath(pc, src, pathComponents); final INodesInPath iip = dir.getINodesInPath4Write(src); lb = appendFileInternal(pc, iip, holder, clientMachine, newBlock, logRetryCache); stat = FSDirStatAndListingOp.getFileInfo(dir, src, false, FSDirectory.isReservedRawName(srcArg), true); } catch (StandbyException se) { skipSync = true; throw se; } finally { writeUnlock(); // There might be transactions logged while trying to recover the lease. // They need to be sync'ed even when an exception was thrown. if (!skipSync) { getEditLog().logSync(); } } if (lb != null) { NameNode.stateChangeLog.debug( "DIR* NameSystem.appendFile: file {} for {} at {} block {} block" + " size {}", src, holder, clientMachine, lb.getBlock(), lb.getBlock().getNumBytes()); } logAuditEvent(true, "append", srcArg); return new LastBlockWithStatus(lb, stat); }
/** * Append to an existing file in the namespace. */ LastBlockWithStatus appendFile(String srcArg, String holder, String clientMachine, EnumSet<CreateFlag> flag, boolean logRetryCache) throws IOException { boolean newBlock = flag.contains(CreateFlag.NEW_BLOCK); if (newBlock) { requireEffectiveLayoutVersionForFeature(Feature.APPEND_NEW_BLOCK); } NameNode.stateChangeLog.debug( "DIR* NameSystem.appendFile: src={}, holder={}, clientMachine={}", srcArg, holder, clientMachine); try { boolean skipSync = false; LastBlockWithStatus lbs = null; final FSPermissionChecker pc = getPermissionChecker(); checkOperation(OperationCategory.WRITE); writeLock(); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot append to file" + srcArg); lbs = FSDirAppendOp.appendFile(this, srcArg, pc, holder, clientMachine, newBlock, logRetryCache); } catch (StandbyException se) { skipSync = true; throw se; } finally { writeUnlock(); // There might be transactions logged while trying to recover the lease // They need to be sync'ed even when an exception was thrown. if (!skipSync) { getEditLog().logSync(); } } logAuditEvent(true, "append", srcArg); return lbs; } catch (AccessControlException e) { logAuditEvent(false, "append", srcArg); throw e; } }