protected boolean mkOneDirWithMode(Path p, File p2f, FsPermission permission) throws IOException { if (permission == null) { permission = FsPermission.getDirDefault(); } permission = permission.applyUMask(umask); if (Shell.WINDOWS && NativeIO.isAvailable()) { try { NativeIO.Windows.createDirectoryWithMode(p2f, permission.toShort()); return true; } catch (IOException e) { if (LOG.isDebugEnabled()) { LOG.debug(String.format( "NativeIO.createDirectoryWithMode error, path = %s, mode = %o", p2f, permission.toShort()), e); } return false; } } else { boolean b = p2f.mkdir(); if (b) { setPermission(p, permission); } return b; } }
@Override public void run() { if (canceled) return; // There's a very narrow race here that the file will close right at // this instant. But if that happens, we'll likely receive an EBADF // error below, and see that it's canceled, ignoring the error. // It's also possible that we'll end up requesting readahead on some // other FD, which may be wasted work, but won't cause a problem. try { NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, fd, off, len, POSIX_FADV_WILLNEED); } catch (IOException ioe) { if (canceled) { // no big deal - the reader canceled the request and closed // the file. return; } LOG.warn("Failed readahead on " + identifier, ioe); } }
/** * Same as openForRandomRead except that it will run even if security is off. * This is used by unit tests. */ @VisibleForTesting protected static RandomAccessFile forceSecureOpenForRandomRead(File f, String mode, String expectedOwner, String expectedGroup) throws IOException { RandomAccessFile raf = new RandomAccessFile(f, mode); boolean success = false; try { Stat stat = NativeIO.POSIX.getFstat(raf.getFD()); checkStat(f, stat.getOwner(), stat.getGroup(), expectedOwner, expectedGroup); success = true; return raf; } finally { if (!success) { raf.close(); } } }
/** * Same as openFSDataInputStream except that it will run even if security is * off. This is used by unit tests. */ @VisibleForTesting protected static FSDataInputStream forceSecureOpenFSDataInputStream( File file, String expectedOwner, String expectedGroup) throws IOException { final FSDataInputStream in = rawFilesystem.open(new Path(file.getAbsolutePath())); boolean success = false; try { Stat stat = NativeIO.POSIX.getFstat(in.getFileDescriptor()); checkStat(file, stat.getOwner(), stat.getGroup(), expectedOwner, expectedGroup); success = true; return in; } finally { if (!success) { in.close(); } } }
/** * Same as openForRead() except that it will run even if security is off. * This is used by unit tests. */ @VisibleForTesting protected static FileInputStream forceSecureOpenForRead(File f, String expectedOwner, String expectedGroup) throws IOException { FileInputStream fis = new FileInputStream(f); boolean success = false; try { Stat stat = NativeIO.POSIX.getFstat(fis.getFD()); checkStat(f, stat.getOwner(), stat.getGroup(), expectedOwner, expectedGroup); success = true; return fis; } finally { if (!success) { fis.close(); } } }
@Override public void close() throws Exception { if (readaheadRequest != null) { readaheadRequest.cancel(); } if (manageOsCache && getEndOffset() - getStartOffset() > 0) { try { NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, fd, getStartOffset(), getEndOffset() - getStartOffset(), NativeIO.POSIX.POSIX_FADV_DONTNEED); } catch (Throwable t) { LOG.warn("Failed to manage OS cache for " + identifier, t); } } super.close(); }
@Override synchronized public void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException { File inprogressFile = NNStorage.getInProgressEditsFile(sd, firstTxId); File dstFile = NNStorage.getFinalizedEditsFile( sd, firstTxId, lastTxId); LOG.info("Finalizing edits file " + inprogressFile + " -> " + dstFile); Preconditions.checkState(!dstFile.exists(), "Can't finalize edits file " + inprogressFile + " since finalized file " + "already exists"); try { NativeIO.renameTo(inprogressFile, dstFile); } catch (IOException e) { errorReporter.reportErrorOnFile(dstFile); throw new IllegalStateException("Unable to finalize edits file " + inprogressFile, e); } if (inprogressFile.equals(currentInProgress)) { currentInProgress = null; } }
private void renameSelf(String newSuffix) throws IOException { File src = file; File dst = new File(src.getParent(), src.getName() + newSuffix); // renameTo fails on Windows if the destination file already exists. try { if (dst.exists()) { if (!dst.delete()) { throw new IOException("Couldn't delete " + dst); } } NativeIO.renameTo(src, dst); } catch (IOException e) { throw new IOException( "Couldn't rename log " + src + " to " + dst, e); } file = dst; }
/** * Bump a replica's generation stamp to a new one. * Its on-disk meta file name is renamed to be the new one too. * * @param replicaInfo a replica * @param newGS new generation stamp * @throws IOException if rename fails */ private void bumpReplicaGS(ReplicaInfo replicaInfo, long newGS) throws IOException { long oldGS = replicaInfo.getGenerationStamp(); File oldmeta = replicaInfo.getMetaFile(); replicaInfo.setGenerationStamp(newGS); File newmeta = replicaInfo.getMetaFile(); // rename meta file to new GS if (LOG.isDebugEnabled()) { LOG.debug("Renaming " + oldmeta + " to " + newmeta); } try { NativeIO.renameTo(oldmeta, newmeta); } catch (IOException e) { replicaInfo.setGenerationStamp(oldGS); // restore old GS throw new IOException("Block " + replicaInfo + " reopen failed. " + " Unable to move meta file " + oldmeta + " to " + newmeta, e); } }
/** * Load the block. * * mmap and mlock the block, and then verify its checksum. * * @param length The current length of the block. * @param blockIn The block input stream. Should be positioned at the * start. The caller must close this. * @param metaIn The meta file input stream. Should be positioned at * the start. The caller must close this. * @param blockFileName The block file name, for logging purposes. * * @return The Mappable block. */ public static MappableBlock load(long length, FileInputStream blockIn, FileInputStream metaIn, String blockFileName) throws IOException { MappableBlock mappableBlock = null; MappedByteBuffer mmap = null; FileChannel blockChannel = null; try { blockChannel = blockIn.getChannel(); if (blockChannel == null) { throw new IOException("Block InputStream has no FileChannel."); } mmap = blockChannel.map(MapMode.READ_ONLY, 0, length); NativeIO.POSIX.getCacheManipulator().mlock(blockFileName, mmap, length); verifyChecksum(length, metaIn, blockChannel, blockFileName); mappableBlock = new MappableBlock(mmap, length); } finally { IOUtils.closeQuietly(blockChannel); if (mappableBlock == null) { if (mmap != null) { NativeIO.POSIX.munmap(mmap); // unmapping also unlocks } } } return mappableBlock; }
public static HdfsConfiguration initZeroCopyTest() { Assume.assumeTrue(NativeIO.isAvailable()); Assume.assumeTrue(SystemUtils.IS_OS_UNIX); HdfsConfiguration conf = new HdfsConfiguration(); conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3); conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100); conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), "TestRequestMmapAccess._PORT.sock").getAbsolutePath()); conf.setBoolean(DFSConfigKeys. DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true); conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000); conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000); return conf; }
/** * Run testCacheAndUncacheBlock with some failures injected into the mlock * call. This tests the ability of the NameNode to resend commands. */ @Test(timeout=600000) public void testCacheAndUncacheBlockWithRetries() throws Exception { // We don't have to save the previous cacheManipulator // because it will be reinstalled by the @After function. NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator() { private final Set<String> seenIdentifiers = new HashSet<String>(); @Override public void mlock(String identifier, ByteBuffer mmap, long length) throws IOException { if (seenIdentifiers.contains(identifier)) { // mlock succeeds the second time. LOG.info("mlocking " + identifier); return; } seenIdentifiers.add(identifier); throw new IOException("injecting IOException during mlock of " + identifier); } }); testCacheAndUncacheBlock(); }
private LocalFSFileOutputStream(Path f, boolean append, FsPermission permission) throws IOException { File file = pathToFile(f); if (permission == null) { this.fos = new FileOutputStream(file, append); } else { if (Shell.WINDOWS && NativeIO.isAvailable()) { this.fos = NativeIO.Windows.createFileOutputStreamWithMode(file, append, permission.toShort()); } else { this.fos = new FileOutputStream(file, append); boolean success = false; try { setPermission(f, permission); success = true; } finally { if (!success) { IOUtils.cleanup(LOG, this.fos); } } } } }
protected boolean mkOneDirWithMode(Path p, File p2f, FsPermission permission) throws IOException { if (permission == null) { return p2f.mkdir(); } else { if (Shell.WINDOWS && NativeIO.isAvailable()) { try { NativeIO.Windows.createDirectoryWithMode(p2f, permission.toShort()); return true; } catch (IOException e) { if (LOG.isDebugEnabled()) { LOG.debug(String.format( "NativeIO.createDirectoryWithMode error, path = %s, mode = %o", p2f, permission.toShort()), e); } return false; } } else { boolean b = p2f.mkdir(); if (b) { setPermission(p, permission); } return b; } } }
@Override public void run() { if (canceled) return; // There's a very narrow race here that the file will close right at // this instant. But if that happens, we'll likely receive an EBADF // error below, and see that it's canceled, ignoring the error. // It's also possible that we'll end up requesting readahead on some // other FD, which may be wasted work, but won't cause a problem. try { NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, fd, off, len, NativeIO.POSIX.POSIX_FADV_WILLNEED); } catch (IOException ioe) { if (canceled) { // no big deal - the reader canceled the request and closed // the file. return; } LOG.warn("Failed readahead on " + identifier, ioe); } }
@Override public void close() throws Exception { if (readaheadRequest != null) { readaheadRequest.cancel(); } if (manageOsCache && getEndOffset() - getStartOffset() > 0) { try { NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, fd, getStartOffset(), getEndOffset() - getStartOffset(), POSIX_FADV_DONTNEED); } catch (Throwable t) { LOG.warn("Failed to manage OS cache for " + identifier, t); } } super.close(); }
/** * Create the ShortCircuitShm. * * @param shmId The ID to use. * @param stream The stream that we're going to use to create this * shared memory segment. * * Although this is a FileInputStream, we are going to * assume that the underlying file descriptor is writable * as well as readable. It would be more appropriate to use * a RandomAccessFile here, but that class does not have * any public accessor which returns a FileDescriptor, * unlike FileInputStream. */ public ShortCircuitShm(ShmId shmId, FileInputStream stream) throws IOException { if (!NativeIO.isAvailable()) { throw new UnsupportedOperationException("NativeIO is not available."); } if (Shell.WINDOWS) { throw new UnsupportedOperationException( "DfsClientShm is not yet implemented for Windows."); } if (unsafe == null) { throw new UnsupportedOperationException( "can't use DfsClientShm because we failed to " + "load misc.Unsafe."); } this.shmId = shmId; this.mmappedLength = getUsableLength(stream); this.baseAddress = POSIX.mmap(stream.getFD(), POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength); this.slots = new Slot[mmappedLength / BYTES_PER_SLOT]; this.allocatedSlots = new BitSet(slots.length); LOG.trace("creating {}(shmId={}, mmappedLength={}, baseAddress={}, " + "slots.length={})", this.getClass().getSimpleName(), shmId, mmappedLength, String.format("%x", baseAddress), slots.length); }
public static HdfsConfiguration initZeroCopyTest() { Assume.assumeTrue(NativeIO.isAvailable()); Assume.assumeTrue(SystemUtils.IS_OS_UNIX); HdfsConfiguration conf = new HdfsConfiguration(); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setInt(HdfsClientConfigKeys.Mmap.CACHE_SIZE_KEY, 3); conf.setLong(HdfsClientConfigKeys.Mmap.CACHE_TIMEOUT_MS_KEY, 100); conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), "TestRequestMmapAccess._PORT.sock").getAbsolutePath()); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, true); conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000); conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000); return conf; }
/** * Use a dummy cache manipulator for testing. */ public static void initCacheManipulator() { NativeIO.POSIX.setCacheManipulator(new NativeIO.POSIX.CacheManipulator() { @Override public void mlock(String identifier, ByteBuffer mmap, long length) throws IOException { LOG.info("LazyPersistTestCase: faking mlock of " + identifier + " bytes."); } @Override public long getMemlockLimit() { LOG.info("LazyPersistTestCase: fake return " + Long.MAX_VALUE); return Long.MAX_VALUE; } @Override public boolean verifyCanMlock() { LOG.info("LazyPersistTestCase: fake return " + true); return true; } }); }
@After public void tearDown() throws Exception { // Verify that each test uncached whatever it cached. This cleanup is // required so that file descriptors are not leaked across tests. DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd); if (fs != null) { fs.close(); fs = null; } if (cluster != null) { cluster.shutdown(); cluster = null; } // Restore the original CacheManipulator NativeIO.POSIX.setCacheManipulator(prevCacheManipulator); }