/** Initialize the environment */ public void startup() throws IOException { if (state == Coprocessor.State.INSTALLED || state == Coprocessor.State.STOPPED) { state = Coprocessor.State.STARTING; Thread currentThread = Thread.currentThread(); ClassLoader hostClassLoader = currentThread.getContextClassLoader(); try { currentThread.setContextClassLoader(this.getClassLoader()); impl.start(this); state = Coprocessor.State.ACTIVE; } finally { currentThread.setContextClassLoader(hostClassLoader); } } else { LOG.warn("Not starting coprocessor "+impl.getClass().getName()+ " because not inactive (state="+state.toString()+")"); } }
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BC_UNCONFIRMED_CAST", justification="Intentional; FB has trouble detecting isAssignableFrom") public RegionServerEnvironment(final Class<?> implClass, final Coprocessor impl, final int priority, final int seq, final Configuration conf, final RegionServerServices services) { super(impl, priority, seq, conf); this.regionServerServices = services; for (Object itf : ClassUtils.getAllInterfaces(implClass)) { Class<?> c = (Class<?>) itf; if (SingletonCoprocessorService.class.isAssignableFrom(c)) {// FindBugs: BC_UNCONFIRMED_CAST this.regionServerServices.registerService( ((SingletonCoprocessorService) impl).getService()); break; } } }
/** * Constructor * @param impl the coprocessor instance * @param priority chaining priority */ public RegionEnvironment(final Coprocessor impl, final int priority, final int seq, final Configuration conf, final Region region, final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { super(impl, priority, seq, conf); this.region = region; this.rsServices = services; this.sharedData = sharedData; // Pick which version of the WAL related events we'll call. // This way we avoid calling the new version on older RegionObservers so // we can maintain binary compatibility. // See notes in javadoc for RegionObserver useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class, HRegionInfo.class, WALKey.class, WALEdit.class); useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class, HRegionInfo.class, WALKey.class, WALEdit.class); }
Region reopenRegion(final Region closedRegion, Class<?> ... implClasses) throws IOException { //HRegionInfo info = new HRegionInfo(tableName, null, null, false); Region r = HRegion.openHRegion(closedRegion, null); // this following piece is a hack. currently a coprocessorHost // is secretly loaded at OpenRegionHandler. we don't really // start a region server here, so just manually create cphost // and set it to region. Configuration conf = TEST_UTIL.getConfiguration(); RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf); ((HRegion)r).setCoprocessorHost(host); for (Class<?> implClass : implClasses) { host.load(implClass, Coprocessor.PRIORITY_USER, conf); } // we need to manually call pre- and postOpen here since the // above load() is not the real case for CP loading. A CP is // expected to be loaded by default from 1) configuration; or 2) // HTableDescriptor. If it's loaded after HRegion initialized, // the pre- and postOpen() won't be triggered automatically. // Here we have to call pre and postOpen explicitly. host.preOpen(); host.postOpen(); return r; }
Region initHRegion (TableName tableName, String callingMethod, Configuration conf, Class<?> [] implClasses, byte [][] families) throws IOException { HTableDescriptor htd = new HTableDescriptor(tableName); for(byte [] family : families) { htd.addFamily(new HColumnDescriptor(family)); } HRegionInfo info = new HRegionInfo(tableName, null, null, false); Path path = new Path(DIR + callingMethod); HRegion r = HRegion.createHRegion(info, path, conf, htd); // this following piece is a hack. RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf); r.setCoprocessorHost(host); for (Class<?> implClass : implClasses) { host.load(implClass, Coprocessor.PRIORITY_USER, conf); Coprocessor c = host.findCoprocessor(implClass.getName()); assertNotNull(c); } // Here we have to call pre and postOpen explicitly. host.preOpen(); host.postOpen(); return r; }
@Test public void testRegionObserverScanTimeStacking() throws Exception { byte[] ROW = Bytes.toBytes("testRow"); byte[] TABLE = Bytes.toBytes(getClass().getName()); byte[] A = Bytes.toBytes("A"); byte[][] FAMILIES = new byte[][] { A }; Configuration conf = HBaseConfiguration.create(); Region region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES); RegionCoprocessorHost h = region.getCoprocessorHost(); h.load(NoDataFromScan.class, Coprocessor.PRIORITY_HIGHEST, conf); h.load(EmptyRegionObsever.class, Coprocessor.PRIORITY_USER, conf); Put put = new Put(ROW); put.add(A, A, A); region.put(put); Get get = new Get(ROW); Result r = region.get(get); assertNull( "Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor. Found: " + r, r.listCells()); }
@Test public void testRegionObserverFlushTimeStacking() throws Exception { byte[] ROW = Bytes.toBytes("testRow"); byte[] TABLE = Bytes.toBytes(getClass().getName()); byte[] A = Bytes.toBytes("A"); byte[][] FAMILIES = new byte[][] { A }; Configuration conf = HBaseConfiguration.create(); Region region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES); RegionCoprocessorHost h = region.getCoprocessorHost(); h.load(NoDataFromFlush.class, Coprocessor.PRIORITY_HIGHEST, conf); h.load(EmptyRegionObsever.class, Coprocessor.PRIORITY_USER, conf); // put a row and flush it to disk Put put = new Put(ROW); put.add(A, A, A); region.put(put); region.flush(true); Get get = new Get(ROW); Result r = region.get(get); assertNull( "Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor. Found: " + r, r.listCells()); }
/** * A loaded WAL coprocessor won't break existing WAL test cases. */ @Test public void testWALCoprocessorLoaded() throws Exception { // test to see whether the coprocessor is loaded or not. FSHLog log = null; try { log = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(), HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); WALCoprocessorHost host = log.getCoprocessorHost(); Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName()); assertNotNull(c); } finally { if (log != null) { log.close(); } } }
protected static HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData, List<String> coprocessors) throws Exception { HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); for (byte[] family : columnFamilies) { HColumnDescriptor columnDesc = new HColumnDescriptor(family); columnDesc.setMaxVersions(Integer.MAX_VALUE); columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis desc.addFamily(columnDesc); } if (existingData) { desc.setValue(TxConstants.READ_NON_TX_DATA, "true"); } // Divide individually to prevent any overflow int priority = Coprocessor.PRIORITY_USER; // order in list is the same order that coprocessors will be invoked for (String coprocessor : coprocessors) { desc.addCoprocessor(coprocessor, null, ++priority, null); } hBaseAdmin.createTable(desc); testUtil.waitTableAvailable(tableName, 5000); return new HTable(testUtil.getConfiguration(), tableName); }
/** Initialize the environment */ public void startup() { if (state == Coprocessor.State.INSTALLED || state == Coprocessor.State.STOPPED) { state = Coprocessor.State.STARTING; try { impl.start(this); state = Coprocessor.State.ACTIVE; } catch (IOException ioe) { LOG.error("Error starting coprocessor "+impl.getClass().getName(), ioe); } } else { LOG.warn("Not starting coprocessor "+impl.getClass().getName()+ " because not inactive (state="+state.toString()+")"); } }
/** Clean up the environment */ protected void shutdown() { if (state == Coprocessor.State.ACTIVE) { state = Coprocessor.State.STOPPING; try { impl.stop(this); state = Coprocessor.State.STOPPED; } catch (IOException ioe) { LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe); } } else { LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+ " because not active (state="+state.toString()+")"); } // clean up any table references for (HTableInterface table: openTables) { try { ((HTableWrapper)table).internalClose(); } catch (IOException e) { // nothing can be done here LOG.warn("Failed to close " + Bytes.toStringBinary(table.getTableName()), e); } } }
@Test public void testRegionObserverScanTimeStacking() throws Exception { byte[] ROW = Bytes.toBytes("testRow"); byte[] TABLE = Bytes.toBytes(getClass().getName()); byte[] A = Bytes.toBytes("A"); byte[][] FAMILIES = new byte[][] { A }; Configuration conf = HBaseConfiguration.create(); HRegion region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES); RegionCoprocessorHost h = region.getCoprocessorHost(); h.load(NoDataFromScan.class, Coprocessor.PRIORITY_HIGHEST, conf); h.load(EmptyRegionObsever.class, Coprocessor.PRIORITY_USER, conf); Put put = new Put(ROW); put.add(A, A, A); region.put(put); Get get = new Get(ROW); Result r = region.get(get); assertNull( "Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor. Found: " + r, r.list()); }
@Test public void testRegionObserverFlushTimeStacking() throws Exception { byte[] ROW = Bytes.toBytes("testRow"); byte[] TABLE = Bytes.toBytes(getClass().getName()); byte[] A = Bytes.toBytes("A"); byte[][] FAMILIES = new byte[][] { A }; Configuration conf = HBaseConfiguration.create(); HRegion region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES); RegionCoprocessorHost h = region.getCoprocessorHost(); h.load(NoDataFromFlush.class, Coprocessor.PRIORITY_HIGHEST, conf); h.load(EmptyRegionObsever.class, Coprocessor.PRIORITY_USER, conf); // put a row and flush it to disk Put put = new Put(ROW); put.add(A, A, A); region.put(put); region.flushcache(); Get get = new Get(ROW); Result r = region.get(get); assertNull( "Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor. Found: " + r, r.list()); }
/** * Constructor * @param impl the coprocessor instance * @param priority chaining priority */ public RegionEnvironment(final Coprocessor impl, final int priority, final int seq, final Configuration conf, final HRegion region, final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { super(impl, priority, seq, conf); this.region = region; this.rsServices = services; this.sharedData = sharedData; // Pick which version of the WAL related events we'll call. // This way we avoid calling the new version on older RegionObservers so // we can maintain binary compatibility. // See notes in javadoc for RegionObserver useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class, HRegionInfo.class, WALKey.class, WALEdit.class); useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class, HRegionInfo.class, WALKey.class, WALEdit.class); }
HRegion reopenRegion(final HRegion closedRegion, Class<?> ... implClasses) throws IOException { //HRegionInfo info = new HRegionInfo(tableName, null, null, false); HRegion r = HRegion.openHRegion(closedRegion, null); // this following piece is a hack. currently a coprocessorHost // is secretly loaded at OpenRegionHandler. we don't really // start a region server here, so just manually create cphost // and set it to region. Configuration conf = TEST_UTIL.getConfiguration(); RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf); r.setCoprocessorHost(host); for (Class<?> implClass : implClasses) { host.load(implClass, Coprocessor.PRIORITY_USER, conf); } // we need to manually call pre- and postOpen here since the // above load() is not the real case for CP loading. A CP is // expected to be loaded by default from 1) configuration; or 2) // HTableDescriptor. If it's loaded after HRegion initialized, // the pre- and postOpen() won't be triggered automatically. // Here we have to call pre and postOpen explicitly. host.preOpen(); host.postOpen(); return r; }
HRegion initHRegion (TableName tableName, String callingMethod, Configuration conf, Class<?> [] implClasses, byte [][] families) throws IOException { HTableDescriptor htd = new HTableDescriptor(tableName); for(byte [] family : families) { htd.addFamily(new HColumnDescriptor(family)); } HRegionInfo info = new HRegionInfo(tableName, null, null, false); Path path = new Path(DIR + callingMethod); HRegion r = HRegion.createHRegion(info, path, conf, htd); // this following piece is a hack. RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf); r.setCoprocessorHost(host); for (Class<?> implClass : implClasses) { host.load(implClass, Coprocessor.PRIORITY_USER, conf); Coprocessor c = host.findCoprocessor(implClass.getName()); assertNotNull(c); } // Here we have to call pre and postOpen explicitly. host.preOpen(); host.postOpen(); return r; }
@Test public void testRegionObserverScanTimeStacking() throws Exception { byte[] ROW = Bytes.toBytes("testRow"); byte[] TABLE = Bytes.toBytes(getClass().getName()); byte[] A = Bytes.toBytes("A"); byte[][] FAMILIES = new byte[][] { A }; Configuration conf = HBaseConfiguration.create(); HRegion region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES); RegionCoprocessorHost h = region.getCoprocessorHost(); h.load(NoDataFromScan.class, Coprocessor.PRIORITY_HIGHEST, conf); h.load(EmptyRegionObsever.class, Coprocessor.PRIORITY_USER, conf); Put put = new Put(ROW); put.add(A, A, A); region.put(put); Get get = new Get(ROW); Result r = region.get(get); assertNull( "Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor. Found: " + r, r.listCells()); }
@Test public void testRegionObserverFlushTimeStacking() throws Exception { byte[] ROW = Bytes.toBytes("testRow"); byte[] TABLE = Bytes.toBytes(getClass().getName()); byte[] A = Bytes.toBytes("A"); byte[][] FAMILIES = new byte[][] { A }; Configuration conf = HBaseConfiguration.create(); HRegion region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES); RegionCoprocessorHost h = region.getCoprocessorHost(); h.load(NoDataFromFlush.class, Coprocessor.PRIORITY_HIGHEST, conf); h.load(EmptyRegionObsever.class, Coprocessor.PRIORITY_USER, conf); // put a row and flush it to disk Put put = new Put(ROW); put.add(A, A, A); region.put(put); region.flushcache(); Get get = new Get(ROW); Result r = region.get(get); assertNull( "Got an unexpected number of rows - no data should be returned with the NoDataFromScan coprocessor. Found: " + r, r.listCells()); }
public static void main(String[] args) throws IOException { Configuration conf = HBaseConfiguration.create(); FileSystem fs = FileSystem.get(conf); //coprocessor所在的jar包的存放路径 Path path = new Path(fs.getUri() + Path.SEPARATOR + "micmiu/coprocessor/demo.jar"); //HTableDescriptor HTableDescriptor htd = new HTableDescriptor("demo_copro"); //addFamily htd.addFamily(new HColumnDescriptor("cf")); // //设置要加载的corpocessor htd.setValue("COPROCESSOR$1", path.toString() + "|" + RegionObserverDemo.class.getCanonicalName() + "|" + Coprocessor.PRIORITY_USER); // HBaseAdmin admin = new HBaseAdmin(conf); //创建表"testtable" admin.createTable(htd); System.out.println("finished."); }
@Test // HBASE-3516: Test CP Class loading from local file system public void testClassLoadingFromLocalFS() throws Exception { File jarFile = buildCoprocessorJar(cpName3); // create a table that references the jar HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(cpName3)); htd.addFamily(new HColumnDescriptor("test")); htd.setValue("COPROCESSOR$1", getLocalPath(jarFile) + "|" + cpName3 + "|" + Coprocessor.PRIORITY_USER); Admin admin = TEST_UTIL.getAdmin(); admin.createTable(htd); waitForTable(htd.getTableName()); // verify that the coprocessor was loaded boolean found = false; MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster(); for (HRegion region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) { if (region.getRegionInfo().getRegionNameAsString().startsWith(cpName3)) { found = (region.getCoprocessorHost().findCoprocessor(cpName3) != null); } } assertTrue("Class " + cpName3 + " was missing on a region", found); }
/** Initialize the environment */ public void startup() throws IOException { if (state == Coprocessor.State.INSTALLED || state == Coprocessor.State.STOPPED) { state = Coprocessor.State.STARTING; Thread currentThread = Thread.currentThread(); ClassLoader hostClassLoader = currentThread.getContextClassLoader(); try { currentThread.setContextClassLoader(this.getClassLoader()); impl.start(this); state = Coprocessor.State.ACTIVE; } finally { currentThread.setContextClassLoader(hostClassLoader); } } else { LOG.warn("Not starting coprocessor " + impl.getClass().getName() + " because not inactive (state=" + state.toString() + ")"); } }