/** * Calculates the splits that will serve as input for the map tasks. The * number of splits matches the number of regions in a table. * * @param context The current job context. * @return The list of input splits. * @throws IOException When creating the list of splits fails. * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext) */ @Override public List<InputSplit> getSplits(JobContext context) throws IOException { if (scans.isEmpty()) { throw new IOException("No scans were provided."); } List<InputSplit> splits = new ArrayList<InputSplit>(); for (Scan scan : scans) { byte[] tableName = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME); if (tableName == null) throw new IOException("A scan object did not have a table name"); HTable table = null; try { table = new HTable(context.getConfiguration(), tableName); Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { throw new IOException("Expecting at least one region for table : " + Bytes.toString(tableName)); } int count = 0; byte[] startRow = scan.getStartRow(); byte[] stopRow = scan.getStopRow(); RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(table); for (int i = 0; i < keys.getFirst().length; i++) { if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { continue; } HRegionLocation hregionLocation = table.getRegionLocation(keys.getFirst()[i], false); String regionHostname = hregionLocation.getHostname(); HRegionInfo regionInfo = hregionLocation.getRegionInfo(); // determine if the given start and stop keys fall into the range if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && (stopRow.length == 0 || Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { byte[] splitStart = startRow.length == 0 || Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys .getFirst()[i] : startRow; byte[] splitStop = (stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && keys.getSecond()[i].length > 0 ? keys .getSecond()[i] : stopRow; long regionSize = sizeCalculator.getRegionSize(regionInfo.getRegionName()); TableSplit split = new TableSplit(table.getName(), scan, splitStart, splitStop, regionHostname, regionSize); splits.add(split); if (LOG.isDebugEnabled()) LOG.debug("getSplits: split -> " + (count++) + " -> " + split); } } } finally { if (null != table) table.close(); } } return splits; }