/** * Handles the degenerate case where serialization fails to fit in * the in-memory buffer, so we must spill the record from collect * directly to a spill file. Consider this "losing". */ private void spillSingleRecord(final K key, final V value, int partition) throws IOException { for (int i = 0; i < partitions; ++i) { // create spill file IFile.Writer<K, V> writer = null; try { // Create a new codec, don't care! writer = getFileWriter(i); if (i == partition) { writer.append(key, value); } writer.close(); mapOutputByteCounter.increment(writer.getRawLength()); fileOutputByteCounter.increment(writer.getCompressedLength()); writer = null; } catch (IOException e) { if (null != writer) writer.close(); throw e; } } }
public long writePartition(RawKeyValueIterator kvIter, int partition) throws IOException { Writer<K, V> writer = getFileWriter(partition); try { if (combinerRunner == null) { Merger.writeFile(kvIter, writer, reporter, job); } else { combineCollector.setWriter(writer); try { combinerRunner.combine(kvIter, combineCollector); } catch (Throwable t) { throw ((t instanceof IOException) ? (IOException) t : new IOException(t)); } } } finally { writer.close(); if (combineCollector != null) { combineCollector.setWriter(null); } } return writer.getCompressedLength(); }
@Test public void testCustomCollect() throws Throwable { //mock creation TaskReporter mockTaskReporter = mock(TaskReporter.class); @SuppressWarnings("unchecked") Writer<String, Integer> mockWriter = mock(Writer.class); Configuration conf = new Configuration(); conf.set(MRJobConfig.COMBINE_RECORDS_BEFORE_PROGRESS, "2"); coc = new CombineOutputCollector<String, Integer>(outCounter, mockTaskReporter, conf); coc.setWriter(mockWriter); verify(mockTaskReporter, never()).progress(); coc.collect("dummy", 1); verify(mockTaskReporter, never()).progress(); coc.collect("dummy", 2); verify(mockTaskReporter, times(1)).progress(); }
@Test public void testDefaultCollect() throws Throwable { //mock creation TaskReporter mockTaskReporter = mock(TaskReporter.class); @SuppressWarnings("unchecked") Writer<String, Integer> mockWriter = mock(Writer.class); Configuration conf = new Configuration(); coc = new CombineOutputCollector<String, Integer>(outCounter, mockTaskReporter, conf); coc.setWriter(mockWriter); verify(mockTaskReporter, never()).progress(); for(int i = 0; i < Task.DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS; i++) { coc.collect("dummy", i); } verify(mockTaskReporter, times(1)).progress(); for(int i = 0; i < Task.DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS; i++) { coc.collect("dummy", i); } verify(mockTaskReporter, times(2)).progress(); }
public static <K extends Object, V extends Object> void writeFile(RawKeyValueIterator records, Writer<K, V> writer, Progressable progressable, Configuration conf) throws IOException { long progressBar = conf.getLong(JobContext.RECORDS_BEFORE_PROGRESS, 10000); long recordCtr = 0; while(records.next()) { writer.append(records.getKey(), records.getValue()); if (((recordCtr++) % progressBar) == 0) { progressable.progress(); } } }
@Test public void testCustomCollect() throws Throwable { //mock creation TaskReporter mockTaskReporter = mock(TaskReporter.class); Counters.Counter outCounter = new Counters.Counter(); Writer<String, Integer> mockWriter = mock(Writer.class); Configuration conf = new Configuration(); conf.set("mapred.combine.recordsBeforeProgress", "2"); coc = new CombineOutputCollector<String, Integer>(outCounter, mockTaskReporter, conf); coc.setWriter(mockWriter); verify(mockTaskReporter, never()).progress(); coc.collect("dummy", 1); verify(mockTaskReporter, never()).progress(); coc.collect("dummy", 2); verify(mockTaskReporter, times(1)).progress(); }
@Test public void testDefaultCollect() throws Throwable { //mock creation TaskReporter mockTaskReporter = mock(TaskReporter.class); Counters.Counter outCounter = new Counters.Counter(); Writer<String, Integer> mockWriter = mock(Writer.class); Configuration conf = new Configuration(); coc = new CombineOutputCollector<String, Integer>(outCounter, mockTaskReporter, conf); coc.setWriter(mockWriter); verify(mockTaskReporter, never()).progress(); for(int i = 0; i < Task.DEFAULT_MR_COMBINE_RECORDS_BEFORE_PROGRESS; i++) { coc.collect("dummy", i); } verify(mockTaskReporter, times(1)).progress(); for(int i = 0; i < Task.DEFAULT_MR_COMBINE_RECORDS_BEFORE_PROGRESS; i++) { coc.collect("dummy", i); } verify(mockTaskReporter, times(2)).progress(); }
public static <K extends Object, V extends Object> void writeFile(RawKeyValueIterator records, Writer<K, V> writer, Progressable progressable, Configuration conf) throws IOException { long progressBar = conf.getLong("mapred.merge.recordsBeforeProgress", 10000); long recordCtr = 0; while(records.next()) { writer.append(records.getKey(), records.getValue()); if (((recordCtr++) % progressBar) == 0) { progressable.progress(); } } }
public Writer<K, V> getFileWriter(int partition) throws IOException { int spillIndex, mapid = getTaskID().getTaskID().getId(); spillIndex = spillIndices[partition]++; Path path = new Path(SharedFsPlugins.getTempPath(job, getTaskID().getJobID()), String.format(SharedFsPlugins.MAP_OUTPUT, partition, mapid, spillIndex)); return new Writer<K, V>(job, lustrefs.create(path), keyClass, valClass, codec, spilledRecordsCounter, true); }
private Writer<K,V> createSpillFile() throws IOException { Path tmp = new Path(MRJobConfig.OUTPUT + "/backup_" + tid.getId() + "_" + (spillNumber++) + ".out"); LOG.info("Created file: " + tmp); file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(), -1, conf); FSDataOutputStream out = fs.create(file); out = CryptoUtils.wrapIfNecessary(conf, out); return new Writer<K, V>(conf, out, null, null, null, null, true); }
@Override public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException { if (inputs == null || inputs.size() == 0) { return; } TaskAttemptID dummyMapId = inputs.get(0).getMapId(); List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>(); long mergeOutputSize = createInMemorySegments(inputs, inMemorySegments, 0); int noInMemorySegments = inMemorySegments.size(); InMemoryMapOutput<K, V> mergedMapOutputs = unconditionalReserve(dummyMapId, mergeOutputSize, false); Writer<K, V> writer = new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream()); LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments + " segments of total-size: " + mergeOutputSize); RawKeyValueIterator rIter = Merger.merge(jobConf, rfs, (Class<K>)jobConf.getMapOutputKeyClass(), (Class<V>)jobConf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(reduceId.toString()), (RawComparator<K>)jobConf.getOutputKeyComparator(), reporter, null, null, null); Merger.writeFile(rIter, writer, reporter, jobConf); writer.close(); LOG.info(reduceId + " Memory-to-Memory merge of the " + noInMemorySegments + " files in-memory complete."); // Note the output of the merge closeInMemoryMergedFile(mergedMapOutputs); }