@SuppressWarnings("unchecked") public void configure(JobConf jobConf) { int numberOfThreads = jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10); if (LOG.isDebugEnabled()) { LOG.debug("Configuring jobConf " + jobConf.getJobName() + " to use " + numberOfThreads + " threads"); } this.job = jobConf; //increment processed counter only if skipping feature is enabled this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && SkipBadRecords.getAutoIncrMapperProcCount(job); this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf); // Creating a threadpool of the configured size to execute the Mapper // map method in parallel. executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 0L, TimeUnit.MILLISECONDS, new BlockingArrayQueue (numberOfThreads)); }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: MultithreadedZipContentLoader configFile inputDir threadCount"); System.exit(2); } Job job = Job.getInstance(conf); job.setJarByClass(MultithreadedZipContentLoader.class); job.setInputFormatClass(ZipContentInputFormat.class); job.setMapperClass(MultithreadedMapper.class); MultithreadedMapper.setMapperClass(job, ZipContentMapper.class); MultithreadedMapper.setNumberOfThreads(job, Integer.parseInt(args[2])); job.setMapOutputKeyClass(DocumentURI.class); job.setMapOutputValueClass(Text.class); job.setOutputFormatClass(ContentOutputFormat.class); ZipContentInputFormat.setInputPaths(job, new Path(otherArgs[1])); conf = job.getConfiguration(); conf.addResource(otherArgs[0]); System.exit(job.waitForCompletion(true) ? 0 : 1); }
@Override public MapperModeOp getOptions() { return new MapperModeOp() { @Option( name = "--threads", aliases = "-j", required = false, usage = "Use NUMBER threads per mapper. defaults n processors.", metaVar = "NUMBER") private int concurrency = Runtime.getRuntime().availableProcessors(); @Override public void prepareJobMapper(Job job, Class<LocalFeaturesMapper> mapperClass) { if (concurrency <= 0) concurrency = Runtime.getRuntime().availableProcessors(); job.setMapperClass(MultithreadedMapper.class); MultithreadedMapper.setNumberOfThreads(job, concurrency); MultithreadedMapper.setMapperClass(job, mapperClass); System.out.println("Using multithreaded mapper"); } }; }
@Override public int run(String[] args) throws Exception { final HadoopDownloaderOptions options = new HadoopDownloaderOptions(args); options.prepare(true); final Job job = new Job(getConf()); job.setJarByClass(HadoopDownloader.class); job.setJobName("Hadoop Downloader Utility"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); if (options.getNumberOfThreads() <= 1) { job.setMapperClass(DownloadMapper.class); } else { job.setMapperClass(MultithreadedMapper.class); MultithreadedMapper.setMapperClass(job, DownloadMapper.class); MultithreadedMapper.setNumberOfThreads(job, options.getNumberOfThreads()); } job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setNumReduceTasks(options.getNumberOfReducers()); job.getConfiguration().setStrings(ARGS_KEY, args); FileInputFormat.setInputPaths(job, options.getInputPaths()); SequenceFileOutputFormat.setOutputPath(job, options.getOutputPath()); SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); job.waitForCompletion(true); return 0; }
@Override public void prepareJobMapper(Job job, Class<SimpleTwitterPreprocessingMapper> mapperClass) { if (concurrency <= 0) concurrency = Runtime.getRuntime().availableProcessors(); job.setMapperClass(MultithreadedMapper.class); MultithreadedMapper.setNumberOfThreads(job, concurrency); MultithreadedMapper.setMapperClass(job, mapperClass); System.out.println("NThreads = " + MultithreadedMapper.getNumberOfThreads(job)); }
@Override public void prepareJobMapper(Job job, Class<ClusterQuantiserMapper> mapperClass, AbstractClusterQuantiserOptions opts) { int concurrency = opts.getConcurrency(); if (opts.getConcurrency() <= 0) concurrency = Runtime.getRuntime().availableProcessors(); job.setMapperClass(MultithreadedMapper.class); MultithreadedMapper.setNumberOfThreads(job, concurrency); MultithreadedMapper.setMapperClass(job, mapperClass); System.out.println("NThreads = " + MultithreadedMapper.getNumberOfThreads(job)); }
public int run(String[] args) throws Exception { getConf().set(CSVLineRecordReader.FORMAT_DELIMITER, "\""); getConf().set(CSVLineRecordReader.FORMAT_SEPARATOR, ";"); getConf().setInt(CSVNLineInputFormat.LINES_PER_MAP, 40000); getConf().setBoolean(CSVLineRecordReader.IS_ZIPFILE, false); Job csvJob = new Job(getConf(), "csv_test_job"); csvJob.setJarByClass(CSVTestRunner.class); csvJob.setNumReduceTasks(0); MultithreadedMapper.setMapperClass(csvJob, TestMapper.class); MultithreadedMapper.setNumberOfThreads(csvJob, 8); MultithreadedMapper.setMapperClass(csvJob, TestMapper.class); MultithreadedMapper.setNumberOfThreads(csvJob, 1); csvJob.setMapperClass(MultithreadedMapper.class); // To run without multithread, use the following line instead of the 3 // above // csvJob.setMapperClass(TestMapper.class); csvJob.setInputFormatClass(CSVNLineInputFormat.class); csvJob.setOutputFormatClass(NullOutputFormat.class); FileInputFormat.setInputPaths(csvJob, new Path(INPUT_PATH_PREFIX)); logger.info("Process will begin"); csvJob.waitForCompletion(true); logger.info("Process ended"); return 0; }
private void run(boolean ioEx, boolean rtEx) throws Exception { Path inDir = new Path("testing/mt/input"); Path outDir = new Path("testing/mt/output"); // Hack for local FS that does not have the concept of a 'mounting point' if (isLocalFS()) { String localPathRoot = System.getProperty("test.build.data", "/tmp") .replace(' ', '+'); inDir = new Path(localPathRoot, inDir); outDir = new Path(localPathRoot, outDir); } JobConf conf = createJobConf(); FileSystem fs = FileSystem.get(conf); fs.delete(outDir, true); if (!fs.mkdirs(inDir)) { throw new IOException("Mkdirs failed to create " + inDir.toString()); } { DataOutputStream file = fs.create(new Path(inDir, "part-0")); file.writeBytes("a\nb\n\nc\nd\ne"); file.close(); } conf.setJobName("mt"); conf.setInputFormat(TextInputFormat.class); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class); conf.setMapOutputKeyClass(LongWritable.class); conf.setMapOutputValueClass(Text.class); conf.setOutputFormat(TextOutputFormat.class); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class); conf.setMapperClass(IDMap.class); conf.setReducerClass(IDReduce.class); FileInputFormat.setInputPaths(conf, inDir); FileOutputFormat.setOutputPath(conf, outDir); conf.setMapRunnerClass(MultithreadedMapRunner.class); conf.setInt(MultithreadedMapper.NUM_THREADS, 2); if (ioEx) { conf.setBoolean("multithreaded.ioException", true); } if (rtEx) { conf.setBoolean("multithreaded.runtimeException", true); } JobClient jc = new JobClient(conf); RunningJob job =jc.submitJob(conf); while (!job.isComplete()) { Thread.sleep(100); } if (job.isSuccessful()) { assertFalse(ioEx || rtEx); } else { assertTrue(ioEx || rtEx); } }