/** * Count the number of live replicas of each parity block in the raided file * If any stripe has not enough parity block replicas, add the stripe to * raidEncodingTasks to schedule encoding. * If forceAdd is true, we always add the stripe to raidEncodingTasks * without checking * @param sourceINode * @param raidTasks * @param fs * @param forceAdd * @return true if all parity blocks of the file have enough replicas * @throws IOException */ public boolean checkRaidProgress(INodeFile sourceINode, LightWeightLinkedSet<RaidBlockInfo> raidEncodingTasks, FSNamesystem fs, boolean forceAdd) throws IOException { boolean result = true; BlockInfo[] blocks = sourceINode.getBlocks(); for (int i = 0; i < blocks.length; i += numStripeBlocks) { boolean hasParity = true; if (!forceAdd) { for (int j = 0; j < numParityBlocks; j++) { if (fs.countLiveNodes(blocks[i + j]) < this.parityReplication) { hasParity = false; break; } } } if (!hasParity || forceAdd) { raidEncodingTasks.add(new RaidBlockInfo(blocks[i], parityReplication, i)); result = false; } } return result; }
/** * Return the number of nodes hosting a given block, grouped * by the state of those replicas. */ public NumberReplicas countNodes(Block b) { int decommissioned = 0; int live = 0; int corrupt = 0; int excess = 0; int stale = 0; Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b); for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) { corrupt++; } else if (node.isDecommissionInProgress() || node.isDecommissioned()) { decommissioned++; } else { LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node .getDatanodeUuid()); if (blocksExcess != null && blocksExcess.contains(b)) { excess++; } else { live++; } } if (storage.areBlockContentsStale()) { stale++; } } return new NumberReplicas(live, decommissioned, corrupt, excess, stale); }
/** Create an object. */ UnderReplicatedBlocks() { for (int i = 0; i < LEVEL; i++) { priorityQueues.add(new LightWeightLinkedSet<Block>()); priorityToReplIdx.put(i, 0); } }
/** Check if a block is in the neededReplication queue */ synchronized boolean contains(Block block) { for(LightWeightLinkedSet<Block> set : priorityQueues) { if (set.contains(block)) { return true; } } return false; }
/** Check if a block is in the neededReplication queue */ synchronized boolean contains(BlockInfo block) { for(LightWeightLinkedSet<BlockInfo> set : priorityQueues) { if (set.contains(block)) { return true; } } return false; }
/** * Get a list of block lists to be replicated. The index of block lists * represents its replication priority. Iterates each block list in priority * order beginning with the highest priority list. Iterators use a bookmark to * resume where the previous iteration stopped. Returns when the block count * is met or iteration reaches the end of the lowest priority list, in which * case bookmarks for each block list are reset to the heads of their * respective lists. * * @param blocksToProcess - number of blocks to fetch from underReplicated * blocks. * @return Return a list of block lists to be replicated. The block list index * represents its replication priority. */ synchronized List<List<BlockInfo>> chooseUnderReplicatedBlocks( int blocksToProcess) { final List<List<BlockInfo>> blocksToReplicate = new ArrayList<>(LEVEL); int count = 0; int priority = 0; for (; count < blocksToProcess && priority < LEVEL; priority++) { if (priority == QUEUE_WITH_CORRUPT_BLOCKS) { // do not choose corrupted blocks. continue; } // Go through all blocks that need replications with current priority. // Set the iterator to the first unprocessed block at this priority level. final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark(); final List<BlockInfo> blocks = new LinkedList<>(); blocksToReplicate.add(blocks); // Loop through all remaining blocks in the list. for(; count < blocksToProcess && i.hasNext(); count++) { blocks.add(i.next()); } } if (priority == LEVEL) { // Reset all bookmarks because there were no recently added blocks. for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) { q.resetBookmark(); } } return blocksToReplicate; }
/** return an iterator of all the under replication blocks */ @Override public synchronized Iterator<BlockInfo> iterator() { final Iterator<LightWeightLinkedSet<BlockInfo>> q = priorityQueues.iterator(); return new Iterator<BlockInfo>() { private Iterator<BlockInfo> b = q.next().iterator(); @Override public BlockInfo next() { hasNext(); return b.next(); } @Override public boolean hasNext() { for(; !b.hasNext() && q.hasNext(); ) { b = q.next().iterator(); } return b.hasNext(); } @Override public void remove() { throw new UnsupportedOperationException(); } }; }
public void testLightWeightLinkedSetBenchmark() { LOG.info("Test LIGHTWEIGHT_LINKED_SET"); queueL = new LightWeightLinkedSet<Block>(); insertBlocks(true); containsBlocks(true); removeBlocks(true); }
protected void setUp() { float maxF = LightWeightLinkedSet.rMaxLoadFactor; float minF = LightWeightLinkedSet.rMinLoadFactor; int initCapacity = LightWeightLinkedSet.MINIMUM_CAPACITY; rand = new Random(System.currentTimeMillis()); list.clear(); for (int i = 0; i < NUM; i++) { list.add(rand.nextInt()); } set = new LightWeightLinkedSet<Integer>(initCapacity, maxF, minF); }
/** * Return the number of nodes hosting a given block, grouped * by the state of those replicas. */ public NumberReplicas countNodes(Block b) { int decommissioned = 0; int live = 0; int corrupt = 0; int excess = 0; int stale = 0; Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b); Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b); while (nodeIter.hasNext()) { DatanodeDescriptor node = nodeIter.next(); if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) { corrupt++; } else if (node.isDecommissionInProgress() || node.isDecommissioned()) { decommissioned++; } else { LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node .getStorageID()); if (blocksExcess != null && blocksExcess.contains(b)) { excess++; } else { live++; } } if (node.areBlockContentsStale()) { stale++; } } return new NumberReplicas(live, decommissioned, corrupt, excess, stale); }
public LightWeightLinkedSet<Block> get(String dn) throws IOException { Collection<ExcessReplica> excessReplicas = getExcessReplicas(datanodeManager.getDatanode(dn).getSId()); if (excessReplicas == null) { return null; } LightWeightLinkedSet<Block> excessBlocks = new LightWeightLinkedSet<>(); for (ExcessReplica er : excessReplicas) { //FIXME: [M] might need to get the blockinfo from the db, but for now we don't need it excessBlocks.add(new Block(er.getBlockId())); } return excessBlocks; }
/** * Serializes leases. */ void saveFilesUnderConstruction(SaveNamespaceContext ctx, DataOutputStream out) throws IOException { synchronized (leaseManager) { out.writeInt(leaseManager.countPath()); // write the size LightWeightLinkedSet<Lease> sortedLeases = leaseManager.getSortedLeases(); Iterator<Lease> itr = sortedLeases.iterator(); while (itr.hasNext()) { ctx.checkCancelled(); Lease lease = itr.next(); for (String path : lease.getPaths()) { // verify that path exists in namespace INode node = dir.getFileINode(path); if (node == null) { throw new IOException("saveLeases found path " + path + " but no matching entry in namespace."); } if (!node.isUnderConstruction()) { throw new IOException("saveLeases found path " + path + " but is not under construction."); } INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node; FSImageSerialization.writeINodeUnderConstruction(out, cons, path); } } } }
/** * Modify (block-->datanode) map. Possibly generate replication tasks, if the * removed block is still valid. */ public void removeStoredBlock(Block block, DatanodeDescriptor node) { blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node); assert (namesystem.hasWriteLock()); { if (!blocksMap.removeNode(block, node)) { blockLog.debug("BLOCK* removeStoredBlock: {} has already been" + " removed from node {}", block, node); return; } // // It's possible that the block was removed because of a datanode // failure. If the block is still valid, check if replication is // necessary. In that case, put block on a possibly-will- // be-replicated list. // BlockCollection bc = blocksMap.getBlockCollection(block); if (bc != null) { namesystem.decrementSafeBlockCount(block); updateNeededReplications(block, -1, 0); } // // We've removed a block from a node, so it's definitely no longer // in "excess" there. // LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node .getDatanodeUuid()); if (excessBlocks != null) { if (excessBlocks.remove(block)) { excessBlocksCount.decrementAndGet(); blockLog.debug("BLOCK* removeStoredBlock: {} is removed from " + "excessBlocks", block); if (excessBlocks.size() == 0) { excessReplicateMap.remove(node.getDatanodeUuid()); } } } // Remove the replica from corruptReplicas corruptReplicas.removeFromCorruptReplicasMap(block, node); } }
/** Create an object. */ UnderReplicatedBlocks() { for (int i = 0; i < LEVEL; i++) { priorityQueues.add(new LightWeightLinkedSet<BlockInfo>()); } }
/** * Tries to get the most up to date lengths of files under construction. */ void updateLeasedFiles(SnapshotStorage ssStore) throws IOException { FSNamesystem fsNamesys = ssStore.getFSNamesystem(); List<Block> blocksForNN = new ArrayList<Block>(); leaseUpdateThreadPool = new ThreadPoolExecutor(1, maxLeaseUpdateThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); ((ThreadPoolExecutor)leaseUpdateThreadPool).allowCoreThreadTimeOut(true); // Try to update lengths for leases from DN LightWeightLinkedSet<Lease> sortedLeases = fsNamesys.leaseManager.getSortedLeases(); Iterator<Lease> itr = sortedLeases.iterator(); while (itr.hasNext()) { Lease lease = itr.next(); for (String path : lease.getPaths()) { // Update file lengths using worker threads to increase throughput leaseUpdateThreadPool.execute( new LeaseUpdateWorker(conf, path, fsNamesys, blocksForNN)); } } try { leaseUpdateThreadPool.shutdown(); // Wait till update tasks finish successfully (max 20 mins?) if (!leaseUpdateThreadPool.awaitTermination(1200, TimeUnit.SECONDS)) { throw new IOException("Updating lease files failed"); } } catch (InterruptedException e) { throw new IOException("Snapshot creation interrupted while updating leased files"); } // Fetch block lengths for renamed/deleted leases from NN long[] blockIds = new long[blocksForNN.size()]; for (int i = 0; i < blocksForNN.size(); ++i) { blockIds[i] = blocksForNN.get(i).getBlockId(); } long[] lengths = namenode.getBlockLengths(blockIds); for (int i = 0; i < blocksForNN.size(); ++i) { if (lengths[i] == -1) { // Couldn't update block length, keep preferred length LOG.error("Couldn't update length for block " + blocksForNN.get(i)); } else { blocksForNN.get(i).setNumBytes(lengths[i]); } } }
UnderReplicatedBlocks() { for(int i=0; i<LEVEL; i++) { priorityQueues.add(new LightWeightLinkedSet<BlockInfo>()); } raidQueue = new RaidMissingBlocks(); }