@Override public RecordReader<StaticBuffer, Iterable<Entry>> createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { tableReader = (TableRecordReader) tableInputFormat.createRecordReader(inputSplit, taskAttemptContext); titanRecordReader = new HBaseBinaryRecordReader(tableReader, inputCFBytes); return titanRecordReader; }
@Override public void open(TableInputSplit split) throws IOException { if (split == null) { throw new IOException("Input split is null!"); } if (this.table == null) { throw new IOException("No HTable provided!"); } if (this.scan == null) { throw new IOException("No Scan instance provided"); } this.tableRecordReader = new TableRecordReader(); this.tableRecordReader.setHTable(this.table); Scan sc = new Scan(this.scan); sc.setStartRow(split.getStartRow()); LOG.info("split start row: " + new String(split.getStartRow())); sc.setStopRow(split.getEndRow()); LOG.info("split end row: " + new String(split.getEndRow())); this.tableRecordReader.setScan(sc); this.tableRecordReader.restart(split.getStartRow()); this.hbaseKey = new HBaseKey(); this.hbaseResult = new HBaseResult(); endReached = false; }
public HBaseBinaryRecordReader(final TableRecordReader reader, final byte[] edgestoreFamilyBytes) { this.reader = reader; this.edgestoreFamilyBytes = edgestoreFamilyBytes; }
public TableRecordReader getTableReader() { return tableReader; }
/** * Allows subclasses to set the {@link TableRecordReader}. * * @param tableRecordReader * A different {@link TableRecordReader} implementation. */ protected void setTableRecordReader(TableRecordReader tableRecordReader) { this.tableRecordReader = tableRecordReader; }