public TeraScheduler(FileSplit[] realSplits, Configuration conf) throws IOException { this.realSplits = realSplits; this.slotsPerHost = conf.getInt(TTConfig.TT_MAP_SLOTS, 4); Map<String, Host> hostTable = new HashMap<String, Host>(); splits = new Split[realSplits.length]; for(FileSplit realSplit: realSplits) { Split split = new Split(realSplit.getPath().toString()); splits[remainingSplits++] = split; for(String hostname: realSplit.getLocations()) { Host host = hostTable.get(hostname); if (host == null) { host = new Host(hostname); hostTable.put(hostname, host); hosts.add(host); } host.splits.add(split); split.locations.add(host); } } }
@Override public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException { final JobConf jobConf = new JobConf(jobCtxt.getConfiguration()); final JobClient client = new JobClient(jobConf); ClusterStatus stat = client.getClusterStatus(true); int numTrackers = stat.getTaskTrackers(); final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1); // Total size of distributed cache files to be generated final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1); // Get the path of the special file String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST); if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) { throw new RuntimeException("Invalid metadata: #files (" + fileCount + "), total_size (" + totalSize + "), filelisturi (" + distCacheFileList + ")"); } Path sequenceFile = new Path(distCacheFileList); FileSystem fs = sequenceFile.getFileSystem(jobConf); FileStatus srcst = fs.getFileStatus(sequenceFile); // Consider the number of TTs * mapSlotsPerTracker as number of mappers. int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2); int numSplits = numTrackers * numMapSlotsPerTracker; List<InputSplit> splits = new ArrayList<InputSplit>(numSplits); LongWritable key = new LongWritable(); BytesWritable value = new BytesWritable(); // Average size of data to be generated by each map task final long targetSize = Math.max(totalSize / numSplits, DistributedCacheEmulator.AVG_BYTES_PER_MAP); long splitStartPosition = 0L; long splitEndPosition = 0L; long acc = 0L; long bytesRemaining = srcst.getLen(); SequenceFile.Reader reader = null; try { reader = new SequenceFile.Reader(fs, sequenceFile, jobConf); while (reader.next(key, value)) { // If adding this file would put this split past the target size, // cut the last split and put this file in the next split. if (acc + key.get() > targetSize && acc != 0) { long splitSize = splitEndPosition - splitStartPosition; splits.add(new FileSplit( sequenceFile, splitStartPosition, splitSize, (String[])null)); bytesRemaining -= splitSize; splitStartPosition = splitEndPosition; acc = 0L; } acc += key.get(); splitEndPosition = reader.getPosition(); } } finally { if (reader != null) { reader.close(); } } if (bytesRemaining != 0) { splits.add(new FileSplit( sequenceFile, splitStartPosition, bytesRemaining, (String[])null)); } return splits; }
ResourceUsageMatcherRunner(final TaskInputOutputContext context, ResourceUsageMetrics metrics) { Configuration conf = context.getConfiguration(); // set the resource calculator plugin Class<? extends ResourceCalculatorPlugin> clazz = conf.getClass(TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN, null, ResourceCalculatorPlugin.class); ResourceCalculatorPlugin plugin = ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf); // set the other parameters this.sleepTime = conf.getLong(SLEEP_CONFIG, DEFAULT_SLEEP_TIME); progress = new BoostingProgress(context); // instantiate a resource-usage-matcher matcher = new ResourceUsageMatcher(); matcher.configure(conf, plugin, metrics, progress); }
public void setConf(Configuration conf) { if (conf instanceof JobConf) { this.conf = (JobConf) conf; } else { this.conf = new JobConf(conf); } this.mapOutputFile.setConf(this.conf); this.lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR); // add the static resolutions (this is required for the junit to // work on testcases that simulate multiple nodes on a single physical // node. String hostToResolved[] = conf.getStrings(TTConfig.TT_STATIC_RESOLUTIONS); if (hostToResolved != null) { for (String str : hostToResolved) { String name = str.substring(0, str.indexOf('=')); String resolvedName = str.substring(str.indexOf('=') + 1); NetUtils.addStaticResolution(name, resolvedName); } } }
public TrackerDistributedCacheManager(Configuration conf, TaskController taskController) throws IOException { this.localFs = FileSystem.getLocal(conf); this.trackerConf = conf; this.lDirAllocator = new LocalDirAllocator(TTConfig.LOCAL_DIR); this.taskController = taskController; // setting the cache size to a default of 10GB this.allowedCacheSize = conf.getLong(TTConfig.TT_LOCAL_CACHE_SIZE, DEFAULT_CACHE_SIZE); // setting the cache number of subdirectories limit to a default of 10000 this.allowedCacheSubdirs = conf.getLong( TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, DEFAULT_CACHE_SUBDIR_LIMIT); double cleanupPct = conf.getFloat(TTConfig.TT_LOCAL_CACHE_KEEP_AROUND_PCT, DEFAULT_CACHE_KEEP_AROUND_PCT); this.allowedCacheSizeCleanupGoal = (long)(this.allowedCacheSize * cleanupPct); this.allowedCacheSubdirsCleanupGoal = (long)(this.allowedCacheSubdirs * cleanupPct); this.cleanupThread = new CleanupThread(conf); }
/** * Main test case which checks proper execution of the testcase. * * @throws Exception */ @Test public void testDebugScript() throws Exception { JobConf conf = new JobConf(); conf.setLong(TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL, 0L); MiniMRCluster mrCluster = new MiniMRCluster(1, "file:///", 1, null, null, conf); Path inputPath = new Path(SCRIPT_DIR); Path outputPath = new Path(SCRIPT_DIR, "task_output"); // Run a failing mapper so debug script is launched. JobID jobId = runFailingMapJob(mrCluster.createJobConf(), inputPath, outputPath); // construct the task id of first map task of failmap TaskAttemptID taskId = new TaskAttemptID( new TaskID(jobId,TaskType.MAP, 0), 0); // verify if debug script was launched correctly and ran correctly. verifyDebugScriptOutput(taskId); }
@Test public void testCreateInstrumentationWithMultipleClasses() { // Set up configuration to create two dummy instrumentation objects JobConf conf = new JobConf(); String dummyClass = DummyTaskTrackerInstrumentation.class.getName(); String classList = dummyClass + "," + dummyClass; conf.set(TTConfig.TT_INSTRUMENTATION, classList); TaskTracker tracker = new TaskTracker(); // Check that a composite instrumentation object is created TaskTrackerInstrumentation inst = TaskTracker.createInstrumentation(tracker, conf); assertEquals(CompositeTaskTrackerInstrumentation.class.getName(), inst.getClass().getName()); // Check that each member of the composite is a dummy instrumentation CompositeTaskTrackerInstrumentation comp = (CompositeTaskTrackerInstrumentation) inst; List<TaskTrackerInstrumentation> insts = comp.getInstrumentations(); assertEquals(2, insts.size()); assertEquals(DummyTaskTrackerInstrumentation.class.getName(), insts.get(0).getClass().getName()); assertEquals(DummyTaskTrackerInstrumentation.class.getName(), insts.get(1).getClass().getName()); }
/** * Test for verifying that tasks that go beyond limits get killed. * * @throws Exception */ @Ignore("Intermittent, unexpected task success causes test to fail.") @Test public void testTasksBeyondLimits() throws Exception { // Run the test only if memory management is enabled if (!isProcfsBasedTreeAvailable()) { return; } // Start cluster with proper configuration. JobConf fConf = new JobConf(); // very small value, so that no task escapes to successful completion. fConf.setInt(TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL, 100); fConf.setLong(MRConfig.MAPMEMORY_MB, 2 * 1024); fConf.setLong(MRConfig.REDUCEMEMORY_MB, 2 * 1024); startCluster(fConf); runJobExceedingMemoryLimit(false); }
/** * Test for verifying that tasks that go beyond physical limits get killed. * * @throws Exception */ @Ignore("Intermittent, unexpected task success causes test to fail.") @Test public void testTasksBeyondPhysicalLimits() throws Exception { // Run the test only if memory management is enabled if (!isProcfsBasedTreeAvailable()) { return; } // Start cluster with proper configuration. JobConf fConf = new JobConf(); // very small value, so that no task escapes to successful completion. fConf.setInt(TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL, 100); // Reserve only 1 mb of the memory on TaskTrackers fConf.setLong(TTConfig.TT_RESERVED_PHYSCIALMEMORY_MB, 1L); startCluster(fConf); runJobExceedingMemoryLimit(true); }
/** * Runs tests with tasks beyond limit and using old configuration values for * the TaskTracker. * * @throws Exception */ @Ignore("Intermittent, unexpected task success causes test to fail.") @Test public void testTaskMemoryMonitoringWithDeprecatedConfiguration () throws Exception { // Run the test only if memory management is enabled if (!isProcfsBasedTreeAvailable()) { return; } // Start cluster with proper configuration. JobConf fConf = new JobConf(); // very small value, so that no task escapes to successful completion. fConf.setInt(TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL, 100); //set old values, max vm property per task and upper limit on the tasks //vm //setting the default maximum vmem property to 2 GB fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, (2L * 1024L * 1024L * 1024L)); fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, (3L * 1024L * 1024L * 1024L)); startCluster(fConf); runJobExceedingMemoryLimit(false); }
protected void startCluster() throws IOException, InterruptedException { JobConf conf = new JobConf(); dfsCluster = new MiniDFSCluster(conf, NUMBER_OF_NODES, true, null); conf.set(TTConfig.TT_TASK_CONTROLLER, MyLinuxTaskController.class.getName()); conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, false); mrCluster = new MiniMRCluster(NUMBER_OF_NODES, dfsCluster.getFileSystem().getUri() .toString(), 4, null, null, conf); clusterConf = mrCluster.createJobConf(); String ugi = System.getProperty(TASKCONTROLLER_UGI); String[] splits = ugi.split(","); jobOwner = UserGroupInformation.createUserForTesting(splits[0], new String[]{splits[1]}); createHomeAndStagingDirectory(clusterConf); }
/** * Create taskcontroller.cfg. * * @param path Path to the taskcontroller binary. * @param conf TaskTracker's configuration * @return the created conf file * @throws IOException */ static File createTaskControllerConf(String path, Configuration conf) throws IOException { File confDirectory = new File(path, "../conf"); if (!confDirectory.exists()) { confDirectory.mkdirs(); } File configurationFile = new File(confDirectory, "taskcontroller.cfg"); PrintWriter writer = new PrintWriter(new FileOutputStream(configurationFile)); writer.println(String.format(MRConfig.LOCAL_DIR + "=%s", conf .get(MRConfig.LOCAL_DIR))); writer .println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir())); writer.println(String.format(TTConfig.TT_GROUP + "=%s", conf.get(TTConfig.TT_GROUP))); writer.flush(); writer.close(); return configurationFile; }
@Override public synchronized void connect() throws IOException { if (isConnected()) { return; } String sockAddrStr = getConf().get(TTConfig.TT_REPORT_ADDRESS); if (sockAddrStr == null) { throw new IllegalArgumentException( "TaskTracker report address is not set"); } String[] splits = sockAddrStr.split(":"); if (splits.length != 2) { throw new IllegalArgumentException(TTConfig.TT_REPORT_ADDRESS + " is not correctly configured or " + SYSTEM_TEST_FILE + " hasn't been found."); } String port = splits[1]; String sockAddr = getHostName() + ":" + port; InetSocketAddress bindAddr = NetUtils.createSocketAddr(sockAddr); proxy = (TTProtocol) RPC.getProxy(TTProtocol.class, TTProtocol.versionID, bindAddr, getConf()); setConnected(true); }
public void testBadIndex() throws Exception { final int parts = 30; fs.delete(p, true); conf.setInt(TTConfig.TT_INDEX_CACHE, 1); IndexCache cache = new IndexCache(conf); Path f = new Path(p, "badindex"); FSDataOutputStream out = fs.create(f, false); CheckedOutputStream iout = new CheckedOutputStream(out, new CRC32()); DataOutputStream dout = new DataOutputStream(iout); for (int i = 0; i < parts; ++i) { for (int j = 0; j < MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8; ++j) { if (0 == (i % 3)) { dout.writeLong(i); } else { out.writeLong(i); } } } out.writeLong(iout.getChecksum().getValue()); dout.close(); try { cache.getIndexInformation("badindex", 7, f, UserGroupInformation.getCurrentUser().getShortUserName()); fail("Did not detect bad checksum"); } catch (IOException e) { if (!(e.getCause() instanceof ChecksumException)) { throw e; } } }