/** * Creates a {@link Configuration} for the Map or Reduce in the chain. * * <p> * It creates a new Configuration using the chain job's Configuration as base * and adds to it the configuration properties for the chain element. The keys * of the chain element Configuration have precedence over the given * Configuration. * </p> * * @param jobConf * the chain job's Configuration. * @param confKey * the key for chain element configuration serialized in the chain * job's Configuration. * @return a new Configuration aggregating the chain job's Configuration with * the chain element configuration properties. */ protected static Configuration getChainElementConf(Configuration jobConf, String confKey) { Configuration conf = null; try (Stringifier<Configuration> stringifier = new DefaultStringifier<Configuration>(jobConf, Configuration.class);) { String confString = jobConf.get(confKey, null); if (confString != null) { conf = stringifier.fromString(jobConf.get(confKey, null)); } } catch (IOException ioex) { throw new RuntimeException(ioex); } // we have to do this because the Writable desearialization clears all // values set in the conf making not possible do a // new Configuration(jobConf) in the creation of the conf above jobConf = new Configuration(jobConf); if (conf != null) { for (Map.Entry<String, String> entry : conf) { jobConf.set(entry.getKey(), entry.getValue()); } } return jobConf; }
/** * Creates a {@link Configuration} for the Map or Reduce in the chain. * * <p> * It creates a new Configuration using the chain job's Configuration as base * and adds to it the configuration properties for the chain element. The keys * of the chain element Configuration have precedence over the given * Configuration. * </p> * * @param jobConf * the chain job's Configuration. * @param confKey * the key for chain element configuration serialized in the chain * job's Configuration. * @return a new Configuration aggregating the chain job's Configuration with * the chain element configuration properties. */ protected static Configuration getChainElementConf(Configuration jobConf, String confKey) { Configuration conf = null; try { Stringifier<Configuration> stringifier = new DefaultStringifier<Configuration>(jobConf, Configuration.class); String confString = jobConf.get(confKey, null); if (confString != null) { conf = stringifier.fromString(jobConf.get(confKey, null)); } } catch (IOException ioex) { throw new RuntimeException(ioex); } // we have to do this because the Writable desearialization clears all // values set in the conf making not possible do a // new Configuration(jobConf) in the creation of the conf above jobConf = new Configuration(jobConf); if (conf != null) { for (Map.Entry<String, String> entry : conf) { jobConf.set(entry.getKey(), entry.getValue()); } } return jobConf; }
/** * Creates a {@link JobConf} for one of the Maps or Reduce in the chain. * <p/> * It creates a new JobConf using the chain job's JobConf as base and adds to * it the configuration properties for the chain element. The keys of the * chain element jobConf have precedence over the given JobConf. * * @param jobConf the chain job's JobConf. * @param confKey the key for chain element configuration serialized in the * chain job's JobConf. * @return a new JobConf aggregating the chain job's JobConf with the chain * element configuration properties. */ private static JobConf getChainElementConf(JobConf jobConf, String confKey) { JobConf conf; try { Stringifier<JobConf> stringifier = new DefaultStringifier<JobConf>(jobConf, JobConf.class); conf = stringifier.fromString(jobConf.get(confKey, null)); } catch (IOException ioex) { throw new RuntimeException(ioex); } // we have to do this because the Writable desearialization clears all // values set in the conf making not possible do do a new JobConf(jobConf) // in the creation of the conf above jobConf = new JobConf(jobConf); for(Map.Entry<String, String> entry : conf) { jobConf.set(entry.getKey(), entry.getValue()); } return jobConf; }
protected static void setMapperConf(boolean isMap, Configuration jobConf, Class<?> inputKeyClass, Class<?> inputValueClass, Class<?> outputKeyClass, Class<?> outputValueClass, Configuration mapperConf, int index, String prefix) { // if the Mapper does not have a configuration, create an empty one if (mapperConf == null) { // using a Configuration without defaults to make it lightweight. // still the chain's conf may have all defaults and this conf is // overlapped to the chain configuration one. mapperConf = new Configuration(true); } // store the input/output classes of the mapper in the mapper conf mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class); mapperConf .setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass, Object.class); mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class); mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass, Object.class); // serialize the mapper configuration in the chain configuration. Stringifier<Configuration> stringifier = new DefaultStringifier<Configuration>(jobConf, Configuration.class); try { jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index, stringifier .toString(new Configuration(mapperConf))); } catch (IOException ioEx) { throw new RuntimeException(ioEx); } // increment the chain counter jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1); }
protected static void setReducerConf(Configuration jobConf, Class<?> inputKeyClass, Class<?> inputValueClass, Class<?> outputKeyClass, Class<?> outputValueClass, Configuration reducerConf, String prefix) { // if the Reducer does not have a Configuration, create an empty one if (reducerConf == null) { // using a Configuration without defaults to make it lightweight. // still the chain's conf may have all defaults and this conf is // overlapped to the chain's Configuration one. reducerConf = new Configuration(false); } // store the input/output classes of the reducer in // the reducer configuration reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class); reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass, Object.class); reducerConf .setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class); reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass, Object.class); // serialize the reducer configuration in the chain's configuration. Stringifier<Configuration> stringifier = new DefaultStringifier<Configuration>(jobConf, Configuration.class); try { jobConf.set(prefix + CHAIN_REDUCER_CONFIG, stringifier .toString(new Configuration(reducerConf))); } catch (IOException ioEx) { throw new RuntimeException(ioEx); } }
/** * Sets the Reducer class to the chain job's JobConf. * <p/> * The configuration properties of the chain job have precedence over the * configuration properties of the Reducer. * * @param jobConf chain job's JobConf to add the Reducer class. * @param klass the Reducer class to add. * @param inputKeyClass reducer input key class. * @param inputValueClass reducer input value class. * @param outputKeyClass reducer output key class. * @param outputValueClass reducer output value class. * @param byValue indicates if key/values should be passed by value * to the next Mapper in the chain, if any. * @param reducerConf a JobConf with the configuration for the Reducer * class. It is recommended to use a JobConf without default values using the * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE. */ public static <K1, V1, K2, V2> void setReducer(JobConf jobConf, Class<? extends Reducer<K1, V1, K2, V2>> klass, Class<? extends K1> inputKeyClass, Class<? extends V1> inputValueClass, Class<? extends K2> outputKeyClass, Class<? extends V2> outputValueClass, boolean byValue, JobConf reducerConf) { String prefix = getPrefix(false); if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) { throw new IllegalStateException("Reducer has been already set"); } jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class); // if the Reducer does not have a private JobConf create an empty one if (reducerConf == null) { // using a JobConf without defaults to make it lightweight. // still the chain JobConf may have all defaults and this conf is // overlapped to the chain JobConf one. reducerConf = new JobConf(false); } // store in the private reducer conf the input/output classes of the reducer // and if it works by value or by reference reducerConf.setBoolean(MAPPER_BY_VALUE, byValue); reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class); reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass, Object.class); reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class); reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass, Object.class); // serialize the private mapper jobconf in the chain jobconf. Stringifier<JobConf> stringifier = new DefaultStringifier<JobConf>(jobConf, JobConf.class); try { jobConf.set(prefix + CHAIN_REDUCER_CONFIG, stringifier.toString(new JobConf(reducerConf))); } catch (IOException ioEx) { throw new RuntimeException(ioEx); } }