public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException { List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen); List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen); JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif); JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2); JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2); zips.foreach( splits -> { Path path = splits._1.getPath(); FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1); FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2); writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir+"/"+path.getParent().getName()+"_"+splits._1.getStart()+".fq"); }); }
private static void splitFastq(FileStatus fst, String fqPath, String splitDir, int splitlen, JavaSparkContext sc) throws IOException { Path fqpath = new Path(fqPath); String fqname = fqpath.getName(); String[] ns = fqname.split("\\."); //TODO: Handle also compressed files List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen); JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif); splitRDD.foreach( split -> { FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), split); writeFastqFile(fqreader, new Configuration(), splitDir + "/split_" + split.getStart() + "." + ns[1]); }); }
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException { String[] ns = fst.getPath().getName().split("\\."); //TODO: Handle also compressed files List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen); List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen); JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif); JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2); JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2); zips.foreach( splits -> { Path path = splits._1.getPath(); FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1); FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2); writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir, path.getParent().getName()+"_"+splits._1.getStart()+".fq"); }); }
@SuppressWarnings("deprecation") @Override public void initializeMemberVariables() { xmlFilename = new String("mapred-default.xml"); configurationClasses = new Class[] { MRJobConfig.class, MRConfig.class, JHAdminConfig.class, ShuffleHandler.class, FileOutputFormat.class, FileInputFormat.class, Job.class, NLineInputFormat.class, JobConf.class, FileOutputCommitter.class }; // Initialize used variables configurationPropsToSkipCompare = new HashSet<String>(); // Set error modes errorIfMissingConfigProps = true; errorIfMissingXmlProps = false; // Ignore deprecated MR1 properties in JobConf configurationPropsToSkipCompare .add(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY); configurationPropsToSkipCompare .add(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY); }
public static void setupJob(Job job, int minFeaturesPerSplit, long featureCount) { if (minFeaturesPerSplit > 0) { if (featureCount < 0) { throw new IllegalArgumentException("Expected a feature count"); } int maxMapTasks = job.getConfiguration().getInt("mapred.tasktracker.map.tasks.maximum", -1); if (maxMapTasks > 0) { int featuresPerSplit = (int) (featureCount / maxMapTasks); if (featuresPerSplit < minFeaturesPerSplit) { featuresPerSplit = minFeaturesPerSplit; } job.getConfiguration().setBoolean(USE_NLINE_FORMAT, true); NLineInputFormat.setNumLinesPerSplit(job, featuresPerSplit); } } }
/** * Java wrapper for {@link NLineInputFormat#getNumLinesPerSplit(org.apache.hadoop.mapreduce.JobContext)}. * * @param ctx the JavaScript context * @param thisObj the 'this' object * @param args the function arguments * @param func the function being called * * @return the number of lines per split */ @JSStaticFunction public static Object getNumLinesPerSplit(final Context ctx, final Scriptable thisObj, final Object[] args, final Function func) { final Object arg0 = args.length >= 1 ? args[0] : Undefined.instance; if (args.length < 1) { throw Utils.makeError(ctx, thisObj, LembosMessages.ONE_ARG_EXPECTED); } else if (!JavaScriptUtils.isDefined(arg0)) { throw Utils.makeError(ctx, thisObj, LembosMessages.FIRST_ARG_REQUIRED); } else if (!(arg0 instanceof JobWrap)) { throw Utils.makeError(ctx, thisObj, LembosMessages.FIRST_ARG_MUST_BE_JOB); } return NLineInputFormat.getNumLinesPerSplit(((JobWrap)arg0).getJob()); }
/** * Java wrapper for {@link NLineInputFormat#setNumLinesPerSplit(org.apache.hadoop.mapreduce.Job, int)}. * * @param ctx the JavaScript context * @param thisObj the 'this' object * @param args the function arguments * @param func the function called (unused) */ @JSStaticFunction public static void setNumLinesPerSplit(final Context ctx, final Scriptable thisObj, final Object[] args, final Function func) { final Object arg0 = args.length >= 1 ? args[0] : Undefined.instance; final Object arg1 = args.length >= 2 ? args[1] : Undefined.instance; if (args.length < 2) { throw Utils.makeError(ctx, thisObj, LembosMessages.TWO_ARGS_EXPECTED); } else if (!JavaScriptUtils.isDefined(arg0)) { throw Utils.makeError(ctx, thisObj, LembosMessages.FIRST_ARG_REQUIRED); } else if (!JavaScriptUtils.isDefined(arg1)) { throw Utils.makeError(ctx, thisObj, LembosMessages.SECOND_ARG_REQUIRED); } else if (!(arg0 instanceof JobWrap)) { throw Utils.makeError(ctx, thisObj, LembosMessages.FIRST_ARG_MUST_BE_JOB); } else if (!(arg1 instanceof Number)) { throw Utils.makeError(ctx, thisObj, LembosMessages.SECOND_ARG_ARG_MUST_BE_NUM); } NLineInputFormat.setNumLinesPerSplit(((JobWrap)arg0).getJob(), JavaScriptUtils.fromNumber(arg1).intValue()); }
private static void splitFastq(FileStatus fst, String fqPath, String splitDir, int splitlen, JavaSparkContext sc) throws IOException { Path fqpath = new Path(fqPath); String fqname = fqpath.getName(); String[] ns = fqname.split("\\."); List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen); JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif); splitRDD.foreach( split -> { FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), split); writeFastqFile(fqreader, new Configuration(), splitDir + "/split_" + split.getStart() + "." + ns[1]); }); }
@Override public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf()); job.setJarByClass(Phase3Step4LocalDeDuplication.class); job.setJobName(Phase3Step4LocalDeDuplication.class.getName()); // paths String inputPath = args[0]; // text files of ids to be deleted String outputPath = args[1]; // input: reading max N lines for each mapper job.setInputFormatClass(NLineInputFormat.class); NLineInputFormat.addInputPath(job, new Path(inputPath)); job.getConfiguration().setInt("mapreduce.input.lineinputformat.linespermap", LINES); // mapper job.setMapperClass(LocalGreedyDeDuplicationMapper.class); LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); // reducer job.setReducerClass(IDCollectorReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileOutputFormat.setOutputPath(job, new Path(outputPath)); return job.waitForCompletion(true) ? 0 : 1; }
/** * Generates a Person hadoop sequence file containing key-value paiers * where the key is the person id and the value is the person itself. * * @param outputFileName The name of the file to store the persons. * @throws Exception */ public void run(String outputFileName, String postKeySetterName) throws Exception { String hadoopDir = new String(conf.get("ldbc.snb.datagen.serializer.hadoopDir")); String tempFile = hadoopDir + "/mrInputFile"; FileSystem dfs = FileSystem.get(conf); dfs.delete(new Path(tempFile), true); writeToOutputFile(tempFile, Integer.parseInt(conf.get("ldbc.snb.datagen.generator.numThreads")), conf); int numThreads = Integer.parseInt(conf.get("ldbc.snb.datagen.generator.numThreads")); conf.setInt("mapreduce.input.lineinputformat.linespermap", 1); conf.set("postKeySetterName", postKeySetterName); Job job = Job.getInstance(conf, "SIB Generate Users & 1st Dimension"); job.setMapOutputKeyClass(TupleKey.class); job.setMapOutputValueClass(Person.class); job.setOutputKeyClass(TupleKey.class); job.setOutputValueClass(Person.class); job.setJarByClass(HadoopPersonGeneratorMapper.class); job.setMapperClass(HadoopPersonGeneratorMapper.class); job.setReducerClass(HadoopPersonGeneratorReducer.class); job.setNumReduceTasks(numThreads); job.setInputFormatClass(NLineInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(tempFile)); FileOutputFormat.setOutputPath(job, new Path(outputFileName)); if (!job.waitForCompletion(true)) { throw new Exception(); } }
private void doMapReduce(final Class<? extends Test> cmd, TestOptions opts) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = getConf(); Path inputDir = writeInputFile(conf, opts); conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); Job job = new Job(conf); job.setJarByClass(PerformanceEvaluation.class); job.setJobName("HBase Performance Evaluation"); job.setInputFormatClass(NLineInputFormat.class); NLineInputFormat.setInputPaths(job, inputDir); // this is default, but be explicit about it just in case. NLineInputFormat.setNumLinesPerSplit(job, 1); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); job.setMapperClass(EvaluationMapTask.class); job.setReducerClass(LongSumReducer.class); job.setNumReduceTasks(1); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJars(job.getConfiguration(), DescriptiveStatistics.class, // commons-math ObjectMapper.class); // jackson-mapper-asl TableMapReduceUtil.initCredentials(job); job.waitForCompletion(true); }
/** * Run method called for starting a MapReduce Job */ public int run(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { checkRequiredPaths(); long startTime = 0; if(measureTime) startTime = System.nanoTime(); Configuration conf = getConf(); Job job = Job.getInstance(conf, "ImageSearcher"); job.setJarByClass(ImageSearcher.class); job.setMapperClass(ImageSearchMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(ImageDistanceMap.class); job.setReducerClass(ImageSearchReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(NLineInputFormat.class); job.setNumReduceTasks(1); FileInputFormat.addInputPath(job, new Path(conf.get("ImageFeatures"))); FileOutputFormat.setOutputPath(job, new Path(conf.get("Output"))); boolean res = job.waitForCompletion(true); if(measureTime) { long elapsedTime = System.nanoTime() - startTime; System.out.println("== MapReduce Execution Time: " + (double)elapsedTime / 1000000000.0 + "s =="); } return res ? 0 : 1; }
@Override public void initializeMemberVariables() { xmlFilename = new String("mapred-default.xml"); configurationClasses = new Class[] { MRJobConfig.class, MRConfig.class, JHAdminConfig.class, ShuffleHandler.class, FileOutputFormat.class, FileInputFormat.class, Job.class, NLineInputFormat.class, JobConf.class, FileOutputCommitter.class }; // Initialize used variables configurationPropsToSkipCompare = new HashSet<String>(); xmlPropsToSkipCompare = new HashSet<String>(); // Set error modes errorIfMissingConfigProps = true; errorIfMissingXmlProps = false; // Ignore deprecated MR1 properties in JobConf configurationPropsToSkipCompare .add(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY); configurationPropsToSkipCompare .add(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY); // Obsolete entries listed in MAPREDUCE-6057 were removed from trunk // but not removed from branch-2. xmlPropsToSkipCompare.add("map.sort.class"); xmlPropsToSkipCompare.add("mapreduce.local.clientfactory.class.name"); xmlPropsToSkipCompare.add("mapreduce.jobtracker.system.dir"); xmlPropsToSkipCompare.add("mapreduce.jobtracker.staging.root.dir"); }
private void doMapReduce(final Class<? extends Test> cmd, TestOptions opts) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = getConf(); Path inputDir = writeInputFile(conf, opts); conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); Job job = new Job(conf); job.setJarByClass(PerformanceEvaluation.class); job.setJobName("HBase Performance Evaluation"); job.setInputFormatClass(NLineInputFormat.class); NLineInputFormat.setInputPaths(job, inputDir); // this is default, but be explicit about it just in case. NLineInputFormat.setNumLinesPerSplit(job, 1); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); job.setMapperClass(EvaluationMapTask.class); job.setReducerClass(LongSumReducer.class); job.setNumReduceTasks(1); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Histogram.class, // yammer metrics ObjectMapper.class); // jackson-mapper-asl TableMapReduceUtil.initCredentials(job); job.waitForCompletion(true); }
/** * Create the index Hadoop job. * @param conf the Hadoop configuration * @param submitFile the path to the submit file * @param jobDescription the job description * @return a Job object * @throws IOException if an error occurs while creating the index */ private Job createIndexJob(final Configuration conf, final DataFile submitFile, final String jobDescription) throws IOException { final Configuration jobConf = new Configuration(conf); // Set one task per map jobConf.set("mapreduce.input.lineinputformat.linespermap", "" + 1); // Set Job name // Create the job and its name final Job job = Job.getInstance(jobConf, jobDescription); // Set the jar job.setJarByClass(IndexerMapper.class); // Set input path FileInputFormat.addInputPath(job, new Path(submitFile.getSource())); job.setInputFormatClass(NLineInputFormat.class); // Set the Mapper class job.setMapperClass(IndexerMapper.class); // Set the output key class job.setOutputKeyClass(NullWritable.class); // Set the output value class job.setOutputValueClass(NullWritable.class); // Set the output format job.setOutputFormatClass(NullOutputFormat.class); // Set the number of reducers job.setNumReduceTasks(0); return job; }
/** Defines how to read data from a file into the Mapper instances. * This method sets the input format to the * 'NLineInputFormat' implementation. * @return The 'this' object */ @JSFunction public Egg nLineInputFormat () { job.setInputFormatClass(NLineInputFormat.class); return this; }
/** * Main method * @param args the first and only expected item in the String[] is the path to the textfile * containing a list of paths of files to be examined, each path on a single line. * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // set up the configuration // NOTE: apparently it's important to set the conf parameters prior to instantiating // the job with this config, which is somewhere cloned. Configuration conf = new Configuration(); // ***** list of config parameters, verified to work ***** conf.setInt(gHadoopVersion.linesPerMapKey(), 50); // 60 * 60 * 1000ms == 1h; this is very long but necessary for some files :-( conf.set(gHadoopVersion.taskTimeoutKey(), Integer.toString(60 * 60 * 1000)); // set up the job // String to use for name and output folder in HDFS String name = "FLintHadoop_"+System.currentTimeMillis(); Job job = new Job(conf, name); job.setJarByClass(FlintHadoop.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(name)); //set the mapper to this class' mapper job.setMapperClass(FlintMap.class); job.setReducerClass(FlintReduce.class); //this input format should split the input by one line per map by default. job.setInputFormatClass(NLineInputFormat.class); //sets how the output is written cf. OutputFormat job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CheckResultText.class); // TODO: shouldn't the number of allowed tasks be set in the config on the cluster, // as it's sensitive to the hardware setup rather than to this code? job.setNumReduceTasks(28); job.waitForCompletion(true); }
private static JavaPairRDD<Text, SequencedFragment> interleaveReads(String fastq, String fastq2, int splitlen, JavaSparkContext sc) throws IOException { FileSystem fs = FileSystem.get(new Configuration()); FileStatus fst = fs.getFileStatus(new Path(fastq)); FileStatus fst2 = fs.getFileStatus(new Path(fastq2)); List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen); List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen); JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif); JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2); JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2); return zips.flatMapToPair( splits -> { FastqInputFormat.FastqRecordReader fqreader = new FastqInputFormat.FastqRecordReader(new Configuration(), splits._1); FastqInputFormat.FastqRecordReader fqreader2 = new FastqInputFormat.FastqRecordReader(new Configuration(), splits._2); ArrayList<Tuple2<Text, SequencedFragment>> reads = new ArrayList<Tuple2<Text, SequencedFragment>>(); while (fqreader.nextKeyValue()) { String key = fqreader.getCurrentKey().toString(); String[] keysplit = key.split(" "); key = keysplit[0]; SequencedFragment sf = new SequencedFragment(); sf.setQuality(new Text(fqreader.getCurrentValue().getQuality().toString())); sf.setSequence(new Text(fqreader.getCurrentValue().getSequence().toString())); if (fqreader2.nextKeyValue()) { String key2 = fqreader2.getCurrentKey().toString(); String[] keysplit2 = key2.split(" "); key2 = keysplit2[0]; //key2 = key2.replace(" 2:N:0:1","/2"); SequencedFragment sf2 = new SequencedFragment(); sf2.setQuality(new Text(fqreader2.getCurrentValue().getQuality().toString())); sf2.setSequence(new Text(fqreader2.getCurrentValue().getSequence().toString())); reads.add(new Tuple2<Text, SequencedFragment>(new Text(key), sf)); reads.add(new Tuple2<Text, SequencedFragment>(new Text(key2), sf2)); } } return reads.iterator(); }); }
@Override public JavaRDD<Quad> parseQuads(String path) { Configuration conf = new Configuration(); Integer batchSize = config.getBatchSize(); conf.set(NLineInputFormat.LINES_PER_MAP, batchSize.toString()); if (config.getErrorHandling() == ParseErrorHandling.Throw) { conf.set(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, "false"); } else { conf.set(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, "true"); } Boolean isLineBased = config.getLineBasedFormat(); if (isLineBased == null) { isLineBased = guessIsLineBasedFormat(path); } JavaRDD<Quad> quads; Integer partitions = config.getRepartition(); if (isLineBased) { log.info("Parsing RDF in parallel with batch size: {}", batchSize); quads = sc.newAPIHadoopFile(path, NQuadsInputFormat.class, LongWritable.class, // position QuadWritable.class, // value conf).values().map(QuadWritable::get); } else { // let Jena guess the format, load whole files log.info("Input format is not line based, parsing RDF by Master node only."); quads = sc.newAPIHadoopFile(path, TriplesOrQuadsInputFormat.class, LongWritable.class, // position QuadWritable.class, // value conf).values().map(QuadWritable::get); if (partitions == null) { log.warn("Reading non-line based formats by master node only, consider setting --parsing.repartition to redistribute work to other nodes."); } } if (partitions != null) { log.info("Distributing workload, repartitioning into {} partitions", partitions); quads = quads.repartition(partitions); } final List<String> acceptedLanguages = config.getAcceptedLanguages(); // if only some languages are accepted if (!acceptedLanguages.isEmpty()) { // filter out literals of unsupported languages quads = quads.filter(quad -> !quad.getObject().isLiteral() || quad.getObject().getLiteralLanguage() == null || quad.getObject().getLiteralLanguage().isEmpty() || acceptedLanguages.contains(quad.getObject().getLiteralLanguage()) ); } return quads; }
@Override public int run(String[] args) throws Exception { if (args.length != 3) { System.err.println("Usage: bulkupdate [-D" + MRJobConfig.QUEUE_NAME + "=proofofconcepts] <input_file_with_SPARQL_queries> <output_path> <table_name>"); return -1; } TableMapReduceUtil.addDependencyJars(getConf(), HalyardExport.class, NTriplesUtil.class, Rio.class, AbstractRDFHandler.class, RDFFormat.class, RDFParser.class, HTable.class, HBaseConfiguration.class, AuthenticationProtos.class, Trace.class, Gauge.class); HBaseConfiguration.addHbaseResources(getConf()); getConf().setStrings(TABLE_NAME_PROPERTY, args[2]); getConf().setLong(DEFAULT_TIMESTAMP_PROPERTY, getConf().getLong(DEFAULT_TIMESTAMP_PROPERTY, System.currentTimeMillis())); Job job = Job.getInstance(getConf(), "HalyardBulkUpdate -> " + args[1] + " -> " + args[2]); NLineInputFormat.setNumLinesPerSplit(job, 1); job.setJarByClass(HalyardBulkUpdate.class); job.setMapperClass(SPARQLMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); job.setInputFormatClass(NLineInputFormat.class); job.setSpeculativeExecution(false); job.setReduceSpeculativeExecution(false); try (HTable hTable = HalyardTableUtils.getTable(getConf(), args[2], false, 0)) { HFileOutputFormat2.configureIncrementalLoad(job, hTable.getTableDescriptor(), hTable.getRegionLocator()); FileInputFormat.setInputPaths(job, args[0]); FileOutputFormat.setOutputPath(job, new Path(args[1])); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); if (job.waitForCompletion(true)) { new LoadIncrementalHFiles(getConf()).doBulkLoad(new Path(args[1]), hTable); LOG.info("Bulk Update Completed.."); return 0; } } return -1; }
/** * Prepare the Hadoop MR job, including configuring the job and setting up the input/output paths. */ private Path prepareHadoopJob(List<WorkUnit> workUnits) throws IOException { TimingEvent mrJobSetupTimer = this.eventSubmitter.getTimingEvent(TimingEventNames.RunJobTimings.MR_JOB_SETUP); this.job.setJarByClass(MRJobLauncher.class); this.job.setMapperClass(TaskRunner.class); // The job is mapper-only this.job.setNumReduceTasks(0); this.job.setInputFormatClass(NLineInputFormat.class); this.job.setOutputFormatClass(GobblinOutputFormat.class); this.job.setMapOutputKeyClass(NullWritable.class); this.job.setMapOutputValueClass(NullWritable.class); // Turn off speculative execution this.job.setSpeculativeExecution(false); // Job input path is where input work unit files are stored Path jobInputPath = new Path(this.mrJobDir, INPUT_DIR_NAME); // Prepare job input Path jobInputFile = prepareJobInput(jobInputPath, workUnits); NLineInputFormat.addInputPath(this.job, jobInputFile); // Job output path is where serialized task states are stored Path jobOutputPath = new Path(this.mrJobDir, OUTPUT_DIR_NAME); SequenceFileOutputFormat.setOutputPath(this.job, jobOutputPath); // Serialize source state to a file which will be picked up by the mappers Path jobStateFilePath = new Path(this.mrJobDir, JOB_STATE_FILE_NAME); SerializationUtils.serializeState(this.fs, jobStateFilePath, this.jobContext.getJobState()); job.getConfiguration().set(ConfigurationKeys.JOB_STATE_FILE_PATH_KEY, jobStateFilePath.toString()); if (this.jobProps.containsKey(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)) { // When there is a limit on the number of mappers, each mapper may run // multiple tasks if the total number of tasks is larger than the limit. int maxMappers = Integer.parseInt(this.jobProps.getProperty(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)); if (workUnits.size() > maxMappers) { int numTasksPerMapper = workUnits.size() % maxMappers == 0 ? workUnits.size() / maxMappers : workUnits.size() / maxMappers + 1; NLineInputFormat.setNumLinesPerSplit(this.job, numTasksPerMapper); } } mrJobSetupTimer.stop(); return jobOutputPath; }
@Override public String launch(String transformation, ModelSpec source, ModelSpec target) throws InvalidTransformation, InvalidModelSpec, TException { try { Job job = Job.getInstance(getConfiguration(), ATLMRMaster.DEFAULT_JOB_NAME); Configuration conf = job.getConfiguration(); conf.set("mapreduce.app-submission.cross-platform", "true"); // Configure classes Bundle bundle = Platform.getBundle(CloudAtlServletPlugin.PLUGIN_ID); final File fJar = new File(FileLocator.toFileURL(bundle.getResource("libs/atl-mr.jar")).toURI()); job.setJar(fJar.getAbsolutePath()); job.setMapperClass(ATLMRMapper.class); job.setReducerClass(ATLMRReducer.class); job.setInputFormatClass(NLineInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); job.setNumReduceTasks(1); // Configure MapReduce input/outputs ResourceSet resourceSet = new ResourceSetImpl(); ATLMRUtils.configureRegistry(conf); Builder builder = new RecordBuilder.Builder( URI.createURI(source.getUri()), Arrays.asList(new URI[]{ URI.createURI(source.getMetamodelUris().get(0)) } )); Path recordsPath = new Path("/tmp/" + UUID.randomUUID().toString() + ".rec"); FileSystem recordsFileSystem = FileSystem.get(recordsPath.toUri(), conf); builder.save(recordsFileSystem.create(recordsPath)); FileInputFormat.setInputPaths(job, recordsPath); String timestamp = new SimpleDateFormat("yyyyMMddhhmm").format(new Date()); String outDirName = "atlmr-out-" + timestamp + "-" + UUID.randomUUID(); FileOutputFormat.setOutputPath(job, new Path(job.getWorkingDirectory().suffix(Path.SEPARATOR + outDirName).toUri())); // Configure records per map InputStream inputStream = recordsFileSystem.open(recordsPath); long linesPerMap = (long) Math.ceil((double) ATLMRMaster.countLines(inputStream) / 1); job.getConfiguration().setLong(NLineInputFormat.LINES_PER_MAP, linesPerMap); recordsFileSystem.close(); // Configure ATL related inputs/outputs job.getConfiguration().set(ATLMRMaster.TRANSFORMATION, transformation); job.getConfiguration().set(ATLMRMaster.SOURCE_METAMODEL, source.getMetamodelUris().get(0)); job.getConfiguration().set(ATLMRMaster.TARGET_METAMODEL, target.getMetamodelUris().get(0)); job.getConfiguration().set(ATLMRMaster.INPUT_MODEL, source.getUri()); job.getConfiguration().set(ATLMRMaster.OUTPUT_MODEL, target.getUri()); // Copy libraries to populate the job's classpath IPath path = new org.eclipse.core.runtime.Path("libs"); URL fileURL = FileLocator.find(bundle, path, null); String localJarsDir = new File(FileLocator.toFileURL(fileURL).toURI()).getAbsolutePath(); String hdfsJarsDir = "/temp/hadoop/atlrm/libs"; // TODO: This JobHelper needs to be updated to the new API JobHelper.copyLocalJarsToHdfs(localJarsDir, hdfsJarsDir, conf); JobHelper.addHdfsJarsToDistributedCache(hdfsJarsDir, job); Logger.getGlobal().log(Level.INFO, "Sending Job"); job.submit(); Logger.getGlobal().log(Level.INFO, "Job sent"); return job.getJobID().toString(); } catch (IOException | InterruptedException | ClassNotFoundException | URISyntaxException e) { throw new TException(e); } }
/** * To uniformly spread load across all mappers we randomize fullInputList * with a separate small Mapper & Reducer preprocessing step. This way * each input line ends up on a random position in the output file list. * Each mapper indexes a disjoint consecutive set of files such that each * set has roughly the same size, at least from a probabilistic * perspective. * * For example an input file with the following input list of URLs: * * A * B * C * D * * might be randomized into the following output list of URLs: * * C * A * D * B * * The implementation sorts the list of lines by randomly generated numbers. */ private Job randomizeManyInputFiles(Configuration baseConfig, Path fullInputList, Path outputStep2Dir, int numLinesPerSplit) throws IOException { Job job2 = Job.getInstance(baseConfig); job2.setJarByClass(getClass()); job2.setJobName(getClass().getName() + "/" + Utils.getShortClassName(LineRandomizerMapper.class)); job2.setInputFormatClass(NLineInputFormat.class); NLineInputFormat.addInputPath(job2, fullInputList); NLineInputFormat.setNumLinesPerSplit(job2, numLinesPerSplit); job2.setMapperClass(LineRandomizerMapper.class); job2.setReducerClass(LineRandomizerReducer.class); job2.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job2, outputStep2Dir); job2.setNumReduceTasks(1); job2.setOutputKeyClass(LongWritable.class); job2.setOutputValueClass(Text.class); return job2; }
@Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { boolean useNLineFormat = context.getConfiguration().getBoolean(USE_NLINE_FORMAT, false); if (useNLineFormat) { List<InputSplit> splits = new NLineInputFormat().getSplits(context); // This is a workaround to what appears to be a bug in in how NLineInputFormat // computes its splits. When there are multiple splits in a file, it seems // the start position in the last split is off by one. Note that this corrective // code needs to check the last split for each different file that appears // in the list of splits. for (int index = 2; index < splits.size(); index++) { FileSplit previousSplit = (FileSplit) splits.get(index - 1); FileSplit currSplit = (FileSplit) splits.get(index); // If this index is the last split, or we've moved on to splits from a different // file, then we need to adjust the last split for that file. int lastFileIndex = -1; if (index == splits.size() - 1) { lastFileIndex = index; } else if (!currSplit.getPath().equals(previousSplit.getPath())) { lastFileIndex = index - 1; } if (lastFileIndex >= 2) { FileSplit lastFileSplit = (FileSplit) splits.get(lastFileIndex); FileSplit priorSplit = (FileSplit) splits.get(lastFileIndex - 1); if (lastFileSplit.getPath().equals(priorSplit.getPath())) { if (priorSplit.getPath().equals(lastFileSplit.getPath()) && priorSplit.getStart() + priorSplit.getLength() < lastFileSplit.getStart()) { // Adjust the start of previous split FileSplit replacement = new FileSplit(lastFileSplit.getPath(), priorSplit.getStart() + priorSplit.getLength(), lastFileSplit.getLength() + 1, lastFileSplit.getLocations()); log.info("Replacing split: " + lastFileSplit); log.info(" With split: " + replacement); splits.set(lastFileIndex, replacement); } } } } return splits; } else { return new TextInputFormat().getSplits(context); } }
private Job createHadoopJob(final Configuration conf, final DataFile submitFile, final int requiredMemory, final String jobDescription) throws IOException { final Configuration jobConf = new Configuration(conf); // Set one task per map jobConf.set("mapreduce.input.lineinputformat.linespermap", "" + 1); if (requiredMemory > 0) { // Set the memory required by the reads mapper jobConf.set("mapreduce.map.memory.mb", "" + requiredMemory); int jvmMemory = requiredMemory - 128; if (jvmMemory <= 0) { jvmMemory = requiredMemory; } // Set the memory required by JVM jobConf.set("mapreduce.map.java.opts", "-Xmx" + jvmMemory + "M"); } // Set Job name // Create the job and its name final Job job = Job.getInstance(jobConf, jobDescription); // Set the jar job.setJarByClass(HadoopCompatibleTaskScheduler.class); // Set input path FileInputFormat.addInputPath(job, new Path(submitFile.getSource())); job.setInputFormatClass(NLineInputFormat.class); // Set the Mapper class job.setMapperClass(HadoopCompatibleMapper.class); // Set the output key class job.setOutputKeyClass(NullWritable.class); // Set the output value class job.setOutputValueClass(NullWritable.class); // Set the output format job.setOutputFormatClass(NullOutputFormat.class); // Set the number of reducers job.setNumReduceTasks(0); return job; }
private static void splitFastq(FileStatus fst, String splitDir, int splitlen, JavaSparkContext sc) throws IOException { //TODO: Handle also compressed files List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, new Configuration(), splitlen); JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif); splitRDD.foreach( split -> { FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), split); writeFastqFile(fqreader, new Configuration(), splitDir + "/" + split.getPath().getName()+"_"+split.getStart() + ".fq"); }); }
@Override public void setupJob(Job job) { NLineInputFormat.setNumLinesPerSplit(job, 1); job.setMapperClass(ImputationMapperMinimac3.class); job.setInputFormatClass(NLineInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setNumReduceTasks(0); }
/** * Java wrapper for * {@link NLineInputFormat#addInputPath(org.apache.hadoop.mapreduce.Job, org.apache.hadoop.fs.Path)}. * * @param ctx the JavaScript context * @param thisObj the 'this' object * @param args the function arguments * @param func the function being called */ @JSStaticFunction public static void addInputPath(final Context ctx, final Scriptable thisObj, final Object[] args, final Function func) { FileInputFormatHelper.addInputPath(NLineInputFormat.class, ctx, thisObj, args); }
/** * Java wrapper for {@link NLineInputFormat#addInputPaths(org.apache.hadoop.mapreduce.Job, String)}. * * @param ctx the JavaScript context * @param thisObj the 'this' object * @param args the function arguments * @param func the function being called */ @JSStaticFunction public static void addInputPaths(final Context ctx, final Scriptable thisObj, final Object[] args, final Function func) { FileInputFormatHelper.addInputPaths(NLineInputFormat.class, ctx, thisObj, args); }
/** * Java wrapper for {@link NLineInputFormat#getInputPathFilter(org.apache.hadoop.mapreduce.JobContext)}. * * @param ctx the JavaScript context * @param thisObj the 'this' object * @param args the function arguments * @param func the function being called * * @return class name for the input path filter or undefined */ @JSStaticFunction public static Object getInputPathFilter(final Context ctx, final Scriptable thisObj, final Object[] args, final Function func) { return FileInputFormatHelper.getInputPathFilter(NLineInputFormat.class, ctx, thisObj, args); }
/** * Java wrapper for {@link NLineInputFormat#getInputPaths(org.apache.hadoop.mapreduce.JobContext)}. * * @param ctx the JavaScript context * @param thisObj the 'this' object * @param args the function arguments * @param func the function being called * * @return array of input paths */ @JSStaticFunction public static Object getInputPaths(final Context ctx, final Scriptable thisObj, final Object[] args, final Function func) { return FileInputFormatHelper.getInputPaths(NLineInputFormat.class, ctx, thisObj, args); }
/** * Java wrapper for {@link NLineInputFormat#getMaxSplitSize(org.apache.hadoop.mapreduce.JobContext)}. * * @param ctx the JavaScript context * @param thisObj the 'this' object * @param args the function arguments * @param func the function being called * * @return the max split size */ @JSStaticFunction public static Object getMaxSplitSize(final Context ctx, final Scriptable thisObj, final Object[] args, final Function func) { return FileInputFormatHelper.getMaxSplitSize(NLineInputFormat.class, ctx, thisObj, args); }
/** * Java wrapper for {@link NLineInputFormat#getMinSplitSize(org.apache.hadoop.mapreduce.JobContext)}. * * @param ctx the JavaScript context * @param thisObj the 'this' object * @param args the function arguments * @param func the function being called * * @return the max split size */ @JSStaticFunction public static Object getMinSplitSize(final Context ctx, final Scriptable thisObj, final Object[] args, final Function func) { return FileInputFormatHelper.getMinSplitSize(NLineInputFormat.class, ctx, thisObj, args); }
/** * Wraps {@link NLineInputFormat#setInputPathFilter(org.apache.hadoop.mapreduce.Job, Class)}. * * @param ctx the JavaScript context (unused) * @param thisObj the 'this' object of the caller * @param args the arguments for the call * @param func the function called (unused) */ @JSStaticFunction public static void setInputPathFilter(final Context ctx, final Scriptable thisObj, final Object[] args, final Function func) { FileInputFormatHelper.setInputPathFilter(NLineInputFormat.class, ctx, thisObj, args); }
/** * Java wrapper for * {@link NLineInputFormat#setInputPaths(org.apache.hadoop.mapreduce.Job, org.apache.hadoop.fs.Path...)} and * {@link NLineInputFormat#setInputPaths(org.apache.hadoop.mapreduce.Job, String)}. * * @param ctx the JavaScript context * @param thisObj the 'this' object * @param args the function arguments * @param func the function being called */ @JSStaticFunction public static void setInputPaths(final Context ctx, final Scriptable thisObj, final Object[] args, final Function func) { FileInputFormatHelper.setInputPaths(NLineInputFormat.class, ctx, thisObj, args); }