private void testCachingAtLevel(int level) throws Exception { String namenode = null; MiniDFSCluster dfs = null; MiniMRCluster mr = null; FileSystem fileSys = null; String testName = "TestMultiLevelCaching"; try { final int taskTrackers = 1; // generate the racks // use rack1 for data node String rack1 = getRack(0, level); // use rack2 for task tracker String rack2 = getRack(1, level); Configuration conf = new Configuration(); // Run a datanode on host1 under /a/b/c/..../d1/e1/f1 dfs = new MiniDFSCluster(conf, 1, true, new String[] {rack1}, new String[] {"host1.com"}); dfs.waitActive(); fileSys = dfs.getFileSystem(); if (!fileSys.mkdirs(inDir)) { throw new IOException("Mkdirs failed to create " + inDir.toString()); } UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file"), (short)1); namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + (dfs.getFileSystem()).getUri().getPort(); // Run a job with the (only)tasktracker on host2 under diff topology // e.g /a/b/c/..../d2/e2/f2. JobConf jc = new JobConf(); // cache-level = level (unshared levels) + 1(topmost shared node i.e /a) // + 1 (for host) jc.setInt("mapred.task.cache.levels", level + 2); mr = new MiniMRCluster(taskTrackers, namenode, 1, new String[] {rack2}, new String[] {"host2.com"}, jc); /* The job is configured with 1 map for one (non-splittable) file. * Since the datanode is running under different subtree, there is no * node-level data locality but there should be topological locality. */ TestRackAwareTaskPlacement.launchJobAndTestCounters( testName, mr, fileSys, inDir, outputPath, 1, 1, 0, 0); mr.shutdown(); } finally { fileSys.delete(inDir, true); fileSys.delete(outputPath, true); if (dfs != null) { dfs.shutdown(); } } }
private void testCachingAtLevel(int level) throws IOException { String namenode = null; MiniDFSCluster dfs = null; MiniMRCluster mr = null; FileSystem fileSys = null; String testName = "TestMultiLevelCaching"; try { final int taskTrackers = 1; // generate the racks // use rack1 for data node String rack1 = getRack(0, level); // use rack2 for task tracker String rack2 = getRack(1, level); Configuration conf = new Configuration(); // Run a datanode on host1 under /a/b/c/..../d1/e1/f1 dfs = new MiniDFSCluster(conf, 1, true, new String[] {rack1}, new String[] {"127.0.0.1"}); dfs.waitActive(); fileSys = dfs.getFileSystem(); if (!fileSys.mkdirs(inDir)) { throw new IOException("Mkdirs failed to create " + inDir.toString()); } UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file"), (short)1); namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + (dfs.getFileSystem()).getUri().getPort(); // Run a job with the (only)tasktracker on host2 under diff topology // e.g /a/b/c/..../d2/e2/f2. JobConf jc = new JobConf(); // cache-level = level (unshared levels) + 1(topmost shared node i.e /a) // + 1 (for host) jc.setInt("mapred.task.cache.levels", level + 2); mr = new MiniMRCluster(taskTrackers, namenode, 1, new String[] {rack2}, new String[] {"127.0.0.1"}, jc); /* The job is configured with 1 map for one (non-splittable) file. * Since the datanode is running under different subtree, there is no * node-level data locality but there should be topological locality. */ TestRackAwareTaskPlacement.launchJobAndTestCounters( testName, mr, fileSys, inDir, outputPath, 1, 0, 1, 0); mr.shutdown(); } finally { fileSys.delete(inDir, true); fileSys.delete(outputPath, true); if (dfs != null) { dfs.shutdown(); } } }
private void testCachingAtLevel(int level) throws IOException { String namenode = null; MiniDFSCluster dfs = null; MiniMRCluster mr = null; FileSystem fileSys = null; String testName = "TestMultiLevelCaching"; try { final int taskTrackers = 1; // generate the racks // use rack1 for data node String rack1 = getRack(0, level); // use rack2 for task tracker String rack2 = getRack(1, level); Configuration conf = new Configuration(); // Run a datanode on host1 under /a/b/c/..../d1/e1/f1 dfs = new MiniDFSCluster(conf, 1, true, new String[] {rack1}, new String[] {"host1.com"}); dfs.waitActive(); fileSys = dfs.getFileSystem(); if (!fileSys.mkdirs(inDir)) { throw new IOException("Mkdirs failed to create " + inDir.toString()); } UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file"), (short)1); namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + (dfs.getFileSystem()).getUri().getPort(); // Run a job with the (only)tasktracker on host2 under diff topology // e.g /a/b/c/..../d2/e2/f2. JobConf jc = new JobConf(); // cache-level = level (unshared levels) + 1(topmost shared node i.e /a) // + 1 (for host) jc.setInt("mapred.task.cache.levels", level + 2); mr = new MiniMRCluster(taskTrackers, namenode, 1, new String[] {rack2}, new String[] {"host2.com"}, jc); /* The job is configured with 1 map for one (non-splittable) file. * Since the datanode is running under different subtree, there is no * node-level data locality but there should be topological locality. */ TestRackAwareTaskPlacement.launchJobAndTestCounters( testName, mr, fileSys, inDir, outputPath, 1, 1, 0, 0); mr.shutdown(); } finally { fileSys.delete(inDir, true); fileSys.delete(outputPath, true); if (dfs != null) { dfs.shutdown(); } } }