private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { // read the parent fields and the final fields in.defaultReadObject(); // the job conf knows how to deserialize itself jobConf = new JobConf(); jobConf.readFields(in); try { hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance(splitType); } catch (Exception e) { throw new RuntimeException("Unable to instantiate Hadoop InputSplit", e); } if (hadoopInputSplit instanceof Configurable) { ((Configurable) hadoopInputSplit).setConf(this.jobConf); } else if (hadoopInputSplit instanceof JobConfigurable) { ((JobConfigurable) hadoopInputSplit).configure(this.jobConf); } hadoopInputSplit.readFields(in); }
@SuppressWarnings("unchecked") @Override public void configure(Configuration config) { this.jobConf = HadoopUtil.asJobConfInstance(FlinkConfigConverter.toHadoopConfig(config)); // set the correct class loader // not necessary for Flink versions >= 0.10 but we set this anyway to be on the safe side jobConf.setClassLoader(this.getClass().getClassLoader()); this.mapredInputFormat = jobConf.getInputFormat(); if (this.mapredInputFormat instanceof JobConfigurable) { ((JobConfigurable) this.mapredInputFormat).configure(jobConf); } }
@Override public void configure(Configuration parameters) { // enforce sequential configuration() calls synchronized (CONFIGURE_MUTEX) { // configure MR InputFormat if necessary if (this.mapredInputFormat instanceof Configurable) { ((Configurable) this.mapredInputFormat).setConf(this.jobConf); } else if (this.mapredInputFormat instanceof JobConfigurable) { ((JobConfigurable) this.mapredInputFormat).configure(this.jobConf); } } }
@Override public void configure(Configuration parameters) { // enforce sequential configure() calls synchronized (CONFIGURE_MUTEX) { // configure MR OutputFormat if necessary if (this.mapredOutputFormat instanceof Configurable) { ((Configurable) this.mapredOutputFormat).setConf(this.jobConf); } else if (this.mapredOutputFormat instanceof JobConfigurable) { ((JobConfigurable) this.mapredOutputFormat).configure(this.jobConf); } } }
/** * @return an instance of the {@link InputFormat} in this {@link StorageDescriptor}. */ public static InputFormat<?, ?> getInputFormat(StorageDescriptor sd) throws IOException { try { InputFormat<?, ?> inputFormat = ConstructorUtils.invokeConstructor((Class<? extends InputFormat>) Class.forName(sd.getInputFormat())); if (inputFormat instanceof JobConfigurable) { ((JobConfigurable) inputFormat).configure(new JobConf(getHadoopConfiguration())); } return inputFormat; } catch (ReflectiveOperationException re) { throw new IOException("Failed to instantiate input format.", re); } }
@Override public void open(HadoopInputSplit split) throws IOException { this.jobConf = split.getJobConf(); this.flowProcess = new FlinkFlowProcess(this.jobConf, this.getRuntimeContext(), flowNode.getID()); processBeginTime = System.currentTimeMillis(); flowProcess.increment( SliceCounters.Process_Begin_Time, processBeginTime ); try { Set<FlowElement> sources = flowNode.getSourceElements(); if(sources.size() != 1) { throw new RuntimeException("FlowNode for TapInputFormat may only have a single source"); } FlowElement sourceElement = sources.iterator().next(); if(!(sourceElement instanceof Tap)) { throw new RuntimeException("Source of TapInputFormat must be a Tap"); } Tap source = (Tap)sourceElement; streamGraph = new SourceStreamGraph( flowProcess, flowNode, source ); sourceStage = this.streamGraph.getSourceStage(); sinkStage = this.streamGraph.getSinkStage(); for( Duct head : streamGraph.getHeads() ) { LOG.info("sourcing from: " + ((ElementDuct) head).getFlowElement()); } for( Duct tail : streamGraph.getTails() ) { LOG.info("sinking to: " + ((ElementDuct) tail).getFlowElement()); } } catch( Throwable throwable ) { if( throwable instanceof CascadingException) { throw (CascadingException) throwable; } throw new FlowException( "internal error during TapInputFormat configuration", throwable ); } RecordReader<?, ?> recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter()); if (recordReader instanceof Configurable) { ((Configurable) recordReader).setConf(jobConf); } else if (recordReader instanceof JobConfigurable) { ((JobConfigurable) recordReader).configure(jobConf); } try { this.sourceStage.setRecordReader(recordReader); } catch(Throwable t) { if(t instanceof IOException) { throw (IOException)t; } else { throw new RuntimeException(t); } } }