private void addCreatedSplit(List<InputSplit> splitList, Collection<String> locations, ArrayList<OneBlockInfo> validBlocks) { // create an input split Path[] fl = new Path[validBlocks.size()]; long[] offset = new long[validBlocks.size()]; long[] length = new long[validBlocks.size()]; for (int i = 0; i < validBlocks.size(); i++) { fl[i] = validBlocks.get(i).onepath; offset[i] = validBlocks.get(i).offset; length[i] = validBlocks.get(i).length; } // add this split to the list that is returned CombineFileSplit thissplit = new CombineFileSplit(fl, offset, length, locations.toArray(new String[0])); splitList.add(thissplit); }
/** * 初始化读取资源以及相关的参数也可以放到initialize()方法中去执行 * @param inputSplit * @param context * @throws IOException */ public XMLRecordReader(InputSplit inputSplit, Configuration context) throws IOException { /** * 获取开传入的开始和结束标签 */ startTag = context.get(START_TAG_KEY).getBytes("UTF-8"); endTag = context.get(END_TAG_KEY).getBytes("UTF-8"); FileSplit fileSplit = (FileSplit) inputSplit; /** * 获取分片的开始位置和结束的位置 */ start = fileSplit.getStart(); end = start + fileSplit.getLength(); Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(context); /** * 根据分片打开一个HDFS的文件输入流 */ fsin = fs.open(fileSplit.getPath()); /** * 定位到分片开始的位置 */ fsin.seek(start); }
private List<InputSplit> createSplits(JobContext jobContext, List<DynamicInputChunk> chunks) throws IOException { int numMaps = getNumMapTasks(jobContext.getConfiguration()); final int nSplits = Math.min(numMaps, chunks.size()); List<InputSplit> splits = new ArrayList<>(nSplits); for (int i = 0; i < nSplits; ++i) { TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i); chunks.get(i).assignTo(taskId); splits.add(new FileSplit(chunks.get(i).getPath(), 0, // Setting non-zero length for FileSplit size, to avoid a possible // future when 0-sized file-splits are considered "empty" and skipped // over. getMinRecordsPerChunk(jobContext.getConfiguration()), null)); } ConfigurationUtil.publish(jobContext.getConfiguration(), CONF_LABEL_NUM_SPLITS, splits.size()); return splits; }
@Override public List<InputSplit> getSplits(JobContext context) throws IOException { List<InputSplit> allSplits = new ArrayList<InputSplit>(); Scan originalScan = getScan(); Scan[] scans = rowKeyDistributor.getDistributedScans(originalScan); for (Scan scan : scans) { // Internally super.getSplits(...) uses scan object stored in private variable, // to re-use the code of super class we switch scan object with scans we setScan(scan); List<InputSplit> splits = super.getSplits(context); allSplits.addAll(splits); } // Setting original scan back setScan(originalScan); return allSplits; }
@Override public List<InputSplit> getSplits(JobContext job) throws IOException { int numMappers = ConfigurationHelper.getJobNumMaps(job); String boundaryQuery = getDBConf().getInputBoundingQuery(); // Resort to base class if // dataslice aligned import is not requested // Not table extract // No boundary query // Only one mapper. if (!getConf().getBoolean( NetezzaManager.NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, false) || getDBConf().getInputTableName() == null || numMappers == 1 || (boundaryQuery != null && !boundaryQuery.isEmpty())) { return super.getSplits(job); } // Generate a splitter that splits only on datasliceid. It is an // integer split. We will just use the lower bounding query to specify // the restriction of dataslice and set the upper bound to a constant NetezzaDBDataSliceSplitter splitter = new NetezzaDBDataSliceSplitter(); return splitter.split(getConf(), null, null); }
@Test public void testSplitLocationInfo() throws Exception { Configuration conf = getConfiguration(); conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, "test:///a1/a2"); Job job = Job.getInstance(conf); TextInputFormat fileInputFormat = new TextInputFormat(); List<InputSplit> splits = fileInputFormat.getSplits(job); String[] locations = splits.get(0).getLocations(); Assert.assertEquals(2, locations.length); SplitLocationInfo[] locationInfo = splits.get(0).getLocationInfo(); Assert.assertEquals(2, locationInfo.length); SplitLocationInfo localhostInfo = locations[0].equals("localhost") ? locationInfo[0] : locationInfo[1]; SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ? locationInfo[0] : locationInfo[1]; Assert.assertTrue(localhostInfo.isOnDisk()); Assert.assertTrue(localhostInfo.isInMemory()); Assert.assertTrue(otherhostInfo.isOnDisk()); Assert.assertFalse(otherhostInfo.isInMemory()); }
@Override public List<InputSplit> getSplits(JobContext job) throws IOException { // Set the max split size based on the number of map tasks we want. long numTasks = getNumMapTasks(job); long numFileBytes = getJobSize(job); long maxSplitSize = numFileBytes / numTasks; setMaxSplitSize(maxSplitSize); LOG.debug("Target numMapTasks=" + numTasks); LOG.debug("Total input bytes=" + numFileBytes); LOG.debug("maxSplitSize=" + maxSplitSize); List<InputSplit> splits = super.getSplits(job); if (LOG.isDebugEnabled()) { LOG.debug("Generated splits:"); for (InputSplit split : splits) { LOG.debug(" " + split); } } return splits; }
@Override public List<InputSplit> split(Configuration conf, ResultSet results, String colName) { // For each map we will add a split such that // the datasliceid % the mapper index equals the mapper index. // The query will only be on the lower bound where clause. // For upper bounds, we will specify a constant clause which always // evaluates to true int numSplits = ConfigurationHelper.getConfNumMaps(conf); List<InputSplit> splitList = new ArrayList<InputSplit>(numSplits); for (int i = 0; i < numSplits; ++i) { StringBuilder lowerBoundClause = new StringBuilder(128); lowerBoundClause.append(" datasliceid % ").append(numSplits) .append(" = ").append(i); splitList.add(new DataDrivenDBInputSplit(lowerBoundClause.toString(), "1 = 1")); } return splitList; }
/** Partitions the summation into parts and then return them as splits */ @Override public List<InputSplit> getSplits(JobContext context) { //read sigma from conf final Configuration conf = context.getConfiguration(); final Summation sigma = SummationWritable.read(DistSum.class, conf); final int nParts = conf.getInt(N_PARTS, 0); //create splits final List<InputSplit> splits = new ArrayList<InputSplit>(nParts); final Summation[] parts = sigma.partition(nParts); for(int i = 0; i < parts.length; ++i) { splits.add(new SummationSplit(parts[i])); //LOG.info("parts[" + i + "] = " + parts[i]); } return splits; }
/** * {@inheritDoc} * @throws IOException If the child InputSplit cannot be read, typically * for failing access checks. */ @SuppressWarnings("unchecked") // Generic array assignment public void readFields(DataInput in) throws IOException { int card = WritableUtils.readVInt(in); if (splits == null || splits.length != card) { splits = new InputSplit[card]; } Class<? extends InputSplit>[] cls = new Class[card]; try { for (int i = 0; i < card; ++i) { cls[i] = Class.forName(Text.readString(in)).asSubclass(InputSplit.class); } for (int i = 0; i < card; ++i) { splits[i] = ReflectionUtils.newInstance(cls[i], null); SerializationFactory factory = new SerializationFactory(conf); Deserializer deserializer = factory.getDeserializer(cls[i]); deserializer.open((DataInputStream)in); splits[i] = (InputSplit)deserializer.deserialize(splits[i]); } } catch (ClassNotFoundException e) { throw new IOException("Failed split init", e); } }
/** * Solve the schedule and modify the FileSplit array to reflect the new * schedule. It will move placed splits to front and unplacable splits * to the end. * @return a new list of FileSplits that are modified to have the * best host as the only host. * @throws IOException */ public List<InputSplit> getNewFileSplits() throws IOException { solve(); FileSplit[] result = new FileSplit[realSplits.length]; int left = 0; int right = realSplits.length - 1; for(int i=0; i < splits.length; ++i) { if (splits[i].isAssigned) { // copy the split and fix up the locations String[] newLocations = {splits[i].locations.get(0).hostname}; realSplits[i] = new FileSplit(realSplits[i].getPath(), realSplits[i].getStart(), realSplits[i].getLength(), newLocations); result[left++] = realSplits[i]; } else { result[right--] = realSplits[i]; } } List<InputSplit> ret = new ArrayList<InputSplit>(); for (FileSplit fs : result) { ret.add(fs); } return ret; }
@Override public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { int taskId = context.getTaskAttemptID().getTaskID().getId(); int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS); int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS); int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0); taskId = taskId + iteration * numMapTasks; numMapTasks = numMapTasks * numIterations; long chainId = Math.abs(new Random().nextLong()); chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per task and across iterations LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)}; return new FixedRecordReader<LongWritable, LongWritable>(keys, keys); }
public void map(LongWritable key, Record val, Context context) throws IOException, InterruptedException{ try { odpsImpl.parse(val); context.write(odpsImpl, NullWritable.get()); } catch (Exception e) { LOG.error("Exception raised during data export"); LOG.error("Exception: ", e); LOG.error("On input: " + val); LOG.error("At position " + key); InputSplit is = context.getInputSplit(); LOG.error(""); LOG.error("Currently processing split:"); LOG.error(is); LOG.error(""); LOG.error("This issue might not necessarily be caused by current input"); LOG.error("due to the batching nature of export."); LOG.error(""); throw new IOException("Can't export data, please check failed map task logs", e); } }
private void testRandomLocation(int locations, int njobs, UserGroupInformation ugi) throws Exception { Configuration configuration = new Configuration(); DebugJobProducer jobProducer = new DebugJobProducer(njobs, configuration); Configuration jconf = GridmixTestUtils.mrvl.getConfig(); jconf.setInt(JobCreator.SLEEPJOB_RANDOM_LOCATIONS, locations); JobStory story; int seq = 1; while ((story = jobProducer.getNextJob()) != null) { GridmixJob gridmixJob = JobCreator.SLEEPJOB.createGridmixJob(jconf, 0, story, new Path("ignored"), ugi, seq++); gridmixJob.buildSplits(null); List<InputSplit> splits = new SleepJob.SleepInputFormat() .getSplits(gridmixJob.getJob()); for (InputSplit split : splits) { assertEquals(locations, split.getLocations().length); } } jobProducer.close(); }
@Test public void testNumInputFilesRecursively() throws Exception { Configuration conf = getConfiguration(); conf.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true"); conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); Job job = Job.getInstance(conf); FileInputFormat<?, ?> fileInputFormat = new TextInputFormat(); List<InputSplit> splits = fileInputFormat.getSplits(job); Assert.assertEquals("Input splits are not correct", 3, splits.size()); verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3", "test:/a1/file1"), splits); // Using the deprecated configuration conf = getConfiguration(); conf.set("mapred.input.dir.recursive", "true"); job = Job.getInstance(conf); splits = fileInputFormat.getSplits(job); verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3", "test:/a1/file1"), splits); }
@Test public void testRetrieveDatasets() throws IOException { JobConf conf = new JobConf(); conf.set(DBConfiguration.URL_PROPERTY, "localhost:12345"); conf.set(DBConfiguration.USERNAME_PROPERTY, "user"); conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword"); // set the password in the secure credentials object Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY); conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, "pssword".getBytes()); String dsName = "dsName1"; conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME, dsName); Job job = new Job(conf); ConfigurationHelper.setJobNumMaps(job, 2); //format.getSplits(job); List<InputSplit> splits = new ArrayList<InputSplit>(); splits = ((MainframeDatasetInputFormat<SqoopRecord>) format).getSplits(job); Assert.assertEquals("test1", ((MainframeDatasetInputSplit) splits.get(0)) .getNextDataset().toString()); Assert.assertEquals("test2", ((MainframeDatasetInputSplit) splits.get(1)) .getNextDataset().toString()); }
/** * Create a new reader from the split, and match the edits against the passed columns. */ private void testSplit(InputSplit split, byte[]... columns) throws Exception { final WALRecordReader reader = getReader(); reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); for (byte[] column : columns) { assertTrue(reader.nextKeyValue()); Cell cell = reader.getCurrentValue().getCells().get(0); if (!Bytes.equals(column, cell.getQualifier())) { assertTrue("expected [" + Bytes.toString(column) + "], actual [" + Bytes.toString(cell.getQualifier()) + "]", false); } } assertFalse(reader.nextKeyValue()); reader.close(); }
@Override public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { List<InputSplit> splits = null; try { splits = getSplits(Job.getInstance(job)); } catch (InterruptedException e) { throw new IOException(e); } org.apache.hadoop.mapred.InputSplit[] retVals = new org.apache.hadoop.mapred.InputSplit[splits.size()]; for (int i = 0; i < splits.size(); ++i) { MapredSequentialSplit split = new MapredSequentialSplit( ((SequentialSplit) splits.get(i)).getInit()); retVals[i] = split; } return retVals; }
@Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR)); FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); int mappers = conf.getInt(CONF_NUM_SPLITS, 0); if (mappers == 0 && snapshotFiles.size() > 0) { mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); mappers = Math.min(mappers, snapshotFiles.size()); conf.setInt(CONF_NUM_SPLITS, mappers); conf.setInt(MR_NUM_MAPS, mappers); } List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers); List<InputSplit> splits = new ArrayList(groups.size()); for (List<Pair<SnapshotFileInfo, Long>> files: groups) { splits.add(new ExportSnapshotInputSplit(files)); } return splits; }
@Override public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException { final JobConf jobConf = new JobConf(jobCtxt.getConfiguration()); final JobClient client = new JobClient(jobConf); ClusterStatus stat = client.getClusterStatus(true); int numTrackers = stat.getTaskTrackers(); final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1); // Total size of distributed cache files to be generated final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1); // Get the path of the special file String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST); if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) { throw new RuntimeException("Invalid metadata: #files (" + fileCount + "), total_size (" + totalSize + "), filelisturi (" + distCacheFileList + ")"); } Path sequenceFile = new Path(distCacheFileList); FileSystem fs = sequenceFile.getFileSystem(jobConf); FileStatus srcst = fs.getFileStatus(sequenceFile); // Consider the number of TTs * mapSlotsPerTracker as number of mappers. int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2); int numSplits = numTrackers * numMapSlotsPerTracker; List<InputSplit> splits = new ArrayList<InputSplit>(numSplits); LongWritable key = new LongWritable(); BytesWritable value = new BytesWritable(); // Average size of data to be generated by each map task final long targetSize = Math.max(totalSize / numSplits, DistributedCacheEmulator.AVG_BYTES_PER_MAP); long splitStartPosition = 0L; long splitEndPosition = 0L; long acc = 0L; long bytesRemaining = srcst.getLen(); SequenceFile.Reader reader = null; try { reader = new SequenceFile.Reader(fs, sequenceFile, jobConf); while (reader.next(key, value)) { // If adding this file would put this split past the target size, // cut the last split and put this file in the next split. if (acc + key.get() > targetSize && acc != 0) { long splitSize = splitEndPosition - splitStartPosition; splits.add(new FileSplit( sequenceFile, splitStartPosition, splitSize, (String[])null)); bytesRemaining -= splitSize; splitStartPosition = splitEndPosition; acc = 0L; } acc += key.get(); splitEndPosition = reader.getPosition(); } } finally { if (reader != null) { reader.close(); } } if (bytesRemaining != 0) { splits.add(new FileSplit( sequenceFile, splitStartPosition, bytesRemaining, (String[])null)); } return splits; }
/** * Create a single split from the list of blocks specified in validBlocks * Add this new split into splitList. */ private void addCreatedSplit(List<InputSplit> splitList, Collection<String> locations, ArrayList<OneBlockInfo> validBlocks) { // create an input split Path[] fl = new Path[validBlocks.size()]; long[] offset = new long[validBlocks.size()]; long[] length = new long[validBlocks.size()]; for (int i = 0; i < validBlocks.size(); i++) { fl[i] = validBlocks.get(i).onepath; offset[i] = validBlocks.get(i).offset; length[i] = validBlocks.get(i).length; } // add this split to the list that is returned CombineFileSplit thissplit = new CombineFileSplit(fl, offset, length, locations.toArray(new String[0])); splitList.add(thissplit); }
/** {@inheritDoc} */ public List<InputSplit> getSplits(JobContext context) { //get the property values final int startDigit = context.getConfiguration().getInt( DIGIT_START_PROPERTY, 1); final int nDigits = context.getConfiguration().getInt( DIGIT_SIZE_PROPERTY, 100); final int nMaps = context.getConfiguration().getInt( DIGIT_PARTS_PROPERTY, 1); //create splits final List<InputSplit> splits = new ArrayList<InputSplit>(nMaps); final int[] parts = partition(startDigit - 1, nDigits, nMaps); for (int i = 0; i < parts.length; ++i) { final int k = i < parts.length - 1 ? parts[i+1]: nDigits+startDigit-1; splits.add(new BbpSplit(i, parts[i], k - parts[i])); } return splits; }
/** * Tests a MR scan using data skew auto-balance * * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException, InterruptedException, ClassNotFoundException { String jobName = "TestJobForNumOfSplits"; LOG.info("Before map/reduce startup - job " + jobName); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); Scan scan = new Scan(); scan.addFamily(INPUT_FAMILY); c.set("hbase.mapreduce.input.autobalance", "true"); c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio); c.set(KEY_STARTROW, ""); c.set(KEY_LASTROW, ""); Job job = new Job(c, jobName); TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), scan, ScanMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); TableInputFormat tif = new TableInputFormat(); tif.setConf(job.getConfiguration()); Assert.assertEquals(new String(TABLE_NAME), new String(table.getTableName())); List<InputSplit> splits = tif.getSplits(job); Assert.assertEquals(expectedNumOfSplits, splits.size()); }
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // it really should be the same file split at the time the wrapper instance // was created assert fileSplitIsValid(context); delegate.initialize(fileSplit, context); }
@Override public void initialize( final InputSplit inputSplit, final TaskAttemptContext context ) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit)inputSplit; Configuration config = context.getConfiguration(); Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem( config ); long fileLength = fs.getLength( path ); long start = fileSplit.getStart(); long length = fileSplit.getLength(); InputStream in = fs.open( path ); }
public void getMoreSplits(Configuration conf, List<FileStatus> stats, long maxSize, long minSizeNode, long minSizeRack, List<InputSplit> splits) throws IOException { // all blocks for all the files in input set OneFileInfo[] files; // mapping from a rack name to the list of blocks it has HashMap<String, List<OneBlockInfo>> rackToBlocks = new HashMap<String, List<OneBlockInfo>>(); // mapping from a block to the nodes on which it has replicas HashMap<OneBlockInfo, String[]> blockToNodes = new HashMap<OneBlockInfo, String[]>(); // mapping from a node to the list of blocks that it contains HashMap<String, Set<OneBlockInfo>> nodeToBlocks = new HashMap<String, Set<OneBlockInfo>>(); files = new OneFileInfo[stats.size()]; if (stats.size() == 0) { return; } // populate all the blocks for all files long totLength = 0; int i = 0; for (FileStatus stat : stats) { files[i] = new OneFileInfo(stat, conf, isSplitable(conf, stat.getPath()), rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes, 256 * 1024 * 1024); totLength += files[i].getLength(); } int partNum = (int) (totLength / maxSize); createSplitsGreedy(nodeToBlocks, blockToNodes, rackToBlocks, totLength, partNum, minSizeNode, minSizeRack, splits); }
/** @return a list containing a single split of summation */ @Override public List<InputSplit> getSplits(JobContext context) { //read sigma from conf final Configuration conf = context.getConfiguration(); final Summation sigma = SummationWritable.read(DistSum.class, conf); //create splits final List<InputSplit> splits = new ArrayList<InputSplit>(1); splits.add(new SummationSplit(sigma)); return splits; }
@Test(timeout=2000) public void testBooleanSplitter() throws Exception{ BooleanSplitter splitter = new BooleanSplitter(); ResultSet result = mock(ResultSet.class); when(result.getString(1)).thenReturn("result1"); List<InputSplit> splits=splitter.split(configuration, result, "column"); assertSplits(new String[] {"column = FALSE column = FALSE", "column IS NULL column IS NULL"}, splits); when(result.getString(1)).thenReturn("result1"); when(result.getString(2)).thenReturn("result2"); when(result.getBoolean(1)).thenReturn(true); when(result.getBoolean(2)).thenReturn(false); splits=splitter.split(configuration, result, "column"); assertEquals(0, splits.size()); when(result.getString(1)).thenReturn("result1"); when(result.getString(2)).thenReturn("result2"); when(result.getBoolean(1)).thenReturn(false); when(result.getBoolean(2)).thenReturn(true); splits = splitter.split(configuration, result, "column"); assertSplits(new String[] { "column = FALSE column = FALSE", ".*column = TRUE"}, splits); }
@Override public void initialize(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { this.lrr = new LineRecordReader(); this.lrr.initialize(inputSplit, taskAttemptContext); }
@Override public RecordReader<LongWritable, Text> createRecordReader( InputSplit inputSplit, TaskAttemptContext context) { try { return new XMLRecordReader(inputSplit, context.getConfiguration()); } catch (IOException e) { return null; } }
@Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit)inputSplit; if(fileSplit != null && fileSplit.getPath() != null && fileSplit.getPath().toString().endsWith(TEMP_FILE_SUFFIX)){ LOG.info("Not processing Avro tmp file {}", fileSplit.getPath()); }else { super.initialize(inputSplit, context); } }
/** * Implementation of InputFormat::getSplits(). Returns a list of InputSplits, such that the number of bytes to be * copied for all the splits are approximately equal. * * @param context JobContext for the job. * @return The list of uniformly-distributed input-splits. * @throws IOException: On failure. * @throws InterruptedException */ @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { Configuration configuration = context.getConfiguration(); int numSplits = ConfigurationUtil.getInt(configuration, MRJobConfig.NUM_MAPS); if (numSplits == 0) { return new ArrayList<>(); } return getSplits(configuration, numSplits, ConfigurationUtil.getLong(configuration, S3MapReduceCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED)); }
@SuppressWarnings("unchecked") public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { if (kids != null) { for (int i = 0; i < kids.length; ++i) { kids[i].initialize(((CompositeInputSplit)split).get(i), context); if (kids[i].key() == null) { continue; } // get keyclass if (keyclass == null) { keyclass = kids[i].createKey().getClass(). asSubclass(WritableComparable.class); } // create priority queue if (null == q) { cmp = WritableComparator.get(keyclass, conf); q = new PriorityQueue<ComposableRecordReader<K,?>>(3, new Comparator<ComposableRecordReader<K,?>>() { public int compare(ComposableRecordReader<K,?> o1, ComposableRecordReader<K,?> o2) { return cmp.compare(o1.key(), o2.key()); } }); } // Explicit check for key class agreement if (!keyclass.equals(kids[i].key().getClass())) { throw new ClassCastException("Child key classes fail to agree"); } // add the kid to priority queue if it has any elements if (kids[i].hasNext()) { q.add(kids[i]); } } } }
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { rr.initialize(split, context); conf = context.getConfiguration(); nextKeyValue(); if (!empty) { keyclass = key.getClass().asSubclass(WritableComparable.class); valueclass = value.getClass(); if (cmp == null) { cmp = WritableComparator.get(keyclass, conf); } } }
public List<InputSplit> split(Configuration conf, ResultSet results, String colName) throws SQLException { List<InputSplit> splits = new ArrayList<InputSplit>(); if (results.getString(1) == null && results.getString(2) == null) { // Range is null to null. Return a null split accordingly. splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( colName + " IS NULL", colName + " IS NULL")); return splits; } boolean minVal = results.getBoolean(1); boolean maxVal = results.getBoolean(2); // Use one or two splits. if (!minVal) { splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( colName + " = FALSE", colName + " = FALSE")); } if (maxVal) { splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( colName + " = TRUE", colName + " = TRUE")); } if (results.getString(1) == null || results.getString(2) == null) { // Include a null value. splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit( colName + " IS NULL", colName + " IS NULL")); } return splits; }
@Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); final Path file = split.getPath(); initialize(job, split.getStart(), split.getLength(), file); }
@SuppressWarnings("unchecked") private static <T extends InputSplit> SplitMetaInfo[] writeNewSplits(Configuration conf, T[] array, FSDataOutputStream out) throws IOException, InterruptedException { SplitMetaInfo[] info = new SplitMetaInfo[array.length]; if (array.length != 0) { SerializationFactory factory = new SerializationFactory(conf); int i = 0; int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT); long offset = out.getPos(); for(T split: array) { long prevCount = out.getPos(); Text.writeString(out, split.getClass().getName()); Serializer<T> serializer = factory.getSerializer((Class<T>) split.getClass()); serializer.open(out); serializer.serialize(split); long currCount = out.getPos(); String[] locations = split.getLocations(); if (locations.length > maxBlockLocations) { LOG.warn("Max block location exceeded for split: " + split + " splitsize: " + locations.length + " maxsize: " + maxBlockLocations); locations = Arrays.copyOf(locations, maxBlockLocations); } info[i++] = new JobSplit.SplitMetaInfo( locations, offset, split.getLength()); offset += currCount - prevCount; } } return info; }
@Override public void initialize(InputSplit split, TaskAttemptContext context) { DataDrivenDBInputFormat.DataDrivenDBInputSplit dbSplit = (DataDrivenDBInputFormat.DataDrivenDBInputSplit) split; this.clause = "(" + dbSplit.getLowerClause() + ") AND (" + dbSplit.getUpperClause() + ")"; }
public static void createSplitFiles(Path jobSubmitDir, Configuration conf, FileSystem fs, org.apache.hadoop.mapred.InputSplit[] splits) throws IOException { FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf); SplitMetaInfo[] info = writeOldSplits(splits, out, conf); out.close(); writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, info); }
@Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { super.initialize(inputSplit, taskAttemptContext); Configuration conf = getConfiguration(); ftp = MainframeFTPClientUtils.getFTPConnection(conf); if (ftp != null) { String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME); ftp.changeWorkingDirectory("'" + dsName + "'"); } }