/** * When no input dir is specified, generate random data. */ protected static void confRandom(Job job) throws IOException { // from RandomWriter job.setInputFormatClass(RandomInputFormat.class); job.setMapperClass(RandomMapOutput.class); Configuration conf = job.getConfiguration(); final ClusterStatus cluster = new JobClient(conf).getClusterStatus(); int numMapsPerHost = conf.getInt(RandomTextWriter.MAPS_PER_HOST, 10); long numBytesToWritePerMap = conf.getLong(RandomTextWriter.BYTES_PER_MAP, 1*1024*1024*1024); if (numBytesToWritePerMap == 0) { throw new IOException( "Cannot have " + RandomTextWriter.BYTES_PER_MAP + " set to 0"); } long totalBytesToWrite = conf.getLong(RandomTextWriter.TOTAL_BYTES, numMapsPerHost * numBytesToWritePerMap * cluster.getTaskTrackers()); int numMaps = (int)(totalBytesToWrite / numBytesToWritePerMap); if (numMaps == 0 && totalBytesToWrite > 0) { numMaps = 1; conf.setLong(RandomTextWriter.BYTES_PER_MAP, totalBytesToWrite); } conf.setInt(MRJobConfig.NUM_MAPS, numMaps); }
@Override public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException { final JobClient client = new JobClient(new JobConf(jobCtxt.getConfiguration())); ClusterStatus stat = client.getClusterStatus(true); final long toGen = jobCtxt.getConfiguration().getLong(GRIDMIX_GEN_BYTES, -1); if (toGen < 0) { throw new IOException("Invalid/missing generation bytes: " + toGen); } final int nTrackers = stat.getTaskTrackers(); final long bytesPerTracker = toGen / nTrackers; final ArrayList<InputSplit> splits = new ArrayList<InputSplit>(nTrackers); final Pattern trackerPattern = Pattern.compile("tracker_([^:]*):.*"); final Matcher m = trackerPattern.matcher(""); for (String tracker : stat.getActiveTrackerNames()) { m.reset(tracker); if (!m.find()) { System.err.println("Skipping node: " + tracker); continue; } final String name = m.group(1); splits.add(new GenSplit(bytesPerTracker, new String[] { name })); } return splits; }
@Override public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException { final JobConf jobConf = new JobConf(jobCtxt.getConfiguration()); final JobClient client = new JobClient(jobConf); ClusterStatus stat = client.getClusterStatus(true); int numTrackers = stat.getTaskTrackers(); final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1); // Total size of distributed cache files to be generated final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1); // Get the path of the special file String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST); if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) { throw new RuntimeException("Invalid metadata: #files (" + fileCount + "), total_size (" + totalSize + "), filelisturi (" + distCacheFileList + ")"); } Path sequenceFile = new Path(distCacheFileList); FileSystem fs = sequenceFile.getFileSystem(jobConf); FileStatus srcst = fs.getFileStatus(sequenceFile); // Consider the number of TTs * mapSlotsPerTracker as number of mappers. int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2); int numSplits = numTrackers * numMapSlotsPerTracker; List<InputSplit> splits = new ArrayList<InputSplit>(numSplits); LongWritable key = new LongWritable(); BytesWritable value = new BytesWritable(); // Average size of data to be generated by each map task final long targetSize = Math.max(totalSize / numSplits, DistributedCacheEmulator.AVG_BYTES_PER_MAP); long splitStartPosition = 0L; long splitEndPosition = 0L; long acc = 0L; long bytesRemaining = srcst.getLen(); SequenceFile.Reader reader = null; try { reader = new SequenceFile.Reader(fs, sequenceFile, jobConf); while (reader.next(key, value)) { // If adding this file would put this split past the target size, // cut the last split and put this file in the next split. if (acc + key.get() > targetSize && acc != 0) { long splitSize = splitEndPosition - splitStartPosition; splits.add(new FileSplit( sequenceFile, splitStartPosition, splitSize, (String[])null)); bytesRemaining -= splitSize; splitStartPosition = splitEndPosition; acc = 0L; } acc += key.get(); splitEndPosition = reader.getPosition(); } } finally { if (reader != null) { reader.close(); } } if (bytesRemaining != 0) { splits.add(new FileSplit( sequenceFile, splitStartPosition, bytesRemaining, (String[])null)); } return splits; }
/** * STRESS Once you get the notification from StatsCollector.Collect the * clustermetrics. Update current loadStatus with new load status of JT. * * @param item */ @Override public void update(Statistics.ClusterStats item) { ClusterStatus clusterStatus = item.getStatus(); try { // update the max cluster map/reduce task capacity loadStatus.updateMapCapacity(clusterStatus.getMaxMapTasks()); loadStatus.updateReduceCapacity(clusterStatus.getMaxReduceTasks()); int numTrackers = clusterStatus.getTaskTrackers(); int jobLoad = (int) (maxJobTrackerRatio * numTrackers) - item.getNumRunningJob(); loadStatus.updateJobLoad(jobLoad); } catch (Exception e) { LOG.error("Couldn't get the new Status",e); } }
/** * Generates an XML-formatted block that summarizes the state of the JobTracker. */ public void generateSummaryTable(JspWriter out, JobTracker tracker) throws IOException { ClusterStatus status = tracker.getClusterStatus(); int maxMapTasks = status.getMaxMapTasks(); int maxReduceTasks = status.getMaxReduceTasks(); int numTaskTrackers = status.getTaskTrackers(); String tasksPerNodeStr; if (numTaskTrackers > 0) { double tasksPerNodePct = (double) (maxMapTasks + maxReduceTasks) / (double) numTaskTrackers; tasksPerNodeStr = percentFormat.format(tasksPerNodePct); } else { tasksPerNodeStr = "-"; } out.print("<maps>" + status.getMapTasks() + "</maps>\n" + "<reduces>" + status.getReduceTasks() + "</reduces>\n" + "<total_submissions>" + tracker.getTotalSubmissions() + "</total_submissions>\n" + "<nodes>" + status.getTaskTrackers() + "</nodes>\n" + "<map_task_capacity>" + status.getMaxMapTasks() + "</map_task_capacity>\n" + "<reduce_task_capacity>" + status.getMaxReduceTasks() + "</reduce_task_capacity>\n" + "<avg_tasks_per_node>" + tasksPerNodeStr + "</avg_tasks_per_node>\n"); }
/** * STRESS Once you get the notification from StatsCollector.Collect the * clustermetrics. Update current loadStatus with new load status of JT. * * @param item */ @Override public void update(Statistics.ClusterStats item) { lock.lock(); try { ClusterStatus clusterMetrics = item.getStatus(); try { checkLoadAndGetSlotsToBackfill(item,clusterMetrics); } catch (IOException e) { LOG.error("Couldn't get the new Status",e); } if (!loadStatus.overloaded()) { condUnderloaded.signalAll(); } } finally { lock.unlock(); } }
/** * Analyzes properties of hadoop cluster and configuration. */ private static void analyzeHadoopCluster() { try { JobConf job = ConfigurationManager.getCachedJobConf(); JobClient client = new JobClient(job); ClusterStatus stat = client.getClusterStatus(); if( stat != null ) { //if in cluster mode //analyze cluster status _remotePar = stat.getTaskTrackers(); _remoteParMap = stat.getMaxMapTasks(); _remoteParReduce = stat.getMaxReduceTasks(); //analyze pure configuration properties analyzeHadoopConfiguration(); } } catch (IOException e) { throw new RuntimeException("Unable to analyze infrastructure.",e); } }
/** * Gets the number of input splits. First, tries the corresponding property, * then falls back to the number of available slots. * * @param context job context * @return number of input splits */ private int getSuggestedNumberOfSplits(JobContext context) throws IOException { int numberOfSplits; Configuration conf = context.getConfiguration(); numberOfSplits = conf.getInt(inputNumberOfSplitsProperty, -1); if (numberOfSplits > 0) return numberOfSplits; if (HServerParameters.isHServerJob(context.getConfiguration())) { //We are running a hServer job, not a Hadoop job return HSERVER_JOB_DEFAULT_NUMBER_OF_SPLITS; } try { ClusterStatus status = (new JobClient((JobConf) context.getConfiguration())).getClusterStatus(); numberOfSplits = status.getMaxMapTasks() - status.getMapTasks(); if (numberOfSplits > 0) return numberOfSplits; } catch (Throwable t) { //Do nothing, will fall back to default; } return DEFAULT_NUMBER_OF_SPLITS; }
private String[] getActiveServersList(JobContext context){ String [] servers = null; try { JobClient jc = new JobClient((JobConf)context.getConfiguration()); ClusterStatus status = jc.getClusterStatus(true); Collection<String> atc = status.getActiveTrackerNames(); servers = new String[atc.size()]; int s = 0; for(String serverInfo : atc){ StringTokenizer st = new StringTokenizer(serverInfo, ":"); String trackerName = st.nextToken(); StringTokenizer st1 = new StringTokenizer(trackerName, "_"); st1.nextToken(); servers[s++] = st1.nextToken(); } }catch (IOException e) { e.printStackTrace(); } return servers; }