/** * Get or create a memory map for this replica. * * There are two kinds of ClientMmap objects we could fetch here: one that * will always read pre-checksummed data, and one that may read data that * hasn't been checksummed. * * If we fetch the former, "safe" kind of ClientMmap, we have to increment * the anchor count on the shared memory slot. This will tell the DataNode * not to munlock the block until this ClientMmap is closed. * If we fetch the latter, we don't bother with anchoring. * * @param opts The options to use, such as SKIP_CHECKSUMS. * * @return null on failure; the ClientMmap otherwise. */ @Override public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { boolean anchor = verifyChecksum && (opts.contains(ReadOption.SKIP_CHECKSUMS) == false); if (anchor) { if (!createNoChecksumContext()) { if (LOG.isTraceEnabled()) { LOG.trace("can't get an mmap for " + block + " of " + filename + " since SKIP_CHECKSUMS was not given, " + "we aren't skipping checksums, and the block is not mlocked."); } return null; } } ClientMmap clientMmap = null; try { clientMmap = replica.getOrCreateClientMmap(anchor); } finally { if ((clientMmap == null) && anchor) { releaseNoChecksumContext(); } } return clientMmap; }
/** * Get or create a memory map for this replica. * * There are two kinds of ClientMmap objects we could fetch here: one that * will always read pre-checksummed data, and one that may read data that * hasn't been checksummed. * * If we fetch the former, "safe" kind of ClientMmap, we have to increment * the anchor count on the shared memory slot. This will tell the DataNode * not to munlock the block until this ClientMmap is closed. * If we fetch the latter, we don't bother with anchoring. * * @param opts The options to use, such as SKIP_CHECKSUMS. * * @return null on failure; the ClientMmap otherwise. */ @Override public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { boolean anchor = verifyChecksum && !opts.contains(ReadOption.SKIP_CHECKSUMS); if (anchor) { if (!createNoChecksumContext()) { LOG.trace("can't get an mmap for {} of {} since SKIP_CHECKSUMS was not " + "given, we aren't skipping checksums, and the block is not " + "mlocked.", block, filename); return null; } } ClientMmap clientMmap = null; try { clientMmap = replica.getOrCreateClientMmap(anchor); } finally { if ((clientMmap == null) && anchor) { releaseNoChecksumContext(); } } return clientMmap; }
@Override public synchronized void releaseBuffer(ByteBuffer buffer) { if (buffer == EMPTY_BUFFER) return; Object val = getExtendedReadBuffers().remove(buffer); if (val == null) { throw new IllegalArgumentException("tried to release a buffer " + "that was not created by this stream, " + buffer); } if (val instanceof ClientMmap) { IOUtils.closeQuietly((ClientMmap)val); } else if (val instanceof ByteBufferPool) { ((ByteBufferPool)val).putBuffer(buffer); } }
@Override public synchronized void releaseBuffer(ByteBuffer buffer) { if (buffer == EMPTY_BUFFER) return; Object val = extendedReadBuffers.remove(buffer); if (val == null) { throw new IllegalArgumentException("tried to release a buffer " + "that was not created by this stream, " + buffer); } if (val instanceof ClientMmap) { IOUtils.closeQuietly((ClientMmap)val); } else if (val instanceof ByteBufferPool) { ((ByteBufferPool)val).putBuffer(buffer); } }
@Override public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { return null; }
private synchronized ByteBuffer tryReadZeroCopy(int maxLength, EnumSet<ReadOption> opts) throws IOException { // Copy 'pos' and 'blockEnd' to local variables to make it easier for the // JVM to optimize this function. final long curPos = pos; final long curEnd = blockEnd; final long blockStartInFile = currentLocatedBlock.getStartOffset(); final long blockPos = curPos - blockStartInFile; // Shorten this read if the end of the block is nearby. long length63; if ((curPos + maxLength) <= (curEnd + 1)) { length63 = maxLength; } else { length63 = 1 + curEnd - curPos; if (length63 <= 0) { DFSClient.LOG.debug("Unable to perform a zero-copy read from offset {}" + " of {}; {} bytes left in block. blockPos={}; curPos={};" + "curEnd={}", curPos, src, length63, blockPos, curPos, curEnd); return null; } DFSClient.LOG.debug("Reducing read length from {} to {} to avoid going " + "more than one byte past the end of the block. blockPos={}; " +" curPos={}; curEnd={}", maxLength, length63, blockPos, curPos, curEnd); } // Make sure that don't go beyond 31-bit offsets in the MappedByteBuffer. int length; if (blockPos + length63 <= Integer.MAX_VALUE) { length = (int)length63; } else { long length31 = Integer.MAX_VALUE - blockPos; if (length31 <= 0) { // Java ByteBuffers can't be longer than 2 GB, because they use // 4-byte signed integers to represent capacity, etc. // So we can't mmap the parts of the block higher than the 2 GB offset. // FIXME: we could work around this with multiple memory maps. // See HDFS-5101. DFSClient.LOG.debug("Unable to perform a zero-copy read from offset {} " + " of {}; 31-bit MappedByteBuffer limit exceeded. blockPos={}, " + "curEnd={}", curPos, src, blockPos, curEnd); return null; } length = (int)length31; DFSClient.LOG.debug("Reducing read length from {} to {} to avoid 31-bit " + "limit. blockPos={}; curPos={}; curEnd={}", maxLength, length, blockPos, curPos, curEnd); } final ClientMmap clientMmap = blockReader.getClientMmap(opts); if (clientMmap == null) { DFSClient.LOG.debug("unable to perform a zero-copy read from offset {} of" + " {}; BlockReader#getClientMmap returned null.", curPos, src); return null; } boolean success = false; ByteBuffer buffer; try { seek(curPos + length); buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer(); buffer.position((int)blockPos); buffer.limit((int)(blockPos + length)); getExtendedReadBuffers().put(buffer, clientMmap); synchronized (infoLock) { readStatistics.addZeroCopyBytes(length); } DFSClient.LOG.debug("readZeroCopy read {} bytes from offset {} via the " + "zero-copy read path. blockEnd = {}", length, curPos, blockEnd); success = true; } finally { if (!success) { IOUtils.closeQuietly(clientMmap); } } return buffer; }
@Override public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { // For now, pluggable ReplicaAccessors do not support zero-copy. return null; }
/** * Get a ClientMmap object for this BlockReader. * * @param opts The read options to use. * @return The ClientMmap object, or null if mmap is not * supported. */ ClientMmap getClientMmap(EnumSet<ReadOption> opts);