static MD5Hash handleUploadImageRequest(HttpServletRequest request, long imageTxId, Storage dstStorage, InputStream stream, long advertisedSize, DataTransferThrottler throttler) throws IOException { String fileName = NNStorage.getCheckpointImageFileName(imageTxId); List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName); if (dstFiles.isEmpty()) { throw new IOException("No targets in destination storage!"); } MD5Hash advertisedDigest = parseMD5Header(request); MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true, advertisedSize, advertisedDigest, fileName, stream, throttler); LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size " + dstFiles.get(0).length() + " bytes."); return hash; }
@Test public void testThrottler() throws IOException { Configuration conf = new HdfsConfiguration(); FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); long bandwidthPerSec = 1024*1024L; final long TOTAL_BYTES =6*bandwidthPerSec; long bytesToSend = TOTAL_BYTES; long start = Time.monotonicNow(); DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec); long totalBytes = 0L; long bytesSent = 1024*512L; // 0.5MB throttler.throttle(bytesSent); bytesToSend -= bytesSent; bytesSent = 1024*768L; // 0.75MB throttler.throttle(bytesSent); bytesToSend -= bytesSent; try { Thread.sleep(1000); } catch (InterruptedException ignored) {} throttler.throttle(bytesToSend); long end = Time.monotonicNow(); assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec); }
@Test public void testThrottler() throws IOException { Configuration conf = new HdfsConfiguration(); FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); long bandwidthPerSec = 1024*1024L; final long TOTAL_BYTES =6*bandwidthPerSec; long bytesToSend = TOTAL_BYTES; long start = Time.now(); DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec); long totalBytes = 0L; long bytesSent = 1024*512L; // 0.5MB throttler.throttle(bytesSent); bytesToSend -= bytesSent; bytesSent = 1024*768L; // 0.75MB throttler.throttle(bytesSent); bytesToSend -= bytesSent; try { Thread.sleep(1000); } catch (InterruptedException ignored) {} throttler.throttle(bytesToSend); long end = Time.now(); assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec); }
@Test public void testThrottler() throws IOException { Configuration conf = new HdfsConfiguration(); FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); long bandwidthPerSec = 1024 * 1024L; final long TOTAL_BYTES = 6 * bandwidthPerSec; long bytesToSend = TOTAL_BYTES; long start = Time.now(); DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec); long totalBytes = 0L; long bytesSent = 1024 * 512L; // 0.5MB throttler.throttle(bytesSent); bytesToSend -= bytesSent; bytesSent = 1024 * 768L; // 0.75MB throttler.throttle(bytesSent); bytesToSend -= bytesSent; try { Thread.sleep(1000); } catch (InterruptedException ignored) { } throttler.throttle(bytesToSend); long end = Time.now(); assertTrue(totalBytes * 1000 / (end - start) <= bandwidthPerSec); }
public void testThrottler() throws IOException { Configuration conf = new HdfsConfiguration(); FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); long bandwidthPerSec = 1024*1024L; final long TOTAL_BYTES =6*bandwidthPerSec; long bytesToSend = TOTAL_BYTES; long start = Util.now(); DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec); long totalBytes = 0L; long bytesSent = 1024*512L; // 0.5MB throttler.throttle(bytesSent); bytesToSend -= bytesSent; bytesSent = 1024*768L; // 0.75MB throttler.throttle(bytesSent); bytesToSend -= bytesSent; try { Thread.sleep(1000); } catch (InterruptedException ignored) {} throttler.throttle(bytesToSend); long end = Util.now(); assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec); }
public void testThrottler() throws IOException { Configuration conf = new Configuration(); FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); long bandwidthPerSec = 1024*1024L; final long TOTAL_BYTES =6*bandwidthPerSec; long bytesToSend = TOTAL_BYTES; long start = Util.now(); DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec); long totalBytes = 0L; long bytesSent = 1024*512L; // 0.5MB throttler.throttle(bytesSent); bytesToSend -= bytesSent; bytesSent = 1024*768L; // 0.75MB throttler.throttle(bytesSent); bytesToSend -= bytesSent; try { Thread.sleep(1000); } catch (InterruptedException ignored) {} throttler.throttle(bytesToSend); long end = Util.now(); assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec); }
public BlockXCodingMerger(Block block, int namespaceId, DataInputStream[] childInputStreams, long offsetInBlock, long length, String[] childAddrs, String myAddr, DataTransferThrottler throttler, int mergerLevel) throws IOException{ super(); this.block = block; this.namespaceId = namespaceId; this.childInputStreams = childInputStreams; this.offsetInBlock = offsetInBlock; this.length = length; this.childAddrs = childAddrs; this.myAddr = myAddr; this.throttler = throttler; this.mergerLevel = mergerLevel; Configuration conf = new Configuration(); this.packetSize = conf.getInt("raid.blockreconstruct.packetsize", 4096); this.bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512); this.checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, bytesPerChecksum, new PureJavaCrc32()); this.checksumSize = checksum.getChecksumSize(); }
/** * Constructor * @param conf Configuration */ FSImage(Configuration conf) throws IOException { this(); setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null), FSImage.getCheckpointEditsDirs(conf, null)); long transferBandwidth = conf.getLong( HdfsConstants.DFS_IMAGE_TRANSFER_RATE_KEY, HdfsConstants.DFS_IMAGE_TRANSFER_RATE_DEFAULT); if (transferBandwidth > 0) { this.imageTransferThrottler = new DataTransferThrottler(transferBandwidth); } }
public InternalBlockXCodingMerger(Block block, int namespaceId, DataInputStream[] childInputStreams, long offsetInBlock, long length, String[] childAddrs, String myAddr, DataTransferThrottler throttler, int mergerLevel, String parentAddr, DataOutputStream parentOut) throws IOException { super(block, namespaceId, childInputStreams, offsetInBlock, length, childAddrs, myAddr, throttler, mergerLevel); this.parentAddr = parentAddr; this.parentOut = parentOut; }
public BufferBlockXCodingMerger(Block block, int namespaceId, DataInputStream[] childInputStreams, long offsetInBlock, long length, String[] childAddrs, String myAddr, DataTransferThrottler throttler,int mergerLevel, byte[] buffer, int offsetInBuffer) throws IOException { super(block, namespaceId, childInputStreams, offsetInBlock, length, childAddrs, myAddr, throttler, mergerLevel); this.buffer = buffer; this.offsetInBuffer = offsetInBuffer; this.currentOffsetInBlock = offsetInBlock; }
/** * A server-side method to respond to a getfile http request * Copies the contents of the local file into the output stream. */ public static void copyFileToStream(OutputStream out, File localfile, FileInputStream infile, DataTransferThrottler throttler) throws IOException { copyFileToStream(out, localfile, infile, throttler, null); }
private static void copyFileToStream(OutputStream out, File localfile, FileInputStream infile, DataTransferThrottler throttler, Canceler canceler) throws IOException { byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE]; try { CheckpointFaultInjector.getInstance() .aboutToSendFile(localfile); if (CheckpointFaultInjector.getInstance(). shouldSendShortFile(localfile)) { // Test sending image shorter than localfile long len = localfile.length(); buf = new byte[(int)Math.min(len/2, HdfsConstants.IO_FILE_BUFFER_SIZE)]; // This will read at most half of the image // and the rest of the image will be sent over the wire infile.read(buf); } int num = 1; while (num > 0) { if (canceler != null && canceler.isCancelled()) { throw new SaveNamespaceCancelledException( canceler.getCancellationReason()); } num = infile.read(buf); if (num <= 0) { break; } if (CheckpointFaultInjector.getInstance() .shouldCorruptAByte(localfile)) { // Simulate a corrupted byte on the wire LOG.warn("SIMULATING A CORRUPT BYTE IN IMAGE TRANSFER!"); buf[0]++; } out.write(buf, 0, num); if (throttler != null) { throttler.throttle(num, canceler); } } } catch (EofException e) { LOG.info("Connection closed by client"); out = null; // so we don't close in the finally } finally { if (out != null) { out.close(); } } }
private long doSendBlock(DataOutputStream out, OutputStream baseStream, DataTransferThrottler throttler) throws IOException { if (out == null) { throw new IOException( "out stream is null" ); } initialOffset = offset; long totalRead = 0; OutputStream streamForSendChunks = out; lastCacheDropOffset = initialOffset; if (isLongRead() && blockInFd != null) { // Advise that this file descriptor will be accessed sequentially. NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( block.getBlockName(), blockInFd, 0, 0, NativeIO.POSIX.POSIX_FADV_SEQUENTIAL); } // Trigger readahead of beginning of file if configured. manageOsCache(); final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0; try { int maxChunksPerPacket; int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN; boolean transferTo = transferToAllowed && !verifyChecksum && baseStream instanceof SocketOutputStream && blockIn instanceof FileInputStream; if (transferTo) { FileChannel fileChannel = ((FileInputStream)blockIn).getChannel(); blockInPosition = fileChannel.position(); streamForSendChunks = baseStream; maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE); // Smaller packet size to only hold checksum when doing transferTo pktBufSize += checksumSize * maxChunksPerPacket; } else { maxChunksPerPacket = Math.max(1, numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE)); // Packet size includes both checksum and data pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket; } ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize); while (endOffset > offset && !Thread.currentThread().isInterrupted()) { manageOsCache(); long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo, throttler); offset += len; totalRead += len + (numberOfChunks(len) * checksumSize); seqno++; } // If this thread was interrupted, then it did not send the full block. if (!Thread.currentThread().isInterrupted()) { try { // send an empty packet to mark the end of the block sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo, throttler); out.flush(); } catch (IOException e) { //socket error throw ioeToSocketException(e); } sentEntireByteRange = true; } } finally { if ((clientTraceFmt != null) && ClientTraceLog.isDebugEnabled()) { final long endTime = System.nanoTime(); ClientTraceLog.debug(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime)); } close(); } return totalRead; }
private static void copyFileToStream(OutputStream out, File localfile, FileInputStream infile, DataTransferThrottler throttler, Canceler canceler) throws IOException { byte buf[] = new byte[IO_FILE_BUFFER_SIZE]; try { CheckpointFaultInjector.getInstance() .aboutToSendFile(localfile); if (CheckpointFaultInjector.getInstance(). shouldSendShortFile(localfile)) { // Test sending image shorter than localfile long len = localfile.length(); buf = new byte[(int)Math.min(len/2, IO_FILE_BUFFER_SIZE)]; // This will read at most half of the image // and the rest of the image will be sent over the wire infile.read(buf); } int num = 1; while (num > 0) { if (canceler != null && canceler.isCancelled()) { throw new SaveNamespaceCancelledException( canceler.getCancellationReason()); } num = infile.read(buf); if (num <= 0) { break; } if (CheckpointFaultInjector.getInstance() .shouldCorruptAByte(localfile)) { // Simulate a corrupted byte on the wire LOG.warn("SIMULATING A CORRUPT BYTE IN IMAGE TRANSFER!"); buf[0]++; } out.write(buf, 0, num); if (throttler != null) { throttler.throttle(num, canceler); } } } catch (EofException e) { LOG.info("Connection closed by client"); out = null; // so we don't close in the finally } finally { if (out != null) { out.close(); } } }
private long doSendBlock(DataOutputStream out, OutputStream baseStream, DataTransferThrottler throttler) throws IOException { if (out == null) { throw new IOException( "out stream is null" ); } initialOffset = offset; long totalRead = 0; OutputStream streamForSendChunks = out; lastCacheDropOffset = initialOffset; if (isLongRead() && blockInFd != null) { // Advise that this file descriptor will be accessed sequentially. NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( block.getBlockName(), blockInFd, 0, 0, POSIX_FADV_SEQUENTIAL); } // Trigger readahead of beginning of file if configured. manageOsCache(); final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0; try { int maxChunksPerPacket; int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN; boolean transferTo = transferToAllowed && !verifyChecksum && baseStream instanceof SocketOutputStream && blockIn instanceof FileInputStream; if (transferTo) { FileChannel fileChannel = ((FileInputStream)blockIn).getChannel(); blockInPosition = fileChannel.position(); streamForSendChunks = baseStream; maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE); // Smaller packet size to only hold checksum when doing transferTo pktBufSize += checksumSize * maxChunksPerPacket; } else { maxChunksPerPacket = Math.max(1, numberOfChunks(IO_FILE_BUFFER_SIZE)); // Packet size includes both checksum and data pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket; } ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize); while (endOffset > offset && !Thread.currentThread().isInterrupted()) { manageOsCache(); long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo, throttler); offset += len; totalRead += len + (numberOfChunks(len) * checksumSize); seqno++; } // If this thread was interrupted, then it did not send the full block. if (!Thread.currentThread().isInterrupted()) { try { // send an empty packet to mark the end of the block sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo, throttler); out.flush(); } catch (IOException e) { //socket error throw ioeToSocketException(e); } sentEntireByteRange = true; } } finally { if ((clientTraceFmt != null) && ClientTraceLog.isDebugEnabled()) { final long endTime = System.nanoTime(); ClientTraceLog.debug(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime)); } close(); } return totalRead; }
@Override public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { FileInputStream editFileIn = null; try { final ServletContext context = getServletContext(); final Configuration conf = (Configuration) getServletContext() .getAttribute(JspHelper.CURRENT_CONF); final String journalId = request.getParameter(JOURNAL_ID_PARAM); QuorumJournalManager.checkJournalId(journalId); final JNStorage storage = JournalNodeHttpServer .getJournalFromContext(context, journalId).getStorage(); // Check security if (!checkRequestorOrSendError(conf, request, response)) { return; } // Check that the namespace info is correct if (!checkStorageInfoOrSendError(storage, request, response)) { return; } long segmentTxId = ServletUtil.parseLongParam(request, SEGMENT_TXID_PARAM); FileJournalManager fjm = storage.getJournalManager(); File editFile; synchronized (fjm) { // Synchronize on the FJM so that the file doesn't get finalized // out from underneath us while we're in the process of opening // it up. EditLogFile elf = fjm.getLogFile( segmentTxId); if (elf == null) { response.sendError(HttpServletResponse.SC_NOT_FOUND, "No edit log found starting at txid " + segmentTxId); return; } editFile = elf.getFile(); ImageServlet.setVerificationHeadersForGet(response, editFile); ImageServlet.setFileNameHeaders(response, editFile); editFileIn = new FileInputStream(editFile); } DataTransferThrottler throttler = ImageServlet.getThrottler(conf); // send edits TransferFsImage.copyFileToStream(response.getOutputStream(), editFile, editFileIn, throttler); } catch (Throwable t) { String errMsg = "getedit failed. " + StringUtils.stringifyException(t); response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, errMsg); throw new IOException(errMsg); } finally { IOUtils.closeStream(editFileIn); } }
/** * A server-side method to respond to a getfile http request * Copies the contents of the local file into the output stream. */ public static void getFileServer(ServletResponse response, File localfile, FileInputStream infile, DataTransferThrottler throttler) throws IOException { byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE]; ServletOutputStream out = null; try { CheckpointFaultInjector.getInstance() .aboutToSendFile(localfile); out = response.getOutputStream(); if (CheckpointFaultInjector.getInstance(). shouldSendShortFile(localfile)) { // Test sending image shorter than localfile long len = localfile.length(); buf = new byte[(int)Math.min(len/2, HdfsConstants.IO_FILE_BUFFER_SIZE)]; // This will read at most half of the image // and the rest of the image will be sent over the wire infile.read(buf); } int num = 1; while (num > 0) { num = infile.read(buf); if (num <= 0) { break; } if (CheckpointFaultInjector.getInstance() .shouldCorruptAByte(localfile)) { // Simulate a corrupted byte on the wire LOG.warn("SIMULATING A CORRUPT BYTE IN IMAGE TRANSFER!"); buf[0]++; } out.write(buf, 0, num); if (throttler != null) { throttler.throttle(num); } } } finally { if (out != null) { out.close(); } } }
void receiveBlock( DataOutputStream mirrOut, // output to next datanode DataInputStream mirrIn, // input from next datanode DataOutputStream replyOut, // output to previous datanode String mirrAddr, DataTransferThrottler throttlerArg, DatanodeInfo[] downstreams) throws IOException { syncOnClose = datanode.getDnConf().syncOnClose; boolean responderClosed = false; mirrorOut = mirrOut; mirrorAddr = mirrAddr; throttler = throttlerArg; try { if (isClient && !isTransfer) { responder = new Daemon(datanode.threadGroup, new PacketResponder(replyOut, mirrIn, downstreams)); responder.start(); // start thread to processes responses } /* * Receive until the last packet. */ while (receivePacket() >= 0) {} // wait for all outstanding packet responses. And then // indicate responder to gracefully shutdown. // Mark that responder has been closed for future processing if (responder != null) { ((PacketResponder)responder.getRunnable()).close(); responderClosed = true; } // If this write is for a replication or transfer-RBW/Finalized, // then finalize block or convert temporary to RBW. // For client-writes, the block is finalized in the PacketResponder. if (isDatanode || isTransfer) { // close the block/crc files close(); block.setNumBytes(replicaInfo.getNumBytes()); if (stage == BlockConstructionStage.TRANSFER_RBW) { // for TRANSFER_RBW, convert temporary to RBW datanode.data.convertTemporaryToRbw(block); } else { // for isDatnode or TRANSFER_FINALIZED // Finalize the block. datanode.data.finalizeBlock(block); } datanode.metrics.incrBlocksWritten(); } } catch (IOException ioe) { LOG.info("Exception for " + block, ioe); throw ioe; } finally { if (!responderClosed) { // Abnormal termination of the flow above IOUtils.closeStream(this); if (responder != null) { responder.interrupt(); } cleanupBlock(); } if (responder != null) { try { responder.join(datanode.getDnConf().getXceiverStopTimeout()); if (responder.isAlive()) { String msg = "Join on responder thread " + responder + " timed out"; LOG.warn(msg + "\n" + StringUtils.getStackTrace(responder)); throw new IOException(msg); } } catch (InterruptedException e) { responder.interrupt(); throw new IOException("Interrupted receiveBlock"); } responder = null; } } }
@Override public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { FileInputStream editFileIn = null; try { final ServletContext context = getServletContext(); final Configuration conf = (Configuration) getServletContext() .getAttribute(JspHelper.CURRENT_CONF); final String journalId = request.getParameter(JOURNAL_ID_PARAM); QuorumJournalManager.checkJournalId(journalId); final JNStorage storage = JournalNodeHttpServer .getJournalFromContext(context, journalId).getStorage(); // Check security if (!checkRequestorOrSendError(conf, request, response)) { return; } // Check that the namespace info is correct if (!checkStorageInfoOrSendError(storage, request, response)) { return; } long segmentTxId = ServletUtil.parseLongParam(request, SEGMENT_TXID_PARAM); FileJournalManager fjm = storage.getJournalManager(); File editFile; synchronized (fjm) { // Synchronize on the FJM so that the file doesn't get finalized // out from underneath us while we're in the process of opening // it up. EditLogFile elf = fjm.getLogFile( segmentTxId); if (elf == null) { response.sendError(HttpServletResponse.SC_NOT_FOUND, "No edit log found starting at txid " + segmentTxId); return; } editFile = elf.getFile(); GetImageServlet.setVerificationHeaders(response, editFile); GetImageServlet.setFileNameHeaders(response, editFile); editFileIn = new FileInputStream(editFile); } DataTransferThrottler throttler = GetImageServlet.getThrottler(conf); // send edits TransferFsImage.getFileServer(response, editFile, editFileIn, throttler); } catch (Throwable t) { String errMsg = "getedit failed. " + StringUtils.stringifyException(t); response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, errMsg); throw new IOException(errMsg); } finally { IOUtils.closeStream(editFileIn); } }