/** * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take * weights into account, thus will treat every location passed from the input split as equal. We * do not want to blindly pass all the locations, since we are creating one split per region, and * the region's blocks are all distributed throughout the cluster unless favorite node assignment * is used. On the expected stable case, only one location will contain most of the blocks as * local. * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here * we are doing a simple heuristic, where we will pass all hosts which have at least 80% * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top * host with the best locality. */ public static List<String> getBestLocations( Configuration conf, HDFSBlocksDistribution blockDistribution) { List<String> locations = new ArrayList<String>(3); HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights(); if (hostAndWeights.length == 0) { return locations; } HostAndWeight topHost = hostAndWeights[0]; locations.add(topHost.getHost()); // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality double cutoffMultiplier = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER); double filterWeight = topHost.getWeight() * cutoffMultiplier; for (int i = 1; i < hostAndWeights.length; i++) { if (hostAndWeights[i].getWeight() >= filterWeight) { locations.add(hostAndWeights[i].getHost()); } else { break; } } return locations; }
/** * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take * weights into account, thus will treat every location passed from the input split as equal. We * do not want to blindly pass all the locations, since we are creating one split per region, and * the region's blocks are all distributed throughout the cluster unless favorite node assignment * is used. On the expected stable case, only one location will contain most of the blocks as local. * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here * we are doing a simple heuristic, where we will pass all hosts which have at least 80% * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top * host with the best locality. */ public static List<String> getBestLocations( Configuration conf, HDFSBlocksDistribution blockDistribution) { List<String> locations = new ArrayList<String>(3); HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights(); if (hostAndWeights.length == 0) { return locations; } HostAndWeight topHost = hostAndWeights[0]; locations.add(topHost.getHost()); // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality double cutoffMultiplier = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER); double filterWeight = topHost.getWeight() * cutoffMultiplier; for (int i = 1; i < hostAndWeights.length; i++) { if (hostAndWeights[i].getWeight() >= filterWeight) { locations.add(hostAndWeights[i].getHost()); } else { break; } } return locations; }
/** * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take * weights into account, thus will treat every location passed from the input split as equal. We * do not want to blindly pass all the locations, since we are creating one split per region, and * the region's blocks are all distributed throughout the cluster unless favorite node assignment * is used. On the expected stable case, only one location will contain most of the blocks as * local. * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here * we are doing a simple heuristic, where we will pass all hosts which have at least 80% * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top * host with the best locality. * Return at most numTopsAtMost locations if there are more than that. */ private static List<String> getBestLocations(Configuration conf, HDFSBlocksDistribution blockDistribution, int numTopsAtMost) { HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights(); if (hostAndWeights.length == 0) { // no matter what numTopsAtMost is return null; } if (numTopsAtMost < 1) { // invalid if numTopsAtMost < 1, correct it to be 1 numTopsAtMost = 1; } int top = Math.min(numTopsAtMost, hostAndWeights.length); List<String> locations = new ArrayList<>(top); HostAndWeight topHost = hostAndWeights[0]; locations.add(topHost.getHost()); if (top == 1) { // only care about the top host return locations; } // When top >= 2, // do the heuristic: filter all hosts which have at least cutoffMultiplier % of block locality double cutoffMultiplier = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER); double filterWeight = topHost.getWeight() * cutoffMultiplier; for (int i = 1; i <= top - 1; i++) { if (hostAndWeights[i].getWeight() >= filterWeight) { locations.add(hostAndWeights[i].getHost()); } else { // As hostAndWeights is in descending order, // we could break the loop as long as we meet a weight which is less than filterWeight. break; } } return locations; }