Java 类org.apache.hadoop.mapred.lib.MultipleInputs 实例源码

项目:fst-bench    文件:HiveData.java   
private void createUserVisitsTableDirectly() throws IOException, URISyntaxException {

        log.info("Creating user visits...");

        Path rankings = new Path(options.getResultPath(), RANKINGS);
        Path fout = new Path(options.getResultPath(), USERVISITS);

        JobConf job = new JobConf(HiveData.class);
        String jobname = "Create uservisits";
        job.setJobName(jobname);
        setVisitsOptions(job);

        /***
         * Set distributed cache file for table generation,
         * cache files include:
         * 1. user agents
         * 2. country code and language code
         * 3. search keys
         */

        Path uagentPath = new Path(options.getWorkPath(), uagentf);
        DistributedCache.addCacheFile(uagentPath.toUri(), job);

        Path countryPath = new Path(options.getWorkPath(), countryf);
        DistributedCache.addCacheFile(countryPath.toUri(), job);

        Path searchkeyPath = new Path(options.getWorkPath(), searchkeyf);
        DistributedCache.addCacheFile(searchkeyPath.toUri(), job);

        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);

        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(JoinBytesInt.class);

        MultipleInputs.addInputPath(job, dummy.getPath(),
                NLineInputFormat.class, DummyToAccessNoMapper.class);

        if (options.isSequenceOut()) {
            MultipleInputs.addInputPath(job, rankings,
                    SequenceFileInputFormat.class, SequenceRankingsToUrlsMapper.class);
        } else {
            MultipleInputs.addInputPath(job, rankings,
                    TextInputFormat.class, TextRankingsToUrlsMapper.class);
        }

        job.setCombinerClass(JoinBytesIntCombiner.class);
        job.setReducerClass(CreateUserVisitsReducer.class);

        if (options.getNumReds() > 0) {
            job.setNumReduceTasks(options.getNumReds());
        } else {
            job.setNumReduceTasks(Utils.getMaxNumReds());
        }

//      job.setNumReduceTasks(options.slots/2);

        if (options.isSequenceOut()) {
            job.setOutputFormat(SequenceFileOutputFormat.class);
        } else {
            job.setOutputFormat(TextOutputFormat.class);
        }

        if (null != options.getCodecClass()) {
            job.set("mapred.output.compression.type","BLOCK");
                        job.set("mapreduce.output.fileoutputformat.compress.type","BLOCK");
            FileOutputFormat.setCompressOutput(job, true);
            FileOutputFormat.setOutputCompressorClass(job, options.getCodecClass());
        }

        FileOutputFormat.setOutputPath(job, fout);

        log.info("Running Job: " +jobname);
        log.info("Dummy file " + dummy.getPath() + " as input");
        log.info("Rankings file " + rankings + " as input");
        log.info("Ouput file " + fout);
        JobClient.runJob(job);
        log.info("Finished Running Job: " + jobname);
    }
项目:hadoop-EAR    文件:HadoopArchives.java   
private void appendFromArchive(Path harSrc, List<Path> relativePaths, Path harDst) throws IOException {
  Path outputPath = harDst;
  FileOutputFormat.setOutputPath(conf, outputPath);
  FileSystem outFs = outputPath.getFileSystem(conf);

  if (!outFs.exists(outputPath)) {
    throw new IOException("Invalid Output. HAR File " + outputPath + "doesn't exist");
  }
  if (outFs.isFile(outputPath)) {
    throw new IOException("Invalid Output. HAR File " + outputPath
        + "must be represented as directory");
  }
  long totalSize = writeFromArchiveFilesToProcess(harSrc, relativePaths);

  //make it a har path
  FileSystem fs1 = harSrc.getFileSystem(conf);
  URI uri = fs1.getUri();
  Path parentPath = new Path("har://" + "hdfs-" + uri.getHost() +":" +
      uri.getPort() + fs1.makeQualified(harSrc).toUri().getPath());
  FileSystem fs = parentPath.getFileSystem(conf);

  conf.set(SRC_LIST_LABEL, srcFiles.toString());
  conf.set(SRC_PARENT_LABEL, parentPath.makeQualified(fs).toString());
  conf.setLong(TOTAL_SIZE_LABEL, totalSize);
  long partSize = conf.getLong(HAR_PARTSIZE_LABEL, HAR_PARTSIZE_DEFAULT);
  int numMaps = (int) (totalSize / partSize);
  //run atleast one map.
  conf.setNumMapTasks(numMaps == 0 ? 1 : numMaps);
  conf.setNumReduceTasks(1);
  conf.setOutputFormat(NullOutputFormat.class);
  conf.setMapOutputKeyClass(IntWritable.class);
  conf.setMapOutputValueClass(Text.class);
  conf.set("hadoop.job.history.user.location", "none");
  //make sure no speculative execution is done
  conf.setSpeculativeExecution(false);

  // set starting offset for mapper
  int partId = findFirstAvailablePartId(outputPath);
  conf.setInt(PART_ID_OFFSET, partId);

  Path index = new Path(outputPath, HarFileSystem.INDEX_NAME);
  Path indexDirectory = new Path(outputPath, HarFileSystem.INDEX_NAME + ".copy");
  outFs.mkdirs(indexDirectory);
  Path indexCopy = new Path(indexDirectory, "data");
  outFs.rename(index, indexCopy);

  MultipleInputs.addInputPath(conf, jobDirectory, HArchiveInputFormat.class,
      HArchivesMapper.class);
  MultipleInputs.addInputPath(conf, indexDirectory, TextInputFormat.class,
      HArchivesConvertingMapper.class);
  conf.setReducerClass(HArchivesMergingReducer.class);

  JobClient.runJob(conf);

  cleanJobDirectory();
}
项目:Acacia    文件:CSRConverter.java   
public static void main(String[] args) throws Exception {
    if (!validArgs(args)) {
        printUsage();
        return;
    }
    // These are the temp paths that are created on HDFS
    String dir1 = "/user/miyuru/csrconverter-output";
    String dir2 = "/user/miyuru/csrconverter-output-sorted";

    // We first delete the temporary directories if they exist on the HDFS
    FileSystem fs1 = FileSystem.get(new JobConf());

    System.out.println("Deleting the dir : " + dir1);

    if (fs1.exists(new Path(dir1))) {
        fs1.delete(new Path(dir1), true);
    }

    System.out.println("Done deleting the dir : " + dir1);
    System.out.println("Deleting the dir : " + dir2);
    if (fs1.exists(new Path(dir2))) {
        fs1.delete(new Path(dir2), true);
    }

    Path notinPath = new Path("/user/miyuru/notinverts/notinverts");

    if (!fs1.exists(notinPath)) {
        fs1.create(notinPath);
    }

    System.out.println("Done deleting the dir : " + dir2);

    // Note on Aug 23 2014: Sometimes after this the mapReduce job hangs.
    // need to see why.

    VertexCounterClient.setDefaultGraphID(args[3], args[2]);

    // First job creates the inverted index

    JobConf conf = new JobConf(CSRConverter.class);
    conf.set("org.acacia.partitioner.hbase.zookeeper.quorum", args[1]);
    conf.set("org.acacia.partitioner.hbase.table", args[2]);
    conf.set("org.acacia.partitioner.hbase.contacthost", args[3]);
    conf.setOutputKeyClass(LongWritable.class);
    conf.setOutputValueClass(Text.class);
    // conf.setMapperClass(InvertedMapper.class);
    conf.setReducerClass(InvertedReducer.class);
    // conf.setInputFormat(TextInputFormat.class);
    conf.setInputFormat(NLinesInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);

    // FileInputFormat.setInputPaths(conf, new Path(args[0]));
    MultipleInputs.addInputPath(conf, new Path(args[0]),
            NLinesInputFormat.class, InvertedMapper.class);
    MultipleInputs.addInputPath(conf, new Path(
            "/user/miyuru/notinverts/notinverts"), TextInputFormat.class,
            InvertedMapper.class);
    FileOutputFormat.setOutputPath(conf, new Path(dir1));

    // Also for the moment we turn-off the speculative execution
    conf.setBoolean("mapred.map.tasks.speculative.execution", false);
    conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
    conf.setNumMapTasks(96);
    conf.setNumReduceTasks(96);
    conf.setPartitionerClass(VertexPartitioner.class);
    conf.set("vertex-count", args[4]);
    conf.set("zero-flag", args[5]);
    Job job = new Job(conf, "csr_inverter");
    job.setSortComparatorClass(SortComparator.class);
    job.waitForCompletion(true);
}