WriteManager(IdMappingServiceProvider iug, final NfsConfiguration config, boolean aixCompatMode) { this.iug = iug; this.config = config; this.aixCompatMode = aixCompatMode; streamTimeout = config.getLong(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_KEY, NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_DEFAULT); LOG.info("Stream timeout is " + streamTimeout + "ms."); if (streamTimeout < NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT) { LOG.info("Reset stream timeout to minimum value " + NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT + "ms."); streamTimeout = NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT; } maxStreams = config.getInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_DEFAULT); LOG.info("Maximum open streams is "+ maxStreams); this.fileContextCache = new OpenFileCtxCache(config, streamTimeout); }
public static Nfs3FileAttributes getNfs3FileAttrFromFileStatus( HdfsFileStatus fs, IdMappingServiceProvider iug) { /** * Some 32bit Linux client has problem with 64bit fileId: it seems the 32bit * client takes only the lower 32bit of the fileId and treats it as signed * int. When the 32th bit is 1, the client considers it invalid. */ NfsFileType fileType = fs.isDir() ? NfsFileType.NFSDIR : NfsFileType.NFSREG; fileType = fs.isSymlink() ? NfsFileType.NFSLNK : fileType; int nlink = (fileType == NfsFileType.NFSDIR) ? fs.getChildrenNum() + 2 : 1; long size = (fileType == NfsFileType.NFSDIR) ? getDirSize(fs .getChildrenNum()) : fs.getLen(); return new Nfs3FileAttributes(fileType, nlink, fs.getPermission().toShort(), iug.getUidAllowingUnknown(fs.getOwner()), iug.getGidAllowingUnknown(fs.getGroup()), size, 0 /* fsid */, fs.getFileId(), fs.getModificationTime(), fs.getAccessTime(), new Nfs3FileAttributes.Specdata3()); }
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, String dumpFilePath, DFSClient client, IdMappingServiceProvider iug, boolean aixCompatMode, NfsConfiguration config) { this.fos = fos; this.latestAttr = latestAttr; this.aixCompatMode = aixCompatMode; // We use the ReverseComparatorOnMin as the comparator of the map. In this // way, we first dump the data with larger offset. In the meanwhile, we // retrieve the last element to write back to HDFS. pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>( OffsetRange.ReverseComparatorOnMin); pendingCommits = new ConcurrentSkipListMap<Long, CommitCtx>(); updateLastAccessTime(); activeState = true; asyncStatus = false; asyncWriteBackStartOffset = 0; dumpOut = null; raf = null; nonSequentialWriteInMemory = new AtomicLong(0); this.dumpFilePath = dumpFilePath; enabledDump = dumpFilePath != null; nextOffset = new AtomicLong(); nextOffset.set(latestAttr.getSize()); try { assert(nextOffset.get() == this.fos.getPos()); } catch (IOException e) {} dumpThread = null; this.client = client; this.iug = iug; this.uploadLargeFile = config.getBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, NfsConfigKeys.LARGE_FILE_UPLOAD_DEFAULT); }
/** * If the file is in cache, update the size based on the cached data size */ Nfs3FileAttributes getFileAttr(DFSClient client, FileHandle fileHandle, IdMappingServiceProvider iug) throws IOException { String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle); Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug); if (attr != null) { OpenFileCtx openFileCtx = fileContextCache.get(fileHandle); if (openFileCtx != null) { attr.setSize(openFileCtx.getNextOffset()); attr.setUsed(openFileCtx.getNextOffset()); } } return attr; }
public static WccData createWccData(final WccAttr preOpAttr, DFSClient dfsClient, final String fileIdPath, final IdMappingServiceProvider iug) throws IOException { Nfs3FileAttributes postOpDirAttr = getFileAttr(dfsClient, fileIdPath, iug); return new WccData(preOpAttr, postOpDirAttr); }
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, String dumpFilePath, DFSClient client, IdMappingServiceProvider iug, boolean aixCompatMode, NfsConfiguration config) { this.fos = fos; this.latestAttr = latestAttr; this.aixCompatMode = aixCompatMode; // We use the ReverseComparatorOnMin as the comparator of the map. In this // way, we first dump the data with larger offset. In the meanwhile, we // retrieve the last element to write back to HDFS. pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>( OffsetRange.ReverseComparatorOnMin); pendingCommits = new ConcurrentSkipListMap<Long, CommitCtx>(); updateLastAccessTime(); activeState = true; asyncStatus = false; asyncWriteBackStartOffset = 0; dumpOut = null; raf = null; nonSequentialWriteInMemory = new AtomicLong(0); this.dumpFilePath = dumpFilePath; enabledDump = dumpFilePath != null; nextOffset = new AtomicLong(); nextOffset.set(latestAttr.getSize()); assert(nextOffset.get() == this.fos.getPos()); dumpThread = null; this.client = client; this.iug = iug; this.uploadLargeFile = config.getBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, NfsConfigKeys.LARGE_FILE_UPLOAD_DEFAULT); }
public static Nfs3FileAttributes getNfs3FileAttrFromFileStatus( HdfsFileStatus fs, IdMappingServiceProvider iug) { /** * Some 32bit Linux client has problem with 64bit fileId: it seems the 32bit * client takes only the lower 32bit of the fileId and treats it as signed * int. When the 32th bit is 1, the client considers it invalid. */ NfsFileType fileType = fs.isDir() ? NfsFileType.NFSDIR : NfsFileType.NFSREG; fileType = fs.isSymlink() ? NfsFileType.NFSLNK : fileType; return new Nfs3FileAttributes(fileType, fs.getChildrenNum(), fs .getPermission().toShort(), iug.getUidAllowingUnknown(fs.getOwner()), iug.getGidAllowingUnknown(fs.getGroup()), fs.getLen(), 0 /* fsid */, fs.getFileId(), fs.getModificationTime(), fs.getAccessTime()); }
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, String dumpFilePath, DFSClient client, IdMappingServiceProvider iug) { this(fos, latestAttr, dumpFilePath, client, iug, false, new NfsConfiguration()); }
private void receivedNewWriteInternal(DFSClient dfsClient, WRITE3Request request, Channel channel, int xid, AsyncDataService asyncDataService, IdMappingServiceProvider iug) { WriteStableHow stableHow = request.getStableHow(); WccAttr preOpAttr = latestAttr.getWccAttr(); int count = request.getCount(); WriteCtx writeCtx = addWritesToCache(request, channel, xid); if (writeCtx == null) { // offset < nextOffset processOverWrite(dfsClient, request, channel, xid, iug); } else { // The write is added to pendingWrites. // Check and start writing back if necessary boolean startWriting = checkAndStartWrite(asyncDataService, writeCtx); if (!startWriting) { // offset > nextOffset. check if we need to dump data waitForDump(); // In test, noticed some Linux client sends a batch (e.g., 1MB) // of reordered writes and won't send more writes until it gets // responses of the previous batch. So here send response immediately // for unstable non-sequential write if (stableHow != WriteStableHow.UNSTABLE) { LOG.info("Have to change stable write to unstable write: " + request.getStableHow()); stableHow = WriteStableHow.UNSTABLE; } if (LOG.isDebugEnabled()) { LOG.debug("UNSTABLE write request, send response for offset: " + writeCtx.getOffset()); } WccData fileWcc = new WccData(preOpAttr, latestAttr); WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); RpcProgramNfs3.metrics.addWrite(Nfs3Utils .getElapsedTime(writeCtx.startTime)); Nfs3Utils .writeChannel(channel, response.serialize(new XDR(), xid, new VerifierNone()), xid); writeCtx.setReplied(true); } } }
public static Nfs3FileAttributes getFileAttr(DFSClient client, String fileIdPath, IdMappingServiceProvider iug) throws IOException { HdfsFileStatus fs = getFileStatus(client, fileIdPath); return fs == null ? null : getNfs3FileAttrFromFileStatus(fs, iug); }
public SysSecurityHandler(CredentialsSys credentialsSys, IdMappingServiceProvider iug) { this.mCredentialsSys = credentialsSys; this.iug = iug; }