@SuppressWarnings("unchecked") private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); List<InputSplit> splits = input.getSplits(job); T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); return array.length; }
private void uploadJobFiles(JobID id, InputSplit[] splits, Path jobSubmitDir, UserGroupInformation ugi, final JobConf conf) throws Exception { final Path confLocation = JobSubmissionFiles.getJobConfPath(jobSubmitDir); FileSystem fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { public FileSystem run() throws IOException { return confLocation.getFileSystem(conf); } }); JobSplitWriter.createSplitFiles(jobSubmitDir, conf, fs, splits); FsPermission perm = new FsPermission((short)0700); // localize conf DataOutputStream confOut = FileSystem.create(fs, confLocation, perm); conf.writeXml(confOut); confOut.close(); }
/** * Generate new-api mapreduce InputFormat splits * @param jobContext JobContext required by InputFormat * @param inputSplitDir Directory in which to generate splits information * * @return InputSplitInfo containing the split files' information and the * location hints for each split generated to be used to determining parallelism of * the map stage. * * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ private static InputSplitInfoDisk writeNewSplits(JobContext jobContext, Path inputSplitDir) throws IOException, InterruptedException, ClassNotFoundException { org.apache.hadoop.mapreduce.InputSplit[] splits = generateNewSplits(jobContext, null, 0); Configuration conf = jobContext.getConfiguration(); JobSplitWriter.createSplitFiles(inputSplitDir, conf, inputSplitDir.getFileSystem(conf), splits); List<TaskLocationHint> locationHints = new ArrayList<TaskLocationHint>(splits.length); for (int i = 0; i < splits.length; ++i) { locationHints.add( new TaskLocationHint(new HashSet<String>( Arrays.asList(splits[i].getLocations())), null)); } return new InputSplitInfoDisk( JobSubmissionFiles.getJobSplitFile(inputSplitDir), JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir), splits.length, locationHints, jobContext.getCredentials()); }
/** * Generate old-api mapred InputFormat splits * @param jobConf JobConf required by InputFormat class * @param inputSplitDir Directory in which to generate splits information * * @return InputSplitInfo containing the split files' information and the * number of splits generated to be used to determining parallelism of * the map stage. * * @throws IOException */ private static InputSplitInfoDisk writeOldSplits(JobConf jobConf, Path inputSplitDir) throws IOException { org.apache.hadoop.mapred.InputSplit[] splits = generateOldSplits(jobConf, null, 0); JobSplitWriter.createSplitFiles(inputSplitDir, jobConf, inputSplitDir.getFileSystem(jobConf), splits); List<TaskLocationHint> locationHints = new ArrayList<TaskLocationHint>(splits.length); for (int i = 0; i < splits.length; ++i) { locationHints.add( new TaskLocationHint(new HashSet<String>( Arrays.asList(splits[i].getLocations())), null)); } return new InputSplitInfoDisk( JobSubmissionFiles.getJobSplitFile(inputSplitDir), JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir), splits.length, locationHints, jobConf.getCredentials()); }
/** * Generate old-api mapred InputFormat splits * @param jobConf JobConf required by InputFormat class * @param inputSplitDir Directory in which to generate splits information * * @return InputSplitInfo containing the split files' information and the * number of splits generated to be used to determining parallelism of * the map stage. * * @throws IOException */ private static InputSplitInfoDisk writeOldSplits(JobConf jobConf, Path inputSplitDir) throws IOException { org.apache.hadoop.mapred.InputSplit[] splits = generateOldSplits(jobConf, false, true, 0); JobSplitWriter.createSplitFiles(inputSplitDir, jobConf, inputSplitDir.getFileSystem(jobConf), splits); List<TaskLocationHint> locationHints = new ArrayList<TaskLocationHint>(splits.length); for (int i = 0; i < splits.length; ++i) { locationHints.add( TaskLocationHint.createTaskLocationHint(new HashSet<String>( Arrays.asList(splits[i].getLocations())), null) ); } return new InputSplitInfoDisk( JobSubmissionFiles.getJobSplitFile(inputSplitDir), JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir), splits.length, locationHints, jobConf.getCredentials()); }
private int writeOldSplits(JobConf job, Path jobSubmitDir) throws IOException { org.apache.hadoop.mapred.InputSplit[] splits = job.getInputFormat().getSplits(job, job.getNumMapTasks()); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(splits, new Comparator<org.apache.hadoop.mapred.InputSplit>() { public int compare(org.apache.hadoop.mapred.InputSplit a, org.apache.hadoop.mapred.InputSplit b) { try { long left = a.getLength(); long right = b.getLength(); if (left == right) { return 0; } else if (left < right) { return 1; } else { return -1; } } catch (IOException ie) { throw new RuntimeException("Problem getting input split size", ie); } } }); JobSplitWriter.createSplitFiles(jobSubmitDir, job, jobSubmitDir.getFileSystem(job), splits); return splits.length; }
@Test public void testMRAppMasterJobLaunchTime() throws IOException, InterruptedException { String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002"; String containerIdStr = "container_1317529182569_0004_000002_1"; String userName = "TestAppMasterUser"; JobConf conf = new JobConf(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); conf.setInt(MRJobConfig.NUM_REDUCES, 0); conf.set(JHAdminConfig.MR_HS_JHIST_FORMAT, "json"); ApplicationAttemptId applicationAttemptId = ConverterUtils .toApplicationAttemptId(applicationAttemptIdStr); JobId jobId = TypeConverter.toYarn( TypeConverter.fromYarn(applicationAttemptId.getApplicationId())); File dir = new File(MRApps.getStagingAreaDir(conf, userName).toString(), jobId.toString()); dir.mkdirs(); File historyFile = new File(JobHistoryUtils.getStagingJobHistoryFile( new Path(dir.toURI().toString()), jobId, (applicationAttemptId.getAttemptId() - 1)).toUri().getRawPath()); historyFile.createNewFile(); FSDataOutputStream out = new FSDataOutputStream( new FileOutputStream(historyFile), null); EventWriter writer = new EventWriter(out, EventWriter.WriteMode.JSON); writer.close(); FileSystem fs = FileSystem.get(conf); JobSplitWriter.createSplitFiles(new Path(dir.getAbsolutePath()), conf, fs, new org.apache.hadoop.mapred.InputSplit[0]); ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMasterTestLaunchTime appMaster = new MRAppMasterTestLaunchTime(applicationAttemptId, containerId, "host", -1, -1, System.currentTimeMillis()); MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); appMaster.stop(); assertTrue("Job launch time should not be negative.", appMaster.jobLaunchTime.get() >= 0); }
@Test public void testMRAppMasterJobLaunchTime() throws IOException, InterruptedException { String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002"; String containerIdStr = "container_1317529182569_0004_000002_1"; String userName = "TestAppMasterUser"; JobConf conf = new JobConf(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); conf.setInt(MRJobConfig.NUM_REDUCES, 0); conf.set(JHAdminConfig.MR_HS_JHIST_FORMAT, "json"); ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.fromString( applicationAttemptIdStr); JobId jobId = TypeConverter.toYarn( TypeConverter.fromYarn(applicationAttemptId.getApplicationId())); File dir = new File(MRApps.getStagingAreaDir(conf, userName).toString(), jobId.toString()); dir.mkdirs(); File historyFile = new File(JobHistoryUtils.getStagingJobHistoryFile( new Path(dir.toURI().toString()), jobId, (applicationAttemptId.getAttemptId() - 1)).toUri().getRawPath()); historyFile.createNewFile(); FSDataOutputStream out = new FSDataOutputStream( new FileOutputStream(historyFile), null); EventWriter writer = new EventWriter(out, EventWriter.WriteMode.JSON); writer.close(); FileSystem fs = FileSystem.get(conf); JobSplitWriter.createSplitFiles(new Path(dir.getAbsolutePath()), conf, fs, new org.apache.hadoop.mapred.InputSplit[0]); ContainerId containerId = ContainerId.fromString(containerIdStr); MRAppMasterTestLaunchTime appMaster = new MRAppMasterTestLaunchTime(applicationAttemptId, containerId, "host", -1, -1, System.currentTimeMillis()); MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); appMaster.stop(); assertTrue("Job launch time should not be negative.", appMaster.jobLaunchTime.get() >= 0); }
@SuppressWarnings("unchecked") private <T extends InputSplit> List<InputSplit> writeNewSplits(Path jobSubmitDir) throws IOException, InterruptedException { List<InputSplit> splits = createNewSplits(); T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); JobSplitWriter.createSplitFiles(jobSubmitDir, getConf(), jobSubmitDir.getFileSystem(getConf()), array); return splits; }