/** * Replicate WAL entries on the region server. * * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority=HConstants.REPLICATION_QOS) public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { try { checkOpen(); if (regionServer.replicationSinkHandler != null) { requestCount.increment(); List<WALEntry> entries = request.getEntryList(); CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner(); regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner); regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner); regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner); return ReplicateWALEntryResponse.newBuilder().build(); } else { throw new ServiceException("Replication services are not initialized yet"); } } catch (IOException ie) { throw new ServiceException(ie); } }
/** * Insert a mix of puts and deletes * @throws Exception */ @Test public void testMixedPutDelete() throws Exception { List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE/2); List<Cell> cells = new ArrayList<Cell>(); for(int i = 0; i < BATCH_SIZE/2; i++) { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells)); entries = new ArrayList<WALEntry>(BATCH_SIZE); cells = new ArrayList<Cell>(); for(int i = 0; i < BATCH_SIZE; i++) { entries.add(createEntry(TABLE_NAME1, i, i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn, cells)); } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator())); Scan scan = new Scan(); ResultScanner scanRes = table1.getScanner(scan); assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length); }
/** * Insert to 2 different tables * @throws Exception */ @Test public void testMixedPutTables() throws Exception { List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE/2); List<Cell> cells = new ArrayList<Cell>(); for(int i = 0; i < BATCH_SIZE; i++) { entries.add(createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator())); Scan scan = new Scan(); ResultScanner scanRes = table2.getScanner(scan); for(Result res : scanRes) { assertTrue(Bytes.toInt(res.getRow()) % 2 == 0); } }
/** * Insert then do different types of deletes * @throws Exception */ @Test public void testMixedDeletes() throws Exception { List<WALEntry> entries = new ArrayList<WALEntry>(3); List<Cell> cells = new ArrayList<Cell>(); for(int i = 0; i < 3; i++) { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator())); entries = new ArrayList<WALEntry>(3); cells = new ArrayList<Cell>(); entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells)); entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells)); entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells)); SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator())); Scan scan = new Scan(); ResultScanner scanRes = table1.getScanner(scan); assertEquals(0, scanRes.next(3).length); }
/** * Puts are buffered, but this tests when a delete (not-buffered) is applied * before the actual Put that creates it. * @throws Exception */ @Test public void testApplyDeleteBeforePut() throws Exception { List<WALEntry> entries = new ArrayList<WALEntry>(5); List<Cell> cells = new ArrayList<Cell>(); for(int i = 0; i < 2; i++) { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells)); for(int i = 3; i < 5; i++) { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator())); Get get = new Get(Bytes.toBytes(1)); Result res = table1.get(get); assertEquals(0, res.size()); }
/** * Replicate WAL entries on the region server. * * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority = HConstants.REPLICATION_QOS) public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { try { if (regionServer.replicationSinkHandler != null) { checkOpen(); requestCount.increment(); List<WALEntry> entries = request.getEntryList(); CellScanner cellScanner = ((PayloadCarryingRpcController) controller).cellScanner(); regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner); regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner); regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner); } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } }
/** * Tag original sequence number for each edit to be replayed * @param entry * @param cell * @return */ private static Cell tagReplayLogSequenceNumber(WALEntry entry, Cell cell) { // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet boolean needAddRecoveryTag = true; if (cell.getTagsLength() > 0) { Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(), TagType.LOG_REPLAY_TAG_TYPE); if (tmpTag != null) { // found an existing log replay tag so reuse it needAddRecoveryTag = false; } } if (needAddRecoveryTag) { List<Tag> newTags = new ArrayList<Tag>(); Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(entry.getKey() .getLogSequenceNumber())); newTags.add(replayTag); return KeyValue.cloneAndAddTags(cell, newTags); } return cell; }
/** * Tag original sequence number for each edit to be replayed * @param entry * @param cell */ private static Cell tagReplayLogSequenceNumber(WALEntry entry, Cell cell) { // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet boolean needAddRecoveryTag = true; if (cell.getTagsLength() > 0) { Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(), TagType.LOG_REPLAY_TAG_TYPE); if (tmpTag != null) { // found an existing log replay tag so reuse it needAddRecoveryTag = false; } } if (needAddRecoveryTag) { List<Tag> newTags = new ArrayList<Tag>(); Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(entry.getKey() .getLogSequenceNumber())); newTags.add(replayTag); return KeyValue.cloneAndAddTags(cell, newTags); } return cell; }
public void preReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells) throws IOException { execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { @Override public void call(RegionServerObserver oserver, ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException { oserver.preReplicateLogEntries(ctx, entries, cells); } }); }
public void postReplicateLogEntries(final List<WALEntry> entries, final CellScanner cells) throws IOException { execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { @Override public void call(RegionServerObserver oserver, ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException { oserver.postReplicateLogEntries(ctx, entries, cells); } }); }
/** * Insert a whole batch of entries * @throws Exception */ @Test public void testBatchSink() throws Exception { List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE); List<Cell> cells = new ArrayList<Cell>(); for(int i = 0; i < BATCH_SIZE; i++) { entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator())); Scan scan = new Scan(); ResultScanner scanRes = table1.getScanner(scan); assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length); }
private WALEntry createEntry(TableName table, int row, KeyValue.Type type, List<Cell> cells) { byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2; byte[] rowBytes = Bytes.toBytes(row); // Just make sure we don't get the same ts for two consecutive rows with // same key try { Thread.sleep(1); } catch (InterruptedException e) { LOG.info("Was interrupted while sleep, meh", e); } final long now = System.currentTimeMillis(); KeyValue kv = null; if(type.getCode() == KeyValue.Type.Put.getCode()) { kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.Put, Bytes.toBytes(row)); } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) { kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.DeleteColumn); } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) { kv = new KeyValue(rowBytes, fam, null, now, KeyValue.Type.DeleteFamily); } WALEntry.Builder builder = WALEntry.newBuilder(); builder.setAssociatedCellCount(1); WALKey.Builder keyBuilder = WALKey.newBuilder(); UUID.Builder uuidBuilder = UUID.newBuilder(); uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits()); uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits()); keyBuilder.setClusterId(uuidBuilder.build()); keyBuilder.setTableName(ByteStringer.wrap(table.getName())); keyBuilder.setWriteTime(now); keyBuilder.setEncodedRegionName(ByteStringer.wrap(HConstants.EMPTY_BYTE_ARRAY)); keyBuilder.setLogSequenceNumber(-1); builder.setKey(keyBuilder.build()); cells.add(kv); return builder.build(); }
private WALEntry createEntry(byte [] table, int row, KeyValue.Type type, List<Cell> cells) { byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2; byte[] rowBytes = Bytes.toBytes(row); // Just make sure we don't get the same ts for two consecutive rows with // same key try { Thread.sleep(1); } catch (InterruptedException e) { LOG.info("Was interrupted while sleep, meh", e); } final long now = System.currentTimeMillis(); KeyValue kv = null; if(type.getCode() == KeyValue.Type.Put.getCode()) { kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.Put, Bytes.toBytes(row)); } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) { kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.DeleteColumn); } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) { kv = new KeyValue(rowBytes, fam, null, now, KeyValue.Type.DeleteFamily); } WALEntry.Builder builder = WALEntry.newBuilder(); builder.setAssociatedCellCount(1); WALKey.Builder keyBuilder = WALKey.newBuilder(); UUID.Builder uuidBuilder = UUID.newBuilder(); uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits()); uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits()); keyBuilder.setClusterId(uuidBuilder.build()); keyBuilder.setTableName(HBaseZeroCopyByteString.wrap(table)); keyBuilder.setWriteTime(now); keyBuilder.setEncodedRegionName(HBaseZeroCopyByteString.wrap(HConstants.EMPTY_BYTE_ARRAY)); keyBuilder.setLogSequenceNumber(-1); builder.setKey(keyBuilder.build()); cells.add(kv); return builder.build(); }
private WALEntry createEntry(byte [] table, int row, KeyValue.Type type, List<Cell> cells) { byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2; byte[] rowBytes = Bytes.toBytes(row); // Just make sure we don't get the same ts for two consecutive rows with // same key try { Thread.sleep(1); } catch (InterruptedException e) { LOG.info("Was interrupted while sleep, meh", e); } final long now = System.currentTimeMillis(); KeyValue kv = null; if(type.getCode() == KeyValue.Type.Put.getCode()) { kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.Put, Bytes.toBytes(row)); } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) { kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.DeleteColumn); } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) { kv = new KeyValue(rowBytes, fam, null, now, KeyValue.Type.DeleteFamily); } WALEntry.Builder builder = WALEntry.newBuilder(); builder.setAssociatedCellCount(1); WALKey.Builder keyBuilder = WALKey.newBuilder(); UUID.Builder uuidBuilder = UUID.newBuilder(); uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits()); uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits()); keyBuilder.setClusterId(uuidBuilder.build()); keyBuilder.setTableName(ZeroCopyLiteralByteString.wrap(table)); keyBuilder.setWriteTime(now); keyBuilder.setEncodedRegionName(ZeroCopyLiteralByteString.wrap(HConstants.EMPTY_BYTE_ARRAY)); keyBuilder.setLogSequenceNumber(-1); builder.setKey(keyBuilder.build()); cells.add(kv); return builder.build(); }
/** * Get the HLog entries from a list of protocol buffer WALEntry * * @param protoList the list of protocol buffer WALEntry * @return an array of HLog entries */ public static HLog.Entry[] toHLogEntries(final List<WALEntry> protoList) { List<HLog.Entry> entries = new ArrayList<HLog.Entry>(); for (WALEntry entry: protoList) { WALKey walKey = entry.getKey(); java.util.UUID clusterId = HConstants.DEFAULT_CLUSTER_ID; if (walKey.hasClusterId()) { UUID protoUuid = walKey.getClusterId(); clusterId = new java.util.UUID( protoUuid.getMostSigBits(), protoUuid.getLeastSigBits()); } HLogKey key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), walKey.getTableName().toByteArray(), walKey.getLogSequenceNumber(), walKey.getWriteTime(), clusterId); WALEntry.WALEdit walEdit = entry.getEdit(); WALEdit edit = new WALEdit(); for (ByteString keyValue: walEdit.getKeyValueBytesList()) { edit.add(new KeyValue(keyValue.toByteArray())); } if (walEdit.getFamilyScopeCount() > 0) { TreeMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); for (FamilyScope scope: walEdit.getFamilyScopeList()) { scopes.put(scope.getFamily().toByteArray(), Integer.valueOf(scope.getScopeType().ordinal())); } edit.setScopes(scopes); } entries.add(new HLog.Entry(key, edit)); } return entries.toArray(new HLog.Entry[entries.size()]); }
@Override public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx, List<WALEntry> entries, CellScanner cells) throws IOException { requirePermission("replicateLogEntries", Action.WRITE); }
@Override public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx, List<WALEntry> entries, CellScanner cells) throws IOException { }