void doRead(SelectionKey key) throws InterruptedException { int count = 0; Connection c = (Connection)key.attachment(); if (c == null) { return; } c.setLastContact(Time.now()); try { count = c.readAndProcess(); } catch (InterruptedException ieo) { LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo); throw ieo; } catch (Exception e) { // a WrappedRpcServerException is an exception that has been sent // to the client, so the stacktrace is unnecessary; any other // exceptions are unexpected internal server errors and thus the // stacktrace should be logged LOG.info(Thread.currentThread().getName() + ": readAndProcess from client " + c.getHostAddress() + " threw exception [" + e + "]", (e instanceof WrappedRpcServerException) ? null : e); count = -1; //so that the (count < 0) block is executed } if (count < 0) { closeConnection(c); c = null; } else { c.setLastContact(Time.now()); } }
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { long startTime = 0; if (LOG.isDebugEnabled()) { startTime = Time.now(); } TraceScope traceScope = null; if (Trace.isTracing()) { traceScope = Trace.startSpan(RpcClientUtil.methodToTraceString(method)); } ObjectWritable value; try { value = (ObjectWritable) client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId, fallbackToSimpleAuth); } finally { if (traceScope != null) traceScope.close(); } if (LOG.isDebugEnabled()) { long callTime = Time.now() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); }
@Test public void testThreadFails() throws Exception { TestContext ctx = new TestContext(); ctx.addThread(new TestingThread(ctx) { @Override public void doWork() throws Exception { fail(FAIL_MSG); } }); ctx.startThreads(); long st = Time.now(); try { ctx.waitFor(30000); fail("waitFor did not throw"); } catch (RuntimeException rte) { // expected assertEquals(FAIL_MSG, rte.getCause().getMessage()); } long et = Time.now(); // Test shouldn't have waited the full 30 seconds, since // the thread throws faster than that assertTrue("Test took " + (et - st) + "ms", et - st < 5000); }
@Override protected synchronized byte[] createPassword(TokenIdent identifier) { int sequenceNum; long now = Time.now(); sequenceNum = incrementDelegationTokenSeqNum(); identifier.setIssueDate(now); identifier.setMaxDate(now + tokenMaxLifetime); identifier.setMasterKeyId(currentKey.getKeyId()); identifier.setSequenceNumber(sequenceNum); LOG.info("Creating password for identifier: " + identifier + ", currentKey: " + currentKey.getKeyId()); byte[] password = createPassword(identifier.getBytes(), currentKey.getKey()); DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)); try { storeToken(identifier, tokenInfo); } catch (IOException ioe) { LOG.error("Could not store token !!", ioe); } return password; }
private void doIO(InputStream in, OutputStream out, int expectedTimeout) throws IOException { /* Keep on writing or reading until we get SocketTimeoutException. * It expects this exception to occur within 100 millis of TIMEOUT. */ byte buf[] = new byte[PAGE_SIZE + 19]; while (true) { long start = Time.now(); try { if (in != null) { in.read(buf); } else { out.write(buf); } } catch (SocketTimeoutException e) { long diff = Time.now() - start; LOG.info("Got SocketTimeoutException as expected after " + diff + " millis : " + e.getMessage()); assertTrue(Math.abs(expectedTimeout - diff) <= TestNetUtils.TIME_FUDGE_MILLIS); break; } } }
/** * Publish a metrics snapshot to all the sinks * @param buffer the metrics snapshot to publish * @param immediate indicates that we should publish metrics immediately * instead of using a separate thread. */ synchronized void publishMetrics(MetricsBuffer buffer, boolean immediate) { int dropped = 0; for (MetricsSinkAdapter sa : sinks.values()) { long startTime = Time.now(); boolean result; if (immediate) { result = sa.putMetricsImmediate(buffer); } else { result = sa.putMetrics(buffer, logicalTime); } dropped += result ? 0 : 1; publishStat.add(Time.now() - startTime); } droppedPubAll.incr(dropped); }
@Test public void testException() throws Throwable { Exception e = new NoRouteToHostException("that box caught fire 3 years ago"); ThrowableInformation ti = new ThrowableInformation(e); Log4Json l4j = new Log4Json(); long timeStamp = Time.now(); String outcome = l4j.toJson(new StringWriter(), "testException", timeStamp, "INFO", "quoted\"", "new line\n and {}", ti) .toString(); println("testException", outcome); }
@Override synchronized void touch(final String bpid, final long blockId) { Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid); RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId); if (ramDiskReplicaLru == null) { return; } ramDiskReplicaLru.numReads.getAndIncrement(); // Reinsert the replica with its new timestamp. if (replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru)) { ramDiskReplicaLru.lastUsedTime = Time.monotonicNow(); replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru); } }
private void output(Configuration conf, FileSummary summary, FileInputStream fin, ArrayList<FileSummary.Section> sections) throws IOException { InputStream is; long startTime = Time.monotonicNow(); out.println(getHeader()); for (FileSummary.Section section : sections) { if (SectionName.fromString(section.getName()) == SectionName.INODE) { fin.getChannel().position(section.getOffset()); is = FSImageUtil.wrapInputStreamForCompression(conf, summary.getCodec(), new BufferedInputStream(new LimitInputStream( fin, section.getLength()))); outputINodes(is); } } long timeTaken = Time.monotonicNow() - startTime; LOG.debug("Time to output inodes: {}ms", timeTaken); }
/** * Gets initial volume failure information for all volumes that failed * immediately at startup. The method works by determining the set difference * between all configured storage locations and the actual storage locations in * use after attempting to put all of them into service. * * @return each storage location that has failed */ private static List<VolumeFailureInfo> getInitialVolumeFailureInfos( Collection<StorageLocation> dataLocations, DataStorage storage) { Set<String> failedLocationSet = Sets.newHashSetWithExpectedSize( dataLocations.size()); for (StorageLocation sl: dataLocations) { failedLocationSet.add(sl.getFile().getAbsolutePath()); } for (Iterator<Storage.StorageDirectory> it = storage.dirIterator(); it.hasNext(); ) { Storage.StorageDirectory sd = it.next(); failedLocationSet.remove(sd.getRoot().getAbsolutePath()); } List<VolumeFailureInfo> volumeFailureInfos = Lists.newArrayListWithCapacity( failedLocationSet.size()); long failureDate = Time.now(); for (String failedStorageLocation: failedLocationSet) { volumeFailureInfos.add(new VolumeFailureInfo(failedStorageLocation, failureDate)); } return volumeFailureInfos; }
/** * Evict and close sockets older than expiry period from the cache. */ private synchronized void evictExpired(long expiryPeriod) { while (multimap.size() != 0) { Iterator<Entry<Key, Value>> iter = multimap.entries().iterator(); Entry<Key, Value> entry = iter.next(); // if oldest socket expired, remove it if (entry == null || Time.monotonicNow() - entry.getValue().getTime() < expiryPeriod) { break; } IOUtils.cleanup(LOG, entry.getValue().getPeer()); iter.remove(); } }
/** * Randomly expire the ZK sessions of the two ZKFCs. This differs * from the above test in that it is not a controlled failover - * we just do random expirations and expect neither one to ever * generate fatal exceptions. */ @Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000) public void testRandomExpirations() throws Exception { cluster.start(); long st = Time.now(); long runFor = STRESS_RUNTIME_SECS * 1000; Random r = new Random(); while (Time.now() - st < runFor) { cluster.getTestContext().checkException(); int targetIdx = r.nextInt(2); ActiveStandbyElector target = cluster.getElector(targetIdx); long sessId = target.getZKSessionIdForTests(); if (sessId != -1) { LOG.info(String.format("Expiring session %x for svc %d", sessId, targetIdx)); getServer(serverFactory).closeSession(sessId); } Thread.sleep(r.nextInt(300)); } }
public static void waitFor(Supplier<Boolean> check, int checkEveryMillis, int waitForMillis) throws TimeoutException, InterruptedException { long st = Time.now(); do { boolean result = check.get(); if (result) { return; } Thread.sleep(checkEveryMillis); } while (Time.now() - st < waitForMillis); throw new TimeoutException("Timed out waiting for condition. " + "Thread diagnostics:\n" + TimedOutTestsListener.buildThreadDiagnosticString()); }
/** Load the directories in the INode section. */ private void loadDirectories( FileInputStream fin, List<FileSummary.Section> sections, FileSummary summary, Configuration conf) throws IOException { LOG.info("Loading directories"); long startTime = Time.monotonicNow(); for (FileSummary.Section section : sections) { if (SectionName.fromString(section.getName()) == SectionName.INODE) { fin.getChannel().position(section.getOffset()); InputStream is = FSImageUtil.wrapInputStreamForCompression(conf, summary.getCodec(), new BufferedInputStream(new LimitInputStream( fin, section.getLength()))); loadDirectoriesInINodeSection(is); } } long timeTaken = Time.monotonicNow() - startTime; LOG.info("Finished loading directories in {}ms", timeTaken); }
/** * Simply fail back and forth between two services for the * configured amount of time, via expiring their ZK sessions. */ @Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000) public void testExpireBackAndForth() throws Exception { cluster.start(); long st = Time.now(); long runFor = STRESS_RUNTIME_SECS * 1000; int i = 0; while (Time.now() - st < runFor) { // flip flop the services back and forth int from = i % 2; int to = (i + 1) % 2; // Expire one service, it should fail over to the other LOG.info("Failing over via expiration from " + from + " to " + to); cluster.expireAndVerifyFailover(from, to); i++; } }
/** Create a file with a length of <code>fileSize</code>. * The file is filled with 'a'. */ private void genFile(Path file, long fileSize) throws IOException { long startTime = Time.now(); FSDataOutputStream out = null; try { out = fc.create(file, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), CreateOpts.createParent(), CreateOpts.bufferSize(4096), CreateOpts.repFac((short) 3)); executionTime[CREATE] += (Time.now() - startTime); numOfOps[CREATE]++; long i = fileSize; while (i > 0) { long s = Math.min(fileSize, WRITE_CONTENTS.length); out.write(WRITE_CONTENTS, 0, (int) s); i -= s; } startTime = Time.now(); executionTime[WRITE_CLOSE] += (Time.now() - startTime); numOfOps[WRITE_CLOSE]++; } finally { IOUtils.cleanup(LOG, out); } }
private void loadINodeDirSection( FileInputStream fin, List<FileSummary.Section> sections, FileSummary summary, Configuration conf, List<Long> refIdList) throws IOException { LOG.info("Loading INode directory section."); long startTime = Time.monotonicNow(); for (FileSummary.Section section : sections) { if (SectionName.fromString(section.getName()) == SectionName.INODE_DIR) { fin.getChannel().position(section.getOffset()); InputStream is = FSImageUtil.wrapInputStreamForCompression(conf, summary.getCodec(), new BufferedInputStream( new LimitInputStream(fin, section.getLength()))); buildNamespace(is, refIdList); } } long timeTaken = Time.monotonicNow() - startTime; LOG.info("Finished loading INode directory section in {}ms", timeTaken); }
/** * Test that repeated calls to getting the local host are fairly fast, and * hence that caching is being used * @throws Exception if hostname lookups fail */ @Test public void testGetLocalHostIsFast() throws Exception { String hostname1 = DNS.getDefaultHost(DEFAULT); assertNotNull(hostname1); String hostname2 = DNS.getDefaultHost(DEFAULT); long t1 = Time.now(); String hostname3 = DNS.getDefaultHost(DEFAULT); long t2 = Time.now(); assertEquals(hostname3, hostname2); assertEquals(hostname2, hostname1); long interval = t2 - t1; assertTrue( "Took too long to determine local host - caching is not working", interval < 20000); }
@Test public void testThreadThrowsCheckedException() throws Exception { TestContext ctx = new TestContext(); ctx.addThread(new TestingThread(ctx) { @Override public void doWork() throws Exception { throw new IOException("my ioe"); } }); ctx.startThreads(); long st = Time.now(); try { ctx.waitFor(30000); fail("waitFor did not throw"); } catch (RuntimeException rte) { // expected assertEquals("my ioe", rte.getCause().getMessage()); } long et = Time.now(); // Test shouldn't have waited the full 30 seconds, since // the thread throws faster than that assertTrue("Test took " + (et - st) + "ms", et - st < 5000); }
@Test public void testRepeatingThread() throws Exception { final AtomicInteger counter = new AtomicInteger(); TestContext ctx = new TestContext(); ctx.addThread(new RepeatingTestThread(ctx) { @Override public void doAnAction() throws Exception { counter.incrementAndGet(); } }); ctx.startThreads(); long st = Time.now(); ctx.waitFor(3000); ctx.stop(); long et = Time.now(); long elapsed = et - st; // Test should have waited just about 3 seconds assertTrue("Test took " + (et - st) + "ms", Math.abs(elapsed - 3000) < 500); // Counter should have been incremented lots of times in 3 full seconds assertTrue("Counter value = " + counter.get(), counter.get() > 1000); }
long connectToServerUsingDelegationToken( final Configuration conf, final InetSocketAddress addr) throws IOException { MiniProtocol client = null; try { long start = Time.now(); try { client = currentUgi.doAs(new PrivilegedExceptionAction<MiniProtocol>() { @Override public MiniProtocol run() throws IOException { return RPC.getProxy(MiniProtocol.class, MiniProtocol.versionID, addr, conf); } }); } catch (InterruptedException e) { e.printStackTrace(); } long end = Time.now(); return end - start; } finally { RPC.stopProxy(client); } }
public synchronized MapHost getHost() throws InterruptedException { while(pendingHosts.isEmpty()) { wait(); } MapHost host = null; Iterator<MapHost> iter = pendingHosts.iterator(); int numToPick = random.nextInt(pendingHosts.size()); for (int i=0; i <= numToPick; ++i) { host = iter.next(); } pendingHosts.remove(host); host.markBusy(); LOG.info("Assigning " + host + " with " + host.getNumKnownMapOutputs() + " to " + Thread.currentThread().getName()); shuffleStart.set(Time.monotonicNow()); return host; }
/** Initialize block keys */ private synchronized void generateKeys() { if (!isMaster) return; /* * Need to set estimated expiry dates for currentKey and nextKey so that if * NN crashes, DN can still expire those keys. NN will stop using the newly * generated currentKey after the first keyUpdateInterval, however it may * still be used by DN and Balancer to generate new tokens before they get a * chance to sync their keys with NN. Since we require keyUpdInterval to be * long enough so that all live DN's and Balancer will sync their keys with * NN at least once during the period, the estimated expiry date for * currentKey is set to now() + 2 * keyUpdateInterval + tokenLifetime. * Similarly, the estimated expiry date for nextKey is one keyUpdateInterval * more. */ setSerialNo(serialNo + 1); currentKey = new BlockKey(serialNo, Time.now() + 2 * keyUpdateInterval + tokenLifetime, generateSecret()); setSerialNo(serialNo + 1); nextKey = new BlockKey(serialNo, Time.now() + 3 * keyUpdateInterval + tokenLifetime, generateSecret()); allKeys.put(currentKey.getKeyId(), currentKey); allKeys.put(nextKey.getKeyId(), nextKey); }
@Override public void destroy() { try { long limit = Time.now() + 30 * 1000; scheduler.shutdownNow(); while (!scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) { LOG.debug("Waiting for scheduler to shutdown"); if (Time.now() > limit) { LOG.warn("Gave up waiting for scheduler to shutdown"); break; } } if (scheduler.isTerminated()) { LOG.debug("Scheduler shutdown"); } } catch (InterruptedException ex) { LOG.warn(ex.getMessage(), ex); } }
/** Remove expired delegation tokens from cache */ private void removeExpiredToken() throws IOException { long now = Time.now(); Set<TokenIdent> expiredTokens = new HashSet<TokenIdent>(); synchronized (this) { Iterator<Map.Entry<TokenIdent, DelegationTokenInformation>> i = currentTokens.entrySet().iterator(); while (i.hasNext()) { Map.Entry<TokenIdent, DelegationTokenInformation> entry = i.next(); long renewDate = entry.getValue().getRenewDate(); if (renewDate < now) { expiredTokens.add(entry.getKey()); i.remove(); } } } // don't hold lock on 'this' to avoid edit log updates blocking token ops for (TokenIdent ident : expiredTokens) { logExpireToken(ident); removeStoredToken(ident); } }
/** * Do mkdirs operation. */ @Override long executeOp(int daemonId, int inputIdx, String clientName) throws IOException { long start = Time.now(); nameNodeProto.mkdirs(dirPaths[daemonId][inputIdx], FsPermission.getDefault(), true); long end = Time.now(); return end-start; }
synchronized private void updateMapIncr(final String name, final boolean isGrp) throws IOException { if (!checkSupportedPlatform()) { return; } if (isInteger(name) && isGrp) { loadFullGroupMap(); return; } boolean updated = false; updateStaticMapping(); if (OS.startsWith("Linux")) { if (isGrp) { updated = updateMapInternal(gidNameMap, "group", getName2IdCmdLinux(name, true), ":", staticMapping.gidMapping); } else { updated = updateMapInternal(uidNameMap, "user", getName2IdCmdLinux(name, false), ":", staticMapping.uidMapping); } } else { // Mac if (isGrp) { updated = updateMapInternal(gidNameMap, "group", getName2IdCmdMac(name, true), "\\s+", staticMapping.gidMapping); } else { updated = updateMapInternal(uidNameMap, "user", getName2IdCmdMac(name, false), "\\s+", staticMapping.uidMapping); } } if (updated) { lastUpdateTime = Time.monotonicNow(); } }
synchronized private void updateMapIncr(final int id, final boolean isGrp) throws IOException { if (!checkSupportedPlatform()) { return; } boolean updated = false; updateStaticMapping(); if (OS.startsWith("Linux")) { if (isGrp) { updated = updateMapInternal(gidNameMap, "group", getId2NameCmdLinux(id, true), ":", staticMapping.gidMapping); } else { updated = updateMapInternal(uidNameMap, "user", getId2NameCmdLinux(id, false), ":", staticMapping.uidMapping); } } else { // Mac if (isGrp) { updated = updateMapInternal(gidNameMap, "group", getId2NameCmdMac(id, true), "\\s+", staticMapping.gidMapping); } else { updated = updateMapInternal(uidNameMap, "user", getId2NameCmdMac(id, false), "\\s+", staticMapping.uidMapping); } } if (updated) { lastUpdateTime = Time.monotonicNow(); } }
@Override long executeOp(int daemonId, int inputIdx, String ignore) throws IOException { long start = Time.now(); nameNodeProto.delete(fileNames[daemonId][inputIdx], false); long end = Time.now(); return end-start; }
public Call(int id, int retryCount, Writable param, Connection connection, RPC.RpcKind kind, byte[] clientId, Span span) { this.callId = id; this.retryCount = retryCount; this.rpcRequest = param; this.connection = connection; this.timestamp = Time.now(); this.rpcResponse = null; this.rpcKind = kind; this.clientId = clientId; this.traceSpan = span; }
/** * Log a user in from a keytab file. Loads a user identity from a keytab * file and logs them in. They become the currently logged-in user. * @param user the principal name to load from the keytab * @param path the path to the keytab file * @throws IOException if the keytab file can't be read */ @InterfaceAudience.Public @InterfaceStability.Evolving public synchronized static void loginUserFromKeytab(String user, String path ) throws IOException { if (!isSecurityEnabled()) return; keytabFile = path; keytabPrincipal = user; Subject subject = new Subject(); LoginContext login; long start = 0; try { login = newLoginContext(HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, subject, new HadoopConfiguration()); start = Time.now(); login.login(); metrics.loginSuccess.add(Time.now() - start); loginUser = new UserGroupInformation(subject); loginUser.setLogin(login); loginUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS); } catch (LoginException le) { if (start > 0) { metrics.loginFailure.add(Time.now() - start); } throw new IOException("Login failure for " + user + " from keytab " + path+ ": " + le, le); } LOG.info("Login successful for user " + keytabPrincipal + " using keytab file " + keytabFile); }
@Test public void ensureInvalidBlockTokensAreRejected() throws IOException, URISyntaxException { cluster.transitionToActive(0); FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); DFSTestUtil.writeFile(fs, TEST_PATH, TEST_DATA); assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH)); DFSClient dfsClient = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs); DFSClient spyDfsClient = Mockito.spy(dfsClient); Mockito.doAnswer( new Answer<LocatedBlocks>() { @Override public LocatedBlocks answer(InvocationOnMock arg0) throws Throwable { LocatedBlocks locatedBlocks = (LocatedBlocks)arg0.callRealMethod(); for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { Token<BlockTokenIdentifier> token = lb.getBlockToken(); BlockTokenIdentifier id = lb.getBlockToken().decodeIdentifier(); // This will make the token invalid, since the password // won't match anymore id.setExpiryDate(Time.now() + 10); Token<BlockTokenIdentifier> newToken = new Token<BlockTokenIdentifier>(id.getBytes(), token.getPassword(), token.getKind(), token.getService()); lb.setBlockToken(newToken); } return locatedBlocks; } }).when(spyDfsClient).getLocatedBlocks(Mockito.anyString(), Mockito.anyLong(), Mockito.anyLong()); DFSClientAdapter.setDFSClient((DistributedFileSystem)fs, spyDfsClient); try { assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH)); fail("Shouldn't have been able to read a file with invalid block tokens"); } catch (IOException ioe) { GenericTestUtils.assertExceptionContains("Could not obtain block", ioe); } }
private void updateJmxCache() { boolean getAllMetrics = false; synchronized(this) { if (Time.now() - jmxCacheTS >= jmxCacheTTL) { // temporarilly advance the expiry while updating the cache jmxCacheTS = Time.now() + jmxCacheTTL; // lastRecs might have been set to an object already by another thread. // Track the fact that lastRecs has been reset once to make sure refresh // is correctly triggered. if (lastRecsCleared) { getAllMetrics = true; lastRecsCleared = false; } } else { return; } } if (getAllMetrics) { MetricsCollectorImpl builder = new MetricsCollectorImpl(); getMetrics(builder, true); } synchronized(this) { updateAttrCache(); if (getAllMetrics) { updateInfoCache(); } jmxCacheTS = Time.now(); lastRecs = null; // in case regular interval update is not running lastRecsCleared = true; } }
/** * This function ensures that writing causes TotalWritetime to increment * and reading causes totalReadTime to move. * @throws Exception */ @Test public void testDataNodeTimeSpend() throws Exception { Configuration conf = new HdfsConfiguration(); SimulatedFSDataset.setFactory(conf); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); try { FileSystem fs = cluster.getFileSystem(); List<DataNode> datanodes = cluster.getDataNodes(); assertEquals(datanodes.size(), 1); DataNode datanode = datanodes.get(0); MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name()); final long LONG_FILE_LEN = 1024 * 1024 * 10; long startWriteValue = getLongCounter("TotalWriteTime", rb); long startReadValue = getLongCounter("TotalReadTime", rb); for (int x =0; x < 50; x++) { DFSTestUtil.createFile(fs, new Path("/time.txt."+ x), LONG_FILE_LEN, (short) 1, Time.monotonicNow()); } for (int x =0; x < 50; x++) { String s = DFSTestUtil.readFile(fs, new Path("/time.txt." + x)); } MetricsRecordBuilder rbNew = getMetrics(datanode.getMetrics().name()); long endWriteValue = getLongCounter("TotalWriteTime", rbNew); long endReadValue = getLongCounter("TotalReadTime", rbNew); assertTrue(endReadValue > startReadValue); assertTrue(endWriteValue > startWriteValue); } finally { if (cluster != null) { cluster.shutdown(); } } }
@Override public void consume(MetricsBuffer buffer) { long ts = 0; for (MetricsBuffer.Entry entry : buffer) { if (sourceFilter == null || sourceFilter.accepts(entry.name())) { for (MetricsRecordImpl record : entry.records()) { if ((context == null || context.equals(record.context())) && (recordFilter == null || recordFilter.accepts(record))) { if (LOG.isDebugEnabled()) { LOG.debug("Pushing record "+ entry.name() +"."+ record.context() + "."+ record.name() +" to "+ name); } sink.putMetrics(metricFilter == null ? record : new MetricsRecordFiltered(record, metricFilter)); if (ts == 0) ts = record.timestamp(); } } } } if (ts > 0) { sink.flush(); latency.add(Time.now() - ts); } if (buffer instanceof WaitableMetricsBuffer) { ((WaitableMetricsBuffer)buffer).notifyAnyWaiters(); } LOG.debug("Done"); }
long connectToServer(Configuration conf, InetSocketAddress addr) throws IOException { MiniProtocol client = null; try { long start = Time.now(); client = RPC.getProxy(MiniProtocol.class, MiniProtocol.versionID, addr, conf); long end = Time.now(); return end - start; } finally { RPC.stopProxy(client); } }
/** Add a snapshot. */ public Snapshot addSnapshot(INodeDirectory snapshotRoot, int id, String name) throws SnapshotException, QuotaExceededException { //check snapshot quota final int n = getNumSnapshots(); if (n + 1 > snapshotQuota) { throw new SnapshotException("Failed to add snapshot: there are already " + n + " snapshot(s) and the snapshot quota is " + snapshotQuota); } final Snapshot s = new Snapshot(id, name, snapshotRoot); final byte[] nameBytes = s.getRoot().getLocalNameBytes(); final int i = searchSnapshot(nameBytes); if (i >= 0) { throw new SnapshotException("Failed to add snapshot: there is already a " + "snapshot with the same name \"" + Snapshot.getSnapshotName(s) + "\"."); } final DirectoryDiff d = getDiffs().addDiff(id, snapshotRoot); d.setSnapshotRoot(s.getRoot()); snapshotsByNames.add(-i - 1, s); // set modification time final long now = Time.now(); snapshotRoot.updateModificationTime(now, Snapshot.CURRENT_STATE_ID); s.getRoot().setModificationTime(now, Snapshot.CURRENT_STATE_ID); return s; }