public boolean preMergeCommit(final HRegion regionA, final HRegion regionB, final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException { boolean bypass = false; ObserverContext<RegionServerCoprocessorEnvironment> ctx = null; for (RegionServerEnvironment env : coprocessors) { if (env.getInstance() instanceof RegionServerObserver) { ctx = ObserverContext.createAndPrepare(env, ctx); Thread currentThread = Thread.currentThread(); ClassLoader cl = currentThread.getContextClassLoader(); try { currentThread.setContextClassLoader(env.getClassLoader()); ((RegionServerObserver) env.getInstance()).preMergeCommit(ctx, regionA, regionB, metaEntries); } catch (Throwable e) { handleCoprocessorThrowable(env, e); } finally { currentThread.setContextClassLoader(cl); } bypass |= ctx.shouldBypass(); if (ctx.shouldComplete()) { break; } } } return bypass; }
/** * Post merge region action * @param env MasterProcedureEnv **/ private void preMergeRegionsCommit(final MasterProcedureEnv env) throws IOException { final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { @MetaMutationAnnotation final List<Mutation> metaEntries = new ArrayList<Mutation>(); cpHost.preMergeRegionsCommit(regionsToMerge, metaEntries, getUser()); try { for (Mutation p : metaEntries) { RegionInfo.parseRegionName(p.getRow()); } } catch (IOException e) { LOG.error("Row key of mutation from coprocessor is not parsable as region name." + "Mutations from coprocessor should only be for hbase:meta table.", e); throw e; } } }
public boolean preMergeCommit(final HRegion regionA, final HRegion regionB, final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException { return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { @Override public void call(RegionServerObserver oserver, ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException { oserver.preMergeCommit(ctx, regionA, regionB, metaEntries); } }); }
/** * Invoked before merge regions operation writes the new region to hbase:meta * @param regionsToMerge the regions to merge * @param metaEntries the meta entry * @param user the user * @throws IOException */ public void preMergeRegionsCommit( final RegionInfo[] regionsToMerge, final @MetaMutationAnnotation List<Mutation> metaEntries, final User user) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) { @Override public void call(MasterObserver observer) throws IOException { observer.preMergeRegionsCommitAction(this, regionsToMerge, metaEntries); } }); }
/** * Prepare the merged region and region files. * @param server Hosting server instance. Can be null when testing * @param services Used to online/offline regions. * @return merged region * @throws IOException If thrown, transaction failed. Call * {@link #rollback(Server, RegionServerServices)} */ HRegion createMergedRegion(final Server server, final RegionServerServices services) throws IOException { LOG.info("Starting merge of " + region_a + " and " + region_b.getRegionNameAsString() + ", forcible=" + forcible); if ((server != null && server.isStopped()) || (services != null && services.isStopping())) { throw new IOException("Server is stopped or stopping"); } if (rsCoprocessorHost != null) { if (rsCoprocessorHost.preMerge(this.region_a, this.region_b)) { throw new IOException("Coprocessor bypassing regions " + this.region_a + " " + this.region_b + " merge."); } } // If true, no cluster to write meta edits to or to use coordination. boolean testing = server == null ? true : server.getConfiguration() .getBoolean("hbase.testing.nocluster", false); HRegion mergedRegion = stepsBeforePONR(server, services, testing); @MetaMutationAnnotation List<Mutation> metaEntries = new ArrayList<Mutation>(); if (rsCoprocessorHost != null) { if (rsCoprocessorHost.preMergeCommit(this.region_a, this.region_b, metaEntries)) { throw new IOException("Coprocessor bypassing regions " + this.region_a + " " + this.region_b + " merge."); } try { for (Mutation p : metaEntries) { HRegionInfo.parseRegionName(p.getRow()); } } catch (IOException e) { LOG.error("Row key of mutation from coprocessor is not parsable as region name." + "Mutations from coprocessor should only be for hbase:meta table.", e); throw e; } } // This is the point of no return. Similar with SplitTransaction. // IF we reach the PONR then subsequent failures need to crash out this // regionserver this.journal.add(JournalEntry.PONR); // Add merged region and delete region_a and region_b // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region // will determine whether the region is merged or not in case of failures. // If it is successful, master will roll-forward, if not, master will // rollback if (!testing && useCoordinationForAssignment) { if (metaEntries.isEmpty()) { MetaTableAccessor.mergeRegions(server.getConnection(), mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName()); } else { mergeRegionsAndPutMetaEntries(server.getConnection(), mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), metaEntries); } } else if (services != null && !useCoordinationForAssignment) { if (!services.reportRegionStateTransition(TransitionCode.MERGE_PONR, mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) { // Passed PONR, let SSH clean it up throw new IOException("Failed to notify master that merge passed PONR: " + region_a.getRegionInfo().getRegionNameAsString() + " and " + region_b.getRegionInfo().getRegionNameAsString()); } } return mergedRegion; }
/** * Prepare the merged region and region files. * @param server Hosting server instance. Can be null when testing (won't try * and update in zk if a null server) * @param services Used to online/offline regions. * @return merged region * @throws IOException If thrown, transaction failed. Call * {@link #rollback(Server, RegionServerServices)} */ HRegion createMergedRegion(final Server server, final RegionServerServices services) throws IOException { LOG.info("Starting merge of " + region_a + " and " + region_b.getRegionNameAsString() + ", forcible=" + forcible); if ((server != null && server.isStopped()) || (services != null && services.isStopping())) { throw new IOException("Server is stopped or stopping"); } if (rsCoprocessorHost != null) { if (rsCoprocessorHost.preMerge(this.region_a, this.region_b)) { throw new IOException("Coprocessor bypassing regions " + this.region_a + " " + this.region_b + " merge."); } } // If true, no cluster to write meta edits to or to update znodes in. boolean testing = server == null ? true : server.getConfiguration() .getBoolean("hbase.testing.nocluster", false); HRegion mergedRegion = stepsBeforePONR(server, services, testing); @MetaMutationAnnotation List<Mutation> metaEntries = new ArrayList<Mutation>(); if (rsCoprocessorHost != null) { if (rsCoprocessorHost.preMergeCommit(this.region_a, this.region_b, metaEntries)) { throw new IOException("Coprocessor bypassing regions " + this.region_a + " " + this.region_b + " merge."); } try { for (Mutation p : metaEntries) { HRegionInfo.parseRegionName(p.getRow()); } } catch (IOException e) { LOG.error("Row key of mutation from coprocessor is not parsable as region name." + "Mutations from coprocessor should only be for hbase:meta table.", e); throw e; } } // This is the point of no return. Similar with SplitTransaction. // IF we reach the PONR then subsequent failures need to crash out this // regionserver this.journal.add(JournalEntry.PONR); // Add merged region and delete region_a and region_b // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region // will determine whether the region is merged or not in case of failures. // If it is successful, master will roll-forward, if not, master will // rollback if (!testing) { if (metaEntries.isEmpty()) { MetaEditor.mergeRegions(server.getCatalogTracker(), mergedRegion.getRegionInfo(), region_a .getRegionInfo(), region_b.getRegionInfo(), server.getServerName()); } else { mergeRegionsAndPutMetaEntries(server.getCatalogTracker(), mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), metaEntries); } } return mergedRegion; }
@Override public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx, HRegion regionA, HRegion regionB, @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException { }
/** * This will be called before PONR step as part of regions merge transaction. Calling * {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} rollback the merge * @param ctx * @param regionA * @param regionB * @param metaEntries mutations to execute on hbase:meta atomically with regions merge updates. * Any puts or deletes to execute on hbase:meta can be added to the mutations. * @throws IOException */ void preMergeCommit(final ObserverContext<RegionServerCoprocessorEnvironment> ctx, final Region regionA, final Region regionB, @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException;
/** * This will be called before PONR step as part of regions merge transaction. Calling * {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} rollback the merge * @param ctx * @param regionA * @param regionB * @param metaEntries mutations to execute on hbase:meta atomically with regions merge updates. * Any puts or deletes to execute on hbase:meta can be added to the mutations. * @throws IOException */ void preMergeCommit(final ObserverContext<RegionServerCoprocessorEnvironment> ctx, final HRegion regionA, final HRegion regionB, @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException;
/** * This will be called before update META step as part of regions merge transaction. * @param ctx the environment to interact with the framework and master * @param metaEntries mutations to execute on hbase:meta atomically with regions merge updates. * Any puts or deletes to execute on hbase:meta can be added to the mutations. */ default void preMergeRegionsCommitAction( final ObserverContext<MasterCoprocessorEnvironment> ctx, final RegionInfo[] regionsToMerge, @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException {}
@Override public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> regionServerCoprocessorEnvironmentObserverContext, Region region, Region region2, @MetaMutationAnnotation List<Mutation> mutations) throws IOException { }