/** * Run the test * * @throws IOException on error */ public static void runTests() throws IOException { config.setLong("io.bytes.per.checksum", bytesPerChecksum); JobConf job = new JobConf(config, NNBench.class); job.setJobName("NNBench-" + operation); FileInputFormat.setInputPaths(job, new Path(baseDir, CONTROL_DIR_NAME)); job.setInputFormat(SequenceFileInputFormat.class); // Explicitly set number of max map attempts to 1. job.setMaxMapAttempts(1); // Explicitly turn off speculative execution job.setSpeculativeExecution(false); job.setMapperClass(NNBenchMapper.class); job.setReducerClass(NNBenchReducer.class); FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME)); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks((int) numberOfReduces); JobClient.runJob(job); }
private void runIOTest( Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass, Path outputDir) throws IOException { JobConf job = new JobConf(config, TestDFSIO.class); FileInputFormat.setInputPaths(job, getControlDir(config)); job.setInputFormat(SequenceFileInputFormat.class); job.setMapperClass(mapperClass); job.setReducerClass(AccumulatingReducer.class); FileOutputFormat.setOutputPath(job, outputDir); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(1); JobClient.runJob(job); }
/** * Creates a simple copy job. * * @param indirs List of input directories. * @param outdir Output directory. * @return JobConf initialised for a simple copy job. * @throws Exception If an error occurs creating job configuration. */ static JobConf createCopyJob(List<Path> indirs, Path outdir) throws Exception { Configuration defaults = new Configuration(); JobConf theJob = new JobConf(defaults, TestJobControl.class); theJob.setJobName("DataMoveJob"); FileInputFormat.setInputPaths(theJob, indirs.toArray(new Path[0])); theJob.setMapperClass(DataCopy.class); FileOutputFormat.setOutputPath(theJob, outdir); theJob.setOutputKeyClass(Text.class); theJob.setOutputValueClass(Text.class); theJob.setReducerClass(DataCopy.class); theJob.setNumMapTasks(12); theJob.setNumReduceTasks(4); return theJob; }
public void validateInput(JobConf job) throws IOException { // expecting exactly one path Path [] tableNames = FileInputFormat.getInputPaths(job); if (tableNames == null || tableNames.length > 1) { throw new IOException("expecting one table name"); } // connected to table? if (getHTable() == null) { throw new IOException("could not connect to table '" + tableNames[0].getName() + "'"); } // expecting at least one column String colArg = job.get(COLUMN_LIST); if (colArg == null || colArg.length() == 0) { throw new IOException("expecting at least one column"); } }
private static JobConf getOldAPIJobconf(Configuration configuration, String name, String input, String output) throws Exception { final JobConf jobConf = new JobConf(configuration); final FileSystem fs = FileSystem.get(configuration); if (fs.exists(new Path(output))) { fs.delete(new Path(output), true); } fs.close(); jobConf.setJobName(name); jobConf.setOutputKeyClass(Text.class); jobConf.setOutputValueClass(IntWritable.class); jobConf.setMapperClass(WordCountWithOldAPI.TokenizerMapperWithOldAPI.class); jobConf.setCombinerClass(WordCountWithOldAPI.IntSumReducerWithOldAPI.class); jobConf.setReducerClass(WordCountWithOldAPI.IntSumReducerWithOldAPI.class); jobConf.setInputFormat(SequenceFileInputFormat.class); jobConf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(jobConf, new Path(input)); FileOutputFormat.setOutputPath(jobConf, new Path(output)); return jobConf; }
public void configure(JobConf job) { // Set the mapper and reducers job.setMapperClass(TestMapper.class); // job.setReducerClass(TestReducer.class); // Set the output types of the mapper and reducer // job.setMapOutputKeyClass(IntWritable.class); // job.setMapOutputValueClass(NullWritable.class); // job.setOutputKeyClass(NullWritable.class); // job.setOutputValueClass(NullWritable.class); // Make sure this jar is included job.setJarByClass(TestMapper.class); // Specify the input and output data formats job.setInputFormat(TextInputFormat.class); job.setOutputFormat(NullOutputFormat.class); // Turn off speculative execution job.setMapSpeculativeExecution(false); job.setReduceSpeculativeExecution(false); // Add the job input path FileInputFormat.addInputPath(job, new Path(this.input_filename)); }
@Test public void readEthereumBlockInputFormatGenesisBlock() throws IOException, EthereumBlockReadException, ParseException, InterruptedException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="ethgenesis.bin"; String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile(); Path file = new Path(fileNameBlock); FileInputFormat.setInputPaths(job, file); EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals( 1, inputSplits.length,"Only one split generated for genesis block"); RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNotNull( reader,"Format returned null RecordReader"); BytesWritable key = new BytesWritable(); EthereumBlock block = new EthereumBlock(); assertTrue( reader.next(key,block),"Input Split for genesis block contains at least one block"); assertEquals( 0, block.getEthereumTransactions().size(),"Genesis Block must have 0 transactions"); assertFalse( reader.next(key,block),"No further blocks in genesis Block"); reader.close(); }
public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WeatherData.class); conf.setJobName("temp"); // Note:- As Mapper's output types are not default so we have to define // the // following properties. conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(Text.class); conf.setMapperClass(MaxTemperatureMapper.class); conf.setReducerClass(MaxTemperatureReducer.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); }
@Test public void readEthereumBlockInputFormatBlock3346406() throws IOException, EthereumBlockReadException, ParseException, InterruptedException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="eth3346406.bin"; String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile(); Path file = new Path(fileNameBlock); FileInputFormat.setInputPaths(job, file); EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals( 1, inputSplits.length,"Only one split generated for genesis block"); RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNotNull( reader,"Format returned null RecordReader"); BytesWritable key = new BytesWritable(); EthereumBlock block = new EthereumBlock(); assertTrue( reader.next(key,block),"Input Split for block 3346406 contains at least one block"); assertEquals( 7, block.getEthereumTransactions().size(),"Block 3346406 must have 7 transactions"); assertFalse( reader.next(key,block),"No further blocks in block 3346406"); reader.close(); }
public void merge(Path output, Path[] dbs, boolean normalize, boolean filter) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); LOG.info("LinkDb merge: starting at " + sdf.format(start)); JobConf job = createMergeJob(getConf(), output, normalize, filter); for (int i = 0; i < dbs.length; i++) { FileInputFormat.addInputPath(job, new Path(dbs[i], LinkDb.CURRENT_NAME)); } JobClient.runJob(job); FileSystem fs = FileSystem.get(getConf()); fs.mkdirs(output); fs.rename(FileOutputFormat.getOutputPath(job), new Path(output, LinkDb.CURRENT_NAME)); long end = System.currentTimeMillis(); LOG.info("LinkDb merge: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); }
@Test public void readEthereumBlockInputFormatBlock1346406GzipCompressed() throws IOException, EthereumBlockReadException, ParseException, InterruptedException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="eth1346406.bin.gz"; String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile(); Path file = new Path(fileNameBlock); FileInputFormat.setInputPaths(job, file); EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals( 1, inputSplits.length,"Only one split generated for genesis block"); RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNotNull( reader,"Format returned null RecordReader"); BytesWritable key = new BytesWritable(); EthereumBlock block = new EthereumBlock(); assertTrue( reader.next(key,block),"Input Split for block 1346406 contains at least one block"); assertEquals( 6, block.getEthereumTransactions().size(),"Block 1346406 must have 6 transactions"); assertFalse( reader.next(key,block),"No further blocks in block 1346406"); reader.close(); }
private JobConf createJobConf() throws IOException { JobConf conf = HdpBootstrap.hadoopConfig(); conf.setInputFormat(EsInputFormat.class); conf.setOutputFormat(PrintStreamOutputFormat.class); conf.setOutputKeyClass(Text.class); boolean type = random.nextBoolean(); Class<?> mapType = (type ? MapWritable.class : LinkedMapWritable.class); conf.setOutputValueClass(mapType); HadoopCfgUtils.setGenericOptions(conf); conf.set(ConfigurationOptions.ES_QUERY, query); conf.setNumReduceTasks(0); conf.set(ConfigurationOptions.ES_READ_METADATA, String.valueOf(readMetadata)); conf.set(ConfigurationOptions.ES_READ_METADATA_VERSION, String.valueOf(true)); conf.set(ConfigurationOptions.ES_OUTPUT_JSON, String.valueOf(readAsJson)); QueryTestParams.provisionQueries(conf); FileInputFormat.setInputPaths(conf, new Path(TestUtils.sampleArtistsDat())); HdpBootstrap.addProperties(conf, TestSettings.TESTING_PROPS, false); return conf; }
@Test public void readEthereumBlockInputFormatBlock1346406() throws IOException, EthereumBlockReadException, ParseException, InterruptedException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="eth1346406.bin"; String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile(); Path file = new Path(fileNameBlock); FileInputFormat.setInputPaths(job, file); EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals( 1, inputSplits.length,"Only one split generated for genesis block"); RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNotNull( reader,"Format returned null RecordReader"); BytesWritable key = new BytesWritable(); EthereumBlock block = new EthereumBlock(); assertTrue( reader.next(key,block),"Input Split for block 1346406 contains at least one block"); assertEquals( 6, block.getEthereumTransactions().size(),"Block 1346406 must have 6 transactions"); assertFalse( reader.next(key,block),"No further blocks in block 1346406"); reader.close(); }
/** * setInputPaths add all the paths in the provided list to the Job conf object * as input paths for the job. * * @param job * @param pathsToAdd */ public static void setInputPaths(JobConf job, List<Path> pathsToAdd) { Path[] addedPaths = FileInputFormat.getInputPaths(job); if (addedPaths == null) { addedPaths = new Path[0]; } Path[] combined = new Path[addedPaths.length + pathsToAdd.size()]; System.arraycopy(addedPaths, 0, combined, 0, addedPaths.length); int i = 0; for(Path p: pathsToAdd) { combined[addedPaths.length + (i++)] = p; } FileInputFormat.setInputPaths(job, combined); }
@Test public void readExcelInputFormatExcel2003Empty() throws IOException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="excel2003empty.xls"; String fileNameSpreadSheet=classLoader.getResource(fileName).getFile(); Path file = new Path(fileNameSpreadSheet); FileInputFormat.setInputPaths(job, file); // set locale to the one of the test data job.set("hadoopoffice.locale.bcp47","de"); ExcelFileInputFormat format = new ExcelFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals(1, inputSplits.length,"Only one split generated for Excel file"); RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNotNull(reader,"Format returned null RecordReader"); Text spreadSheetKey = new Text(); ArrayWritable spreadSheetValue = new ArrayWritable(SpreadSheetCellDAO.class); assertTrue( reader.next(spreadSheetKey,spreadSheetValue),"Input Split for Excel file contains row 1"); assertEquals(0,spreadSheetValue.get().length,"Input Split for Excel file contain row 1 and is empty"); assertFalse(reader.next(spreadSheetKey,spreadSheetValue),"Input Split for Excel file contains no further row"); }
@Test public void readEthereumBlockInputFormatBlock1346406Bzip2Compressed() throws IOException, EthereumBlockReadException, ParseException, InterruptedException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="eth1346406.bin.bz2"; String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile(); Path file = new Path(fileNameBlock); FileInputFormat.setInputPaths(job, file); EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals( 1, inputSplits.length,"Only one split generated for genesis block"); RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNotNull( reader,"Format returned null RecordReader"); BytesWritable key = new BytesWritable(); EthereumBlock block = new EthereumBlock(); assertTrue( reader.next(key,block),"Input Split for block 1346406 contains at least one block"); assertEquals( 6, block.getEthereumTransactions().size(),"Block 1346406 must have 6 transactions"); assertFalse( reader.next(key,block),"No further blocks in block 1346406"); reader.close(); }
@Test public void readExcelInputFormatExcel2003SingleSheetEncryptedNegative() throws IOException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="excel2003encrypt.xls"; String fileNameSpreadSheet=classLoader.getResource(fileName).getFile(); Path file = new Path(fileNameSpreadSheet); FileInputFormat.setInputPaths(job, file); // set locale to the one of the test data job.set("hadoopoffice.read.locale.bcp47","de"); // for decryption simply set the password job.set("hadoopoffice.read.security.crypt.password","test2"); ExcelFileInputFormat format = new ExcelFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals( 1, inputSplits.length, "Only one split generated for Excel file"); RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNull(reader, "Null record reader implies invalid password"); }
@Test public void readEthereumBlockInputFormatBlock1() throws IOException, EthereumBlockReadException, ParseException, InterruptedException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="eth1.bin"; String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile(); Path file = new Path(fileNameBlock); FileInputFormat.setInputPaths(job, file); EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals( 1, inputSplits.length,"Only one split generated for genesis block"); RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNotNull( reader,"Format returned null RecordReader"); BytesWritable key = new BytesWritable(); EthereumBlock block = new EthereumBlock(); assertTrue( reader.next(key,block),"Input Split for block 1 contains at least one block"); assertEquals( 0, block.getEthereumTransactions().size(),"Block 1 must have 0 transactions"); assertFalse( reader.next(key,block),"No further blocks in block 1"); reader.close(); }
@Test public void readExcelInputFormatExcel2003SingleSheetEncryptedNegativeLowFootprint() throws IOException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="excel2003encrypt.xls"; String fileNameSpreadSheet=classLoader.getResource(fileName).getFile(); Path file = new Path(fileNameSpreadSheet); FileInputFormat.setInputPaths(job, file); // set locale to the one of the test data job.set("hadoopoffice.read.locale.bcp47","de"); // low footprint job.set("hadoopoffice.read.lowFootprint", "true"); // for decryption simply set the password job.set("hadoopoffice.read.security.crypt.password","test2"); ExcelFileInputFormat format = new ExcelFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals(1,inputSplits.length,"Only one split generated for Excel file"); RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNull(reader,"Null record reader implies invalid password"); }
public SparkQuery(ConfUtils config) { this.cfg = config; SparkConf sconf = new SparkConf().setMaster(cfg.getSPARK_MASTER()) .setAppName(this.getClass().getName()) .setSparkHome(cfg.getSPARK_HOME()) .setJars(new String[] { cfg.getSPARK_APPLICATION_JAR() }) .set("spark.hadoop.cloneConf", "false") .set("spark.executor.memory", cfg.getSPARK_EXECUTOR_MEMORY()) .set("spark.driver.memory", cfg.getSPARK_DRIVER_MEMORY()) .set("spark.task.cpus", cfg.getSPARK_TASK_CPUS()); ctx = new JavaSparkContext(sconf); ctx.hadoopConfiguration().setBoolean( FileInputFormat.INPUT_DIR_RECURSIVE, true); ctx.hadoopConfiguration().set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); queryConf = new SparkQueryConf(ctx.hadoopConfiguration()); }
private void splitInput(final Properties properties, final StorageDescriptor sd, final Partition partition) throws ReflectiveOperationException, IOException { final JobConf job = new JobConf(); for (final Object obj : properties.keySet()) { job.set((String) obj, (String) properties.get(obj)); } for (final Map.Entry<String, String> entry : hiveReadEntry.hiveConfigOverride.entrySet()) { job.set(entry.getKey(), entry.getValue()); } InputFormat<?, ?> format = (InputFormat<?, ?>) Class.forName(sd.getInputFormat()).getConstructor().newInstance(); job.setInputFormat(format.getClass()); final Path path = new Path(sd.getLocation()); final FileSystem fs = path.getFileSystem(job); if (fs.exists(path)) { FileInputFormat.addInputPath(job, path); format = job.getInputFormat(); for (final InputSplit split : format.getSplits(job, 1)) { inputSplits.add(split); partitionMap.put(split, partition); } } final String numRowsProp = properties.getProperty("numRows"); logger.trace("HiveScan num rows property = {}", numRowsProp); if (numRowsProp != null) { final long numRows = Long.valueOf(numRowsProp); // starting from hive-0.13, when no statistics are available, this property is set to -1 // it's important to note that the value returned by hive may not be up to date if (numRows > 0) { rowCount += numRows; } } }
/** * Provide the required splits from the specified configuration. By default this * method makes query (function-execution) on the region with `_meta' suffix * so need to be make sure that the region-name is passed accordingly. * * @param conf the job configuration * @param numSplits the required number of splits * @return the required splits to read/write the data * @throws IOException if table does not exist. */ public static InputSplit[] getSplits(final JobConf conf, final int numSplits) throws IOException { final Path[] tablePaths = FileInputFormat.getInputPaths(conf); /** initialize cache if not done yet.. **/ final AmpoolClient aClient = MonarchUtils.getConnectionFromConf(conf); String tableName = conf.get(MonarchUtils.REGION); boolean isFTable = MonarchUtils.isFTable(conf); Table table = null; if (isFTable) { table = aClient.getFTable(tableName); } else { table = aClient.getMTable(tableName); } if (table == null) { throw new IOException("Table " + tableName + "does not exist."); } int totalnumberOfSplits = table.getTableDescriptor().getTotalNumOfSplits(); Map<Integer, Set<ServerLocation>> bucketMap = new HashMap<>(numSplits); final AtomicLong start = new AtomicLong(0L); MonarchSplit[] splits = MTableUtils .getSplitsWithSize(tableName, numSplits, totalnumberOfSplits, bucketMap) .stream().map(e -> { MonarchSplit ms = convertToSplit(tablePaths, start.get(), e, bucketMap); start.addAndGet(e.getSize()); return ms; }).toArray(MonarchSplit[]::new); logger.info("numSplits= {}; MonarchSplits= {}", numSplits, Arrays.toString(splits)); return splits; }
@SuppressWarnings("unchecked") public static InputSplit[] getSplits(final JobConf conf, final int numSplits, int dummy) { final Path[] tablePaths = FileInputFormat.getInputPaths(conf); long splitSize = NumberUtils.toLong(conf.get(MonarchUtils.SPLIT_SIZE_KEY), DEFAULT_SPLIT_SIZE); final String regionName = conf.get(MonarchUtils.REGION) + MonarchUtils.META_TABLE_SFX; MPredicateHolder ph = new MPredicateHolder(-1, BasicTypes.STRING, CompareOp.REGEX, ".*"+MonarchUtils.KEY_BLOCKS_SFX); MonarchGetAllFunction func = new MonarchGetAllFunction(); final AmpoolClient aClient = MonarchUtils.getConnectionFromConf(conf); Execution exec = FunctionService.onServer(((GemFireCacheImpl)(aClient.getGeodeCache())).getDefaultPool()) .withArgs(new Object[]{regionName, ph}); ResultCollector rc = exec.execute(func); /** TODO: refactor below code.. change below required in case the function is changed to return in some way **/ List<String[]> output = (List<String[]>)((List) rc.getResult()).get(0); if (output.isEmpty()) { logger.error("No entries found in region= {} with key_prefix= %-{}", regionName, MonarchUtils.KEY_BLOCKS_SFX); return new MonarchSplit[0]; } List<MonarchSplit> list = new ArrayList<>(output.size()); String prefix; long numberOfBlocks; for (final String[] arr : output) { prefix = arr[0].substring(0, arr[0].length() - 6); numberOfBlocks = Long.valueOf(arr[1]); if (numberOfBlocks > splitSize) { Collections.addAll(list, MonarchSplit.getInputSplits(tablePaths[0], prefix, splitSize, numberOfBlocks)); } else { list.add(new MonarchSplit(tablePaths[0], 0, numberOfBlocks, null, prefix)); } } return list.toArray(new MonarchSplit[list.size()]); }
public int run(String[] argv) throws IOException { if (argv.length < 2) { System.out.println("ExternalMapReduce <input> <output>"); return -1; } Path outDir = new Path(argv[1]); Path input = new Path(argv[0]); JobConf testConf = new JobConf(getConf(), ExternalMapReduce.class); //try to load a class from libjar try { testConf.getClassByName("testjar.ClassWordCount"); } catch (ClassNotFoundException e) { System.out.println("Could not find class from libjar"); return -1; } testConf.setJobName("external job"); FileInputFormat.setInputPaths(testConf, input); FileOutputFormat.setOutputPath(testConf, outDir); testConf.setMapperClass(MapClass.class); testConf.setReducerClass(Reduce.class); testConf.setNumReduceTasks(1); JobClient.runJob(testConf); return 0; }
public void configure(String keySpec, int expect) throws Exception { Path testdir = new Path(TEST_DIR.getAbsolutePath()); Path inDir = new Path(testdir, "in"); Path outDir = new Path(testdir, "out"); FileSystem fs = getFileSystem(); fs.delete(testdir, true); conf.setInputFormat(TextInputFormat.class); FileInputFormat.setInputPaths(conf, inDir); FileOutputFormat.setOutputPath(conf, outDir); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(LongWritable.class); conf.setNumMapTasks(1); conf.setNumReduceTasks(1); conf.setOutputFormat(TextOutputFormat.class); conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class); conf.setKeyFieldComparatorOptions(keySpec); conf.setKeyFieldPartitionerOptions("-k1.1,1.1"); conf.set(JobContext.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " "); conf.setMapperClass(InverseMapper.class); conf.setReducerClass(IdentityReducer.class); if (!fs.mkdirs(testdir)) { throw new IOException("Mkdirs failed to create " + testdir.toString()); } if (!fs.mkdirs(inDir)) { throw new IOException("Mkdirs failed to create " + inDir.toString()); } // set up input data in 2 files Path inFile = new Path(inDir, "part0"); FileOutputStream fos = new FileOutputStream(inFile.toString()); fos.write((line1 + "\n").getBytes()); fos.write((line2 + "\n").getBytes()); fos.close(); JobClient jc = new JobClient(conf); RunningJob r_job = jc.submitJob(conf); while (!r_job.isComplete()) { Thread.sleep(1000); } if (!r_job.isSuccessful()) { fail("Oops! The job broke due to an unexpected error"); } Path[] outputFiles = FileUtil.stat2Paths( getFileSystem().listStatus(outDir, new Utils.OutputFileUtils.OutputFilesFilter())); if (outputFiles.length > 0) { InputStream is = getFileSystem().open(outputFiles[0]); BufferedReader reader = new BufferedReader(new InputStreamReader(is)); String line = reader.readLine(); //make sure we get what we expect as the first line, and also //that we have two lines if (expect == 1) { assertTrue(line.startsWith(line1)); } else if (expect == 2) { assertTrue(line.startsWith(line2)); } line = reader.readLine(); if (expect == 1) { assertTrue(line.startsWith(line2)); } else if (expect == 2) { assertTrue(line.startsWith(line1)); } reader.close(); } }
static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps, int numReds) throws IOException, InterruptedException { FileSystem fs = FileSystem.get(conf); if (fs.exists(outDir)) { fs.delete(outDir, true); } if (!fs.exists(inDir)) { fs.mkdirs(inDir); } String input = "The quick brown fox\n" + "has many silly\n" + "red fox sox\n"; for (int i = 0; i < numMaps; ++i) { DataOutputStream file = fs.create(new Path(inDir, "part-" + i)); file.writeBytes(input); file.close(); } DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs); conf.setOutputCommitter(CustomOutputCommitter.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(conf, inDir); FileOutputFormat.setOutputPath(conf, outDir); conf.setNumMapTasks(numMaps); conf.setNumReduceTasks(numReds); JobClient jobClient = new JobClient(conf); RunningJob job = jobClient.submitJob(conf); return jobClient.monitorAndPrintJob(conf, job); }
@Test public void testCombinerShouldUpdateTheReporter() throws Exception { JobConf conf = new JobConf(mrCluster.getConfig()); int numMaps = 5; int numReds = 2; Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(), "testCombinerShouldUpdateTheReporter-in"); Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(), "testCombinerShouldUpdateTheReporter-out"); createInputOutPutFolder(in, out, numMaps); conf.setJobName("test-job-with-combiner"); conf.setMapperClass(IdentityMapper.class); conf.setCombinerClass(MyCombinerToCheckReporter.class); //conf.setJarByClass(MyCombinerToCheckReporter.class); conf.setReducerClass(IdentityReducer.class); DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf); conf.setOutputCommitter(CustomOutputCommitter.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(conf, in); FileOutputFormat.setOutputPath(conf, out); conf.setNumMapTasks(numMaps); conf.setNumReduceTasks(numReds); runJob(conf); }
protected CombineFileRecordReaderWrapper(FileInputFormat<K,V> inputFormat, CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx) throws IOException { FileSplit fileSplit = new FileSplit(split.getPath(idx), split.getOffset(idx), split.getLength(idx), split.getLocations()); delegate = inputFormat.getRecordReader(fileSplit, (JobConf)conf, reporter); }
private static boolean addInputPath(StorageDescriptor sd, JobConf job) throws IOException { final Path path = new Path(sd.getLocation()); final FileSystem fs = FileSystemWrapper.get(path, job); if (fs.exists(path)) { FileInputFormat.addInputPath(job, path); return true; } return false; }
/** * Use this before submitting a TableMap job. It will * appropriately set up the JobConf. * * @param table The table name to read from. * @param columns The columns to scan. * @param mapper The mapper class to use. * @param outputKeyClass The class of the output key. * @param outputValueClass The class of the output value. * @param job The current job configuration to adjust. * @param addDependencyJars upload HBase jars and jars for any of the configured * job classes via the distributed cache (tmpjars). */ public static void initTableMapJob(String table, String columns, Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, JobConf job, boolean addDependencyJars, Class<? extends InputFormat> inputFormat) { job.setInputFormat(inputFormat); job.setMapOutputValueClass(outputValueClass); job.setMapOutputKeyClass(outputKeyClass); job.setMapperClass(mapper); job.setStrings("io.serializations", job.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName()); FileInputFormat.addInputPaths(job, table); job.set(TableInputFormat.COLUMN_LIST, columns); if (addDependencyJars) { try { addDependencyJars(job); } catch (IOException e) { e.printStackTrace(); } } try { initCredentials(job); } catch (IOException ioe) { // just spit out the stack trace? really? ioe.printStackTrace(); } }
@Override protected void initialize(JobConf job) throws IOException { Path[] tableNames = FileInputFormat.getInputPaths(job); String colArg = job.get(COLUMN_LIST); String[] colNames = colArg.split(" "); byte [][] m_cols = new byte[colNames.length][]; for (int i = 0; i < m_cols.length; i++) { m_cols[i] = Bytes.toBytes(colNames[i]); } setInputColumns(m_cols); Connection connection = ConnectionFactory.createConnection(job); initializeTable(connection, TableName.valueOf(tableNames[0].getName())); }
public static void main(String[] args) throws Exception { JobConf conf = new JobConf(CountFacts.class); conf.setJobName("count facts"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); //////////////////////////////////////////////////////////////////////////// // input as pails PailSpec spec = PailFormatFactory.getDefaultCopy().setStructure(new DataPailStructure()); PailFormat format = PailFormatFactory.create(spec); String masterPath = FileUtils.prepareMasterFactsPath(false,false); // conf.setInputFormat(format.getInputFormatClass()); FileInputFormat.setInputPaths(conf, new Path(masterPath)); //////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////// // output as text conf.setOutputFormat(TextOutputFormat.class); FileSystem fs = FileUtils.getFs(false); FileOutputFormat.setOutputPath(conf, new Path( FileUtils.getTmpPath(fs, "fact-count", true, false))); //////////////////////////////////////////////////////////////////////////// JobClient.runJob(conf); }
@Test public void readEthereumBlockInputFormatBlock403419() throws IOException, EthereumBlockReadException, ParseException, InterruptedException { JobConf job = new JobConf(defaultConf); ClassLoader classLoader = getClass().getClassLoader(); String fileName="block403419.bin"; String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile(); Path file = new Path(fileNameBlock); FileInputFormat.setInputPaths(job, file); EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat(); format.configure(job); InputSplit[] inputSplits = format.getSplits(job,1); assertEquals( 1, inputSplits.length,"Only one split generated for block 403419"); RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter); assertNotNull( reader,"Format returned null RecordReader"); BytesWritable key = new BytesWritable(); EthereumBlock block = new EthereumBlock(); assertTrue( reader.next(key,block),"Input Split for block 403419 contains at least one block"); assertEquals( 2, block.getEthereumTransactions().size(),"Block 403419 must have 2 transactions"); EthereumBlockHeader ethereumBlockHeader = block.getEthereumBlockHeader(); assertEquals( "f8b483dba2c3b7176a3da549ad41a48bb3121069", bytesToHex(ethereumBlockHeader.getCoinBase()).toLowerCase(), "Block 403419 was mined by f8b483dba2c3b7176a3da549ad41a48bb3121069" ); assertEquals( "08741fa532c05804d9c1086a311e47cc024bbc43980f561041ad1fbb3c223322", bytesToHex(ethereumBlockHeader.getParentHash()).toLowerCase(), "The parent of block 403419 has hash 08741fa532c05804d9c1086a311e47cc024bbc43980f561041ad1fbb3c223322" ); assertFalse( reader.next(key,block),"No further lock 403419 in genesis Block"); reader.close(); }