@Override public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { if (getHTable() == null) { throw new IOException("Cannot create a record reader because of a" + " previous error. Please look at the previous logs lines from" + " the task's full log for more details."); } TableSplit tSplit = (TableSplit) split; ThemisTableRecordReader trr = this.themisTableRecordReader; if (trr == null) { trr = new ThemisTableRecordReader(); } Scan sc = new Scan(this.getScan()); sc.setStartRow(tSplit.getStartRow()); sc.setStopRow(tSplit.getEndRow()); trr.setScan(sc); trr.setConf(getHTable().getConfiguration()); trr.setTableName(getHTable().getTableName()); try { trr.initialize(tSplit, context); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } return trr; }
@Override public RecordReader<ImmutableBytesWritable, Result> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { TableSplit tSplit = (TableSplit) split; if (tSplit.getTableName() == null) { throw new IOException("Cannot create a record reader because of a" + " previous error. Please look at the previous logs lines from" + " the task's full log for more details."); } ThemisTableRecordReader trr = this.themisTableRecordReader; // if no table record reader was provided use default if (trr == null) { trr = new ThemisTableRecordReader(); } Scan sc = tSplit.getScan(); sc.setStartRow(tSplit.getStartRow()); sc.setStopRow(tSplit.getEndRow()); trr.setScan(sc); trr.setConf(context.getConfiguration()); trr.setTableName(tSplit.getTableName()); trr.initialize(split, context); return trr; }
@Override public List<InputSplit> getSplits(org.apache.hadoop.mapreduce.JobContext context) throws IOException { List<InputSplit> splits = super.getSplits(context); ListIterator<InputSplit> splitIter = splits.listIterator(); while (splitIter.hasNext()) { TableSplit split = (TableSplit) splitIter.next(); byte[] startKey = split.getStartRow(); byte[] endKey = split.getEndRow(); // Skip if the region doesn't satisfy configured options. if ((skipRegion(CompareOp.LESS, startKey, lt_)) || (skipRegion(CompareOp.GREATER, endKey, gt_)) || (skipRegion(CompareOp.GREATER, endKey, gte_)) || (skipRegion(CompareOp.LESS_OR_EQUAL, startKey, lte_)) ) { splitIter.remove(); } } return splits; }
@Test public void completeMemstoreScan() throws Exception{ List<String> names = new ArrayList<String>(); names.add("COL1"); names.add("COL2"); config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName()+".A", names).base64Encode()); SMRecordReaderImpl rr = new SMRecordReaderImpl(config); String tableName = sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName()+".A"); HTable htable = new HTable(config,tableName); Scan scan = new Scan(); rr.setHTable(htable); rr.setScan(scan); SMSplit tableSplit = new SMSplit(new TableSplit(Bytes.toBytes(tableName), scan.getStartRow(),scan.getStopRow(),"sdfsdf")); rr.initialize(tableSplit, null); int i = 0; while (rr.nextKeyValue()) { i++; Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1)); Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2)); Assert.assertNotNull("Current Key is null", rr.getCurrentKey()); } Assert.assertEquals("incorrect results returned",1000,i); }
@Test public void emptyMemstoreScan() throws Exception{ List<String> names = new ArrayList<String>(); names.add("COL1"); names.add("COL2"); config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName()+".D", names).base64Encode()); SMRecordReaderImpl rr = new SMRecordReaderImpl(config); String tableName = sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName()+".D"); HTable htable = new HTable(config,tableName); Scan scan = new Scan(); rr.setHTable(htable); rr.setScan(scan); SMSplit tableSplit = new SMSplit(new TableSplit(Bytes.toBytes(tableName), scan.getStartRow(),scan.getStopRow(),"sdfsdf")); rr.initialize(tableSplit, null); int i = 0; while (rr.nextKeyValue()) { i++; Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1)); Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2)); Assert.assertNotNull("Current Key is null", rr.getCurrentKey()); } Assert.assertEquals("incorrect results returned",1000,i); }
@Test public void singleRegionScanWithOneStoreFileAndMemstore() throws Exception{ List<String> names = new ArrayList<String>(); names.add("COL1"); names.add("COL2"); config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName()+".B", names).base64Encode()); SMRecordReaderImpl rr = new SMRecordReaderImpl(config); String tableName = sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName()+".B"); HTable htable = new HTable(config,tableName); Scan scan = new Scan(); rr.setHTable(htable); rr.setScan(scan); SMSplit tableSplit = new SMSplit(new TableSplit(Bytes.toBytes(tableName), scan.getStartRow(),scan.getStopRow(),"sdfsdf")); rr.initialize(tableSplit, null); int i = 0; while (rr.nextKeyValue()) { i++; Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1)); Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2)); Assert.assertNotNull("Current Key is null", rr.getCurrentKey()); } Assert.assertEquals("incorrect results returned",1000,i); }
@Test public void twoRegionsWithMemstores() throws Exception{ List<String> names = new ArrayList<String>(); names.add("COL1"); names.add("COL2"); config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName()+".C", names).base64Encode()); SMRecordReaderImpl rr = new SMRecordReaderImpl(config); String tableName = sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName()+".C"); HTable htable = new HTable(config,tableName); Scan scan = new Scan(); rr.setHTable(htable); rr.setScan(scan); SMSplit tableSplit = new SMSplit(new TableSplit(Bytes.toBytes(tableName), scan.getStartRow(),scan.getStopRow(),"sdfsdf")); rr.initialize(tableSplit, null); int i = 0; while (rr.nextKeyValue()) { i++; Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1)); Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2)); Assert.assertNotNull("Current Key is null", rr.getCurrentKey()); } Assert.assertEquals("incorrect results returned",10000,i); }
@Test public void testScanAfterMajorCompaction() throws Exception{ List<String> names = new ArrayList<String>(); names.add("COL1"); names.add("COL2"); config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName()+".E", names).base64Encode()); SMRecordReaderImpl rr = new SMRecordReaderImpl(config); String tableName = sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName()+".E"); HTable htable = new HTable(config,tableName); Scan scan = new Scan(); rr.setHTable(htable); rr.setScan(scan); SMSplit tableSplit = new SMSplit(new TableSplit(Bytes.toBytes(tableName), scan.getStartRow(),scan.getStopRow(),"sdfsdf")); rr.initialize(tableSplit, null); int i = 0; while (rr.nextKeyValue()) { i++; Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1)); Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2)); Assert.assertNotNull("Current Key is null", rr.getCurrentKey()); } Assert.assertEquals("incorrect results returned",5000,i); }
@Override public List<InputSplit> getSplits(JobContext context) throws IOException { if (isMock()) { if (table == null) { initialize(context); } List<InputSplit> splits = new ArrayList<>(1); TableSplit split = new TableSplit(getTable().getName(), getScan(), HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, "", 0); splits.add(split); return splits; } else { return super.getSplits(context); } }
/** * Calculates the splits that will serve as input for the map tasks. The * number of splits matches the number of regions in a table. * * @param context * The current job context. * @return The list of input splits. * @throws IOException * When creating the list of splits fails. * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext) */ void setSplitsForTable(HTable tTable, Scan tScan, List<InputSplit> splits) throws IOException { if (tTable == null) { throw new IOException("No table was provided."); } Pair<byte[][], byte[][]> keys = tTable.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { throw new IOException("Expecting at least one region."); } int count = 0; for (int i = 0; i < keys.getFirst().length; i++) { if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { continue; } String regionLocation = tTable.getRegionLocation(keys.getFirst()[i]).getHostname(); byte[] startRow = tScan.getStartRow(); byte[] stopRow = tScan.getStopRow(); // determine if the given start an stop key fall into the region if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || Bytes .compareTo(startRow, keys.getSecond()[i]) < 0) && (stopRow.length == 0 || Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { byte[] splitStart = startRow.length == 0 || Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys .getFirst()[i] : startRow; byte[] splitStop = (stopRow.length == 0 || Bytes.compareTo( keys.getSecond()[i], stopRow) <= 0) && keys.getSecond()[i].length > 0 ? keys.getSecond()[i] : stopRow; InputSplit split = new TableSplit(tTable.getTableName(), splitStart, splitStop, regionLocation); splits.add(split); if (LOG.isDebugEnabled()) LOG.debug("getSplits: split -> " + (count++) + " -> " + split); } } }
@Override public WritableComparable<TableSplit> getSplitComparable(InputSplit split) throws IOException { if (split instanceof TableSplit) { return new TableSplitComparable((TableSplit) split); } else { throw new RuntimeException("LoadFunc expected split of type TableSplit but was " + split.getClass().getName()); } }
private void checkAndPatchRegionEndKey(LinkedList<InputSplit> newSplits, byte[] regionEndKey) { if (Arrays.equals(HConstants.EMPTY_BYTE_ARRAY, regionEndKey)) { TableSplit tmpSplit = (TableSplit) newSplits.removeLast(); newSplits.add(new TableSplit(tmpSplit.getTableName(), tmpSplit.getStartRow(), regionEndKey, tmpSplit.getLocations()[0])); } }
@Override public void initialize() { List<Partition> partitions = ((SparkDataSet) dataSet).rdd.partitions(); tableSplits = new ArrayList<>(partitions.size()); for (Partition p : partitions) { NewHadoopPartition nhp = (NewHadoopPartition) p; SMSplit sms = (SMSplit) nhp.serializableHadoopSplit().value(); TableSplit ts = sms.getSplit(); if (ts.getStartRow() != null && Bytes.equals(ts.getStartRow(),ts.getEndRow()) && ts.getStartRow().length > 0) { // this would be an empty partition, with the same start and end key, so don't add it continue; } tableSplits.add(ts); } }
public void init(Configuration config, InputSplit split) throws IOException, InterruptedException { if (LOG.isDebugEnabled()) SpliceLogUtils.debug(LOG, "init"); TaskContext.get().addTaskFailureListener(this); String tableScannerAsString = config.get(MRConstants.SPLICE_SCAN_INFO); if (tableScannerAsString == null) throw new IOException("splice scan info was not serialized to task, failing"); byte[] scanStartKey = null; byte[] scanStopKey = null; try { builder = TableScannerBuilder.getTableScannerBuilderFromBase64String(tableScannerAsString); if (LOG.isTraceEnabled()) SpliceLogUtils.trace(LOG, "config loaded builder=%s", builder); TableSplit tSplit = ((SMSplit) split).getSplit(); DataScan scan = builder.getScan(); scanStartKey = scan.getStartKey(); scanStopKey = scan.getStopKey(); if (Bytes.startComparator.compare(scanStartKey, tSplit.getStartRow()) < 0) { // the split itself is more restrictive scan.startKey(tSplit.getStartRow()); } if (Bytes.endComparator.compare(scanStopKey, tSplit.getEndRow()) > 0) { // the split itself is more restrictive scan.stopKey(tSplit.getEndRow()); } setScan(((HScan) scan).unwrapDelegate()); // TODO (wjk): this seems weird (added with DB-4483) this.statisticsRun = AbstractSMInputFormat.oneSplitPerRegion(config); restart(scan.getStartKey()); } catch (IOException ioe) { LOG.error(String.format("Received exception with scan %s, original start key %s, original stop key %s, split %s", scan, Bytes.toStringBinary(scanStartKey), Bytes.toStringBinary(scanStopKey), split), ioe); throw ioe; } catch (StandardException e) { throw new IOException(e); } }
private List<InputSplit> toSMSplits (List<Partition> splits) throws IOException { List<InputSplit> sMSplits = Lists.newArrayList(); HBaseTableInfoFactory infoFactory = HBaseTableInfoFactory.getInstance(HConfiguration.getConfiguration()); for(Partition split:splits) { SMSplit smSplit = new SMSplit( new TableSplit( infoFactory.getTableInfo(split.getTableName()), split.getStartKey(), split.getEndKey(), split.owningServer().getHostname())); sMSplits.add(smSplit); } return sMSplits; }
/** * Map method that compares every scanned row with the equivalent from * a distant cluster. * @param row The current table row key. * @param value The columns. * @param context The current context. * @throws IOException When something is broken with the data. */ @Override public void map(ImmutableBytesWritable row, final Result value, Context context) throws IOException { if (replicatedScanner == null) { Configuration conf = context.getConfiguration(); final Scan scan = new Scan(); scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); long startTime = conf.getLong(NAME + ".startTime", 0); long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE); String families = conf.get(NAME + ".families", null); if(families != null) { String[] fams = families.split(","); for(String fam : fams) { scan.addFamily(Bytes.toBytes(fam)); } } scan.setTimeRange(startTime, endTime); int versions = conf.getInt(NAME+".versions", -1); LOG.info("Setting number of version inside map as: " + versions); if (versions >= 0) { scan.setMaxVersions(versions); } final TableSplit tableSplit = (TableSplit)(context.getInputSplit()); HConnectionManager.execute(new HConnectable<Void>(conf) { @Override public Void connect(HConnection conn) throws IOException { String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); Configuration peerConf = HBaseConfiguration.createClusterConf(conf, zkClusterKey, PEER_CONFIG_PREFIX); TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); replicatedTable = new HTable(peerConf, tableName); scan.setStartRow(value.getRow()); scan.setStopRow(tableSplit.getEndRow()); replicatedScanner = replicatedTable.getScanner(scan); return null; } }); currentCompareRowInPeerTable = replicatedScanner.next(); } while (true) { if (currentCompareRowInPeerTable == null) { // reach the region end of peer table, row only in source table logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); break; } int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); if (rowCmpRet == 0) { // rowkey is same, need to compare the content of the row try { Result.compareResults(value, currentCompareRowInPeerTable); context.getCounter(Counters.GOODROWS).increment(1); } catch (Exception e) { logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); LOG.error("Exception while comparing row : " + e); } currentCompareRowInPeerTable = replicatedScanner.next(); break; } else if (rowCmpRet < 0) { // row only exists in source table logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); break; } else { // row only exists in peer table logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, currentCompareRowInPeerTable); currentCompareRowInPeerTable = replicatedScanner.next(); } } }
/** * Map method that compares every scanned row with the equivalent from * a distant cluster. * @param row The current table row key. * @param value The columns. * @param context The current context. * @throws IOException When something is broken with the data. */ @Override public void map(ImmutableBytesWritable row, final Result value, Context context) throws IOException { if (replicatedScanner == null) { Configuration conf = context.getConfiguration(); final Scan scan = new Scan(); scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); long startTime = conf.getLong(NAME + ".startTime", 0); long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE); String families = conf.get(NAME + ".families", null); if(families != null) { String[] fams = families.split(","); for(String fam : fams) { scan.addFamily(Bytes.toBytes(fam)); } } scan.setTimeRange(startTime, endTime); if (versions >= 0) { scan.setMaxVersions(versions); } final TableSplit tableSplit = (TableSplit)(context.getInputSplit()); HConnectionManager.execute(new HConnectable<Void>(conf) { @Override public Void connect(HConnection conn) throws IOException { String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); Configuration peerConf = HBaseConfiguration.create(conf); ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey); TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); // TODO: THis HTable doesn't get closed. Fix! Table replicatedTable = new HTable(peerConf, tableName); scan.setStartRow(value.getRow()); scan.setStopRow(tableSplit.getEndRow()); replicatedScanner = replicatedTable.getScanner(scan); return null; } }); currentCompareRowInPeerTable = replicatedScanner.next(); } while (true) { if (currentCompareRowInPeerTable == null) { // reach the region end of peer table, row only in source table logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); break; } int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); if (rowCmpRet == 0) { // rowkey is same, need to compare the content of the row try { Result.compareResults(value, currentCompareRowInPeerTable); context.getCounter(Counters.GOODROWS).increment(1); } catch (Exception e) { logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); } currentCompareRowInPeerTable = replicatedScanner.next(); break; } else if (rowCmpRet < 0) { // row only exists in source table logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); break; } else { // row only exists in peer table logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, currentCompareRowInPeerTable); currentCompareRowInPeerTable = replicatedScanner.next(); } } }
@Override public void setup(Context context) { tableName = ((TableSplit)context.getInputSplit()).getTableName(); }
public TableSplitComparable() { tsplit = new TableSplit(); }
public TableSplitComparable(TableSplit tsplit) { this.tsplit = tsplit; }
@Override public int compareTo(TableSplit split) { return tsplit.compareTo((TableSplit) split); }
public TableSplit getSplit() { return this.split.getSplit(); }
public SMSplit() throws IOException{ super(FSUtils.getRootDir(HConfiguration.unwrapDelegate()), 0, 0,null); split = new TableSplit(); }
public SMSplit(TableSplit split) throws IOException{ super(FSUtils.getRootDir(HConfiguration.unwrapDelegate()), 0, 0, null); this.split = split; }
public TableSplit getSplit() { return this.split; }