/** * Choose the target storage within same Datanode if possible. */ boolean chooseTargetInSameNode(DBlock db, Source source, List<StorageType> targetTypes) { for (StorageType t : targetTypes) { StorageGroup target = storages.getTarget(source.getDatanodeInfo() .getDatanodeUuid(), t); if (target == null) { continue; } final PendingMove pm = source.addPendingMove(db, target); if (pm != null) { dispatcher.executePendingMove(pm); return true; } } return false; }
boolean chooseTarget(DBlock db, Source source, List<StorageType> targetTypes, Matcher matcher) { final NetworkTopology cluster = dispatcher.getCluster(); for (StorageType t : targetTypes) { for(StorageGroup target : storages.getTargetStorages(t)) { if (matcher.match(cluster, source.getDatanodeInfo(), target.getDatanodeInfo())) { final PendingMove pm = source.addPendingMove(db, target); if (pm != null) { dispatcher.executePendingMove(pm); return true; } } } } return false; }
/** * For the given datanode, choose a candidate and then schedule it. * @return true if a candidate is chosen; false if no candidates is chosen. */ private <C extends StorageGroup> boolean choose4One(StorageGroup g, Collection<C> candidates, Matcher matcher) { final Iterator<C> i = candidates.iterator(); final C chosen = chooseCandidate(g, i, matcher); if (chosen == null) { return false; } if (g instanceof Source) { matchSourceWithTargetToMove((Source)g, chosen); } else { matchSourceWithTargetToMove((Source)chosen, g); } if (!chosen.hasSpaceForScheduling()) { i.remove(); } return true; }
/** Choose a candidate for the given datanode. */ private <G extends StorageGroup, C extends StorageGroup> C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) { if (g.hasSpaceForScheduling()) { for(; candidates.hasNext(); ) { final C c = candidates.next(); if (!c.hasSpaceForScheduling()) { candidates.remove(); } else if (matcher.match(dispatcher.getCluster(), g.getDatanodeInfo(), c.getDatanodeInfo())) { return c; } } } return null; }
public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, Set<String> excludedNodes, long movedWinWidth, int moverThreads, int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) { this.nnc = nnc; this.excludedNodes = excludedNodes; this.includedNodes = includedNodes; this.movedBlocks = new MovedBlocks<StorageGroup>(movedWinWidth); this.cluster = NetworkTopology.getInstance(conf); this.moveExecutor = Executors.newFixedThreadPool(moverThreads); this.dispatchExecutor = dispatcherThreads == 0? null : Executors.newFixedThreadPool(dispatcherThreads); this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; this.saslClient = new SaslDataTransferClient(conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth); }
/** * Wait for all block move confirmations. * @return true if there is failed move execution */ public static boolean waitForMoveCompletion( Iterable<? extends StorageGroup> targets) { boolean hasFailure = false; for(;;) { boolean empty = true; for (StorageGroup t : targets) { if (!t.getDDatanode().isPendingQEmpty()) { empty = false; break; } else { hasFailure |= t.getDDatanode().hasFailure; } } if (empty) { return hasFailure; // all pending queues are empty } try { Thread.sleep(blockMoveWaitTime); } catch (InterruptedException ignored) { } } }
DBlock newDBlock(LocatedBlock lb, List<MLocation> locations, ErasureCodingPolicy ecPolicy) { Block blk = lb.getBlock().getLocalBlock(); DBlock db; if (lb.isStriped()) { LocatedStripedBlock lsb = (LocatedStripedBlock) lb; byte[] indices = new byte[lsb.getBlockIndices().length]; for (int i = 0; i < indices.length; i++) { indices[i] = (byte) lsb.getBlockIndices()[i]; } db = new DBlockStriped(blk, indices, (short) ecPolicy.getNumDataUnits(), ecPolicy.getCellSize()); } else { db = new DBlock(blk); } for(MLocation ml : locations) { StorageGroup source = storages.getSource(ml); if (source != null) { db.addLocation(source); } } return db; }
public DBlock getInternalBlock(StorageGroup storage) { int idxInLocs = locations.indexOf(storage); if (idxInLocs == -1) { return null; } byte idxInGroup = indices[idxInLocs]; long blkId = getBlock().getBlockId() + idxInGroup; long numBytes = getInternalBlockLength(getNumBytes(), cellSize, dataBlockNum, idxInGroup); Block blk = new Block(getBlock()); blk.setBlockId(blkId); blk.setNumBytes(numBytes); DBlock dblk = new DBlock(blk); dblk.addLocation(storage); return dblk; }
/** * Wait for all reportedBlock move confirmations. * @return true if there is failed move execution */ public static boolean waitForMoveCompletion( Iterable<? extends StorageGroup> targets) { boolean hasFailure = false; for(;;) { boolean empty = true; for (StorageGroup t : targets) { if (!t.getDDatanode().isPendingQEmpty()) { empty = false; break; } else { hasFailure |= t.getDDatanode().hasFailure; } } if (empty) { return hasFailure; // all pending queues are empty } try { Thread.sleep(1000); } catch (InterruptedException ignored) { } } }
/** * Decide if the block is a good candidate to be moved from source to target. * A block is a good candidate if * 1. the block is not in the process of being moved/has not been moved; * 2. the block does not have a replica on the target; * 3. doing the move does not reduce the number of racks that the block has */ private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target, StorageType targetStorageType, DBlock block) { if (target.storageType != targetStorageType) { return false; } // check if the block is moved or not if (movedBlocks.contains(block.getBlock())) { return false; } if (block.isLocatedOn(target)) { return false; } if (cluster.isNodeGroupAware() && isOnSameNodeGroupWithReplicas(source, target, block)) { return false; } if (reduceNumOfRacks(source, target, block)) { return false; } return true; }
private void add(Source source, StorageGroup target) { sources.put(source); if (target != null) { targets.put(target); getTargetStorages(target.getStorageType()).add(target); } }
void init() throws IOException { initStoragePolicies(); final List<DatanodeStorageReport> reports = dispatcher.init(); for(DatanodeStorageReport r : reports) { final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo()); for(StorageType t : StorageType.getMovableTypes()) { final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher); final long maxRemaining = getMaxRemaining(r, t); final StorageGroup target = maxRemaining > 0L ? dn.addTarget(t, maxRemaining) : null; storages.add(source, target); } } }
DBlock newDBlock(Block block, List<MLocation> locations) { final DBlock db = new DBlock(block); for(MLocation ml : locations) { StorageGroup source = storages.getSource(ml); if (source != null) { db.addLocation(source); } } return db; }
/** * For each datanode, choose matching nodes from the candidates. Either the * datanodes or the candidates are source nodes with (utilization > Avg), and * the others are target nodes with (utilization < Avg). */ private <G extends StorageGroup, C extends StorageGroup> void chooseStorageGroups(Collection<G> groups, Collection<C> candidates, Matcher matcher) { for(final Iterator<G> i = groups.iterator(); i.hasNext();) { final G g = i.next(); for(; choose4One(g, candidates, matcher); ); if (!g.hasSpaceForScheduling()) { i.remove(); } } }
private void matchSourceWithTargetToMove(Source source, StorageGroup target) { long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove()); final Task task = new Task(target, size); source.addTask(task); target.incScheduledSize(task.getSize()); dispatcher.add(source, target); LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from " + source.getDisplayName() + " to " + target.getDisplayName()); }
/** Remove all blocks except for the moved blocks. */ private void removeAllButRetain(MovedBlocks<StorageGroup> movedBlocks) { for (Iterator<Block> i = map.keySet().iterator(); i.hasNext();) { if (!movedBlocks.contains(i.next())) { i.remove(); } } }
/** add to a proxy source for specific block movement */ private boolean addTo(StorageGroup g) { final DDatanode dn = g.getDDatanode(); if (dn.addPendingBlock(this)) { proxySource = dn; return true; } return false; }
@Override public boolean equals(Object obj) { if (this == obj) { return true; } else if (obj == null || !(obj instanceof StorageGroup)) { return false; } else { final StorageGroup that = (StorageGroup) obj; return this.getStorageType() == that.getStorageType() && this.getDatanodeInfo().equals(that.getDatanodeInfo()); } }