Java 类org.apache.hadoop.mapreduce.Reducer 实例源码

项目:hadoop    文件:MapReduceTestUtil.java   
/**
 * Creates a simple fail job.
 * 
 * @param conf Configuration object
 * @param outdir Output directory.
 * @param indirs Comma separated input directories.
 * @return Job initialized for a simple fail job.
 * @throws Exception If an error occurs creating job configuration.
 */
public static Job createFailJob(Configuration conf, Path outdir, 
    Path... indirs) throws Exception {
  FileSystem fs = outdir.getFileSystem(conf);
  if (fs.exists(outdir)) {
    fs.delete(outdir, true);
  }
  conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
  Job theJob = Job.getInstance(conf);
  theJob.setJobName("Fail-Job");

  FileInputFormat.setInputPaths(theJob, indirs);
  theJob.setMapperClass(FailMapper.class);
  theJob.setReducerClass(Reducer.class);
  theJob.setNumReduceTasks(0);
  FileOutputFormat.setOutputPath(theJob, outdir);
  theJob.setOutputKeyClass(Text.class);
  theJob.setOutputValueClass(Text.class);
  return theJob;
}
项目:HotTopicsApp    文件:HotTopicsApp.java   
@Override
protected void setup(
        Reducer<NullWritable, Text, org.apache.hadoop.io.Text, NullWritable>.Context context)
        throws IOException, InterruptedException {
    Configuration conf = context.getConfiguration();
    this.k = conf.getInt("topk", 1);
    this.type = conf.get("type", "min");
    if("min".equals(this.type)){
        topkSet = new TreeSet<>();
    }else { 
        topkSet = new TreeSet<>(new Comparator<TFIDFWord>() {
            @Override
            public int compare(TFIDFWord o1, TFIDFWord o2) {
                return -o1.compareTo(o2);
            }
        });
    }
}
项目:HotTopicsApp    文件:HotTopicsApp.java   
@Override
protected void reduce(NullWritable k2, Iterable<Text> v2s,
        Reducer<NullWritable, Text, Text, NullWritable>.Context context)
        throws IOException, InterruptedException {
    for (Text v2 : v2s) {
        String line = v2.toString();
        topkSet.add(new TFIDFWord(line));
        if(topkSet.size()>k){
            topkSet.pollLast();
        }
    }

    for (TFIDFWord v : topkSet) {
        k3.set(v.toString());
        context.write(k3, NullWritable.get());
    }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:MapReduceTestUtil.java   
/**
 * Creates a simple fail job.
 * 
 * @param conf Configuration object
 * @param outdir Output directory.
 * @param indirs Comma separated input directories.
 * @return Job initialized for a simple kill job.
 * @throws Exception If an error occurs creating job configuration.
 */
public static Job createKillJob(Configuration conf, Path outdir, 
    Path... indirs) throws Exception {

  Job theJob = Job.getInstance(conf);
  theJob.setJobName("Kill-Job");

  FileInputFormat.setInputPaths(theJob, indirs);
  theJob.setMapperClass(KillMapper.class);
  theJob.setReducerClass(Reducer.class);
  theJob.setNumReduceTasks(0);
  FileOutputFormat.setOutputPath(theJob, outdir);
  theJob.setOutputKeyClass(Text.class);
  theJob.setOutputValueClass(Text.class);
  return theJob;
}
项目:openimaj    文件:CumulativeTimeWord.java   
@Override
protected void reduce(BytesWritable wordtimeb, Iterable<BooleanWritable> wordBools, Reducer<BytesWritable,BooleanWritable,LongWritable,BytesWritable>.Context context) throws IOException ,InterruptedException {
    ReadWritableStringLong wordtime = IOUtils.deserialize(wordtimeb.getBytes(), ReadWritableStringLong.class);
    long time = wordtime.secondObject();
    boolean seenInPresent = false;
    boolean seenInPast = false;
    for (BooleanWritable isfrompast: wordBools) {
        boolean frompast = isfrompast.get();
        seenInPresent |= !frompast;
        seenInPast |= frompast;
        if(seenInPast && seenInPresent){
            // then we've seen all the ones from this time if we were to see them, so we can break early. MASSIVE SAVINGS HERE
            break;
        }
    }
    ReadWritableBooleanBoolean intersectionUnion = new ReadWritableBooleanBoolean(seenInPast && seenInPresent,seenInPast || seenInPresent);
    context.write(new LongWritable(time), new BytesWritable(IOUtils.serialize(intersectionUnion)));
}
项目:openimaj    文件:ReduceValuesByTime.java   
protected static synchronized void loadOptions(Reducer<LongWritable, BytesWritable, NullWritable, Text>.Context context) throws IOException {
    if (options == null) {
        try {
            options = context.getConfiguration().getStrings(Values.ARGS_KEY);
            matlabOut = context.getConfiguration().getBoolean(Values.MATLAB_OUT, false);
            timeIndex = TimeIndex.readTimeCountLines(options[0]);
            if (matlabOut) {
                wordIndex = WordIndex.readWordCountLines(options[0]);
                valuesLocation = options[0] + "/values/values.%d.mat";
            }
            System.out.println("timeindex loaded: " + timeIndex.size());
        } catch (Exception e) {
            throw new IOException(e);
        }
    }
}
项目:titan0.5.4-hbase1.1.1-custom    文件:LinkMapReduce.java   
@Override
public void setup(final Reducer.Context context) throws IOException, InterruptedException {
    faunusConf = ModifiableHadoopConfiguration.of(DEFAULT_COMPAT.getContextConfiguration(context));

    if (!faunusConf.has(LINK_DIRECTION)) {
        Iterator<Entry<String, String>> it = DEFAULT_COMPAT.getContextConfiguration(context).iterator();
        log.error("Broken configuration missing {}", LINK_DIRECTION);
        log.error("---- Start config dump ----");
        while (it.hasNext()) {
            Entry<String,String> ent = it.next();
            log.error("k:{} -> v:{}", ent.getKey(), ent.getValue());
        }
        log.error("---- End config dump   ----");
        throw new NullPointerException();
    }
    direction = faunusConf.get(LINK_DIRECTION).opposite();
}
项目:hadoop-2.6.0-cdh5.4.3    文件:MapReduceTestUtil.java   
/**
 * Creates a simple fail job.
 * 
 * @param conf Configuration object
 * @param outdir Output directory.
 * @param indirs Comma separated input directories.
 * @return Job initialized for a simple fail job.
 * @throws Exception If an error occurs creating job configuration.
 */
public static Job createFailJob(Configuration conf, Path outdir, 
    Path... indirs) throws Exception {

  FileSystem fs = outdir.getFileSystem(conf);
  if (fs.exists(outdir)) {
    fs.delete(outdir, true);
  }
  conf.setInt("mapred.map.max.attempts", 2);
  Job theJob = Job.getInstance(conf);
  theJob.setJobName("Fail-Job");

  FileInputFormat.setInputPaths(theJob, indirs);
  theJob.setMapperClass(FailMapper.class);
  theJob.setReducerClass(Reducer.class);
  theJob.setNumReduceTasks(0);
  FileOutputFormat.setOutputPath(theJob, outdir);
  theJob.setOutputKeyClass(Text.class);
  theJob.setOutputValueClass(Text.class);
  return theJob;
}
项目:openimaj    文件:TimeIndex.java   
@Override
public SequenceFileTextStage<LongWritable,BytesWritable, LongWritable,LongWritable,NullWritable,Text>stage() {
    return new SequenceFileTextStage<LongWritable,BytesWritable, LongWritable,LongWritable,NullWritable,Text>() {

        @Override
        public void setup(Job job) {
            job.setSortComparatorClass(LongWritable.Comparator.class);
            job.setNumReduceTasks(1);
        }
        @Override
        public Class<? extends Mapper<LongWritable, BytesWritable, LongWritable, LongWritable>> mapper() {
            return TimeIndex.Map.class;
        }
        @Override
        public Class<? extends Reducer<LongWritable, LongWritable,NullWritable,Text>> reducer() {
            return TimeIndex.Reduce.class;
        }

        @Override
        public String outname() {
            return "times";
        }
    };
}
项目:incubator-rya    文件:CountPlan.java   
@Override
public void reduce(final IntermediateProspect prospect, final Iterable<LongWritable> counts, final Date timestamp, final Reducer.Context context) throws IOException, InterruptedException {
    long sum = 0;
    for(final LongWritable count : counts) {
        sum += count.get();
    }

    final String indexType = prospect.getTripleValueType().getIndexType();

    // not sure if this is the best idea..
    if ((sum >= 0) || indexType.equals(TripleValueType.PREDICATE.getIndexType())) {
        final Mutation m = new Mutation(indexType + DELIM + prospect.getData() + DELIM + ProspectorUtils.getReverseIndexDateTime(timestamp));

        final String dataType = prospect.getDataType();
        final ColumnVisibility visibility = new ColumnVisibility(prospect.getVisibility());
        final Value sumValue = new Value(("" + sum).getBytes(StandardCharsets.UTF_8));
        m.put(COUNT, prospect.getDataType(), visibility, timestamp.getTime(), sumValue);

        context.write(null, m);
    }
}
项目:openimaj    文件:CumulativeJacardReducer.java   
@Override
protected void reduce(LongWritable time, java.lang.Iterable<Text> words, org.apache.hadoop.mapreduce.Reducer<LongWritable,Text,NullWritable,Text>.Context context) throws java.io.IOException ,InterruptedException {
    HashSet<String> unseenwords = new HashSet<String>();
    StringWriter writer = new StringWriter();

    for (Text text : words) {
        unseenwords.add(text.toString());
    }
    long intersection = 0;
    for (String string : unseenwords) {
        if(this.seenwords.contains(string)) intersection += 1;
        this.seenwords.add(string);
    }

    JacardIndex index = new JacardIndex(time.get(),intersection,this.seenwords.size());
    IOUtils.writeASCII(writer, index);
    context.write(NullWritable.get(), new Text(writer.toString()));
}
项目:hadoop-plus    文件:KNNCombiner.java   
protected void reduce(
            Text key,
            java.lang.Iterable<Vector2SF> value,
            org.apache.hadoop.mapreduce.Reducer<Text, Vector2SF, Text, Vector2SF>.Context context)
            throws java.io.IOException, InterruptedException {
        ArrayList<Vector2SF> vs = new ArrayList<Vector2SF>();
        // sort each vector2SF by similarty
//      System.out.println("combining key: " + key + " value: " );
        for (Vector2SF v : value) {
//          System.out.println(v.getV1() + ", " + v.getV2());
            vs.add(new Vector2SF(v.getV1(), v.getV2()));
        }
        Collections.sort(vs, new Comparator<Vector2SF>() {
            @Override
            public int compare(Vector2SF o1, Vector2SF o2) {
                return Double.compare(o2.getV2(), o1.getV2());
            }
        });
//      System.out.println("vs after sorting: " + vs);
        int k = context.getConfiguration().getInt("cn.ac.ict.htc.knn.k", 4);

        for (int i = 0; i < k && i < vs.size(); i++) {
 //         System.out.println("key: " + key + " vs[" + i + "]: " + vs.get(i));
            context.write(key, vs.get(i));
        }
    }
项目:hadoop-plus    文件:MapReduceTestUtil.java   
/**
 * Creates a simple fail job.
 * 
 * @param conf Configuration object
 * @param outdir Output directory.
 * @param indirs Comma separated input directories.
 * @return Job initialized for a simple kill job.
 * @throws Exception If an error occurs creating job configuration.
 */
public static Job createKillJob(Configuration conf, Path outdir, 
    Path... indirs) throws Exception {

  Job theJob = Job.getInstance(conf);
  theJob.setJobName("Kill-Job");

  FileInputFormat.setInputPaths(theJob, indirs);
  theJob.setMapperClass(KillMapper.class);
  theJob.setReducerClass(Reducer.class);
  theJob.setNumReduceTasks(0);
  FileOutputFormat.setOutputPath(theJob, outdir);
  theJob.setOutputKeyClass(Text.class);
  theJob.setOutputValueClass(Text.class);
  return theJob;
}
项目:kylin    文件:MapReduceUtil.java   
public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, CuboidScheduler cuboidScheduler)
        throws IOException {
    KylinConfig kylinConfig = cubeSeg.getConfig();

    Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig).getCuboidSizeMap();
    double totalSizeInM = 0;
    for (Double cuboidSize : cubeSizeMap.values()) {
        totalSizeInM += cuboidSize;
    }

    double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
    double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();

    // number of reduce tasks
    int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);

    // at least 1 reducer by default
    numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
    // no more than 500 reducer by default
    numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);

    logger.info("Having total map input MB " + Math.round(totalSizeInM));
    logger.info("Having per reduce MB " + perReduceInputMB);
    logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
    return numReduceTasks;
}
项目:openimaj    文件:TimeIndex.java   
@Override
public void reduce(LongWritable timeslot, Iterable<LongWritable> counts, Reducer<LongWritable,LongWritable,NullWritable,Text>.Context context){
    try {
        String timeStr = timeslot.toString();
        long total = 0;
        for (LongWritable count : counts) {
            total += count.get();
        }
        StringWriter swriter = new StringWriter();
        CSVPrinter writer = new CSVPrinter(swriter);
        writer.write(new String[]{timeStr,total + ""});
        writer.flush();
        String toWrote = swriter.toString();
        context.write(NullWritable.get(), new Text(toWrote));
        return;

    } catch (Exception e) {
        System.err.println("Couldn't reduce to final file");
    }
}
项目:hadoop    文件:MapReduceTestUtil.java   
/**
 * Creates a simple fail job.
 * 
 * @param conf Configuration object
 * @param outdir Output directory.
 * @param indirs Comma separated input directories.
 * @return Job initialized for a simple kill job.
 * @throws Exception If an error occurs creating job configuration.
 */
public static Job createKillJob(Configuration conf, Path outdir, 
    Path... indirs) throws Exception {

  Job theJob = Job.getInstance(conf);
  theJob.setJobName("Kill-Job");

  FileInputFormat.setInputPaths(theJob, indirs);
  theJob.setMapperClass(KillMapper.class);
  theJob.setReducerClass(Reducer.class);
  theJob.setNumReduceTasks(0);
  FileOutputFormat.setOutputPath(theJob, outdir);
  theJob.setOutputKeyClass(Text.class);
  theJob.setOutputValueClass(Text.class);
  return theJob;
}
项目:hadoop    文件:TestLineRecordReaderJobs.java   
/**
 * Creates and runs an MR job
 *
 * @param conf
 * @throws IOException
 * @throws InterruptedException
 * @throws ClassNotFoundException
 */
public void createAndRunJob(Configuration conf) throws IOException,
    InterruptedException, ClassNotFoundException {
  Job job = Job.getInstance(conf);
  job.setJarByClass(TestLineRecordReaderJobs.class);
  job.setMapperClass(Mapper.class);
  job.setReducerClass(Reducer.class);
  FileInputFormat.addInputPath(job, inputDir);
  FileOutputFormat.setOutputPath(job, outputDir);
  job.waitForCompletion(true);
}
项目:hadoop    文件:Chain.java   
ReduceRunner(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context,
    Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer,
    RecordWriter<KEYOUT, VALUEOUT> rw) throws IOException,
    InterruptedException {
  this.reducer = reducer;
  this.chainContext = context;
  this.rw = rw;
}
项目:hadoop    文件:Chain.java   
/**
 * Create a reduce context that is based on ChainMapContext and the given
 * record writer
 */
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
    RecordWriter<KEYOUT, VALUEOUT> rw,
    ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
    Configuration conf) {
  ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext = 
    new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
        context, rw, conf);
  Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext = 
    new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
      .getReducerContext(reduceContext);
  return reducerContext;
}
项目:hadoop    文件:Chain.java   
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
    TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
    throws IOException, InterruptedException {
  RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
      context);
  Reducer.Context reducerContext = createReduceContext(rw,
      (ReduceContext) context, rConf);
  reducer.run(reducerContext);
  rw.close(context);
}
项目:hadoop    文件:Chain.java   
/**
 * Add reducer that reads from context and writes to a queue
 */
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
    ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
    InterruptedException {

  Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
      Object.class);
  Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
      Object.class);
  RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
      outputQueue, rConf);
  Reducer.Context reducerContext = createReduceContext(rw,
      (ReduceContext) inputContext, rConf);
  ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
  threads.add(runner);
}
项目:hadoop    文件:JobContextImpl.java   
/**
 * Get the combiner class for the job.
 * 
 * @return the combiner class for the job.
 */
@SuppressWarnings("unchecked")
public Class<? extends Reducer<?,?,?,?>> getCombinerClass() 
   throws ClassNotFoundException {
  return (Class<? extends Reducer<?,?,?,?>>) 
    conf.getClass(COMBINE_CLASS_ATTR, null);
}
项目:hadoop    文件:JobContextImpl.java   
/**
 * Get the {@link Reducer} class for the job.
 * 
 * @return the {@link Reducer} class for the job.
 */
@SuppressWarnings("unchecked")
public Class<? extends Reducer<?,?,?,?>> getReducerClass() 
   throws ClassNotFoundException {
  return (Class<? extends Reducer<?,?,?,?>>) 
    conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);
}
项目:ditb    文件:HashTable.java   
public Job createSubmittableJob(String[] args) throws IOException {
  Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
  generatePartitions(partitionsPath);

  Job job = Job.getInstance(getConf(),
        getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
  Configuration jobConf = job.getConfiguration();
  jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
  job.setJarByClass(HashTable.class);

  TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
      HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);

  // use a TotalOrderPartitioner and reducers to group region output into hash files
  job.setPartitionerClass(TotalOrderPartitioner.class);
  TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
  job.setReducerClass(Reducer.class);  // identity reducer
  job.setNumReduceTasks(tableHash.numHashFiles);
  job.setOutputKeyClass(ImmutableBytesWritable.class);
  job.setOutputValueClass(ImmutableBytesWritable.class);
  job.setOutputFormatClass(MapFileOutputFormat.class);
  FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));

  return job;
}
项目:ditb    文件:IntegrationTestBigLinkedList.java   
@Override
protected void cleanup(
    Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
    throws IOException, InterruptedException {
  if (this.connection != null) {
    this.connection.close();
  }
  super.cleanup(context);
}
项目:aliyun-oss-hadoop-fs    文件:MapReduceTestUtil.java   
/**
 * Creates a simple fail job.
 * 
 * @param conf Configuration object
 * @param outdir Output directory.
 * @param indirs Comma separated input directories.
 * @return Job initialized for a simple fail job.
 * @throws Exception If an error occurs creating job configuration.
 */
public static Job createFailJob(Configuration conf, Path outdir, 
    Path... indirs) throws Exception {
  FileSystem fs = outdir.getFileSystem(conf);
  if (fs.exists(outdir)) {
    fs.delete(outdir, true);
  }
  conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
  Job theJob = Job.getInstance(conf);
  theJob.setJobName("Fail-Job");

  FileInputFormat.setInputPaths(theJob, indirs);
  theJob.setMapperClass(FailMapper.class);
  theJob.setReducerClass(Reducer.class);
  theJob.setNumReduceTasks(0);
  FileOutputFormat.setOutputPath(theJob, outdir);
  theJob.setOutputKeyClass(Text.class);
  theJob.setOutputValueClass(Text.class);
  return theJob;
}
项目:aliyun-oss-hadoop-fs    文件:MapReduceTestUtil.java   
/**
 * Creates a simple fail job.
 * 
 * @param conf Configuration object
 * @param outdir Output directory.
 * @param indirs Comma separated input directories.
 * @return Job initialized for a simple kill job.
 * @throws Exception If an error occurs creating job configuration.
 */
public static Job createKillJob(Configuration conf, Path outdir, 
    Path... indirs) throws Exception {

  Job theJob = Job.getInstance(conf);
  theJob.setJobName("Kill-Job");

  FileInputFormat.setInputPaths(theJob, indirs);
  theJob.setMapperClass(KillMapper.class);
  theJob.setReducerClass(Reducer.class);
  theJob.setNumReduceTasks(0);
  FileOutputFormat.setOutputPath(theJob, outdir);
  theJob.setOutputKeyClass(Text.class);
  theJob.setOutputValueClass(Text.class);
  return theJob;
}
项目:aliyun-oss-hadoop-fs    文件:TestLineRecordReaderJobs.java   
/**
 * Creates and runs an MR job
 *
 * @param conf
 * @throws IOException
 * @throws InterruptedException
 * @throws ClassNotFoundException
 */
public void createAndRunJob(Configuration conf) throws IOException,
    InterruptedException, ClassNotFoundException {
  Job job = Job.getInstance(conf);
  job.setJarByClass(TestLineRecordReaderJobs.class);
  job.setMapperClass(Mapper.class);
  job.setReducerClass(Reducer.class);
  FileInputFormat.addInputPath(job, inputDir);
  FileOutputFormat.setOutputPath(job, outputDir);
  job.waitForCompletion(true);
}
项目:aliyun-oss-hadoop-fs    文件:Chain.java   
ReduceRunner(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context,
    Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer,
    RecordWriter<KEYOUT, VALUEOUT> rw) throws IOException,
    InterruptedException {
  this.reducer = reducer;
  this.chainContext = context;
  this.rw = rw;
}
项目:aliyun-oss-hadoop-fs    文件:Chain.java   
/**
 * Create a reduce context that is based on ChainMapContext and the given
 * record writer
 */
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
    RecordWriter<KEYOUT, VALUEOUT> rw,
    ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
    Configuration conf) {
  ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext = 
    new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
        context, rw, conf);
  Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext = 
    new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
      .getReducerContext(reduceContext);
  return reducerContext;
}
项目:aliyun-oss-hadoop-fs    文件:Chain.java   
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
    TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
    throws IOException, InterruptedException {
  RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
      context);
  Reducer.Context reducerContext = createReduceContext(rw,
      (ReduceContext) context, rConf);
  reducer.run(reducerContext);
  rw.close(context);
}
项目:aliyun-oss-hadoop-fs    文件:Chain.java   
/**
 * Add reducer that reads from context and writes to a queue
 */
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
    ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
    InterruptedException {

  Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
      Object.class);
  Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
      Object.class);
  RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
      outputQueue, rConf);
  Reducer.Context reducerContext = createReduceContext(rw,
      (ReduceContext) inputContext, rConf);
  ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
  threads.add(runner);
}
项目:aliyun-oss-hadoop-fs    文件:JobContextImpl.java   
/**
 * Get the combiner class for the job.
 * 
 * @return the combiner class for the job.
 */
@SuppressWarnings("unchecked")
public Class<? extends Reducer<?,?,?,?>> getCombinerClass() 
   throws ClassNotFoundException {
  return (Class<? extends Reducer<?,?,?,?>>) 
    conf.getClass(COMBINE_CLASS_ATTR, null);
}
项目:aliyun-oss-hadoop-fs    文件:JobContextImpl.java   
/**
 * Get the {@link Reducer} class for the job.
 * 
 * @return the {@link Reducer} class for the job.
 */
@SuppressWarnings("unchecked")
public Class<? extends Reducer<?,?,?,?>> getReducerClass() 
   throws ClassNotFoundException {
  return (Class<? extends Reducer<?,?,?,?>>) 
    conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);
}
项目:elephant56    文件:GlobalDistributedDriver.java   
public Job createJob(
        Configuration configuration,
        int numberOfNodes,
        long currentGenerationNumber,
        String generationNameFormat,
        Path currentGenerationsBlockReportsFolderPath,
        Schema individualWrapperSchema
) throws IOException {
    // Creates a job.
    Job job = super.createJob(configuration, numberOfNodes, currentGenerationNumber, currentGenerationNumber,
            (currentGenerationNumber - 1L), currentGenerationNumber, generationNameFormat,
            currentGenerationsBlockReportsFolderPath, individualWrapperSchema,
            GlobalMapper.class, Partitioner.class, Reducer.class);

    // Sets the input.
    NodesInputFormat.setInputPopulationFolderPath(job, this.getInputFolderPath());
    NodesInputFormat.activateInitialisation(job, false);

    // Configures the fitness value class.
    job.getConfiguration().setClass(Constants.CONFIGURATION_FITNESS_VALUE_CLASS, this.fitnessValueClass,
            FitnessValue.class);

    // Configures the Fitness Evaluation phase.
    job.getConfiguration().setClass(Constants.CONFIGURATION_FITNESS_EVALUATION_CLASS, this.fitnessEvaluationClass,
            FitnessEvaluation.class);

    // Disables the reducer.
    job.setNumReduceTasks(0);

    // Returns the job.
    return job;
}
项目:big-c    文件:MapReduceTestUtil.java   
/**
 * Creates a simple fail job.
 * 
 * @param conf Configuration object
 * @param outdir Output directory.
 * @param indirs Comma separated input directories.
 * @return Job initialized for a simple fail job.
 * @throws Exception If an error occurs creating job configuration.
 */
public static Job createFailJob(Configuration conf, Path outdir, 
    Path... indirs) throws Exception {
  FileSystem fs = outdir.getFileSystem(conf);
  if (fs.exists(outdir)) {
    fs.delete(outdir, true);
  }
  conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
  Job theJob = Job.getInstance(conf);
  theJob.setJobName("Fail-Job");

  FileInputFormat.setInputPaths(theJob, indirs);
  theJob.setMapperClass(FailMapper.class);
  theJob.setReducerClass(Reducer.class);
  theJob.setNumReduceTasks(0);
  FileOutputFormat.setOutputPath(theJob, outdir);
  theJob.setOutputKeyClass(Text.class);
  theJob.setOutputValueClass(Text.class);
  return theJob;
}
项目:big-c    文件:MapReduceTestUtil.java   
/**
 * Creates a simple fail job.
 * 
 * @param conf Configuration object
 * @param outdir Output directory.
 * @param indirs Comma separated input directories.
 * @return Job initialized for a simple kill job.
 * @throws Exception If an error occurs creating job configuration.
 */
public static Job createKillJob(Configuration conf, Path outdir, 
    Path... indirs) throws Exception {

  Job theJob = Job.getInstance(conf);
  theJob.setJobName("Kill-Job");

  FileInputFormat.setInputPaths(theJob, indirs);
  theJob.setMapperClass(KillMapper.class);
  theJob.setReducerClass(Reducer.class);
  theJob.setNumReduceTasks(0);
  FileOutputFormat.setOutputPath(theJob, outdir);
  theJob.setOutputKeyClass(Text.class);
  theJob.setOutputValueClass(Text.class);
  return theJob;
}
项目:big-c    文件:TestLineRecordReaderJobs.java   
/**
 * Creates and runs an MR job
 *
 * @param conf
 * @throws IOException
 * @throws InterruptedException
 * @throws ClassNotFoundException
 */
public void createAndRunJob(Configuration conf) throws IOException,
    InterruptedException, ClassNotFoundException {
  Job job = Job.getInstance(conf);
  job.setJarByClass(TestLineRecordReaderJobs.class);
  job.setMapperClass(Mapper.class);
  job.setReducerClass(Reducer.class);
  FileInputFormat.addInputPath(job, inputDir);
  FileOutputFormat.setOutputPath(job, outputDir);
  job.waitForCompletion(true);
}
项目:big-c    文件:Chain.java   
ReduceRunner(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context,
    Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer,
    RecordWriter<KEYOUT, VALUEOUT> rw) throws IOException,
    InterruptedException {
  this.reducer = reducer;
  this.chainContext = context;
  this.rw = rw;
}
项目:big-c    文件:Chain.java   
/**
 * Create a reduce context that is based on ChainMapContext and the given
 * record writer
 */
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
    RecordWriter<KEYOUT, VALUEOUT> rw,
    ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
    Configuration conf) {
  ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext = 
    new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
        context, rw, conf);
  Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext = 
    new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
      .getReducerContext(reduceContext);
  return reducerContext;
}