Java 类org.apache.hadoop.mapred.OutputCollector 实例源码

项目:ditb    文件:TestTableInputFormat.java   
@Override
public void map(ImmutableBytesWritable key, Result value,
    OutputCollector<NullWritable,NullWritable> output,
    Reporter reporter) throws IOException {
  for (Cell cell : value.listCells()) {
    reporter.getCounter(TestTableInputFormat.class.getName() + ":row",
        Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
        .increment(1l);
    reporter.getCounter(TestTableInputFormat.class.getName() + ":family",
        Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
        .increment(1l);
    reporter.getCounter(TestTableInputFormat.class.getName() + ":value",
        Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
        .increment(1l);
  }
}
项目:mumu-mapreduce    文件:MaxTemperatureMapRedTest.java   
@Test
public void reduce() {
    MaxTemperatureMapRed.MaxTemperatureReduce maxTemperatureReduce = new MaxTemperatureMapRed.MaxTemperatureReduce();
    try {
        List<IntWritable> list = new ArrayList<IntWritable>();
        list.add(new IntWritable(12));
        list.add(new IntWritable(31));
        list.add(new IntWritable(45));
        list.add(new IntWritable(23));
        list.add(new IntWritable(21));
        maxTemperatureReduce.reduce(new Text("1901"), list.iterator(), new OutputCollector<Text, IntWritable>() {
            @Override
            public void collect(final Text text, final IntWritable intWritable) throws IOException {
                log.info(text.toString() + "  " + intWritable.get());
            }
        }, null);
    } catch (IOException e) {
        e.printStackTrace();
    }
}
项目:hadoop    文件:TestDFSIO.java   
@Override // IOMapperBase
void collectStats(OutputCollector<Text, Text> output, 
                  String name,
                  long execTime, 
                  Long objSize) throws IOException {
  long totalSize = objSize.longValue();
  float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
  LOG.info("Number of bytes processed = " + totalSize);
  LOG.info("Exec time = " + execTime);
  LOG.info("IO rate = " + ioRateMbSec);

  output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
      new Text(String.valueOf(1)));
  output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
      new Text(String.valueOf(totalSize)));
  output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
      new Text(String.valueOf(execTime)));
  output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
      new Text(String.valueOf(ioRateMbSec*1000)));
  output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
      new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
}
项目:hadoop    文件:PipesReducer.java   
/**
 * Process all of the keys and values. Start up the application if we haven't
 * started it yet.
 */
public void reduce(K2 key, Iterator<V2> values, 
                   OutputCollector<K3, V3> output, Reporter reporter
                   ) throws IOException {
  isOk = false;
  startApplication(output, reporter);
  downlink.reduceKey(key);
  while (values.hasNext()) {
    downlink.reduceValue(values.next());
  }
  if(skipping) {
    //flush the streams on every record input if running in skip mode
    //so that we don't buffer other records surrounding a bad record.
    downlink.flush();
  }
  isOk = true;
}
项目:hadoop    文件:PipesReducer.java   
@SuppressWarnings("unchecked")
private void startApplication(OutputCollector<K3, V3> output, Reporter reporter) throws IOException {
  if (application == null) {
    try {
      LOG.info("starting application");
      application = 
        new Application<K2, V2, K3, V3>(
            job, null, output, reporter, 
            (Class<? extends K3>) job.getOutputKeyClass(), 
            (Class<? extends V3>) job.getOutputValueClass());
      downlink = application.getDownlink();
    } catch (InterruptedException ie) {
      throw new RuntimeException("interrupted", ie);
    }
    int reduce=0;
    downlink.runReduce(reduce, Submitter.getIsJavaRecordWriter(job));
  }
}
项目:hadoop    文件:ValueAggregatorCombiner.java   
/** Combines values for a given key.  
 * @param key the key is expected to be a Text object, whose prefix indicates
 * the type of aggregation to aggregate the values. 
 * @param values the values to combine
 * @param output to collect combined values
 */
public void reduce(Text key, Iterator<Text> values,
                   OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
  String keyStr = key.toString();
  int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
  String type = keyStr.substring(0, pos);
  ValueAggregator aggregator = ValueAggregatorBaseDescriptor
    .generateValueAggregator(type);
  while (values.hasNext()) {
    aggregator.addNextValue(values.next());
  }
  Iterator outputs = aggregator.getCombinerOutput().iterator();

  while (outputs.hasNext()) {
    Object v = outputs.next();
    if (v instanceof Text) {
      output.collect(key, (Text)v);
    } else {
      output.collect(key, new Text(v.toString()));
    }
  }
}
项目:hadoop    文件:HadoopArchives.java   
public void reduce(IntWritable key, Iterator<Text> values,
    OutputCollector<Text, Text> out,
    Reporter reporter) throws IOException {
  keyVal = key.get();
  while(values.hasNext()) {
    Text value = values.next();
    String towrite = value.toString() + "\n";
    indexStream.write(towrite.getBytes(Charsets.UTF_8));
    written++;
    if (written > numIndexes -1) {
      // every 1000 indexes we report status
      reporter.setStatus("Creating index for archives");
      reporter.progress();
      endIndex = keyVal;
      String masterWrite = startIndex + " " + endIndex + " " + startPos 
                          +  " " + indexStream.getPos() + " \n" ;
      outStream.write(masterWrite.getBytes(Charsets.UTF_8));
      startPos = indexStream.getPos();
      startIndex = endIndex;
      written = 0;
    }
  }
}
项目:hadoop    文件:DistCh.java   
/** Run a FileOperation */
public void map(Text key, FileOperation value,
    OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
    ) throws IOException {
  try {
    value.run(jobconf);
    ++succeedcount;
    reporter.incrCounter(Counter.SUCCEED, 1);
  } catch (IOException e) {
    ++failcount;
    reporter.incrCounter(Counter.FAIL, 1);

    String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
    out.collect(null, new Text(s));
    LOG.info(s);
  } finally {
    reporter.setStatus(getCountString());
  }
}
项目:hadoop    文件:DataJoinMapperBase.java   
public void map(Object key, Object value,
                OutputCollector output, Reporter reporter) throws IOException {
  if (this.reporter == null) {
    this.reporter = reporter;
  }
  addLongValue("totalCount", 1);
  TaggedMapOutput aRecord = generateTaggedMapOutput(value);
  if (aRecord == null) {
    addLongValue("discardedCount", 1);
    return;
  }
  Text groupKey = generateGroupKey(aRecord);
  if (groupKey == null) {
    addLongValue("nullGroupKeyCount", 1);
    return;
  }
  output.collect(groupKey, aRecord);
  addLongValue("collectedCount", 1);
}
项目:hadoop    文件:DataJoinReducerBase.java   
public void reduce(Object key, Iterator values,
                   OutputCollector output, Reporter reporter) throws IOException {
  if (this.reporter == null) {
    this.reporter = reporter;
  }

  SortedMap<Object, ResetableIterator> groups = regroup(key, values, reporter);
  Object[] tags = groups.keySet().toArray();
  ResetableIterator[] groupValues = new ResetableIterator[tags.length];
  for (int i = 0; i < tags.length; i++) {
    groupValues[i] = groups.get(tags[i]);
  }
  joinAndCollect(tags, groupValues, key, output, reporter);
  addLongValue("groupCount", 1);
  for (int i = 0; i < tags.length; i++) {
    groupValues[i].close();
  }
}
项目:hadoop    文件:DataJoinReducerBase.java   
/**
 * Perform the actual join recursively.
 * 
 * @param tags
 *          a list of input tags
 * @param values
 *          a list of value lists, each corresponding to one input source
 * @param pos
 *          indicating the next value list to be joined
 * @param partialList
 *          a list of values, each from one value list considered so far.
 * @param key
 * @param output
 * @throws IOException
 */
private void joinAndCollect(Object[] tags, ResetableIterator[] values,
                            int pos, Object[] partialList, Object key,
                            OutputCollector output, Reporter reporter) throws IOException {

  if (values.length == pos) {
    // get a value from each source. Combine them
    TaggedMapOutput combined = combine(tags, partialList);
    collect(key, combined, output, reporter);
    return;
  }
  ResetableIterator nextValues = values[pos];
  nextValues.reset();
  while (nextValues.hasNext()) {
    Object v = nextValues.next();
    partialList[pos] = v;
    joinAndCollect(tags, values, pos + 1, partialList, key, output, reporter);
  }
}
项目:ditb    文件:TestTableMapReduceUtil.java   
@Override
public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
    OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter)
    throws IOException {
  String strKey = Bytes.toString(key.get());
  List<Put> result = new ArrayList<Put>();
  while (values.hasNext())
    result.add(values.next());

  if (relation.keySet().contains(strKey)) {
    Set<String> set = relation.get(strKey);
    if (set != null) {
      assertEquals(set.size(), result.size());
    } else {
      throwAccertionError("Test infrastructure error: set is null");
    }
  } else {
    throwAccertionError("Test infrastructure error: key not found in map");
  }
}
项目:ditb    文件:TestTableMapReduceUtil.java   
@Override
public void map(ImmutableBytesWritable row, Result result,
    OutputCollector<ImmutableBytesWritable, Put> outCollector,
    Reporter reporter) throws IOException {
  String rowKey = Bytes.toString(result.getRow());
  final ImmutableBytesWritable pKey = new ImmutableBytesWritable(
      Bytes.toBytes(PRESIDENT_PATTERN));
  final ImmutableBytesWritable aKey = new ImmutableBytesWritable(
      Bytes.toBytes(ACTOR_PATTERN));
  ImmutableBytesWritable outKey = null;

  if (rowKey.startsWith(PRESIDENT_PATTERN)) {
    outKey = pKey;
  } else if (rowKey.startsWith(ACTOR_PATTERN)) {
    outKey = aKey;
  } else {
    throw new AssertionError("unexpected rowKey");
  }

  String name = Bytes.toString(result.getValue(COLUMN_FAMILY,
      COLUMN_QUALIFIER));
  outCollector.collect(outKey, new Put(Bytes.toBytes("rowKey2")).add(
      COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(name)));
}
项目:ditb    文件:TestIdentityTableMap.java   
@Test
@SuppressWarnings({ "deprecation", "unchecked" })
public void shouldCollectPredefinedTimes() throws IOException {
  int recordNumber = 999;
  Result resultMock = mock(Result.class);
  IdentityTableMap identityTableMap = null;
  try {
    Reporter reporterMock = mock(Reporter.class);
    identityTableMap = new IdentityTableMap();
    ImmutableBytesWritable bytesWritableMock = mock(ImmutableBytesWritable.class);
    OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
        mock(OutputCollector.class);

    for (int i = 0; i < recordNumber; i++)
      identityTableMap.map(bytesWritableMock, resultMock, outputCollectorMock,
          reporterMock);

    verify(outputCollectorMock, times(recordNumber)).collect(
        Mockito.any(ImmutableBytesWritable.class), Mockito.any(Result.class));
  } finally {
    if (identityTableMap != null)
      identityTableMap.close();
  }
}
项目:big-c    文件:DataJoinReducerBase.java   
public void reduce(Object key, Iterator values,
                   OutputCollector output, Reporter reporter) throws IOException {
  if (this.reporter == null) {
    this.reporter = reporter;
  }

  SortedMap<Object, ResetableIterator> groups = regroup(key, values, reporter);
  Object[] tags = groups.keySet().toArray();
  ResetableIterator[] groupValues = new ResetableIterator[tags.length];
  for (int i = 0; i < tags.length; i++) {
    groupValues[i] = groups.get(tags[i]);
  }
  joinAndCollect(tags, groupValues, key, output, reporter);
  addLongValue("groupCount", 1);
  for (int i = 0; i < tags.length; i++) {
    groupValues[i].close();
  }
}
项目:big-c    文件:PipesReducer.java   
/**
 * Process all of the keys and values. Start up the application if we haven't
 * started it yet.
 */
public void reduce(K2 key, Iterator<V2> values, 
                   OutputCollector<K3, V3> output, Reporter reporter
                   ) throws IOException {
  isOk = false;
  startApplication(output, reporter);
  downlink.reduceKey(key);
  while (values.hasNext()) {
    downlink.reduceValue(values.next());
  }
  if(skipping) {
    //flush the streams on every record input if running in skip mode
    //so that we don't buffer other records surrounding a bad record.
    downlink.flush();
  }
  isOk = true;
}
项目:gemfirexd-oss    文件:VerifyHdfsDataUsingMR.java   
@Override
public void map(Key key, Row value, OutputCollector<Text, MyRow> output, Reporter reporter) throws IOException {      
  String tableName = null;
  try {
    ResultSet rs = value.getRowAsResultSet();
    tableName = rs.getMetaData().getTableName(1);

    Log.getLogWriter().info("i am in a mapper and table Name is " + tableName);

    int cid = rs.getInt("cid");
    String cname = rs.getString("cust_name");
    String addr = rs.getString("addr");
    int tid = rs.getInt("tid");

    Log.getLogWriter().info("mapper procesing record from " + tableName + ": " + cid + ": " + cname + ": "  + addr + ": " + tid);                 

    Text myKey = new Text(Integer.toString(cid));
    MyRow myRow = new MyRow (cid, cname ,  addr , tid);
    Log.getLogWriter().info("MAPPER writing intermediate record " + myRow.toString());

    output.collect(myKey, myRow);

  } catch (SQLException se) {
    System.err.println("Error logging result set" + se);
  }
}
项目:es-hadoop-v2.2.0    文件:EsHadoopScheme.java   
@Override
public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {

    conf.setOutputFormat(EsOutputFormat.class);
    // define an output dir to prevent Cascading from setting up a TempHfs and overriding the OutputFormat
    Settings set = loadSettings(conf, false);

    Log log = LogFactory.getLog(EsTap.class);
    InitializationUtils.setValueWriterIfNotSet(set, CascadingValueWriter.class, log);
    InitializationUtils.setValueReaderIfNotSet(set, JdkValueReader.class, log);
    InitializationUtils.setBytesConverterIfNeeded(set, CascadingLocalBytesConverter.class, log);
    InitializationUtils.setFieldExtractorIfNotSet(set, CascadingFieldExtractor.class, log);

    // NB: we need to set this property even though it is not being used - and since and URI causes problem, use only the resource/file
    //conf.set("mapred.output.dir", set.getTargetUri() + "/" + set.getTargetResource());
    HadoopCfgUtils.setFileOutputFormatDir(conf, set.getResourceWrite());
    HadoopCfgUtils.setOutputCommitterClass(conf, EsOutputFormat.EsOldAPIOutputCommitter.class.getName());

    if (log.isTraceEnabled()) {
        log.trace("Initialized (sink) configuration " + HadoopCfgUtils.asProperties(conf));
    }
}
项目:aliyun-oss-hadoop-fs    文件:PipesReducer.java   
/**
 * Process all of the keys and values. Start up the application if we haven't
 * started it yet.
 */
public void reduce(K2 key, Iterator<V2> values, 
                   OutputCollector<K3, V3> output, Reporter reporter
                   ) throws IOException {
  isOk = false;
  startApplication(output, reporter);
  downlink.reduceKey(key);
  while (values.hasNext()) {
    downlink.reduceValue(values.next());
  }
  if(skipping) {
    //flush the streams on every record input if running in skip mode
    //so that we don't buffer other records surrounding a bad record.
    downlink.flush();
  }
  isOk = true;
}
项目:GeoCrawler    文件:NodeDumper.java   
/**
 * Outputs the url with the appropriate number of inlinks, outlinks, or for
 * score.
 */
public void map(Text key, Node node,
    OutputCollector<FloatWritable, Text> output, Reporter reporter)
    throws IOException {

  float number = 0;
  if (inlinks) {
    number = node.getNumInlinks();
  } else if (outlinks) {
    number = node.getNumOutlinks();
  } else {
    number = node.getInlinkScore();
  }

  // number collected with negative to be descending
  output.collect(new FloatWritable(-number), key);
}
项目:gemfirexd-oss    文件:BusyAirports.java   
@Override
public void map(Object key, ResultSet rs,
    OutputCollector<Text, IntWritable> output,
    Reporter reporter) throws IOException {

  String origAirport;
  String destAirport;

  try {
    while (rs.next()) {
      origAirport = rs.getString("ORIG_AIRPORT");
      destAirport = rs.getString("DEST_AIRPORT");
      reusableText.set(origAirport);
      output.collect(reusableText, countOne);
      reusableText.set(destAirport);
      output.collect(reusableText, countOne);
    }
  } catch (SQLException e) {
    e.printStackTrace();
  }
}
项目:GeoCrawler    文件:NodeDumper.java   
/**
 * Outputs the host or domain as key for this record and numInlinks,
 * numOutlinks or score as the value.
 */
public void map(Text key, Node node,
    OutputCollector<Text, FloatWritable> output, Reporter reporter)
    throws IOException {

  float number = 0;
  if (inlinks) {
    number = node.getNumInlinks();
  } else if (outlinks) {
    number = node.getNumOutlinks();
  } else {
    number = node.getInlinkScore();
  }

  if (host) {
    key.set(URLUtil.getHost(key.toString()));
  } else {
    key.set(URLUtil.getDomainName(key.toString()));
  }

  output.collect(key, new FloatWritable(number));
}
项目:aliyun-oss-hadoop-fs    文件:HadoopArchives.java   
public void reduce(IntWritable key, Iterator<Text> values,
    OutputCollector<Text, Text> out,
    Reporter reporter) throws IOException {
  keyVal = key.get();
  while(values.hasNext()) {
    Text value = values.next();
    String towrite = value.toString() + "\n";
    indexStream.write(towrite.getBytes(Charsets.UTF_8));
    written++;
    if (written > numIndexes -1) {
      // every 1000 indexes we report status
      reporter.setStatus("Creating index for archives");
      reporter.progress();
      endIndex = keyVal;
      String masterWrite = startIndex + " " + endIndex + " " + startPos 
                          +  " " + indexStream.getPos() + " \n" ;
      outStream.write(masterWrite.getBytes(Charsets.UTF_8));
      startPos = indexStream.getPos();
      startIndex = endIndex;
      written = 0;
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:DataJoinMapperBase.java   
public void map(Object key, Object value,
                OutputCollector output, Reporter reporter) throws IOException {
  if (this.reporter == null) {
    this.reporter = reporter;
  }
  addLongValue("totalCount", 1);
  TaggedMapOutput aRecord = generateTaggedMapOutput(value);
  if (aRecord == null) {
    addLongValue("discardedCount", 1);
    return;
  }
  Text groupKey = generateGroupKey(aRecord);
  if (groupKey == null) {
    addLongValue("nullGroupKeyCount", 1);
    return;
  }
  output.collect(groupKey, aRecord);
  addLongValue("collectedCount", 1);
}
项目:aliyun-oss-hadoop-fs    文件:DataJoinReducerBase.java   
/**
 * Perform the actual join recursively.
 * 
 * @param tags
 *          a list of input tags
 * @param values
 *          a list of value lists, each corresponding to one input source
 * @param pos
 *          indicating the next value list to be joined
 * @param partialList
 *          a list of values, each from one value list considered so far.
 * @param key
 * @param output
 * @throws IOException
 */
private void joinAndCollect(Object[] tags, ResetableIterator[] values,
                            int pos, Object[] partialList, Object key,
                            OutputCollector output, Reporter reporter) throws IOException {

  if (values.length == pos) {
    // get a value from each source. Combine them
    TaggedMapOutput combined = combine(tags, partialList);
    collect(key, combined, output, reporter);
    return;
  }
  ResetableIterator nextValues = values[pos];
  nextValues.reset();
  while (nextValues.hasNext()) {
    Object v = nextValues.next();
    partialList[pos] = v;
    joinAndCollect(tags, values, pos + 1, partialList, key, output, reporter);
  }
}
项目:THUTag    文件:ImportDouban.java   
public void map(LongWritable key, Text value,
    OutputCollector<Text, Text> collector, Reporter r) throws IOException {
  String json = value.toString();
  if (json.contains("\"tag\":")) {
    // This is a douban raw tag.
    DoubanRawTag tag = J.fromTextAsJson(value, DoubanRawTag.class);
    outkey.set(Long.toString(tag.getSubject_id()));
    r.incrCounter(MRCounter.NUM_TAGS, 1);
  } else {
    // This is a douban subject.
    DoubanRawSubject subject =
      J.fromTextAsJson(value, DoubanRawSubject.class);
    // We use books only.
    if (subject.getCat_id() != DOUBAN_BOOK_CATID) {
      return;
    }
    outkey.set(Long.toString(subject.getId()));
    r.incrCounter(MRCounter.NUM_SUBJECTS, 1);
  }
  collector.collect(outkey, value);
}
项目:big-c    文件:HadoopArchives.java   
public void reduce(IntWritable key, Iterator<Text> values,
    OutputCollector<Text, Text> out,
    Reporter reporter) throws IOException {
  keyVal = key.get();
  while(values.hasNext()) {
    Text value = values.next();
    String towrite = value.toString() + "\n";
    indexStream.write(towrite.getBytes(Charsets.UTF_8));
    written++;
    if (written > numIndexes -1) {
      // every 1000 indexes we report status
      reporter.setStatus("Creating index for archives");
      reporter.progress();
      endIndex = keyVal;
      String masterWrite = startIndex + " " + endIndex + " " + startPos 
                          +  " " + indexStream.getPos() + " \n" ;
      outStream.write(masterWrite.getBytes(Charsets.UTF_8));
      startPos = indexStream.getPos();
      startIndex = endIndex;
      written = 0;
    }
  }
}
项目:gemfirexd-oss    文件:TopBusyAirportGemfirexd.java   
@Override
public void map(Object key, Row row,
    OutputCollector<Text, IntWritable> output,
    Reporter reporter) throws IOException {

  String origAirport;
  String destAirport;

  try {
    ResultSet rs = row.getRowAsResultSet();
    origAirport = rs.getString("ORIG_AIRPORT");
    destAirport = rs.getString("DEST_AIRPORT");
    reusableText.set(origAirport);
    output.collect(reusableText, countOne);
    reusableText.set(destAirport);
    output.collect(reusableText, countOne);
  } catch (SQLException e) {
    e.printStackTrace();
  }
}
项目:big-c    文件:ValueAggregatorReducer.java   
/**
 * @param key
 *          the key is expected to be a Text object, whose prefix indicates
 *          the type of aggregation to aggregate the values. In effect, data
 *          driven computing is achieved. It is assumed that each aggregator's
 *          getReport method emits appropriate output for the aggregator. This
 *          may be further customiized.
 * @param values
 *          the values to be aggregated
 */
public void reduce(Text key, Iterator<Text> values,
                   OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
  String keyStr = key.toString();
  int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
  String type = keyStr.substring(0, pos);
  keyStr = keyStr.substring(pos
                            + ValueAggregatorDescriptor.TYPE_SEPARATOR.length());

  ValueAggregator aggregator = ValueAggregatorBaseDescriptor
    .generateValueAggregator(type);
  while (values.hasNext()) {
    aggregator.addNextValue(values.next());
  }

  String val = aggregator.getReport();
  key = new Text(keyStr);
  output.collect(key, new Text(val));
}
项目:aliyun-maxcompute-data-collectors    文件:ExplicitSetMapper.java   
public void map(LongWritable key, Text val,
    OutputCollector<Text, NullWritable> out, Reporter r) throws IOException {

  // Try to set the field.
  userRecord.setField(setCol, setVal);
  Map<String, Object> fieldVals = userRecord.getFieldMap();
  if (!fieldVals.get(setCol).equals(setVal)) {
    throw new IOException("Could not set column value! Got back "
        + fieldVals.get(setCol));
  } else {
    LOG.info("Correctly changed value for col " + setCol + " to " + setVal);
  }
}
项目:aliyun-maxcompute-data-collectors    文件:ReparseMapper.java   
public void map(LongWritable key, Text val,
    OutputCollector<Text, NullWritable> out, Reporter r) throws IOException {

  LOG.info("Mapper input line: " + val.toString());

  try {
    // Use the user's record class to parse the line back in.
    userRecord.parse(val);
  } catch (RecordParser.ParseError pe) {
    LOG.error("Got parse error: " + pe.toString());
    throw new IOException(pe);
  }

  LOG.info("Mapper output line: " + userRecord.toString());

  out.collect(new Text(userRecord.toString()), NullWritable.get());

  if (!userRecord.toString(false).equals(val.toString())) {
    // Could not format record w/o end-of-record delimiter.
    throw new IOException("Returned string w/o EOR has value ["
        + userRecord.toString(false) + "] when ["
        + val.toString() + "] was expected.");
  }

  if (!userRecord.toString().equals(val.toString() + "\n")) {
    // misparsed.
    throw new IOException("Returned string has value ["
        + userRecord.toString() + "] when ["
        + val.toString() + "\n] was expected.");
  }
}
项目:hadoop    文件:ExternalMapReduce.java   
public void map(WritableComparable key, Writable value,
                OutputCollector<WritableComparable, IntWritable> output,
                Reporter reporter)
  throws IOException {
  //check for classpath
  String classpath = System.getProperty("java.class.path");
  if (classpath.indexOf("testjob.jar") == -1) {
    throw new IOException("failed to find in the library " + classpath);
  }
  if (classpath.indexOf("test.jar") == -1) {
    throw new IOException("failed to find the library test.jar in" 
        + classpath);
  }
  //fork off ls to see if the file exists.
  // java file.exists() will not work on 
  // Windows since it is a symlink
  String[] argv = new String[7];
  argv[0] = "ls";
  argv[1] = "files_tmp";
  argv[2] = "localfilelink";
  argv[3] = "dfsfilelink";
  argv[4] = "tarlink";
  argv[5] = "ziplink";
  argv[6] = "test.tgz";
  Process p = Runtime.getRuntime().exec(argv);
  int ret = -1;
  try {
    ret = p.waitFor();
  } catch(InterruptedException ie) {
    //do nothing here.
  }
  if (ret != 0) {
    throw new IOException("files_tmp does not exist");
  }
}
项目:hadoop    文件:SliveReducer.java   
@Override // Reducer
public void reduce(Text key, Iterator<Text> values,
    OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
  OperationOutput collector = null;
  int reduceAm = 0;
  int errorAm = 0;
  logAndSetStatus(reporter, "Iterating over reduction values for key " + key);
  while (values.hasNext()) {
    Text value = values.next();
    try {
      OperationOutput val = new OperationOutput(key, value);
      if (collector == null) {
        collector = val;
      } else {
        collector = OperationOutput.merge(collector, val);
      }
      LOG.info("Combined " + val + " into/with " + collector);
      ++reduceAm;
    } catch (Exception e) {
      ++errorAm;
      logAndSetStatus(reporter, "Error iterating over reduction input "
          + value + " due to : " + StringUtils.stringifyException(e));
      if (getConfig().shouldExitOnFirstError()) {
        break;
      }
    }
  }
  logAndSetStatus(reporter, "Reduced " + reduceAm + " values with " + errorAm
      + " errors");
  if (collector != null) {
    logAndSetStatus(reporter, "Writing output " + collector.getKey() + " : "
        + collector.getOutputValue());
    output.collect(collector.getKey(), collector.getOutputValue());
  }
}
项目:hadoop    文件:SliveMapper.java   
/**
 * Runs the given operation and reports on its results
 * 
 * @param op
 *          the operation to run
 * @param reporter
 *          the status reporter to notify
 * @param output
 *          the output to write to
 * @throws IOException
 */
private void runOperation(Operation op, Reporter reporter,
    OutputCollector<Text, Text> output, long opNum) throws IOException {
  if (op == null) {
    return;
  }
  logAndSetStatus(reporter, "Running operation #" + opNum + " (" + op + ")");
  List<OperationOutput> opOut = op.run(filesystem);
  logAndSetStatus(reporter, "Finished operation #" + opNum + " (" + op + ")");
  if (opOut != null && !opOut.isEmpty()) {
    for (OperationOutput outData : opOut) {
      output.collect(outData.getKey(), outData.getOutputValue());
    }
  }
}
项目:hadoop    文件:LoadGeneratorMR.java   
@Override
public void reduce(Text key, Iterator<IntWritable> values,
    OutputCollector<Text, IntWritable> output, Reporter reporter)
    throws IOException {
  int sum = 0;
  while (values.hasNext()) {
    sum += values.next().get();
  }
  if (key.equals(OPEN_EXECTIME)){
    executionTime[OPEN] = sum;
  } else if (key.equals(NUMOPS_OPEN)){
    numOfOps[OPEN] = sum;
  } else if (key.equals(LIST_EXECTIME)){
    executionTime[LIST] = sum;
  } else if (key.equals(NUMOPS_LIST)){
    numOfOps[LIST] = sum;
  } else if (key.equals(DELETE_EXECTIME)){
    executionTime[DELETE] = sum;
  } else if (key.equals(NUMOPS_DELETE)){
    numOfOps[DELETE] = sum;
  } else if (key.equals(CREATE_EXECTIME)){
    executionTime[CREATE] = sum;
  } else if (key.equals(NUMOPS_CREATE)){
    numOfOps[CREATE] = sum;
  } else if (key.equals(WRITE_CLOSE_EXECTIME)){
    System.out.println(WRITE_CLOSE_EXECTIME + " = " + sum);
    executionTime[WRITE_CLOSE]= sum;
  } else if (key.equals(NUMOPS_WRITE_CLOSE)){
    numOfOps[WRITE_CLOSE] = sum;
  } else if (key.equals(TOTALOPS)){
    totalOps = sum;
  } else if (key.equals(ELAPSED_TIME)){
    totalTime = sum;
  }
  result.set(sum);
  output.collect(key, result);
  // System.out.println("Key = " + key + " Sum is =" + sum);
  // printResults(System.out);
}
项目:hadoop    文件:MRCaching.java   
public void map(LongWritable key, Text value,
                OutputCollector<Text, IntWritable> output,
                Reporter reporter) throws IOException {
  String line = value.toString();
  StringTokenizer itr = new StringTokenizer(line);
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    output.collect(word, one);
  }

}
项目:hadoop    文件:MRCaching.java   
public void reduce(Text key, Iterator<IntWritable> values,
                   OutputCollector<Text, IntWritable> output,
                   Reporter reporter) throws IOException {
  int sum = 0;
  while (values.hasNext()) {
    sum += values.next().get();
  }
  output.collect(key, new IntWritable(sum));
}
项目:hadoop    文件:WordCount.java   
public void map(LongWritable key, Text value, 
                OutputCollector<Text, IntWritable> output, 
                Reporter reporter) throws IOException {
  String line = value.toString();
  StringTokenizer itr = new StringTokenizer(line);
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    output.collect(word, one);
  }
}
项目:hadoop    文件:WordCount.java   
public void reduce(Text key, Iterator<IntWritable> values,
                   OutputCollector<Text, IntWritable> output, 
                   Reporter reporter) throws IOException {
  int sum = 0;
  while (values.hasNext()) {
    sum += values.next().get();
  }
  output.collect(key, new IntWritable(sum));
}
项目:hadoop    文件:TestDatamerge.java   
public void reduce(IntWritable key, Iterator<IntWritable> values,
                   OutputCollector<Text, Text> output,
                   Reporter reporter) throws IOException {
  int seen = 0;
  while (values.hasNext()) {
    seen += values.next().get();
  }
  assertTrue("Bad count for " + key.get(), verify(key.get(), seen));
}