public static Peer peerFromSocketAndKey( SaslDataTransferClient saslClient, Socket s, DataEncryptionKeyFactory keyFactory, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; boolean success = false; try { peer = peerFromSocket(s); peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId); success = true; return peer; } finally { if (!success) { IOUtilsClient.cleanup(null, peer); } } }
protected RemoteBlockReader2(String file, String bpid, long blockId, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, DatanodeID datanodeID, PeerCache peerCache) { this.isLocal = DFSClient.isLocalAddress(NetUtils. createSocketAddr(datanodeID.getXferAddr())); // Path is used only for printing block and file information in debug this.peer = peer; this.datanodeID = datanodeID; = peer.getInputStreamChannel(); this.checksum = checksum; this.verifyChecksum = verifyChecksum; this.startOffset = Math.max( startOffset, 0 ); this.filename = file; this.peerCache = peerCache; this.blockId = blockId; // The total number of bytes that we need to transfer from the DN is // the amount that the user wants (bytesToRead), plus the padding at // the beginning in order to chunk-align. Note that the DN may elect // to send more than this amount if the read starts/ends mid-chunk. this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); bytesPerChecksum = this.checksum.getBytesPerChecksum(); checksumSize = this.checksum.getChecksumSize(); }
/** * Checks if an address is already trusted and then sends client SASL * negotiation if required. * * @param addr connection address * @param underlyingOut connection output stream * @param underlyingIn connection input stream * @param encryptionKeyFactory for creation of an encryption key * @param accessToken connection block access token * @param datanodeId ID of destination DataNode * @return new pair of streams, wrapped after SASL negotiation * @throws IOException for any error */ private IOStreamPair checkTrustAndSend(InetAddress addr, OutputStream underlyingOut, InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory, Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId) throws IOException { if (!trustedChannelResolver.isTrusted() && !trustedChannelResolver.isTrusted(addr)) { // The encryption key factory only returns a key if encryption is enabled. DataEncryptionKey encryptionKey = encryptionKeyFactory.newDataEncryptionKey(); return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken, datanodeId); } else { LOG.debug( "SASL client skipping handshake on trusted connection for addr = {}, " + "datanodeId = {}", addr, datanodeId); return null; } }
@Override // RemotePeerFactory public Peer newConnectedPeer(InetSocketAddress addr, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; boolean success = false; Socket sock = null; try { sock = socketFactory.createSocket(); NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(), dfsClientConf.socketTimeout); peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this, blockToken, datanodeId); peer.setReadTimeout(dfsClientConf.socketTimeout); success = true; return peer; } finally { if (!success) { IOUtils.cleanup(LOG, peer); IOUtils.closeSocket(sock); } } }
private synchronized Peer getInternal(DatanodeID dnId, boolean isDomain) { List<Value> sockStreamList = multimap.get(new Key(dnId, isDomain)); if (sockStreamList == null) { return null; } Iterator<Value> iter = sockStreamList.iterator(); while (iter.hasNext()) { Value candidate =; iter.remove(); long ageMs = Time.monotonicNow() - candidate.getTime(); Peer peer = candidate.getPeer(); if (ageMs >= expiryPeriod) { try { peer.close(); } catch (IOException e) { LOG.warn("got IOException closing stale peer " + peer + ", which is " + ageMs + " ms old"); } } else if (!peer.isClosed()) { return peer; } } return null; }
@Override public void doGet(HttpServletRequest request, HttpServletResponse response ) throws ServletException, IOException { final ServletContext context = getServletContext(); final Configuration conf = NameNodeHttpServer.getConfFromContext(context); final UserGroupInformation ugi = getUGI(request, conf); final NameNode namenode = NameNodeHttpServer.getNameNodeFromContext( context); final DatanodeID datanode = NamenodeJspHelper.getRandomDatanode(namenode); try { response.sendRedirect( createRedirectURL(ugi, datanode, request, namenode).toString()); } catch (IOException e) { response.sendError(400, e.getMessage()); } }
public static InterDatanodeProtocol createInterDataNodeProtocolProxy( DatanodeID datanodeid, final Configuration conf, final int socketTimeout, final boolean connectToDnViaHostname) throws IOException { final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname); final InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr); if (LOG.isDebugEnabled()) { LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr); } final UserGroupInformation loginUgi = UserGroupInformation.getLoginUser(); try { return loginUgi .doAs(new PrivilegedExceptionAction<InterDatanodeProtocol>() { @Override public InterDatanodeProtocol run() throws IOException { return new InterDatanodeProtocolTranslatorPB(addr, loginUgi, conf, NetUtils.getDefaultSocketFactory(conf), socketTimeout); } }); } catch (InterruptedException ie) { throw new IOException(ie.getMessage()); } }
@Override public void commitBlockSynchronization(ExtendedBlock block, long newgenerationstamp, long newlength, boolean closeFile, boolean deleteblock, DatanodeID[] newtargets, String[] newtargetstorages ) throws IOException { CommitBlockSynchronizationRequestProto.Builder builder = CommitBlockSynchronizationRequestProto.newBuilder() .setBlock(PBHelper.convert(block)).setNewGenStamp(newgenerationstamp) .setNewLength(newlength).setCloseFile(closeFile) .setDeleteBlock(deleteblock); for (int i = 0; i < newtargets.length; i++) { builder.addNewTaragets(PBHelper.convert(newtargets[i])); builder.addNewTargetStorages(newtargetstorages[i]); } CommitBlockSynchronizationRequestProto req =; try { rpcProxy.commitBlockSynchronization(NULL_CONTROLLER, req); } catch (ServiceException se) { throw ProtobufHelper.getRemoteException(se); } }
@Override public CommitBlockSynchronizationResponseProto commitBlockSynchronization( RpcController controller, CommitBlockSynchronizationRequestProto request) throws ServiceException { List<DatanodeIDProto> dnprotos = request.getNewTaragetsList(); DatanodeID[] dns = new DatanodeID[dnprotos.size()]; for (int i = 0; i < dnprotos.size(); i++) { dns[i] = PBHelper.convert(dnprotos.get(i)); } final List<String> sidprotos = request.getNewTargetStoragesList(); final String[] storageIDs = sidprotos.toArray(new String[sidprotos.size()]); try { impl.commitBlockSynchronization(PBHelper.convert(request.getBlock()), request.getNewGenStamp(), request.getNewLength(), request.getCloseFile(), request.getDeleteBlock(), dns, storageIDs); } catch (IOException e) { throw new ServiceException(e); } return VOID_COMMIT_BLOCK_SYNCHRONIZATION_RESPONSE_PROTO; }
static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy( DatanodeID datanodeid, Configuration conf, int socketTimeout, boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException { final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname); InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr); if (LOG.isDebugEnabled()) { LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr); } // Since we're creating a new UserGroupInformation here, we know that no // future RPC proxies will be able to re-use the same connection. And // usages of this proxy tend to be one-off calls. // // This is a temporary fix: callers should really achieve this by using // RPC.stopProxy() on the resulting object, but this is currently not // working in trunk. See the discussion on HDFS-1965. Configuration confWithNoIpcIdle = new Configuration(conf); confWithNoIpcIdle.setInt(CommonConfigurationKeysPublic .IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0); UserGroupInformation ticket = UserGroupInformation .createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString()); ticket.addToken(locatedBlock.getBlockToken()); return createClientDatanodeProtocolProxy(addr, ticket, confWithNoIpcIdle, NetUtils.getDefaultSocketFactory(conf), socketTimeout); }
@Override public void updatePipeline(String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock, DatanodeID[] newNodes, String[] storageIDs) throws IOException { UpdatePipelineRequestProto req = UpdatePipelineRequestProto.newBuilder() .setClientName(clientName) .setOldBlock(PBHelper.convert(oldBlock)) .setNewBlock(PBHelper.convert(newBlock)) .addAllNewNodes(Arrays.asList(PBHelper.convert(newNodes))) .addAllStorageIDs(storageIDs == null ? null : Arrays.asList(storageIDs)) .build(); try { rpcProxy.updatePipeline(null, req); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Test public void testCommitBlockSynchronization2() throws IOException { INodeFile file = mockFileUnderConstruction(); Block block = new Block(blockId, length, genStamp); FSNamesystem namesystemSpy = makeNameSystemSpy(block, file); DatanodeID[] newTargets = new DatanodeID[0]; ExtendedBlock lastBlock = new ExtendedBlock(); namesystemSpy.commitBlockSynchronization( lastBlock, genStamp, length, false, false, newTargets, null); // Make sure the call fails if the generation stamp does not match // the block recovery ID. try { namesystemSpy.commitBlockSynchronization( lastBlock, genStamp - 1, length, false, false, newTargets, null); fail("Failed to get expected IOException on generation stamp/" + "recovery ID mismatch"); } catch (IOException ioe) { // Expected exception. } }
@Test public void testCommitBlockSynchronizationWithDelete() throws IOException { INodeFile file = mockFileUnderConstruction(); Block block = new Block(blockId, length, genStamp); FSNamesystem namesystemSpy = makeNameSystemSpy(block, file); DatanodeID[] newTargets = new DatanodeID[0]; ExtendedBlock lastBlock = new ExtendedBlock(); namesystemSpy.commitBlockSynchronization( lastBlock, genStamp, length, false, true, newTargets, null); // Simulate removing the last block from the file. doReturn(false).when(file).removeLastBlock(any(Block.class)); // Repeat the call to make sure it does not throw namesystemSpy.commitBlockSynchronization( lastBlock, genStamp, length, false, true, newTargets, null); }
@Test public void testCommitBlockSynchronizationWithCloseAndNonExistantTarget() throws IOException { INodeFile file = mockFileUnderConstruction(); Block block = new Block(blockId, length, genStamp); FSNamesystem namesystemSpy = makeNameSystemSpy(block, file); DatanodeID[] newTargets = new DatanodeID[]{ new DatanodeID("", "nonexistantHost", "1", 0, 0, 0, 0)}; ExtendedBlock lastBlock = new ExtendedBlock(); namesystemSpy.commitBlockSynchronization( lastBlock, genStamp, length, true, false, newTargets, null); // Repeat the call to make sure it returns true namesystemSpy.commitBlockSynchronization( lastBlock, genStamp, length, true, false, newTargets, null); }
void register() throws IOException { // get versions from the namenode nsInfo = nameNodeProto.versionRequest(); dnRegistration = new DatanodeRegistration( new DatanodeID(DNS.getDefaultIP("default"), DNS.getDefaultHost("default", "default"), DataNode.generateUuid(), getNodePort(dnIdx), DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT), new DataStorage(nsInfo), new ExportedBlockKeys(), VersionInfo.getVersion()); // register datanode dnRegistration = nameNodeProto.registerDatanode(dnRegistration); //first block reports storage = new DatanodeStorage(DatanodeStorage.generateUuid()); final StorageBlockReport[] reports = { new StorageBlockReport(storage, BlockListAsLongs.EMPTY) }; nameNodeProto.blockReport(dnRegistration, nameNode.getNamesystem().getBlockPoolId(), reports, new BlockReportContext(1, 0, System.nanoTime())); }
/** Test to verify that InterDatanode RPC timesout as expected when * the server DN does not respond. */ @Test(expected=SocketTimeoutException.class) public void testInterDNProtocolTimeout() throws Throwable { final Server server = new TestServer(1, true); server.start(); final InetSocketAddress addr = NetUtils.getConnectAddress(server); DatanodeID fakeDnId = DFSTestUtil.getLocalDatanodeID(addr.getPort()); DatanodeInfo dInfo = new DatanodeInfo(fakeDnId); InterDatanodeProtocol proxy = null; try { proxy = DataNode.createInterDataNodeProtocolProxy( dInfo, conf, 500, false); proxy.initReplicaRecovery(new RecoveringBlock( new ExtendedBlock("bpid", 1), null, 100)); fail ("Expected SocketTimeoutException exception, but did not get."); } finally { if (proxy != null) { RPC.stopProxy(proxy); } server.stop(); } }
/** * BlockRecoveryFI_07. max replica length from all DNs is zero. * * @throws IOException in case of an error */ @Test public void testZeroLenReplicas() throws IOException, InterruptedException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } DataNode spyDN = spy(dn); doReturn(new ReplicaRecoveryInfo(block.getBlockId(), 0, block.getGenerationStamp(), ReplicaState.FINALIZED)).when(spyDN). initReplicaRecovery(any(RecoveringBlock.class)); Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks()); d.join(); DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID); verify(dnP).commitBlockSynchronization( block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY, null); }
/** * BlockRecoveryFI_10. DN has no ReplicaUnderRecovery. * * @throws IOException in case of an error */ @Test public void testNoReplicaUnderRecovery() throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); }, block, false); try { dn.syncBlock(rBlock, initBlockRecords(dn)); fail("Sync should fail"); } catch (IOException e) { e.getMessage().startsWith("Cannot recover "); } DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID); verify(namenode, never()).commitBlockSynchronization( any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(), anyBoolean(), any(DatanodeID[].class), any(String[].class)); }
/** Create a {@link ClientDatanodeProtocol} proxy */ public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( DatanodeID datanodeid, Configuration conf, int socketTimeout, boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException { return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout, connectToDnViaHostname, locatedBlock); }
/** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */ public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( DatanodeID datanodeid, Configuration conf, int socketTimeout, boolean connectToDnViaHostname) throws IOException { return new ClientDatanodeProtocolTranslatorPB( datanodeid, conf, socketTimeout, connectToDnViaHostname); }
/** {@inheritDoc} */ public void run(DatanodeID id) throws IOException { final Pipeline p = getPipelineTest().getPipelineForDatanode(id); if (p == null) { return; } if (p.contains(index, id)) { final String s = super.toString(id);; throw new DiskErrorException(s); } }
private RemoteBlockReader(String file, String bpid, long blockId, DataInputStream in, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, DatanodeID datanodeID, PeerCache peerCache) { // Path is used only for printing block and file information in debug super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/, 1, verifyChecksum, checksum.getChecksumSize() > 0? checksum : null, checksum.getBytesPerChecksum(), checksum.getChecksumSize()); this.isLocal = DFSClient.isLocalAddress(NetUtils. createSocketAddr(datanodeID.getXferAddr())); this.peer = peer; this.datanodeID = datanodeID; = in; this.checksum = checksum; this.startOffset = Math.max( startOffset, 0 ); this.blockId = blockId; // The total number of bytes that we need to transfer from the DN is // the amount that the user wants (bytesToRead), plus the padding at // the beginning in order to chunk-align. Note that the DN may elect // to send more than this amount if the read starts/ends mid-chunk. this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); this.firstChunkOffset = firstChunkOffset; lastChunkOffset = firstChunkOffset; lastChunkLen = -1; bytesPerChecksum = this.checksum.getBytesPerChecksum(); checksumSize = this.checksum.getChecksumSize(); this.peerCache = peerCache; }
/** * Sends client SASL negotiation for general-purpose handshake. * * @param addr connection address * @param underlyingOut connection output stream * @param underlyingIn connection input stream * @param accessToken connection block access token * @param datanodeId ID of destination DataNode * @return new pair of streams, wrapped after SASL negotiation * @throws IOException for any error */ private IOStreamPair getSaslStreams(InetAddress addr, OutputStream underlyingOut, InputStream underlyingIn, Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId) throws IOException { Map<String, String> saslProps = saslPropsResolver.getClientProperties(addr); String userName = buildUserName(accessToken); char[] password = buildClientPassword(accessToken); CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName, password); return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps, callbackHandler); }
/** * Give an unused socket to the cache. */ public void put(DatanodeID dnId, Peer peer) { Preconditions.checkNotNull(dnId); Preconditions.checkNotNull(peer); if (peer.isClosed()) return; if (capacity <= 0) { // Cache disabled. IOUtils.cleanup(LOG, peer); return; } putInternal(dnId, peer); }
private synchronized void putInternal(DatanodeID dnId, Peer peer) { startExpiryDaemon(); if (capacity == multimap.size()) { evictOldest(); } multimap.put(new Key(dnId, peer.getDomainSocket() != null), new Value(peer, Time.monotonicNow())); }
/** Create a redirection URL */ private URL createRedirectURL(UserGroupInformation ugi, DatanodeID host, HttpServletRequest request, NameNode nn) throws IOException { final String hostname = host instanceof DatanodeInfo ? host.getHostName() : host.getIpAddr(); final String scheme = request.getScheme(); int port = host.getInfoPort(); if ("https".equals(scheme)) { final Integer portObject = (Integer) getServletContext().getAttribute( DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY); if (portObject != null) { port = portObject; } } final String encodedPath = ServletUtil.getRawPath(request, "/fileChecksum"); String dtParam = ""; if (UserGroupInformation.isSecurityEnabled()) { String tokenString = ugi.getTokens().iterator().next().encodeToUrlString(); dtParam = JspHelper.getDelegationTokenUrlParam(tokenString); } String addr = nn.getNameNodeAddressHostPortString(); String addrParam = JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, addr); return new URL(scheme, hostname, port, "/getFileChecksum" + encodedPath + '?' + "ugi=" + ServletUtil.encodeQueryValue(ugi.getShortUserName()) + dtParam + addrParam); }
public void processIncrementalBlockReport(final DatanodeID nodeID, final StorageReceivedDeletedBlocks srdb) throws IOException { writeLock(); try { blockManager.processIncrementalBlockReport(nodeID, srdb); } finally { writeUnlock(); } }
/** * Update a pipeline for a block under construction * * @param clientName the name of the client * @param oldBlock and old block * @param newBlock a new block with a new generation stamp and length * @param newNodes datanodes in the pipeline * @throws IOException if any error occurs */ void updatePipeline( String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs, boolean logRetryCache) throws IOException { checkOperation(OperationCategory.WRITE);"updatePipeline(" + oldBlock.getLocalBlock() + ", newGS=" + newBlock.getGenerationStamp() + ", newLength=" + newBlock.getNumBytes() + ", newNodes=" + Arrays.asList(newNodes) + ", client=" + clientName + ")"); waitForLoadingFSImage(); writeLock(); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Pipeline not updated"); assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and " + oldBlock + " has different block identifier"; updatePipelineInternal(clientName, oldBlock, newBlock, newNodes, newStorageIDs, logRetryCache); } finally { writeUnlock(); } getEditLog().logSync();"updatePipeline(" + oldBlock.getLocalBlock() + " => " + newBlock.getLocalBlock() + ") success"); }
/** * Create a OomAction with a CountdownConstraint * so that it throws OutOfMemoryError if the count is zero. */ public static ConstraintSatisfactionAction<DatanodeID, IOException> createCountdownOomAction( String currentTest, int i, int count) { return new ConstraintSatisfactionAction<DatanodeID, IOException>( new OomAction(currentTest, i), new CountdownConstraint(count)); }
@Override // DatanodeProtocol public void commitBlockSynchronization(ExtendedBlock block, long newgenerationstamp, long newlength, boolean closeFile, boolean deleteblock, DatanodeID[] newtargets, String[] newtargetstorages) throws IOException { checkNNStartup(); namesystem.commitBlockSynchronization(block, newgenerationstamp, newlength, closeFile, deleteblock, newtargets, newtargetstorages); }
/** Create a redirection URL */ private URL createRedirectURL(String path, String encodedPath, HdfsFileStatus status, UserGroupInformation ugi, ClientProtocol nnproxy, HttpServletRequest request, String dt) throws IOException { String scheme = request.getScheme(); final LocatedBlocks blks = nnproxy.getBlockLocations( status.getFullPath(new Path(path)).toUri().getPath(), 0, 1); final Configuration conf = NameNodeHttpServer.getConfFromContext( getServletContext()); final DatanodeID host = pickSrcDatanode(blks, status, conf); final String hostname; if (host instanceof DatanodeInfo) { hostname = host.getHostName(); } else { hostname = host.getIpAddr(); } int port = "https".equals(scheme) ? host.getInfoSecurePort() : host .getInfoPort(); String dtParam = ""; if (dt != null) { dtParam = JspHelper.getDelegationTokenUrlParam(dt); } // Add namenode address to the url params NameNode nn = NameNodeHttpServer.getNameNodeFromContext( getServletContext()); String addr = nn.getNameNodeAddressHostPortString(); String addrParam = JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, addr); return new URL(scheme, hostname, port, "/streamFile" + encodedPath + '?' + "ugi=" + ServletUtil.encodeQueryValue(ugi.getShortUserName()) + dtParam + addrParam); }
/** Select a datanode to service this request. * Currently, this looks at no more than the first five blocks of a file, * selecting a datanode randomly from the most represented. * @param conf */ private DatanodeID pickSrcDatanode(LocatedBlocks blks, HdfsFileStatus i, Configuration conf) throws IOException { if (i.getLen() == 0 || blks.getLocatedBlocks().size() <= 0) { // pick a random datanode NameNode nn = NameNodeHttpServer.getNameNodeFromContext( getServletContext()); return NamenodeJspHelper.getRandomDatanode(nn); } return JspHelper.bestNode(blks, conf); }
public final void processCacheReport(final DatanodeID datanodeID, final List<Long> blockIds) throws IOException { namesystem.writeLock(); final long startTime = Time.monotonicNow(); final long endTime; try { final DatanodeDescriptor datanode = blockManager.getDatanodeManager().getDatanode(datanodeID); if (datanode == null || !datanode.isAlive) { throw new IOException( "processCacheReport from dead or unregistered datanode: " + datanode); } processCacheReportImpl(datanode, blockIds); } finally { endTime = Time.monotonicNow(); namesystem.writeUnlock(); } // Log the block report processing stats from Namenode perspective final NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); if (metrics != null) { metrics.addCacheBlockReport((int) (endTime - startTime)); } LOG.debug("Processed cache report from {}, blocks: {}, " + "processing time: {} msecs", datanodeID, blockIds.size(), (endTime - startTime)); }
@Override public void run(DatanodeID id) throws IOException { final DataTransferTest test = getDataTransferTest(); if (test.isNotSuccessAndLastPipelineContains(index, id)) { final String s = toString(id);; throw new IOException(s); } }
@VisibleForTesting public DatanodeRegistration(String uuid, DatanodeRegistration dnr) { this(new DatanodeID(uuid, dnr), dnr.getStorageInfo(), dnr.getExportedKeys(), dnr.getSoftwareVersion()); }
private static void runCallWritePacketToDisk(String methodName, int errorIndex, Action<DatanodeID, IOException> a) throws IOException {"Running " + methodName + " ..."); final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest(); t.fiCallWritePacketToDisk.set(a); t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, errorIndex)); write1byte(methodName); Assert.assertTrue(t.isSuccess()); }
private static void runPipelineCloseAck(String name, int i, DataNodeAction a ) throws IOException {"Running " + name + " ..."); final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest(); final MarkerConstraint marker = new MarkerConstraint(name); t.fiPipelineClose.set(new DatanodeMarkingAction(name, i, marker)); t.fiPipelineAck.set(new ConstraintSatisfactionAction<DatanodeID, IOException>(a, marker)); TestFiDataTransferProtocol.write1byte(name); }
/** * @param nodeReg DatanodeID to update registration for. */ @Override public void updateRegInfo(DatanodeID nodeReg) { super.updateRegInfo(nodeReg); // must re-process IBR after re-registration for(DatanodeStorageInfo storage : getStorageInfos()) { storage.setBlockReportCount(0); } heartbeatedSinceRegistration = false; }
BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, ReplicaRecoveryInfo rInfo) { = id; this.datanode = datanode; this.rInfo = rInfo; }