private void combineAndSpill( RawKeyValueIterator kvIter, Counters.Counter inCounter) throws IOException { JobConf job = jobConf; Reducer combiner = ReflectionUtils.newInstance(combinerClass, job); Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass(); Class<V> valClass = (Class<V>) job.getMapOutputValueClass(); RawComparator<K> comparator = (RawComparator<K>)job.getCombinerKeyGroupingComparator(); try { CombineValuesIterator values = new CombineValuesIterator( kvIter, comparator, keyClass, valClass, job, Reporter.NULL, inCounter); while (values.more()) { combiner.reduce(values.getKey(), values, combineCollector, Reporter.NULL); values.nextKey(); } } finally { combiner.close(); } }
/** * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction. * * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction. * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function. * @param conf The JobConf that is used to configure both Hadoop Reducers. */ public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner, JobConf conf) { if (hadoopReducer == null) { throw new NullPointerException("Reducer may not be null."); } if (hadoopCombiner == null) { throw new NullPointerException("Combiner may not be null."); } if (conf == null) { throw new NullPointerException("JobConf may not be null."); } this.reducer = hadoopReducer; this.combiner = hadoopCombiner; this.jobConf = conf; }
/** * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction. * * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction. * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function. * @param conf The JobConf that is used to configure both Hadoop Reducers. */ public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) { if(hadoopReducer == null) { throw new NullPointerException("Reducer may not be null."); } if(hadoopCombiner == null) { throw new NullPointerException("Combiner may not be null."); } if(conf == null) { throw new NullPointerException("JobConf may not be null."); } this.reducer = hadoopReducer; this.combiner = hadoopCombiner; this.jobConf = conf; }
@SuppressWarnings("unchecked") @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.reducer.configure(jobConf); this.reporter = new HadoopDummyReporter(); this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>(); Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0); TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig()); this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(keySerializer); }
public void configure(JobConf job) { super.configure(job); Class<?> c = job.getClass("stream.reduce.posthook", null, Mapper.class); if(c != null) { postMapper = (Mapper)ReflectionUtils.newInstance(c, job); LOG.info("PostHook="+c.getName()); } c = job.getClass("stream.reduce.prehook", null, Reducer.class); if(c != null) { preReducer = (Reducer)ReflectionUtils.newInstance(c, job); oc = new InmemBufferingOutputCollector(); LOG.info("PreHook="+c.getName()); } this.ignoreKey = job.getBoolean("stream.reduce.ignoreKey", false); }
private void combineAndSpill( RawKeyValueIterator kvIter, Counters.Counter inCounter) throws IOException { JobConf job = jobConf; Reducer combiner = ReflectionUtils.newInstance(combinerClass, job); Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass(); Class<V> valClass = (Class<V>) job.getMapOutputValueClass(); RawComparator<K> comparator = (RawComparator<K>)job.getOutputKeyComparator(); try { CombineValuesIterator values = new CombineValuesIterator( kvIter, comparator, keyClass, valClass, job, Reporter.NULL, inCounter); while (values.more()) { combiner.reduce(values.getKey(), values, combineCollector, Reporter.NULL); values.nextKey(); } } finally { combiner.close(); } }
private void runOldCombiner(final TezRawKeyValueIterator rawIter, final Writer writer) throws IOException { Class<? extends Reducer> reducerClazz = (Class<? extends Reducer>) conf.getClass("mapred.combiner.class", null, Reducer.class); Reducer combiner = ReflectionUtils.newInstance(reducerClazz, conf); OutputCollector collector = new OutputCollector() { @Override public void collect(Object key, Object value) throws IOException { writer.append(key, value); } }; CombinerValuesIterator values = new CombinerValuesIterator(rawIter, keyClass, valClass, comparator); while (values.moveToNext()) { combiner.reduce(values.getKey(), values.getValues().iterator(), collector, reporter); } }
private void runOldCombiner(final TezRawKeyValueIterator rawIter, final Writer writer) throws IOException { Class<? extends Reducer> reducerClazz = (Class<? extends Reducer>) conf.getClass("mapred.combiner.class", null, Reducer.class); Reducer combiner = ReflectionUtils.newInstance(reducerClazz, conf); OutputCollector collector = new OutputCollector() { @Override public void collect(Object key, Object value) throws IOException { writer.append(key, value); combineOutputRecordsCounter.increment(1); } }; CombinerValuesIterator values = new CombinerValuesIterator(rawIter, keyClass, valClass, comparator); while (values.moveToNext()) { combiner.reduce(values.getKey(), values.getValues().iterator(), collector, reporter); } }
@SuppressWarnings("unchecked") @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.reducer.configure(jobConf); this.combiner.configure(jobConf); this.reporter = new HadoopDummyReporter(); Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0); TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig()); this.valueIterator = new HadoopTupleUnwrappingIterator<>(keySerializer); this.combineCollector = new HadoopOutputCollector<>(); this.reduceCollector = new HadoopOutputCollector<>(); }
@SuppressWarnings("unchecked") @Override public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() { Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2); Class<VALUEOUT> outValClass = (Class<VALUEOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3); final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass(outKeyClass); final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass(outValClass); return new TupleTypeInfo<>(keyTypeInfo, valueTypleInfo); }
@SuppressWarnings("unchecked") private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> reducerClass = (Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>) in.readObject(); reducer = InstantiationUtil.instantiate(reducerClass); Class<Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN>> combinerClass = (Class<Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN>>) in.readObject(); combiner = InstantiationUtil.instantiate(combinerClass); jobConf = new JobConf(); jobConf.readFields(in); }
/** * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. * * @param hadoopReducer The Hadoop Reducer to wrap. * @param conf The JobConf that is used to configure the Hadoop Reducer. */ public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) { if (hadoopReducer == null) { throw new NullPointerException("Reducer may not be null."); } if (conf == null) { throw new NullPointerException("JobConf may not be null."); } this.reducer = hadoopReducer; this.jobConf = conf; }
@SuppressWarnings("unchecked") @Override public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() { Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2); Class<VALUEOUT> outValClass = (Class<VALUEOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3); final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass); final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass); return new TupleTypeInfo<Tuple2<KEYOUT, VALUEOUT>>(keyTypeInfo, valueTypleInfo); }
@SuppressWarnings("unchecked") private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> reducerClass = (Class<Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>) in.readObject(); reducer = InstantiationUtil.instantiate(reducerClass); jobConf = new JobConf(); jobConf.readFields(in); }
@SuppressWarnings("unchecked") @Override public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() { Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2); Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3); final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass(outKeyClass); final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass(outValClass); return new TupleTypeInfo<>(keyTypeInfo, valueTypleInfo); }
@SuppressWarnings("unchecked") private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = (Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject(); reducer = InstantiationUtil.instantiate(reducerClass); Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass = (Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject(); combiner = InstantiationUtil.instantiate(combinerClass); jobConf = new JobConf(); jobConf.readFields(in); }
/** * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. * * @param hadoopReducer The Hadoop Reducer to wrap. * @param conf The JobConf that is used to configure the Hadoop Reducer. */ public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) { if(hadoopReducer == null) { throw new NullPointerException("Reducer may not be null."); } if(conf == null) { throw new NullPointerException("JobConf may not be null."); } this.reducer = hadoopReducer; this.jobConf = conf; }
@SuppressWarnings("unchecked") @Override public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() { Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2); Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3); final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass); final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass); return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo); }
@SuppressWarnings("unchecked") private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = (Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject(); reducer = InstantiationUtil.instantiate(reducerClass); jobConf = new JobConf(); jobConf.readFields(in); }
@Before public void setUp() { mapper1 = new IdentityMapper<Text, Text>(); reducer1 = new IdentityReducer<Text, Text>(); mapper2 = new IdentityMapper<Text, Text>(); reducer2 = new IdentityReducer<Text, Text>(); driver = new PipelineMapReduceDriver<Text, Text, Text, Text>(); driver.addMapReduce(new Pair<Mapper, Reducer>(mapper1, reducer1)); driver.addMapReduce(new Pair<Mapper, Reducer>(mapper2, reducer2)); }
@SuppressWarnings("unchecked") ReducerContext(org.apache.hadoop.mapreduce.Reducer reducer, JobConf conf) throws IOException, InterruptedException, ClassNotFoundException { ((org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer) reducer).super(new MRContextUtil() .createReduceContext(conf, new TaskAttemptID(), rawKeyValueIterator, null, null, null, null, null, null, Class.forName("org.apache.hadoop.io.NullWritable"), Class.forName("org.apache.hadoop.io.NullWritable"))); }
@Override public void close() throws HyracksDataException { // -- - close - -- try { if (!jobConf.getUseNewMapper()) { ((org.apache.hadoop.mapred.Reducer) reducer).close(); } } catch (IOException e) { throw new HyracksDataException(e); } }
@SuppressWarnings("unchecked") @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.reducer.configure(jobConf); this.combiner.configure(jobConf); this.reporter = new HadoopDummyReporter(); Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0); this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(inKeyClass); this.combineCollector = new HadoopOutputCollector<KEYIN, VALUEIN>(); this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>(); }
@SuppressWarnings("unchecked") @Override public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() { Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2); Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3); final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass); final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass); return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo); }
@SuppressWarnings("unchecked") @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.reducer.configure(jobConf); this.reporter = new HadoopDummyReporter(); this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>(); Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0); this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(inKeyClass); }