@Before public void setUp() throws Exception { dfs = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); fs = DFS_UGI.doAs(new PrivilegedExceptionAction<FileSystem>() { public FileSystem run() throws IOException { return dfs.getFileSystem(); } }); // Home directories for users mkdir(fs, "/user", "nobody", "nogroup", (short)01777); mkdir(fs, "/user/alice", "alice", "nogroup", (short)0755); mkdir(fs, "/user/bob", "bob", "nogroup", (short)0755); // staging directory root with sticky bit UserGroupInformation MR_UGI = UserGroupInformation.getLoginUser(); mkdir(fs, "/staging", MR_UGI.getShortUserName(), "nogroup", (short)01777); JobConf mrConf = new JobConf(); mrConf.set(JTConfig.JT_STAGING_AREA_ROOT, "/staging"); mr = new MiniMRCluster(0, 0, 4, dfs.getFileSystem().getUri().toString(), 1, null, null, MR_UGI, mrConf); }
public void testWithDFS() throws IOException { MiniDFSCluster dfs = null; MiniMRCluster mr = null; FileSystem fileSys = null; try { final int taskTrackers = 4; JobConf conf = new JobConf(); conf.set(JTConfig.JT_SYSTEM_DIR, "/tmp/custom/mapred/system"); dfs = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); fileSys = dfs.getFileSystem(); mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1, null, null, conf); runWordCount(mr, mr.createJobConf(), conf.get("mapred.system.dir")); } finally { if (dfs != null) { dfs.shutdown(); } if (mr != null) { mr.shutdown(); } } }
@Test public void testGetClusterStatusWithLocalJobRunner() throws Exception { Configuration conf = new Configuration(); conf.set(JTConfig.JT_IPC_ADDRESS, MRConfig.LOCAL_FRAMEWORK_NAME); conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); JobClient client = new JobClient(conf); ClusterStatus clusterStatus = client.getClusterStatus(true); Collection<String> activeTrackerNames = clusterStatus .getActiveTrackerNames(); Assert.assertEquals(0, activeTrackerNames.size()); int blacklistedTrackers = clusterStatus.getBlacklistedTrackers(); Assert.assertEquals(0, blacklistedTrackers); Collection<BlackListInfo> blackListedTrackersInfo = clusterStatus .getBlackListedTrackersInfo(); Assert.assertEquals(0, blackListedTrackersInfo.size()); }
/** * Sets the high ram job properties in the simulated job's configuration. */ @SuppressWarnings("deprecation") static void configureHighRamProperties(Configuration sourceConf, Configuration destConf) { // set the memory per map task scaleConfigParameter(sourceConf, destConf, MRConfig.MAPMEMORY_MB, MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB); // validate and fail early validateTaskMemoryLimits(destConf, MRJobConfig.MAP_MEMORY_MB, JTConfig.JT_MAX_MAPMEMORY_MB); // set the memory per reduce task scaleConfigParameter(sourceConf, destConf, MRConfig.REDUCEMEMORY_MB, MRJobConfig.REDUCE_MEMORY_MB, MRJobConfig.DEFAULT_REDUCE_MEMORY_MB); // validate and fail early validateTaskMemoryLimits(destConf, MRJobConfig.REDUCE_MEMORY_MB, JTConfig.JT_MAX_REDUCEMEMORY_MB); }
@Test public void testFramework() { JobConf jobConf = new JobConf(); jobConf.set(JTConfig.JT_IPC_ADDRESS, MRConfig.LOCAL_FRAMEWORK_NAME); jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); assertFalse("Expected 'isLocal' to be false", StreamUtil.isLocalJobTracker(jobConf)); jobConf.set(JTConfig.JT_IPC_ADDRESS, MRConfig.LOCAL_FRAMEWORK_NAME); jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME); assertFalse("Expected 'isLocal' to be false", StreamUtil.isLocalJobTracker(jobConf)); jobConf.set(JTConfig.JT_IPC_ADDRESS, "jthost:9090"); jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); assertTrue("Expected 'isLocal' to be true", StreamUtil.isLocalJobTracker(jobConf)); }
protected String[] genArgs(String jobtracker, String mapper, String reducer) { return new String[] { "-input", INPUT_FILE, "-output", OUTPUT_DIR, "-mapper", mapper, "-reducer", reducer, "-jobconf", MRJobConfig.NUM_MAPS + "=1", "-jobconf", MRJobConfig.NUM_REDUCES + "=1", "-jobconf", MRJobConfig.PRESERVE_FAILED_TASK_FILES + "=true", "-jobconf", "stream.tmpdir=" + new Path(TEST_ROOT_DIR).toUri().getPath(), "-jobconf", JTConfig.JT_IPC_ADDRESS + "="+jobtracker, "-jobconf", "fs.default.name=file:///", "-jobconf", "mapred.jar=" + TestStreaming.STREAMING_JAR, "-jobconf", "mapreduce.framework.name=yarn" }; }
void runStreamJob(TaskType type, boolean isEmptyInput) throws IOException { boolean mayExit = false; StreamJob job = new StreamJob(genArgs( mr.createJobConf().get(JTConfig.JT_IPC_ADDRESS), map, reduce), mayExit); int returnValue = job.go(); assertEquals(0, returnValue); // If input to reducer is empty, dummy reporter(which ignores all // reporting lines) is set for MRErrorThread in waitOutputThreads(). So // expectedCounterValue is 0 for empty-input-to-reducer case. // Output of reducer is also empty for empty-input-to-reducer case. int expectedCounterValue = 0; if (type == TaskType.MAP || !isEmptyInput) { validateTaskStatus(job, type); // output is from "print STDOUT" statements in perl script validateJobOutput(job.getConf()); expectedCounterValue = 2; } validateUserCounter(job, expectedCounterValue); validateTaskStderr(job, type); deleteOutDir(fs); }
@Test public void testClusterException() { Configuration conf = new Configuration(); conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME); conf.set(JTConfig.JT_IPC_ADDRESS, "local"); // initializing a cluster with this conf should throw an error. // However the exception thrown should not be specific to either // the job tracker client provider or the local provider boolean errorThrown = false; try { Cluster cluster = new Cluster(conf); cluster.close(); fail("Not expected - cluster init should have failed"); } catch (IOException e) { errorThrown = true; assert(e.getMessage().contains("Cannot initialize Cluster. Please check")); } assert(errorThrown); }
@Before public void setUp() throws Exception { dfs = new MiniDFSCluster(conf, 4, true, null); fs = DFS_UGI.doAs(new PrivilegedExceptionAction<FileSystem>() { public FileSystem run() throws IOException { return dfs.getFileSystem(); } }); // Home directories for users mkdir(fs, "/user", "nobody", "nogroup", (short)01777); mkdir(fs, "/user/alice", "alice", "nogroup", (short)0755); mkdir(fs, "/user/bob", "bob", "nogroup", (short)0755); // staging directory root with sticky bit UserGroupInformation MR_UGI = UserGroupInformation.getLoginUser(); mkdir(fs, "/staging", MR_UGI.getShortUserName(), "nogroup", (short)01777); JobConf mrConf = new JobConf(); mrConf.set(JTConfig.JT_STAGING_AREA_ROOT, "/staging"); mr = new MiniMRCluster(0, 0, 4, dfs.getFileSystem().getUri().toString(), 1, null, null, MR_UGI, mrConf); }
public void testWithDFS() throws IOException { MiniDFSCluster dfs = null; MiniMRCluster mr = null; FileSystem fileSys = null; try { final int taskTrackers = 4; JobConf conf = new JobConf(); conf.set(JTConfig.JT_SYSTEM_DIR, "/tmp/custom/mapred/system"); dfs = new MiniDFSCluster(conf, 4, true, null); fileSys = dfs.getFileSystem(); mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1, null, null, conf); runWordCount(mr, mr.createJobConf(), conf.get("mapred.system.dir")); } finally { if (dfs != null) { dfs.shutdown(); } if (mr != null) { mr.shutdown(); } } }