private void combineAndSpill( RawKeyValueIterator kvIter, Counters.Counter inCounter) throws IOException { JobConf job = jobConf; Reducer combiner = ReflectionUtils.newInstance(combinerClass, job); Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass(); Class<V> valClass = (Class<V>) job.getMapOutputValueClass(); RawComparator<K> comparator = (RawComparator<K>)job.getCombinerKeyGroupingComparator(); try { CombineValuesIterator values = new CombineValuesIterator( kvIter, comparator, keyClass, valClass, job, Reporter.NULL, inCounter); while (values.more()) { combiner.reduce(values.getKey(), values, combineCollector, Reporter.NULL); values.nextKey(); } } finally { combiner.close(); } }
private void combineAndSpill( RawKeyValueIterator kvIter, Counters.Counter inCounter) throws IOException { JobConf job = jobConf; Reducer combiner = ReflectionUtils.newInstance(combinerClass, job); Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass(); Class<V> valClass = (Class<V>) job.getMapOutputValueClass(); RawComparator<K> comparator = (RawComparator<K>)job.getOutputKeyComparator(); try { CombineValuesIterator values = new CombineValuesIterator( kvIter, comparator, keyClass, valClass, job, Reporter.NULL, inCounter); while (values.more()) { combiner.reduce(values.getKey(), values, combineCollector, Reporter.NULL); values.nextKey(); } } finally { combiner.close(); } }