Java 类org.apache.hadoop.mapreduce.Reducer.Context 实例源码

项目:SOAPgaea    文件:VCFRecalibrator.java   
/**
 * recal one vcf file
 * @param id
 * @param values
 * @throws InterruptedException 
 * @throws IOException
 */
public void recalVCF(int id, Context context) throws IOException, InterruptedException{
    long start,end;
    start = System.currentTimeMillis();
    recalTable.getRecalibrationTable();
    end = System.currentTimeMillis();
    System.err.println("recal table time:"+(end-start)/1000+"s");

    start = System.currentTimeMillis();
    recalTable.indexData();
    end = System.currentTimeMillis();
    System.err.println("recal table index time:"+(end-start)/1000+"s");
    for (final Tranche t : recalTable.getTranches()) {
        if (t.ts >= options.getTSFilterLevel()) {
            tranches.add(t);
        }
    }
    // this algorithm wants the tranches ordered from best (lowest truth sensitivity) to worst (highest truth sensitivity)
    Collections.reverse(tranches); 
}
项目:big-c    文件:HistogramRatings.java   
public void map(Object key, Text value, 
    Context context) throws IOException, InterruptedException{

  int rating, reviewIndex, movieIndex;
  String reviews = new String();
  String tok = new String();
  String ratingStr = new String();

  String line = ((Text)value).toString();
  movieIndex = line.indexOf(":");
  if (movieIndex > 0) {
    reviews = line.substring(movieIndex + 1);
    StringTokenizer token = new StringTokenizer(reviews, ",");
    while (token.hasMoreTokens()) {
      tok = token.nextToken();
      reviewIndex = tok.indexOf("_");
      ratingStr = tok.substring(reviewIndex + 1);
      rating = Integer.parseInt(ratingStr);
      context.write(new IntWritable(rating), one);
    }
  }
}
项目:newsRecommender    文件:TFIDF2.java   
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
                InterruptedException {
            // 同一个单词会被分成同一个group
            float sum = 0;
            List<String> vals = new ArrayList<String>();
            for (Text str : values) {
                int index = str.toString().lastIndexOf(" ");
                sum += Integer.parseInt(str.toString().substring(index + 1)); // 统计此单词在所有文件中出现的次数
                vals.add(str.toString().substring(0, index)); // 保存
            }
            double tmp = Math.log10(totalArticle * 1.0 / (sum * 1.0)); // 单词在所有文件中出现的次数除以总文件数IDF
            for (int j = 0; j < vals.size(); j++) {
                String val = vals.get(j);
                String newsID=val.substring(0,val.indexOf(" "));
                String end = val.substring(val.lastIndexOf(" "));
                float f_end = Float.parseFloat(end); // 读取TF
                val += " ";
                val += f_end * tmp; // tf-idf值
//              context.write(key, new Text(val));
                context.write(new Text(newsID), new Text(key+" "+val.substring(val.indexOf(" ")+1)));
            }
        }
项目:ldbc_snb_datagen    文件:PersonActivityGenerator.java   
public void generateActivityForBlock(int seed, ArrayList<Person> block, Context context) throws IOException {
    randomFarm_.resetRandomGenerators(seed);
    forumId = 0;
    messageId = 0;
    SN.machineId = seed;
    personActivitySerializer_.reset();
    int counter = 0;
    float personGenerationTime = 0.0f;
    for (Person p : block) {
        long start = System.currentTimeMillis();
        generateActivity(p, block);
        if (DatagenParams.updateStreams) {
            updateSerializer_.changePartition();
        }
        if (counter % 1000 == 0) {
            context.setStatus("Generating activity of person " + counter + " of block" + seed);
            context.progress();
        }
        float time = (System.currentTimeMillis() - start) / 1000.0f;
        personGenerationTime += time;
        counter++;
    }
    System.out.println("Average person activity generation time " + personGenerationTime / (float) block.size());
}
项目:bigdata-fingerprint    文件:Util.java   
@SuppressWarnings("rawtypes")
    public static LocalStructure [] readDistributedCacheFingerprint(Context context, String fpid, boolean discarding) throws IOException {

        URI[] input_files = context.getCacheFiles();

        @SuppressWarnings("unchecked")
        Class<? extends LocalStructure> MatcherClass = (Class<? extends LocalStructure>) Util.getClassFromProperty(context, "matcher");

        // Compute the localstructures of the input fingerprint
        // and store so that all maps and reduces can access.
        for(URI input_file : input_files) {
//          String[] lines = Util.readFileByLines(FilenameUtils.getName(input_file.getPath()));
            String[] lines = Util.readFileByLines(input_file.getPath());

            for(String line : lines) {
                if(LocalStructure.decodeFpid(line).equals(fpid))
                    return LocalStructure.extractLocalStructures(MatcherClass, line);
            }
        }

        System.err.println("readDistributedCacheFingerprint: input fingerprint " + fpid + " not found");
        return null;
    }
项目:htools    文件:HashPartitioner.java   
public static String getOutfile(Context context) {
    try {
        if (HashPartitioner.class.isAssignableFrom(context.getPartitionerClass())) {
            String outpath = context.getConfiguration().get(HPCONFIG);
            if (outpath != null) {
            int partition = Job.getReducerId(context);
            return PrintTools.sprintf("%s/partition.%5d", outpath, partition);
            } else {
                log.info("must use setOutPath on HashPartitioner");
            }
        }
    } catch (ClassNotFoundException ex) {
        log.exception(ex, "HashPartitioner");
    }
    throw new RuntimeException("fatal");
}
项目:SentimentAnalysis    文件:WriteIndicesSetDriver.java   
@Override
public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
 Text resText = null;
 String prim_key = new String(row.get());
 for(KeyValue kv : value.raw()) {
  try{
      double norm = (double)Integer.parseInt(new String(kv.getValue())) / prim_key.length();
      //double norm = (double)Integer.parseInt(new String(kv.getValue()));
      //double norm = (double)Integer.parseInt(new String(kv.getValue())) / kv.getQualifier().toString().length();
      resText = new Text(prim_key + "," + String.valueOf(norm));      
      String qual = new String (kv.getQualifier());
      context.write(new Text(qual), resText);
      //System.out.println("WriteIndicesMapper: w_i = " + prim_key + " w_c = " + qual + " <w_c>, <w_i, norm_ic> = " + resText);
  }
  catch(Exception e) {
      System.out.println("Exception in mapper for key = " + prim_key);
  }
      }
}
项目:recsys-offline    文件:Step32.java   
public void map(VarLongWritable key,VectorWritable value,Context context) throws IOException, InterruptedException{  

                long userID=key.get();  
                Vector userVector=value.get();  
                Iterator<Vector.Element> it=userVector.nonZeroes().iterator();  
                IntWritable itemi=new IntWritable();  
                while(it.hasNext()){  
                    Vector.Element e=it.next();  
                    int itemIndex=e.index();  
                    float preferenceValue=(float)e.get();  
                    itemi.set(itemIndex);  
                    context.write(itemi, new VectorOrPrefWritable(userID,preferenceValue));  
                   System.out.println("item :"+itemi+",userand val:"+userID+","+preferenceValue);  
                } 

        }
项目:chinesesegmentor    文件:ParallelTraining.java   
@Override
protected void reduce(NullWritable key, Iterable<TrainingWeights> values, Context context)
    throws IOException, InterruptedException {
  TrainingWeights result = null;
  int total = 0;
  for (TrainingWeights weights : values) {
    if (result == null) {
      result = weights;
    } else {
      addWeights(result, weights);
    }
    total++;
  }
  if (total > 1) {
    divideWeights(result, total);
  }
  context.write(NullWritable.get(), result);
}
项目:chinesesegmentor    文件:CalcFeatureWeights.java   
@Override
protected void reduce(IntWritable key, Iterable<MyKey> values, Context context)
    throws IOException, InterruptedException {
  double w = 0;
  int total = 0;
  double[] array = new double[6];
  for (MyKey value : values) {
    total++;
    w += value.score * value.score;
    array[value.id] = value.score;
  }
  if (total != 6) {
    throw new IOException("not 6 for: " + key.get());
  }

  MyKey k = new MyKey(key.get(), w);
  MyValue v = new MyValue(array);
  context.write(k, v);
}
项目:mutual-information-words    文件:wordcountReduce.java   
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    Set docs = new HashSet();
    Text output = new Text();
    StringBuffer result = new StringBuffer();
    for (Text val : values) {
        docs.add(val.toString());
    }       
    result.append(docs.size()+"#");
    Iterator setIter = docs.iterator();
    while(setIter.hasNext())
    {           
        Object setValue = setIter.next();
        result.append(setValue.toString() + "#");
    }
    output.set(result.toString().substring(0, result.length() - 1));
    context.write(key, output);
}
项目:SOAPgaea    文件:BasicReport.java   
public boolean constructMapReport(SamRecordDatum datum, ReferenceShare genome, String chrName, Context context) {
    ChromosomeInformationShare chrInfo = genome.getChromosomeInfo(chrName);
    rTracker.setTrackerAttribute(ReadType.TOTALREADS);
    // 当位点坐标值+read长度大于染色体的长度时,则不处理该read,进入下一次循环
    if(datum.getEnd() >= chrInfo.getLength()) {
        context.getCounter("Exception", "read end pos more than chr end pos").increment(1);
        return false;
    }

    rTracker.setTrackerAttribute(ReadType.MAPPED);
    bTracker.setTrackerAttribute(BaseType.TOTALBASE.setCount(datum.getBaseCount()));

    if(datum.isUniqueAlignment()) {
        rTracker.setTrackerAttribute(ReadType.UNIQUE);
    }

    if ((datum.getFlag() & 0x400) != 0) {
        rTracker.setTrackerAttribute(ReadType.DUP);
    }

    if ((datum.getFlag() & 0x40) != 0 && (datum.getFlag() & 0x8) == 0) {
        rTracker.setTrackerAttribute(ReadType.PE);
    }

    String cigar = datum.getCigarString();
    if (cigar.contains("S") || cigar.contains("H")) {
        rTracker.setTrackerAttribute(ReadType.CLIPPED);
    }

    if (cigar.contains("D") || cigar.contains("I")) {
        rTracker.setTrackerAttribute(ReadType.INDEL);
    }

    if (isMismatch(datum, chrInfo)) {
        rTracker.setTrackerAttribute(ReadType.MISMATCHREADS);
    }
    return true;
}
项目:SOAPgaea    文件:RecalibratorContextWriter.java   
@SuppressWarnings({ "rawtypes", "unchecked" })
public RecalibratorContextWriter(Context ctx,boolean multiple) {
    if(multiple)
        mos = new MultipleOutputs<NullWritable, Text>(ctx);
    this.context = ctx;
    value = new SamRecordWritable();
}
项目:Apriori_Hadoop    文件:Reduce.java   
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    int countItemFreq = 0;

    for (IntWritable value : values){
        countItemFreq += value.get();
    }

    int minsup = Integer.parseInt(context.getConfiguration().get("minsup"));

    if (countItemFreq >= minsup)
    {
        context.write(key, new IntWritable(countItemFreq));
    }
}
项目:big-c    文件:MutiWordcount.java   
public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
  }
}
项目:big-c    文件:MutiWordcount.java   
public void reduce(Text key, Iterable<IntWritable> values, 
                   Context context
                   ) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);
}
项目:big-c    文件:HistogramRatings.java   
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context
        ) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
      sum += val.get();
  }
  context.write(key, new IntWritable(sum));
}
项目:HadoopKMeansClustering    文件:KReducer.java   
@Override
public void setup(Context context) throws IOException{

 //get job configuration
 Configuration conf = context.getConfiguration();
 columns = Arrays.stream(conf.getStrings("columns"))
.map( s -> Integer.parseInt(s)).toArray(Integer[]::new);
 k = (int) conf.getInt("k", 10);
 currentIteration = conf.getInt("currentIteration", 0);
 lastIteration = conf.getBoolean("lastIteration", false);
 mos = new MultipleOutputs(context);

}
项目:newsRecommender    文件:TFIDF2.java   
public void map(LongWritable key, Text value, Context context) throws IOException,
        InterruptedException {
    String[] lineSplits = value.toString().split("\t");
    String newsID = lineSplits[1];
    String content = lineSplits[4];
    String publishTime=lineSplits[5];
    Calendar cal1 = Calendar.getInstance();
    try {
        cal1.setTime(new SimpleDateFormat("yyyy年MM月dd日HH:mm").parse(publishTime));
        publishTime=Long.toString(cal1.getTimeInMillis());
    } catch (Exception e) {
        publishTime="0";
    }
    context.write(new Text(newsID+"|"+publishTime+"|"+content),new Text(""));
}
项目:newsRecommender    文件:TFIDF2.java   
public void map(LongWritable key, Text value, Context context) throws IOException,
                InterruptedException {
            int all = 0; // 单词总数统计
            String[] lineSplits = value.toString().split("\\|");
            newsID = lineSplits[0];
            publishTime = lineSplits[1];
            content = lineSplits[2];

            Analyzer analyzer = new IKAnalyzer(false);
            TokenStream ts = analyzer.tokenStream("", new StringReader(content));
            ts.reset();
            CharTermAttribute cta = ts.getAttribute(CharTermAttribute.class);
            Map<String, Long> splitWordMap = new HashMap<String, Long>();
            while (ts.incrementToken()) {
                word = cta.toString();
                word += " ";
                word += newsID;
                all++;
                if (splitWordMap.containsKey(word))
                    splitWordMap.put(word, splitWordMap.get(word) + 1);
                else
                    splitWordMap.put(word, 1L);
            }
            Iterator iter = splitWordMap.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry<String, Long> entry = (Map.Entry<String, Long>) iter.next();
                String key1 = entry.getKey();
                Long val = entry.getValue();
//              下面的key值要加上一个单词的总字数 ,在生成每篇文章的词频矩阵时会用到。
                context.write(new Text(key1+"|"+all+"|"+publishTime), new Text((Float.parseFloat(val.toString()) / all)
                        + ""));
            }
        }
项目:newsRecommender    文件:TFIDF2.java   
public void map(LongWritable key, Text value, Context context) throws IOException,
        InterruptedException {
    String val = value.toString().replaceAll("  ", " "); // 将vlaue中的TAB分割符换成空格
    int index = val.indexOf(" ");
    String s1 = val.substring(0, index); // 获取单词 作为key es: hello
    String s2 = val.substring(index + 1); // 其余部分 作为value es: test1 0.11764706
    s2 += " ";
    s2 += "1"; // 统计单词在所有文章中出现的次数, “1” 表示出现一次。 es: test1 0.11764706 1
    context.write(new Text(s1), new Text(s2));
}
项目:newsRecommender    文件:TFIDF2.java   
public void map(LongWritable key, Text value, Context cxt) throws IOException,
        InterruptedException {
    String[] values = value.toString().split("\t");
    String[] values2=values[1].split(" ");
    newKey.setSymbol(values[0]);
    newKey.setSymbol2(values2[0]);
    newKey.setValue(Double.parseDouble(values2[1]));
    newKey.setValue2(Double.parseDouble(values2[2]));
    cxt.write(newKey, NullWritable.get());
}
项目:newsRecommender    文件:TFIDF2.java   
public void reduce(CustomKey key, Iterable<NullWritable> values, Context cxt)
        throws IOException, InterruptedException {
    int limit=0;
    for (NullWritable v : values) {
        if(++limit<=50)
            cxt.write(key, v);
    }
}
项目:search    文件:RejectingUpdateConflictResolver.java   
@Override
public Iterator<SolrInputDocument> orderUpdates(Text key, Iterator<SolrInputDocument> updates, Context ctx) {    
  SolrInputDocument firstUpdate = null;
  while (updates.hasNext()) {
    if (firstUpdate == null) {
      firstUpdate = updates.next();
      assert firstUpdate != null;
    } else {
      throw new IllegalArgumentException("Update conflict! Documents with the same unique key are forbidden: "
          + key);
    }
  }
  assert firstUpdate != null;
  return Collections.singletonList(firstUpdate).iterator();
}
项目:search    文件:RetainMostRecentUpdateConflictResolver.java   
/** Returns the most recent document among the colliding updates */
protected Iterator<SolrInputDocument> getMaximum(Iterator<SolrInputDocument> updates, String fieldName,
    Comparator child, Context context) {

  SolrInputDocumentComparator comp = new SolrInputDocumentComparator(fieldName, child);
  SolrInputDocument max = null;
  long numDupes = 0;
  long numOutdated = 0;
  while (updates.hasNext()) {
    SolrInputDocument next = updates.next(); 
    assert next != null;
    if (max == null) {
      max = next;
    } else {
      int c = comp.compare(next, max);
      if (c == 0) {
        LOG.debug("Ignoring document version because it is a duplicate: {}", next);
        numDupes++;
      } else if (c > 0) {
        LOG.debug("Ignoring document version because it is outdated: {}", max);
        max = next;
        numOutdated++;
      } else {
        LOG.debug("Ignoring document version because it is outdated: {}", next);        
        numOutdated++;
      }
    }
  }

  assert max != null;
  if (numDupes > 0) {
    context.getCounter(COUNTER_GROUP, DUPLICATES_COUNTER_NAME).increment(numDupes);
  }
  if (numOutdated > 0) {
    context.getCounter(COUNTER_GROUP, OUTDATED_COUNTER_NAME).increment(numOutdated);
  }
  return Collections.singletonList(max).iterator();
}
项目:FlexMap    文件:MutiWordcount.java   
public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
  }
}
项目:FlexMap    文件:MutiWordcount.java   
public void reduce(Text key, Iterable<IntWritable> values, 
                   Context context
                   ) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);
}
项目:htools    文件:DayPartitioner.java   
/**
 * @param context Reducer context
 * @return a YYYY-MM-DD datestring based on the configured startdate, enddates 
 * and the reducer number.
 */
public static String getDate(Context context) {
    Configuration conf = context.getConfiguration();
    int reducer = ContextTools.getTaskID(context);
    long start = conf.getLong(starttimelabel, 0) + reducer * secperday;
    return DateTools.FORMAT.Y_M_D.formatEpoch(start); 
}
项目:iis    文件:ImportInformationSpaceReducerTest.java   
@Before
public void init() throws Exception {
    reducer = new ImportInformationSpaceReducer() {

        @Override
        protected MultipleOutputs instantiateMultipleOutputs(Context context) {
            return multipleOutputs;
        }

    };
}
项目:Ankus    文件:MahoutDotProductDistributedCache.java   
@Override
public void map(IntWritable r, VectorWritable v, Context context) throws IOException {
    try {
        for (Entry<String, Vector> w : classWeights.entrySet()) {
            context.write(new Text(String.valueOf(r.get())+"_"+w.getKey()), new DoubleWritable(v.get().dot(w.getValue())));
        }

    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
项目:Ankus    文件:SubtractColumnMeans.java   
@Override
public void map(LongWritable r, VectorWritable v, Context context) throws IOException {
    try {
        Vector newV = v.get().minus(columnMeans);
        context.write(new IntWritable((int)r.get()), new VectorWritable(newV));
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
项目:SentimentAnalysis    文件:CalculateBetaDriver.java   
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String line = value.toString();
    String [] words = line.split("\t");
    String [] sentence = words[1].split("=");
    String [] scores = sentence[1].split(";");
    String posScore = scores[0];
    String negScore = scores[1];

    context.write(new Text(words[0]), new Text(posScore + ";" + negScore));     
}
项目:SentimentAnalysis    文件:CalculateBetaDriver.java   
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    double sumPos = 0;
    double sumNeg = 0;
    for(Text word: values) {
        String [] scores = word.toString().split(";");
        sumPos += Double.valueOf(scores[0]);
        sumNeg += Double.valueOf(scores[1]);                
    }
    context.write(key, new Text(sumPos + ";" + sumNeg));
}
项目:SentimentAnalysis    文件:FinalScoreCalculation.java   
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String line = value.toString();
    String [] words = line.split("\t");         

    context.write(new Text(words[0]), new Text(words[1]));
}
项目:SentimentAnalysis    文件:FinalScoreCalculation.java   
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {             
    for(Text text : values) {
        Put put = new Put(key.getBytes());
        String word = text.toString();
        String [] sentence = word.split("=");
        String [] scores = sentence[1].split(";");
        String posScore = scores[0];
        String negScore = scores[1];
        double beta = GeneralDriver.getBeta();
        double finScore = Double.valueOf(posScore) - beta * Double.valueOf(negScore);
        put.add(Bytes.toBytes("score"), Bytes.toBytes(sentence[0]), Bytes.toBytes(finScore));
        context.write(null, put);
    }
}
项目:SentimentAnalysis    文件:WriteIndicesSetDriver.java   
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 StringBuilder sb = new StringBuilder();
 for(Text str : values) {
     sb.append(str.toString() + "///"); 
 }
 context.write(key, new Text(sb.toString()));            
}
项目:recsys-offline    文件:Step32.java   
public void reduce(IntWritable key,Iterable<VectorOrPrefWritable> values ,Context context ) throws IOException, InterruptedException{  

    for(VectorOrPrefWritable va:values){  
        context.write(key, va);  
        System.err.println("key"+key.toString()+",vlaue"+va);  
    }  
}
项目:SEARUM    文件:RuleMiningReducer.java   
@Override
protected void setup(Context context) throws IOException,
        InterruptedException {
    super.setup(context);

    Parameters params = new Parameters(context.getConfiguration().get(
            "minConfidence", ""));

    /* Get Item Frequent List from DC */
    for (Pair<String, Long> e : ARM.readFList(context.getConfiguration())) {
        freqItemMap.put(e.getFirst(), e.getSecond());
    }

    minConfidence = Double.valueOf(params.get("minConfidence", "0.1"));
}
项目:chinesesegmentor    文件:ParallelTraining2.java   
@Override
public void map(Object key, Text value, final Context context) throws IOException,
    InterruptedException {
  String s = value.toString().split("\t", 2)[1];
  Instance instance = gson.fromJson(s, Instance.class);
  instances.add(instance);
}
项目:chinesesegmentor    文件:ParallelTraining2.java   
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Context context)
    throws IOException, InterruptedException {
  double sum = 0;
  int num = 0;
  for (DoubleWritable weights : values) {
    sum += weights.get();
    num++;
  }
  context.getCounter("TrainingReducer", "num" + num).increment(1);
  context.write(key, new DoubleWritable(sum / num));
}