/** * STRESS Once you get the notification from StatsCollector.Collect the * clustermetrics. Update current loadStatus with new load status of JT. * * @param item */ @Override public void update(Statistics.ClusterStats item) { ClusterStatus clusterStatus = item.getStatus(); try { // update the max cluster map/reduce task capacity loadStatus.updateMapCapacity(clusterStatus.getMaxMapTasks()); loadStatus.updateReduceCapacity(clusterStatus.getMaxReduceTasks()); int numTrackers = clusterStatus.getTaskTrackers(); int jobLoad = (int) (maxJobTrackerRatio * numTrackers) - item.getNumRunningJob(); loadStatus.updateJobLoad(jobLoad); } catch (Exception e) { LOG.error("Couldn't get the new Status",e); } }
/** * STRESS Once you get the notification from StatsCollector.Collect the * clustermetrics. Update current loadStatus with new load status of JT. * * @param item */ @Override public void update(Statistics.ClusterStats item) { lock.lock(); try { ClusterStatus clusterMetrics = item.getStatus(); try { checkLoadAndGetSlotsToBackfill(item,clusterMetrics); } catch (IOException e) { LOG.error("Couldn't get the new Status",e); } if (!loadStatus.overloaded()) { condUnderloaded.signalAll(); } } finally { lock.unlock(); } }
/** * STRESS Once you get the notification from StatsCollector.Collect the * clustermetrics. Update current loadStatus with new load status of JT. * * @param item */ @Override public void update(Statistics.ClusterStats item) { lock.lock(); try { ClusterStatus clusterMetrics = item.getStatus(); try { checkLoadAndGetSlotsToBackfill(item,clusterMetrics); } catch (Exception e) { LOG.error("Couldn't get the new Status",e); } if (!loadStatus.overloaded()) { condUnderloaded.signalAll(); } } finally { lock.unlock(); } }
@Override @SuppressWarnings("deprecation") public void update(ClusterStats item) { try { numBlacklistedTrackers = item.getStatus().getBlacklistedTrackers(); numActiveTrackers = item.getStatus().getTaskTrackers(); maxMapTasks = item.getStatus().getMaxMapTasks(); maxReduceTasks = item.getStatus().getMaxReduceTasks(); } catch (Exception e) { long time = System.currentTimeMillis(); LOG.info("Error in processing cluster status at " + FastDateFormat.getInstance().format(time)); } }
/** * Test {@link ClusterSummarizer}. */ @Test (timeout=20000) public void testClusterSummarizer() throws IOException { ClusterSummarizer cs = new ClusterSummarizer(); Configuration conf = new Configuration(); String jt = "test-jt:1234"; String nn = "test-nn:5678"; conf.set(JTConfig.JT_IPC_ADDRESS, jt); conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, nn); cs.start(conf); assertEquals("JT name mismatch", jt, cs.getJobTrackerInfo()); assertEquals("NN name mismatch", nn, cs.getNamenodeInfo()); ClusterStats cStats = ClusterStats.getClusterStats(); conf.set(JTConfig.JT_IPC_ADDRESS, "local"); conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "local"); JobClient jc = new JobClient(conf); cStats.setClusterMetric(jc.getClusterStatus()); cs.update(cStats); // test assertEquals("Cluster summary test failed!", 1, cs.getMaxMapTasks()); assertEquals("Cluster summary test failed!", 1, cs.getMaxReduceTasks()); assertEquals("Cluster summary test failed!", 1, cs.getNumActiveTrackers()); assertEquals("Cluster summary test failed!", 0, cs.getNumBlacklistedTrackers()); }
private static void testClusterStats(int numSubmittedMapTasks, int numSubmittedReduceTasks, int numSubmittedJobs) { assertEquals("Incorrect count of total number of submitted map tasks", numSubmittedMapTasks, ClusterStats.getSubmittedMapTasks()); assertEquals("Incorrect count of total number of submitted reduce tasks", numSubmittedReduceTasks, ClusterStats.getSubmittedReduceTasks()); assertEquals("Incorrect submitted jobs", numSubmittedJobs, ClusterStats.getRunningJobStats().size()); }
/** * Test {@link ClusterSummarizer}. */ @Test public void testClusterSummarizer() throws IOException { ClusterSummarizer cs = new ClusterSummarizer(); Configuration conf = new Configuration(); String jt = "test-jt:1234"; String nn = "test-nn:5678"; conf.set(JTConfig.JT_IPC_ADDRESS, jt); conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, nn); cs.start(conf); assertEquals("JT name mismatch", jt, cs.getJobTrackerInfo()); assertEquals("NN name mismatch", nn, cs.getNamenodeInfo()); ClusterStats cStats = ClusterStats.getClusterStats(); conf.set(JTConfig.JT_IPC_ADDRESS, "local"); conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "local"); JobClient jc = new JobClient(conf); cStats.setClusterMetric(jc.getClusterStatus()); cs.update(cStats); // test assertEquals("Cluster summary test failed!", 1, cs.getMaxMapTasks()); assertEquals("Cluster summary test failed!", 1, cs.getMaxReduceTasks()); assertEquals("Cluster summary test failed!", 1, cs.getNumActiveTrackers()); assertEquals("Cluster summary test failed!", 0, cs.getNumBlacklistedTrackers()); }