@Override protected void doShuffle(MapHost host, IFileInputStream iFin, long compressedLength, long decompressedLength, ShuffleClientMetrics metrics, Reporter reporter) throws IOException { InputStream input = iFin; // Are map-outputs compressed? if (codec != null) { decompressor.reset(); input = codec.createInputStream(input, decompressor); } try { IOUtils.readFully(input, memory, 0, memory.length); metrics.inputBytes(memory.length); reporter.progress(); LOG.info("Read " + memory.length + " bytes from map-output for " + getMapId()); /** * We've gotten the amount of data we were expecting. Verify the * decompressor has nothing more to offer. This action also forces the * decompressor to read any trailing bytes that weren't critical * for decompression, which is necessary to keep the stream * in sync. */ if (input.read() >= 0 ) { throw new IOException("Unexpected extra bytes from input stream for " + getMapId()); } } finally { CodecPool.returnDecompressor(decompressor); } }
@Override public void shuffle(MapHost host, InputStream input, long compressedLength, long decompressedLength, ShuffleClientMetrics metrics, Reporter reporter) throws IOException { doShuffle(host, new IFileInputStream(input, compressedLength, conf), compressedLength, decompressedLength, metrics, reporter); }
private void shuffleToMemory(MapHost host, MapOutput<K,V> mapOutput, InputStream input, int decompressedLength, int compressedLength) throws IOException { IFileInputStream checksumIn = new IFileInputStream(input, compressedLength); input = checksumIn; // Are map-outputs compressed? if (codec != null) { decompressor.reset(); input = codec.createInputStream(input, decompressor); } // Copy map-output into an in-memory buffer byte[] shuffleData = mapOutput.getMemory(); try { IOUtils.readFully(input, shuffleData, 0, shuffleData.length); metrics.inputBytes(shuffleData.length); reporter.progress(); LOG.info("Read " + shuffleData.length + " bytes from map-output for " + mapOutput.getMapId()); } catch (IOException ioe) { // Close the streams IOUtils.cleanup(LOG, input); // Re-throw throw ioe; } }
@Override public void shuffle(MapHost host, InputStream input, long compressedLength, long decompressedLength, ShuffleClientMetrics metrics, Reporter reporter) throws IOException { IFileInputStream checksumIn = new IFileInputStream(input, compressedLength, conf); input = checksumIn; // Are map-outputs compressed? if (codec != null) { decompressor.reset(); input = codec.createInputStream(input, decompressor); } try { IOUtils.readFully(input, memory, 0, memory.length); metrics.inputBytes(memory.length); reporter.progress(); LOG.info("Read " + memory.length + " bytes from map-output for " + getMapId()); /** * We've gotten the amount of data we were expecting. Verify the * decompressor has nothing more to offer. This action also forces the * decompressor to read any trailing bytes that weren't critical * for decompression, which is necessary to keep the stream * in sync. */ if (input.read() >= 0 ) { throw new IOException("Unexpected extra bytes from input stream for " + getMapId()); } } catch (IOException ioe) { // Close the streams IOUtils.cleanup(LOG, input); // Re-throw throw ioe; } finally { CodecPool.returnDecompressor(decompressor); } }
@Override protected void doShuffle(MapHost host, IFileInputStream input, long compressedLength, long decompressedLength, ShuffleClientMetrics metrics, Reporter reporter) throws IOException { // Copy data to local-disk long bytesLeft = compressedLength; try { final int BYTES_TO_READ = 64 * 1024; byte[] buf = new byte[BYTES_TO_READ]; while (bytesLeft > 0) { int n = input.readWithChecksum(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ)); if (n < 0) { throw new IOException("read past end of stream reading " + getMapId()); } disk.write(buf, 0, n); bytesLeft -= n; metrics.inputBytes(n); reporter.progress(); } LOG.info("Read " + (compressedLength - bytesLeft) + " bytes from map-output for " + getMapId()); disk.close(); } catch (IOException ioe) { // Close the streams IOUtils.cleanup(LOG, disk); // Re-throw throw ioe; } // Sanity check if (bytesLeft != 0) { throw new IOException("Incomplete map output received for " + getMapId() + " from " + host.getHostName() + " (" + bytesLeft + " bytes missing of " + compressedLength + ")"); } this.compressedSize = compressedLength; }
protected abstract void doShuffle( MapHost host, IFileInputStream iFileInputStream, long compressedLength, long decompressedLength, ShuffleClientMetrics metrics, Reporter reporter) throws IOException;
@Test public void testCorruptedIFile() throws Exception { final int fetcher = 7; Path onDiskMapOutputPath = new Path(name.getMethodName() + "/foo"); Path shuffledToDisk = OnDiskMapOutput.getTempPath(onDiskMapOutputPath, fetcher); fs = FileSystem.getLocal(job).getRaw(); IFileWrappedMapOutput<Text,Text> odmo = new OnDiskMapOutput<Text,Text>(map1ID, mm, 100L, job, fetcher, true, fs, onDiskMapOutputPath); String mapData = "MAPDATA12345678901234567890"; ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 14, 10, 1); ByteArrayOutputStream bout = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bout); IFileOutputStream ios = new IFileOutputStream(dos); header.write(dos); int headerSize = dos.size(); try { ios.write(mapData.getBytes()); } finally { ios.close(); } int dataSize = bout.size() - headerSize; // Ensure that the OnDiskMapOutput shuffler can successfully read the data. MapHost host = new MapHost("TestHost", "http://test/url"); ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray()); try { // Read past the shuffle header. bin.read(new byte[headerSize], 0, headerSize); odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL); } finally { bin.close(); } // Now corrupt the IFile data. byte[] corrupted = bout.toByteArray(); corrupted[headerSize + (dataSize / 2)] = 0x0; try { bin = new ByteArrayInputStream(corrupted); // Read past the shuffle header. bin.read(new byte[headerSize], 0, headerSize); odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL); fail("OnDiskMapOutput.shuffle didn't detect the corrupted map partition file"); } catch(ChecksumException e) { LOG.info("The expected checksum exception was thrown.", e); } finally { bin.close(); } // Ensure that the shuffled file can be read. IFileInputStream iFin = new IFileInputStream(fs.open(shuffledToDisk), dataSize, job); try { iFin.read(new byte[dataSize], 0, dataSize); } finally { iFin.close(); } }
@Test public void testCorruptedIFile() throws Exception { final int fetcher = 7; Path onDiskMapOutputPath = new Path(name.getMethodName() + "/foo"); Path shuffledToDisk = OnDiskMapOutput.getTempPath(onDiskMapOutputPath, fetcher); fs = FileSystem.getLocal(job).getRaw(); MapOutputFile mof = mock(MapOutputFile.class); OnDiskMapOutput<Text,Text> odmo = new OnDiskMapOutput<Text,Text>(map1ID, id, mm, 100L, job, mof, fetcher, true, fs, onDiskMapOutputPath); String mapData = "MAPDATA12345678901234567890"; ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 14, 10, 1); ByteArrayOutputStream bout = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bout); IFileOutputStream ios = new IFileOutputStream(dos); header.write(dos); int headerSize = dos.size(); try { ios.write(mapData.getBytes()); } finally { ios.close(); } int dataSize = bout.size() - headerSize; // Ensure that the OnDiskMapOutput shuffler can successfully read the data. MapHost host = new MapHost("TestHost", "http://test/url"); ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray()); try { // Read past the shuffle header. bin.read(new byte[headerSize], 0, headerSize); odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL); } finally { bin.close(); } // Now corrupt the IFile data. byte[] corrupted = bout.toByteArray(); corrupted[headerSize + (dataSize / 2)] = 0x0; try { bin = new ByteArrayInputStream(corrupted); // Read past the shuffle header. bin.read(new byte[headerSize], 0, headerSize); odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL); fail("OnDiskMapOutput.shuffle didn't detect the corrupted map partition file"); } catch(ChecksumException e) { LOG.info("The expected checksum exception was thrown.", e); } finally { bin.close(); } // Ensure that the shuffled file can be read. IFileInputStream iFin = new IFileInputStream(fs.open(shuffledToDisk), dataSize, job); try { iFin.read(new byte[dataSize], 0, dataSize); } finally { iFin.close(); } }
@Override public void shuffle(MapHost host, InputStream input, long compressedLength, long decompressedLength, ShuffleClientMetrics metrics, Reporter reporter) throws IOException { String mapHostName = host.getHostName().split(":")[0]; String app_path = conf.get(MRConfig.LOCAL_DIR); LOG.debug("original app_path " + app_path); String[] app_path_parts = app_path.split("/"); app_path_parts[app_path_parts.length-5] = mapHostName; StringBuilder builder = new StringBuilder(); for(String s : app_path_parts) { builder.append(s); builder.append("/"); } app_path = builder.toString(); String src = app_path + "output/" + getMapId() + "/file.out"; File f = new File(src); if (f.exists()) { LOG.debug("shuffleToLink: the src " + src + " EXIST!"); } //LOG.debug("src file size: "+f.length()); //input = new FileInputStream(src); //input.skip(offset); RandomAccessFile raf = new RandomAccessFile(f, "r"); input = Channels.newInputStream(raf.getChannel().position(offset)); IFileInputStream checksumIn = new IFileInputStream(input, compressedLength, conf); input = checksumIn; // Are map-outputs compressed? if (codec != null) { decompressor.reset(); input = codec.createInputStream(input, decompressor); } try { LOG.debug("offset: " + offset); LOG.debug("memory.length: " + memory.length); LOG.debug("compressedLength: " + compressedLength); LOG.debug("decompressedLength: " + decompressedLength); // TO-DO: would offset and length be OK to be int? IOUtils.readFully(input, memory, 0, memory.length); metrics.inputBytes((int) memory.length); reporter.progress(); LOG.info("Read " + memory.length + " bytes from map-output for " + getMapId()); /** * We've gotten the amount of data we were expecting. Verify the * decompressor has nothing more to offer. This action also forces * the decompressor to read any trailing bytes that weren't critical * for decompression, which is necessary to keep the stream in sync. */ //if (input.read() >= 0) { // throw new IOException( // "Unexpected extra bytes from input stream for " // + getMapId()); //} input.close(); raf.close(); } catch (IOException ioe) { // Close the streams IOUtils.cleanup(LOG, input); // Re-throw throw ioe; } finally { CodecPool.returnDecompressor(decompressor); } }