@Override public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx, HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { // only keep primary region's edits if (logKey.getTablename().equals(tableName) && info.getReplicaId() == 0) { entries.add(new Entry(logKey, logEdit)); } }
@Override public boolean preWALWrite(ObserverContext<WALCoprocessorEnvironment> ctx, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { TableName tableName = info.getTable(); if (IndexUtils.isCatalogOrSystemTable(tableName) || IndexUtils.isIndexTable(tableName)) { return true; } List<IndexSpecification> indices = indexManager.getIndicesForTable(tableName.getNameAsString()); if (indices != null && !indices.isEmpty()) { LOG.trace("Entering preWALWrite for the table " + tableName); String indexTableName = IndexUtils.getIndexTableName(tableName); IndexEdits iEdits = IndexRegionObserver.threadLocal.get(); WALEdit indexWALEdit = iEdits.getWALEdit(); // This size will be 0 when none of the Mutations to the user table to be indexed. // or write to WAL is disabled for the Mutations if (indexWALEdit.getKeyValues().size() == 0) { return true; } LOG.trace("Adding indexWALEdits into WAL for table " + tableName); HRegion indexRegion = iEdits.getRegion(); // TS in all KVs within WALEdit will be the same. So considering the 1st one. Long time = indexWALEdit.getKeyValues().get(0).getTimestamp(); indexRegion.getLog().appendNoSync(indexRegion.getRegionInfo(), TableName.valueOf(indexTableName), indexWALEdit, logKey.getClusterIds(), time, indexRegion.getTableDesc(), indexRegion.getSequenceId(), true, HConstants.NO_NONCE, HConstants.NO_NONCE); LOG.trace("Exiting preWALWrite for the table " + tableName); } return true; }
@Override public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx, RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { // only keep primary region's edits if (logKey.getTableName().equals(tableName) && info.getReplicaId() == 0) { // Presume type is a WALKeyImpl entries.add(new Entry((WALKeyImpl)logKey, logEdit)); } }
@Override public boolean preWALWrite(ObserverContext<WALCoprocessorEnvironment> ctx, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { String r = info.getRegionNameAsString(); LOG.info("preWALWrite triggered for " + r + ". counters: " + counters); return false; }
@Override public void postWALWrite(ObserverContext<WALCoprocessorEnvironment> ctx, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { String r = info.getRegionNameAsString(); counters.addAndGet(r, 1); LOG.info("postWALWrite triggered for " + r + ". counters: " + counters); }
@Override public boolean preWALWrite(ObserverContext<WALCoprocessorEnvironment> ctx, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { String tableNameStr = info.getTableNameAsString(); if (IndexUtils.isCatalogTable(info.getTableName()) || IndexUtils.isIndexTable(tableNameStr)) { return true; } List<IndexSpecification> indices = indexManager.getIndicesForTable(tableNameStr); if (indices != null && !indices.isEmpty()) { LOG.trace("Entering preWALWrite for the table " + tableNameStr); String indexTableName = IndexUtils.getIndexTableName(tableNameStr); IndexEdits iEdits = IndexRegionObserver.threadLocal.get(); WALEdit indexWALEdit = iEdits.getWALEdit(); // This size will be 0 when none of the Mutations to the user table to be indexed. // or write to WAL is disabled for the Mutations if (indexWALEdit.getKeyValues().size() == 0) { return true; } LOG.trace("Adding indexWALEdits into WAL for table " + tableNameStr); HRegion indexRegion = iEdits.getRegion(); // TS in all KVs within WALEdit will be the same. So considering the 1st one. Long time = indexWALEdit.getKeyValues().get(0).getTimestamp(); ctx.getEnvironment() .getWAL() .appendNoSync(indexRegion.getRegionInfo(), Bytes.toBytes(indexTableName), indexWALEdit, logKey.getClusterId(), time, indexRegion.getTableDesc()); LOG.trace("Exiting preWALWrite for the table " + tableNameStr); } return true; }
@Override public void postWALWrite(ObserverContext<WALCoprocessorEnvironment> ctx, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { }