/** Constructs a split with host and cached-blocks information * * @param file the file name * @param start the position of the first byte in the file to process * @param length the number of bytes in the file to process * @param hosts the list of hosts containing the block * @param inMemoryHosts the list of hosts containing the block in memory */ public FileSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) { this(file, start, length, hosts); hostInfos = new SplitLocationInfo[hosts.length]; for (int i = 0; i < hosts.length; i++) { // because N will be tiny, scanning is probably faster than a HashSet boolean inMemory = false; for (String inMemoryHost : inMemoryHosts) { if (inMemoryHost.equals(hosts[i])) { inMemory = true; break; } } hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory); } }
@Test public void testSplitLocationInfo() throws Exception { Configuration conf = getConfiguration(); conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, "test:///a1/a2"); Job job = Job.getInstance(conf); TextInputFormat fileInputFormat = new TextInputFormat(); List<InputSplit> splits = fileInputFormat.getSplits(job); String[] locations = splits.get(0).getLocations(); Assert.assertEquals(2, locations.length); SplitLocationInfo[] locationInfo = splits.get(0).getLocationInfo(); Assert.assertEquals(2, locationInfo.length); SplitLocationInfo localhostInfo = locations[0].equals("localhost") ? locationInfo[0] : locationInfo[1]; SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ? locationInfo[0] : locationInfo[1]; Assert.assertTrue(localhostInfo.isOnDisk()); Assert.assertTrue(localhostInfo.isInMemory()); Assert.assertTrue(otherhostInfo.isOnDisk()); Assert.assertFalse(otherhostInfo.isInMemory()); }
public HashMap<String, ArrayList<Integer>> getLocationsOfSplits() throws IOException { HashMap<String, ArrayList<Integer>> splitsMap = new HashMap<>(); ArrayList<Integer> temp; int i = 0; String hostname; for (InputSplit s : this.splits) { SplitLocationInfo[] info = s.getLocationInfo(); hostname = info[0].getLocation(); if (splitsMap.containsKey(hostname)) { temp = splitsMap.get(hostname); temp.add(i); } else { temp = new ArrayList<>(); temp.add(i); splitsMap.put(hostname, temp); } i++; } return splitsMap; }
@Override @Evolving public SplitLocationInfo[] getLocationInfo() throws IOException { return hostInfos; }
/** * Gets info about which nodes the input split is stored on and how it is * stored at each location. * * @return list of <code>SplitLocationInfo</code>s describing how the split * data is stored at each location. A null value indicates that all the * locations have the data stored on disk. * @throws IOException */ @Evolving public SplitLocationInfo[] getLocationInfo() throws IOException { return null; }