public void report(long currTime, String userName, String cmd) { LOG.debug("a metric is reported: cmd: {} user: {}", cmd, userName); userName = UserGroupInformation.trimLoginMethod(userName); for (RollingWindowManager rollingWindowManager : rollingWindowManagers .values()) { rollingWindowManager.recordMetric(currTime, cmd, userName, 1); rollingWindowManager.recordMetric(currTime, TopConf.ALL_CMDS, userName, 1); } }
@Test(timeout=120000) @SuppressWarnings("unchecked") public void testTopUsers() throws Exception { final Configuration conf = new Configuration(); MiniDFSCluster cluster = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); cluster.waitActive(); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ObjectName mxbeanNameFsns = new ObjectName( "Hadoop:service=NameNode,name=FSNamesystemState"); FileSystem fs = cluster.getFileSystem(); final Path path = new Path("/"); final int NUM_OPS = 10; for (int i=0; i< NUM_OPS; i++) { fs.listStatus(path); fs.setTimes(path, 0, 1); } String topUsers = (String) (mbs.getAttribute(mxbeanNameFsns, "TopUserOpCounts")); ObjectMapper mapper = new ObjectMapper(); Map<String, Object> map = mapper.readValue(topUsers, Map.class); assertTrue("Could not find map key timestamp", map.containsKey("timestamp")); assertTrue("Could not find map key windows", map.containsKey("windows")); List<Map<String, List<Map<String, Object>>>> windows = (List<Map<String, List<Map<String, Object>>>>) map.get("windows"); assertEquals("Unexpected num windows", 3, windows.size()); for (Map<String, List<Map<String, Object>>> window : windows) { final List<Map<String, Object>> ops = window.get("ops"); assertEquals("Unexpected num ops", 3, ops.size()); for (Map<String, Object> op: ops) { final long count = Long.parseLong(op.get("totalCount").toString()); final String opType = op.get("opType").toString(); final int expected; if (opType.equals(TopConf.ALL_CMDS)) { expected = 2*NUM_OPS; } else { expected = NUM_OPS; } assertEquals("Unexpected total count", expected, count); } } } finally { if (cluster != null) { cluster.shutdown(); } } }