void runOldMapper( final JobConf job, final MRTaskReporter reporter, final MRInputLegacy input, final KeyValueWriter output ) throws IOException, InterruptedException { // Initialize input in-line since it sets parameters which may be used by the processor. // Done only for MRInput. // TODO use new method in MRInput to get required info //input.initialize(job, master); InputSplit inputSplit = input.getOldInputSplit(); updateJobWithSplit(job, inputSplit); RecordReader in = new OldRecordReader(input); OutputCollector collector = new OldOutputCollector(output); MapRunnable runner = (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job); runner.run(in, collector, (Reporter)reporter); // Set progress to 1.0f if there was no exception, reporter.setProgress(1.0f); // start the sort phase only if there are reducers this.statusUpdate(); }
@Override public void setMapRunnerClass( Class c ) { super.setMapRunnerClass( (Class<? extends MapRunnable>) c ); }