@Test(timeout=10000) public void testFormat() throws Exception { JobConf job = new JobConf(conf); Reporter reporter = Reporter.NULL; Random random = new Random(); long seed = random.nextLong(); LOG.info("seed = "+seed); random.setSeed(seed); localFs.delete(workDir, true); FileInputFormat.setInputPaths(job, workDir); final int length = 10000; final int numFiles = 10; // create a file with various lengths createFiles(length, numFiles, random); // create a combine split for the files InputFormat<IntWritable, BytesWritable> format = new CombineSequenceFileInputFormat<IntWritable, BytesWritable>(); IntWritable key = new IntWritable(); BytesWritable value = new BytesWritable(); for (int i = 0; i < 3; i++) { int numSplits = random.nextInt(length/(SequenceFile.SYNC_INTERVAL/20))+1; LOG.info("splitting: requesting = " + numSplits); InputSplit[] splits = format.getSplits(job, numSplits); LOG.info("splitting: got = " + splits.length); // we should have a single split as the length is comfortably smaller than // the block size assertEquals("We got more than one splits!", 1, splits.length); InputSplit split = splits[0]; assertEquals("It should be CombineFileSplit", CombineFileSplit.class, split.getClass()); // check each split BitSet bits = new BitSet(length); RecordReader<IntWritable, BytesWritable> reader = format.getRecordReader(split, job, reporter); try { while (reader.next(key, value)) { assertFalse("Key in multiple partitions.", bits.get(key.get())); bits.set(key.get()); } } finally { reader.close(); } assertEquals("Some keys in no partition.", length, bits.cardinality()); } }
public static void setUpMultipleInputs(JobConf job, byte[] inputIndexes, String[] inputs, InputInfo[] inputInfos, int[] brlens, int[] bclens, boolean[] distCacheOnly, boolean setConverter, ConvertTarget target) throws Exception { if(inputs.length!=inputInfos.length) throw new Exception("number of inputs and inputInfos does not match"); //set up names of the input matrices and their inputformat information job.setStrings(INPUT_MATRICIES_DIRS_CONFIG, inputs); MRJobConfiguration.setMapFunctionInputMatrixIndexes(job, inputIndexes); //set up converter infos (converter determined implicitly) if(setConverter) { for(int i=0; i<inputs.length; i++) setInputInfo(job, inputIndexes[i], inputInfos[i], brlens[i], bclens[i], target); } //remove redundant inputs and pure broadcast variables ArrayList<Path> lpaths = new ArrayList<>(); ArrayList<InputInfo> liinfos = new ArrayList<>(); for(int i=0; i<inputs.length; i++) { Path p = new Path(inputs[i]); //check and skip redundant inputs if( lpaths.contains(p) //path already included || distCacheOnly[i] ) //input only required in dist cache { continue; } lpaths.add(p); liinfos.add(inputInfos[i]); } boolean combineInputFormat = false; if( OptimizerUtils.ALLOW_COMBINE_FILE_INPUT_FORMAT ) { //determine total input sizes double totalInputSize = 0; for(int i=0; i<inputs.length; i++) totalInputSize += MapReduceTool.getFilesizeOnHDFS(new Path(inputs[i])); //set max split size (default blocksize) to 2x blocksize if (1) sort buffer large enough, //(2) degree of parallelism not hurt, and only a single input (except broadcasts) //(the sort buffer size is relevant for pass-through of, potentially modified, inputs to the reducers) //(the single input constraint stems from internal runtime assumptions used to relate meta data to inputs) long sizeSortBuff = InfrastructureAnalyzer.getRemoteMaxMemorySortBuffer(); long sizeHDFSBlk = InfrastructureAnalyzer.getHDFSBlockSize(); long newSplitSize = sizeHDFSBlk * 2; //use generic config api for backwards compatibility double spillPercent = Double.parseDouble(job.get(MRConfigurationNames.MR_MAP_SORT_SPILL_PERCENT, "1.0")); int numPMap = OptimizerUtils.getNumMappers(); if( numPMap < totalInputSize/newSplitSize && sizeSortBuff*spillPercent >= newSplitSize && lpaths.size()==1 ) { job.setLong(MRConfigurationNames.MR_INPUT_FILEINPUTFORMAT_SPLIT_MAXSIZE, newSplitSize); combineInputFormat = true; } } //add inputs to jobs input (incl input format configuration) for(int i=0; i<lpaths.size(); i++) { //add input to job inputs (for binaryblock we use CombineSequenceFileInputFormat to reduce task latency) if( combineInputFormat && liinfos.get(i) == InputInfo.BinaryBlockInputInfo ) MultipleInputs.addInputPath(job, lpaths.get(i), CombineSequenceFileInputFormat.class); else MultipleInputs.addInputPath(job, lpaths.get(i), liinfos.get(i).inputFormatClass); } }