/** * go to each block on the 2nd DataNode until it fails... * @param path * @param size * @throws IOException */ private void triggerFailure(String path, long size) throws IOException { NamenodeProtocols nn = cluster.getNameNodeRpc(); List<LocatedBlock> locatedBlocks = nn.getBlockLocations(path, 0, size).getLocatedBlocks(); for (LocatedBlock lb : locatedBlocks) { DatanodeInfo dinfo = lb.getLocations()[1]; ExtendedBlock b = lb.getBlock(); try { accessBlock(dinfo, lb); } catch (IOException e) { System.out.println("Failure triggered, on block: " + b.getBlockId() + "; corresponding volume should be removed by now"); break; } } }
static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src, EnumSet<CreateFlag> flags, int bufferSize, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes) throws IOException { TraceScope scope = dfsClient.getPathTraceScope("newStreamForAppend", src); try { final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags, progress, lastBlock, stat, checksum); if (favoredNodes != null && favoredNodes.length != 0) { out.streamer.setFavoredNodes(favoredNodes); } out.start(); return out; } finally { scope.close(); } }
private LocatedBlocks makeBadBlockList(LocatedBlocks goodBlockList) { LocatedBlock goodLocatedBlock = goodBlockList.get(0); LocatedBlock badLocatedBlock = new LocatedBlock( goodLocatedBlock.getBlock(), new DatanodeInfo[] { DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234) }, goodLocatedBlock.getStartOffset(), false); List<LocatedBlock> badBlocks = new ArrayList<LocatedBlock>(); badBlocks.add(badLocatedBlock); return new LocatedBlocks(goodBlockList.getFileLength(), false, badBlocks, null, true, null); }
/** Test to ensure metrics reflects missing blocks */ @Test public void testMissingBlock() throws Exception { // Create a file with single block with two replicas Path file = getTestPath("testMissingBlocks"); createFile(file, 100, (short)1); // Corrupt the only replica of the block to result in a missing block LocatedBlock block = NameNodeAdapter.getBlockLocations( cluster.getNameNode(), file.toString(), 0, 1).get(0); cluster.getNamesystem().writeLock(); try { bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0], "STORAGE_ID", "TEST"); } finally { cluster.getNamesystem().writeUnlock(); } updateMetrics(); MetricsRecordBuilder rb = getMetrics(NS_METRICS); assertGauge("UnderReplicatedBlocks", 1L, rb); assertGauge("MissingBlocks", 1L, rb); assertGauge("MissingReplOneBlocks", 1L, rb); fs.delete(file, true); waitForDnMetricValue(NS_METRICS, "UnderReplicatedBlocks", 0L); }
private ExtendedBlock addBlocks(String fileName, String clientName) throws IOException { ExtendedBlock prevBlock = null; for(int jdx = 0; jdx < blocksPerFile; jdx++) { LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName, prevBlock, null, INodeId.GRANDFATHER_INODE_ID, null); prevBlock = loc.getBlock(); for(DatanodeInfo dnInfo : loc.getLocations()) { int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getXferAddr()); datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock()); ReceivedDeletedBlockInfo[] rdBlocks = { new ReceivedDeletedBlockInfo( loc.getBlock().getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) }; StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( datanodes[dnIdx].storage.getStorageID(), rdBlocks) }; nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc .getBlock().getBlockPoolId(), report); } } return prevBlock; }
private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode, final LocatedBlock block, final long start, final long end, final ByteBuffer bb, final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, final int hedgedReadId) { final Span parentSpan = Trace.currentSpan(); return new Callable<ByteBuffer>() { @Override public ByteBuffer call() throws Exception { byte[] buf = bb.array(); int offset = bb.position(); TraceScope scope = Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan); try { actualGetFromOneDataNode(datanode, block, start, end, buf, offset, corruptedBlockMap); return bb; } finally { scope.close(); } } }; }
/** * DFSInputStream reports checksum failure. * Case I : client has tried multiple data nodes and at least one of the * attempts has succeeded. We report the other failures as corrupted block to * namenode. * Case II: client has tried out all data nodes, but all failed. We * only report if the total number of replica is 1. We do not * report otherwise since this maybe due to the client is a handicapped client * (who can not read). * @param corruptedBlockMap map of corrupted blocks * @param dataNodeCount number of data nodes who contains the block replicas */ private void reportCheckSumFailure( Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, int dataNodeCount) { if (corruptedBlockMap.isEmpty()) { return; } Iterator<Entry<ExtendedBlock, Set<DatanodeInfo>>> it = corruptedBlockMap .entrySet().iterator(); Entry<ExtendedBlock, Set<DatanodeInfo>> entry = it.next(); ExtendedBlock blk = entry.getKey(); Set<DatanodeInfo> dnSet = entry.getValue(); if (((dnSet.size() < dataNodeCount) && (dnSet.size() > 0)) || ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) { DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()]; int i = 0; for (DatanodeInfo dn:dnSet) { locs[i++] = dn; } LocatedBlock [] lblocks = { new LocatedBlock(blk, locs) }; dfsClient.reportChecksumFailure(src, lblocks); } corruptedBlockMap.clear(); }
/** * Connect to the given datanode's datantrasfer port, and return * the resulting IOStreamPair. This includes encryption wrapping, etc. */ private IOStreamPair connectToDN(DatanodeInfo dn, int timeout, LocatedBlock lb) throws IOException { boolean success = false; Socket sock = null; try { sock = socketFactory.createSocket(); String dnAddr = dn.getXferAddr(getConf().connectToDnViaHostname); if (LOG.isDebugEnabled()) { LOG.debug("Connecting to datanode " + dnAddr); } NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout); sock.setSoTimeout(timeout); OutputStream unbufOut = NetUtils.getOutputStream(sock); InputStream unbufIn = NetUtils.getInputStream(sock); IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this, lb.getBlockToken(), dn); success = true; return ret; } finally { if (!success) { IOUtils.closeSocket(sock); } } }
/** * Helper method to combine a list of {@link LocatedBlock} with associated * {@link VolumeId} information to form a list of {@link BlockStorageLocation} * . */ static BlockStorageLocation[] convertToVolumeBlockLocations( List<LocatedBlock> blocks, Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException { // Construct the final return value of VolumeBlockLocation[] BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks); List<BlockStorageLocation> volumeBlockLocs = new ArrayList<BlockStorageLocation>(locations.length); for (int i = 0; i < locations.length; i++) { LocatedBlock locBlock = blocks.get(i); List<VolumeId> volumeIds = blockVolumeIds.get(locBlock); BlockStorageLocation bsLoc = new BlockStorageLocation(locations[i], volumeIds.toArray(new VolumeId[0])); volumeBlockLocs.add(bsLoc); } return volumeBlockLocs.toArray(new BlockStorageLocation[] {}); }
/** * The client would like to obtain an additional block for the indicated * filename (which is being written-to). Return an array that consists * of the block, plus a set of machines. The first on this list should * be where the client writes data. Subsequent items in the list must * be provided in the connection to the first datanode. * * Make sure the previous blocks have been reported by datanodes and * are replicated. Will return an empty 2-elt array if we want the * client to "try again later". */ LocatedBlock getAdditionalBlock(String src, long fileId, String clientName, ExtendedBlock previous, Set<Node> excludedNodes, List<String> favoredNodes) throws IOException { LocatedBlock[] onRetryBlock = new LocatedBlock[1]; DatanodeStorageInfo targets[] = getNewBlockTargets(src, fileId, clientName, previous, excludedNodes, favoredNodes, onRetryBlock); if (targets == null) { assert onRetryBlock[0] != null : "Retry block is null"; // This is a retry. Just return the last block. return onRetryBlock[0]; } LocatedBlock newBlock = storeAllocatedBlock( src, fileId, clientName, previous, targets); return newBlock; }
@Override public LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId, String[] favoredNodes) throws IOException { checkNNStartup(); if (stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src + " fileId=" + fileId + " for " + clientName); } Set<Node> excludedNodesSet = null; if (excludedNodes != null) { excludedNodesSet = new HashSet<Node>(excludedNodes.length); for (Node node : excludedNodes) { excludedNodesSet.add(node); } } List<String> favoredNodesList = (favoredNodes == null) ? null : Arrays.asList(favoredNodes); LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId, clientName, previous, excludedNodesSet, favoredNodesList); if (locatedBlock != null) metrics.incrAddBlockOps(); return locatedBlock; }
private static ExtendedBlock[][] generateBlocks(Suite s, long size ) throws IOException, InterruptedException, TimeoutException { final ExtendedBlock[][] blocks = new ExtendedBlock[s.clients.length][]; for(int n = 0; n < s.clients.length; n++) { final long fileLen = size/s.replication; createFile(s, n, fileLen); final List<LocatedBlock> locatedBlocks = s.clients[n].getBlockLocations( FILE_NAME, 0, fileLen).getLocatedBlocks(); final int numOfBlocks = locatedBlocks.size(); blocks[n] = new ExtendedBlock[numOfBlocks]; for(int i = 0; i < numOfBlocks; i++) { final ExtendedBlock b = locatedBlocks.get(i).getBlock(); blocks[n][i] = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(), b.getNumBytes(), b.getGenerationStamp()); } } return blocks; }
private void testPlacement(String clientMachine, String clientRack) throws IOException { // write 5 files and check whether all times block placed for (int i = 0; i < 5; i++) { String src = "/test-" + i; // Create the file with client machine HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false); LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, null, null, fileStatus.getFileId(), null); assertEquals("Block should be allocated sufficient locations", REPLICATION_FACTOR, locatedBlock.getLocations().length); if (clientRack != null) { assertEquals("First datanode should be rack local", clientRack, locatedBlock.getLocations()[0].getNetworkLocation()); } nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(), src, clientMachine); } }
static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy( DatanodeID datanodeid, Configuration conf, int socketTimeout, boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException { final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname); InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr); if (LOG.isDebugEnabled()) { LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr); } // Since we're creating a new UserGroupInformation here, we know that no // future RPC proxies will be able to re-use the same connection. And // usages of this proxy tend to be one-off calls. // // This is a temporary fix: callers should really achieve this by using // RPC.stopProxy() on the resulting object, but this is currently not // working in trunk. See the discussion on HDFS-1965. Configuration confWithNoIpcIdle = new Configuration(conf); confWithNoIpcIdle.setInt(CommonConfigurationKeysPublic .IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0); UserGroupInformation ticket = UserGroupInformation .createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString()); ticket.addToken(locatedBlock.getBlockToken()); return createClientDatanodeProtocolProxy(addr, ticket, confWithNoIpcIdle, NetUtils.getDefaultSocketFactory(conf), socketTimeout); }
@Override public AddBlockResponseProto addBlock(RpcController controller, AddBlockRequestProto req) throws ServiceException { try { List<DatanodeInfoProto> excl = req.getExcludeNodesList(); List<String> favor = req.getFavoredNodesList(); LocatedBlock result = server.addBlock( req.getSrc(), req.getClientName(), req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null, (excl == null || excl.size() == 0) ? null : PBHelper.convert(excl .toArray(new DatanodeInfoProto[excl.size()])), req.getFileId(), (favor == null || favor.size() == 0) ? null : favor .toArray(new String[favor.size()])); return AddBlockResponseProto.newBuilder() .setBlock(PBHelper.convert(result)).build(); } catch (IOException e) { throw new ServiceException(e); } }
@Override public LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes) throws AccessControlException, FileNotFoundException, NotReplicatedYetException, SafeModeException, UnresolvedLinkException, IOException { AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder() .setSrc(src).setClientName(clientName).setFileId(fileId); if (previous != null) req.setPrevious(PBHelper.convert(previous)); if (excludeNodes != null) req.addAllExcludeNodes(PBHelper.convert(excludeNodes)); if (favoredNodes != null) { req.addAllFavoredNodes(Arrays.asList(favoredNodes)); } try { return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Override public LocatedBlock getAdditionalDatanode(String src, long fileId, ExtendedBlock blk, DatanodeInfo[] existings, String[] existingStorageIDs, DatanodeInfo[] excludes, int numAdditionalNodes, String clientName) throws AccessControlException, FileNotFoundException, SafeModeException, UnresolvedLinkException, IOException { GetAdditionalDatanodeRequestProto req = GetAdditionalDatanodeRequestProto .newBuilder() .setSrc(src) .setFileId(fileId) .setBlk(PBHelper.convert(blk)) .addAllExistings(PBHelper.convert(existings)) .addAllExistingStorageUuids(Arrays.asList(existingStorageIDs)) .addAllExcludes(PBHelper.convert(excludes)) .setNumAdditionalNodes(numAdditionalNodes) .setClientName(clientName) .build(); try { return PBHelper.convert(rpcProxy.getAdditionalDatanode(null, req) .getBlock()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Test public void testAddBlockRetryShouldReturnBlockWithLocations() throws Exception { final String src = "/testAddBlockRetryShouldReturnBlockWithLocations"; NamenodeProtocols nameNodeRpc = cluster.getNameNodeRpc(); // create file nameNodeRpc.create(src, FsPermission.getFileDefault(), "clientName", new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, (short) 3, 1024, null); // start first addBlock() LOG.info("Starting first addBlock for " + src); LocatedBlock lb1 = nameNodeRpc.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID, null); assertTrue("Block locations should be present", lb1.getLocations().length > 0); cluster.restartNameNode(); nameNodeRpc = cluster.getNameNodeRpc(); LocatedBlock lb2 = nameNodeRpc.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID, null); assertEquals("Blocks are not equal", lb1.getBlock(), lb2.getBlock()); assertTrue("Wrong locations with retry", lb2.getLocations().length > 0); }
private void verifyFile(final Path parent, final HdfsFileStatus status, final Byte expectedPolicyId) throws Exception { HdfsLocatedFileStatus fileStatus = (HdfsLocatedFileStatus) status; byte policyId = fileStatus.getStoragePolicy(); BlockStoragePolicy policy = policies.getPolicy(policyId); if (expectedPolicyId != null) { Assert.assertEquals((byte)expectedPolicyId, policy.getId()); } final List<StorageType> types = policy.chooseStorageTypes( status.getReplication()); for(LocatedBlock lb : fileStatus.getBlockLocations().getLocatedBlocks()) { final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types, lb.getStorageTypes()); Assert.assertTrue(fileStatus.getFullName(parent.toString()) + " with policy " + policy + " has non-empty overlap: " + diff + ", the corresponding block is " + lb.getBlock().getLocalBlock(), diff.removeOverlap(true)); } }
private Replication getOrVerifyReplication(Path file, Replication expected) throws IOException { final List<LocatedBlock> lbs = dfs.getClient().getLocatedBlocks( file.toString(), 0).getLocatedBlocks(); Assert.assertEquals(1, lbs.size()); LocatedBlock lb = lbs.get(0); StringBuilder types = new StringBuilder(); final Replication r = new Replication(); for(StorageType t : lb.getStorageTypes()) { types.append(t).append(", "); if (t == StorageType.DISK) { r.disk++; } else if (t == StorageType.ARCHIVE) { r.archive++; } else { Assert.fail("Unexpected storage type " + t); } } if (expected != null) { final String s = "file = " + file + "\n types = [" + types + "]"; Assert.assertEquals(s, expected, r); } return r; }
@Override void prepare() throws Exception { final Path filePath = new Path(file); DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0); // append to the file and leave the last block under construction out = this.client.append(file, BlockSize, EnumSet.of(CreateFlag.APPEND), null, null); byte[] appendContent = new byte[100]; new Random().nextBytes(appendContent); out.write(appendContent); ((HdfsDataOutputStream) out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); LocatedBlocks blks = dfs.getClient() .getLocatedBlocks(file, BlockSize + 1); assertEquals(1, blks.getLocatedBlocks().size()); nodes = blks.get(0).getLocations(); oldBlock = blks.get(0).getBlock(); LocatedBlock newLbk = client.getNamenode().updateBlockForPipeline( oldBlock, client.getClientName()); newBlock = new ExtendedBlock(oldBlock.getBlockPoolId(), oldBlock.getBlockId(), oldBlock.getNumBytes(), newLbk.getBlock().getGenerationStamp()); }
/** Create a {@link ClientDatanodeProtocol} proxy */ public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( DatanodeID datanodeid, Configuration conf, int socketTimeout, boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException { return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout, connectToDnViaHostname, locatedBlock); }
@Test (timeout=300000) public void testRetainBlockOnPersistentStorage() throws Exception { cluster = new MiniDFSCluster .Builder(CONF) .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT }) .numDataNodes(1) .build(); try { cluster.waitActive(); DataNode dataNode = cluster.getDataNodes().get(0); bpid = cluster.getNamesystem().getBlockPoolId(); fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0)); client = cluster.getFileSystem().getClient(); scanner = new DirectoryScanner(dataNode, fds, CONF); scanner.setRetainDiffs(true); FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0)); // Add a file with 1 block List<LocatedBlock> blocks = createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, false); // Ensure no difference between volumeMap and disk. scan(1, 0, 0, 0, 0, 0); // Make a copy of the block on RAM_DISK and ensure that it is // picked up by the scanner. duplicateBlock(blocks.get(0).getBlock().getBlockId()); scan(2, 1, 0, 0, 0, 0, 1); verifyStorageType(blocks.get(0).getBlock().getBlockId(), false); scan(1, 0, 0, 0, 0, 0); } finally { if (scanner != null) { scanner.shutdown(); scanner = null; } cluster.shutdown(); cluster = null; } }
public static LocatedBlock getLastLocatedBlock( ClientProtocol namenode, String src) throws IOException { //get block info for the last block LocatedBlocks locations = namenode.getBlockLocations(src, 0, Long.MAX_VALUE); List<LocatedBlock> blocks = locations.getLocatedBlocks(); DataNode.LOG.info("blocks.size()=" + blocks.size()); assertTrue(blocks.size() > 0); return blocks.get(blocks.size() - 1); }
/** * Convert a List<LocatedBlock> to BlockLocation[] * @param blocks A List<LocatedBlock> to be converted * @return converted array of BlockLocation */ public static BlockLocation[] locatedBlocks2Locations(List<LocatedBlock> blocks) { if (blocks == null) { return new BlockLocation[0]; } int nrBlocks = blocks.size(); BlockLocation[] blkLocations = new BlockLocation[nrBlocks]; if (nrBlocks == 0) { return blkLocations; } int idx = 0; for (LocatedBlock blk : blocks) { assert idx < nrBlocks : "Incorrect index"; DatanodeInfo[] locations = blk.getLocations(); String[] hosts = new String[locations.length]; String[] xferAddrs = new String[locations.length]; String[] racks = new String[locations.length]; for (int hCnt = 0; hCnt < locations.length; hCnt++) { hosts[hCnt] = locations[hCnt].getHostName(); xferAddrs[hCnt] = locations[hCnt].getXferAddr(); NodeBase node = new NodeBase(xferAddrs[hCnt], locations[hCnt].getNetworkLocation()); racks[hCnt] = node.toString(); } DatanodeInfo[] cachedLocations = blk.getCachedLocations(); String[] cachedHosts = new String[cachedLocations.length]; for (int i=0; i<cachedLocations.length; i++) { cachedHosts[i] = cachedLocations[i].getHostName(); } blkLocations[idx] = new BlockLocation(xferAddrs, hosts, cachedHosts, racks, blk.getStartOffset(), blk.getBlockSize(), blk.isCorrupt()); idx++; } return blkLocations; }
@Test public void testConvertLocatedBlockArray() { LocatedBlock [] lbl = new LocatedBlock[3]; for (int i=0;i<3;i++) { lbl[i] = createLocatedBlock(); } LocatedBlockProto [] lbpl = PBHelper.convertLocatedBlock(lbl); LocatedBlock [] lbl2 = PBHelper.convertLocatedBlock(lbpl); assertEquals(lbl.length, lbl2.length); for (int i=0;i<lbl.length;i++) { compare(lbl[i], lbl2[i]); } }
/** * Get block at the specified position. * Fetch it from the namenode if not cached. * * @param offset block corresponding to this offset in file is returned * @return located block * @throws IOException */ private LocatedBlock getBlockAt(long offset) throws IOException { synchronized(infoLock) { assert (locatedBlocks != null) : "locatedBlocks is null"; final LocatedBlock blk; //check offset if (offset < 0 || offset >= getFileLength()) { throw new IOException("offset < 0 || offset >= getFileLength(), offset=" + offset + ", locatedBlocks=" + locatedBlocks); } else if (offset >= locatedBlocks.getFileLength()) { // offset to the portion of the last block, // which is not known to the name-node yet; // getting the last block blk = locatedBlocks.getLastLocatedBlock(); } else { // search cached blocks first int targetBlockIdx = locatedBlocks.findBlock(offset); if (targetBlockIdx < 0) { // block is not cached targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); // fetch more blocks final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset); assert (newBlocks != null) : "Could not find target position " + offset; locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); } blk = locatedBlocks.get(targetBlockIdx); } return blk; } }
/** * Get blocks in the specified range. * Fetch them from the namenode if not cached. This function * will not get a read request beyond the EOF. * @param offset starting offset in file * @param length length of data * @return consequent segment of located blocks * @throws IOException */ private List<LocatedBlock> getBlockRange(long offset, long length) throws IOException { // getFileLength(): returns total file length // locatedBlocks.getFileLength(): returns length of completed blocks if (offset >= getFileLength()) { throw new IOException("Offset: " + offset + " exceeds file length: " + getFileLength()); } synchronized(infoLock) { final List<LocatedBlock> blocks; final long lengthOfCompleteBlk = locatedBlocks.getFileLength(); final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk; final boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk; if (readOffsetWithinCompleteBlk) { //get the blocks of finalized (completed) block range blocks = getFinalizedBlockRange(offset, Math.min(length, lengthOfCompleteBlk - offset)); } else { blocks = new ArrayList<LocatedBlock>(1); } // get the blocks from incomplete block range if (readLengthPastCompleteBlk) { blocks.add(locatedBlocks.getLastLocatedBlock()); } return blocks; } }
/** * Get the best node from which to stream the data. * @param block LocatedBlock, containing nodes in priority order. * @param ignoredNodes Do not choose nodes in this array (may be null) * @return The DNAddrPair of the best node. * @throws IOException */ private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block, Collection<DatanodeInfo> ignoredNodes) throws IOException { DatanodeInfo[] nodes = block.getLocations(); StorageType[] storageTypes = block.getStorageTypes(); DatanodeInfo chosenNode = null; StorageType storageType = null; if (nodes != null) { for (int i = 0; i < nodes.length; i++) { if (!deadNodes.containsKey(nodes[i]) && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) { chosenNode = nodes[i]; // Storage types are ordered to correspond with nodes, so use the same // index to get storage type. if (storageTypes != null && i < storageTypes.length) { storageType = storageTypes[i]; } break; } } } if (chosenNode == null) { throw new IOException("No live nodes contain block " + block.getBlock() + " after checking nodes = " + Arrays.toString(nodes) + ", ignoredNodes = " + ignoredNodes); } final String dnAddr = chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Connecting to datanode " + dnAddr); } InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); return new DNAddrPair(chosenNode, targetAddr, storageType); }
/** * Creates and closes a file of certain length. * Calls append to allow next write() operation to add to the end of it * After write() invocation, calls hflush() to make sure that data sunk through * the pipeline and check the state of the last block's replica. * It supposes to be in RBW state * * @throws IOException in case of an error */ @Test public void pipeline_01() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); if(LOG.isDebugEnabled()) { LOG.debug("Running " + METHOD_NAME); } Path filePath = new Path("/" + METHOD_NAME + ".dat"); DFSTestUtil.createFile(fs, filePath, FILE_SIZE, REPL_FACTOR, rand.nextLong()); if(LOG.isDebugEnabled()) { LOG.debug("Invoking append but doing nothing otherwise..."); } FSDataOutputStream ofs = fs.append(filePath); ofs.writeBytes("Some more stuff to write"); ((DFSOutputStream) ofs.getWrappedStream()).hflush(); List<LocatedBlock> lb = cluster.getNameNodeRpc().getBlockLocations( filePath.toString(), FILE_SIZE - 1, FILE_SIZE).getLocatedBlocks(); String bpid = cluster.getNamesystem().getBlockPoolId(); for (DataNode dn : cluster.getDataNodes()) { Replica r = DataNodeTestUtils.fetchReplicaInfo(dn, bpid, lb.get(0) .getBlock().getBlockId()); assertTrue("Replica on DN " + dn + " shouldn't be null", r != null); assertEquals("Should be RBW replica on " + dn + " after sequence of calls append()/write()/hflush()", HdfsServerConstants.ReplicaState.RBW, r.getState()); } ofs.close(); }
private void compare(LocatedBlock expected, LocatedBlock actual) { assertEquals(expected.getBlock(), actual.getBlock()); compare(expected.getBlockToken(), actual.getBlockToken()); assertEquals(expected.getStartOffset(), actual.getStartOffset()); assertEquals(expected.isCorrupt(), actual.isCorrupt()); DatanodeInfo [] ei = expected.getLocations(); DatanodeInfo [] ai = actual.getLocations(); assertEquals(ei.length, ai.length); for (int i = 0; i < ei.length ; i++) { compare(ei[i], ai[i]); } }
private void assertBlocks(BlockManager bm, LocatedBlocks lbs, boolean exist) { for (LocatedBlock locatedBlock : lbs.getLocatedBlocks()) { if (exist) { assertTrue(bm.getStoredBlock(locatedBlock.getBlock(). getLocalBlock()) != null); } else { assertTrue(bm.getStoredBlock(locatedBlock.getBlock(). getLocalBlock()) == null); } } }
void reportChecksumFailure(String file, LocatedBlock lblocks[]) { try { reportBadBlocks(lblocks); } catch (IOException ie) { LOG.info("Found corruption while reading " + file + ". Error repairing corrupt blocks. Bad blocks remain.", ie); } }
private Set<ExtendedBlock> getBlocksOnRack(List<LocatedBlock> blks, String rack) { Set<ExtendedBlock> ret = new HashSet<ExtendedBlock>(); for (LocatedBlock blk : blks) { for (DatanodeInfo di : blk.getLocations()) { if (rack.equals(NetworkTopology.getFirstHalf(di.getNetworkLocation()))) { ret.add(blk.getBlock()); break; } } } return ret; }
/** Initialize the pipeline. */ @Override public synchronized Pipeline initPipeline(LocatedBlock lb) { final Pipeline pl = new Pipeline(lb); if (pipelines.contains(pl)) { throw new IllegalStateException("thepipeline != null"); } pipelines.add(pl); return pl; }
private void waitForBlockReplication(String filename, ClientProtocol namenode, int expected, long maxWaitSec) throws IOException { long start = Time.monotonicNow(); //wait for all the blocks to be replicated; LOG.info("Checking for block replication for " + filename); while (true) { boolean replOk = true; LocatedBlocks blocks = namenode.getBlockLocations(filename, 0, Long.MAX_VALUE); for (Iterator<LocatedBlock> iter = blocks.getLocatedBlocks().iterator(); iter.hasNext();) { LocatedBlock block = iter.next(); int actual = block.getLocations().length; if ( actual < expected ) { LOG.info("Not enough replicas for " + block.getBlock() + " yet. Expecting " + expected + ", got " + actual + "."); replOk = false; break; } } if (replOk) { return; } if (maxWaitSec > 0 && (Time.monotonicNow() - start) > (maxWaitSec * 1000)) { throw new IOException("Timedout while waiting for all blocks to " + " be replicated for " + filename); } try { Thread.sleep(500); } catch (InterruptedException ignored) {} } }
/** Test that timeout occurs when DN does not respond to RPC. * Start up a server and ask it to sleep for n seconds. Make an * RPC to the server and set rpcTimeout to less than n and ensure * that socketTimeoutException is obtained */ @Test public void testClientDNProtocolTimeout() throws IOException { final Server server = new TestServer(1, true); server.start(); final InetSocketAddress addr = NetUtils.getConnectAddress(server); DatanodeID fakeDnId = DFSTestUtil.getLocalDatanodeID(addr.getPort()); ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L)); LocatedBlock fakeBlock = new LocatedBlock(b, new DatanodeInfo[0]); ClientDatanodeProtocol proxy = null; try { proxy = DFSUtil.createClientDatanodeProtocolProxy( fakeDnId, conf, 500, false, fakeBlock); proxy.getReplicaVisibleLength(new ExtendedBlock("bpid", 1)); fail ("Did not get expected exception: SocketTimeoutException"); } catch (SocketTimeoutException e) { LOG.info("Got the expected Exception: SocketTimeoutException"); } finally { if (proxy != null) { RPC.stopProxy(proxy); } server.stop(); } }
/** create a file with a length of <code>fileLen</code> */ private List<LocatedBlock> createFile(String fileNamePrefix, long fileLen, boolean isLazyPersist) throws IOException { FileSystem fs = cluster.getFileSystem(); Path filePath = new Path("/" + fileNamePrefix + ".dat"); DFSTestUtil.createFile( fs, filePath, isLazyPersist, 1024, fileLen, BLOCK_LENGTH, (short) 1, r.nextLong(), false); return client.getLocatedBlocks(filePath.toString(), 0, fileLen).getLocatedBlocks(); }
private Block findBlock(Path path, long size) throws IOException { Block ret; List<LocatedBlock> lbs = cluster.getNameNodeRpc() .getBlockLocations(path.toString(), FILE_START, size).getLocatedBlocks(); LocatedBlock lb = lbs.get(lbs.size() - 1); // Get block from the first DN ret = cluster.getDataNodes().get(DN_N0). data.getStoredBlock(lb.getBlock() .getBlockPoolId(), lb.getBlock().getBlockId()); return ret; }
private static void checkEquals(LocatedBlocks l1, LocatedBlocks l2) { List<LocatedBlock> list1 = l1.getLocatedBlocks(); List<LocatedBlock> list2 = l2.getLocatedBlocks(); assertEquals(list1.size(), list2.size()); for (int i = 0; i < list1.size(); i++) { LocatedBlock b1 = list1.get(i); LocatedBlock b2 = list2.get(i); assertEquals(b1.getBlock(), b2.getBlock()); assertEquals(b1.getBlockSize(), b2.getBlockSize()); } }