public void initialize(JobConf job, TaskReporter reporter, TaskAttemptID taskId) throws ClassNotFoundException, IOException{ this.reporter = reporter; this.taskId = taskId; mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES); mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS); this.job = job; sorter = ReflectionUtils.newInstance( job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job); partitions = job.getNumReduceTasks(); if (partitionInd == null || partitions * 2 != partitionInd.length) { partitionInd = new int[partitions * 2]; } comparator = job.getOutputKeyComparator(); keyClass = (Class)job.getMapOutputKeyClass(); valClass = (Class)job.getMapOutputValueClass(); serializationFactory = new SerializationFactory(job); keySerializer = serializationFactory.getSerializer(keyClass); keySerializer.open(bb); valSerializer = serializationFactory.getSerializer(valClass); valSerializer.open(bb); reset(); }
public SpanIterator sort(IndexedSorter sorter, RawComparator comparator) { this.comparator = comparator; ki = new byte[keymax]; kj = new byte[keymax]; LOG.info("begin sorting Span"+index + " ("+length()+")"); if(length() > 1) { sorter.sort(this, 0, length(), nullProgressable); } LOG.info("done sorting Span"+index); return new SpanIterator(this); }
public SpanIterator sort(IndexedSorter sorter) { long start = System.currentTimeMillis(); if(length() > 1) { sorter.sort(this, 0, length(), progressable); } LOG.info(outputContext.getDestinationVertexName() + ": " + "done sorting span=" + index + ", length=" + length() + ", " + "time=" + (System.currentTimeMillis() - start)); return new SpanIterator((SortSpan)this); }
public ExternalSorter(TezOutputContext outputContext, Configuration conf, int numOutputs, long initialMemoryAvailable) throws IOException { this.outputContext = outputContext; this.conf = conf; this.partitions = numOutputs; rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw(); int assignedMb = (int) (initialMemoryAvailable >> 20); if (assignedMb <= 0) { if (initialMemoryAvailable > 0) { // Rounded down to 0MB - may be > 0 && < 1MB this.availableMemoryMb = 1; LOG.warn("initialAvailableMemory: " + initialMemoryAvailable + " is too low. Rounding to 1 MB"); } else { throw new RuntimeException("InitialMemoryAssigned is <= 0: " + initialMemoryAvailable); } } else { this.availableMemoryMb = assignedMb; } // sorter sorter = ReflectionUtils.newInstance(this.conf.getClass( TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, QuickSort.class, IndexedSorter.class), this.conf); comparator = ConfigUtils.getIntermediateOutputKeyComparator(this.conf); // k/v serialization keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf); valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf); serializationFactory = new SerializationFactory(this.conf); keySerializer = serializationFactory.getSerializer(keyClass); valSerializer = serializationFactory.getSerializer(valClass); // counters mapOutputByteCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES); mapOutputRecordCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS); outputBytesWithOverheadCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD); fileOutputByteCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL); spilledRecordsCounter = outputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS); additionalSpillBytesWritten = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN); additionalSpillBytesRead = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ); numAdditionalSpills = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT); // compression if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) { Class<? extends CompressionCodec> codecClass = ConfigUtils.getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class); codec = ReflectionUtils.newInstance(codecClass, this.conf); } else { codec = null; } this.ifileReadAhead = this.conf.getBoolean( TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT); if (this.ifileReadAhead) { this.ifileReadAheadLength = conf.getInt( TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT); } else { this.ifileReadAheadLength = 0; } this.ifileBufferSize = conf.getInt("io.file.buffer.size", TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT); // Task outputs mapOutputFile = TezRuntimeUtils.instantiateTaskOutputManager(conf, outputContext); LOG.info("Instantiating Partitioner: [" + conf.get(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS) + "]"); this.conf.setInt(TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, this.partitions); this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf); this.combiner = TezRuntimeUtils.instantiateCombiner(this.conf, outputContext); }
public SortTask(SortSpan sortable, IndexedSorter sorter, RawComparator comparator) { this.sortable = sortable; this.sorter = sorter; this.comparator = comparator; }
public SortTask(SortSpan sortable, IndexedSorter sorter) { this.sortable = sortable; this.sorter = sorter; }