/** * Create replication peer for replicating to region replicas if needed. * @param conf configuration to use * @throws IOException */ public static void setupRegionReplicaReplication(Configuration conf) throws IOException { if (!isRegionReplicaReplicationEnabled(conf)) { return; } ReplicationAdmin repAdmin = new ReplicationAdmin(conf); try { if (repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER) == null) { ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf)); peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName()); repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null); } } catch (ReplicationException ex) { throw new IOException(ex); } finally { repAdmin.close(); } }
@Test (timeout=120000) public void testWALEntryFilterFromReplicationEndpoint() throws Exception { admin.addPeer("testWALEntryFilterFromReplicationEndpoint", new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null); // now replicate some data. try (Connection connection = ConnectionFactory.createConnection(conf1)) { doPut(connection, Bytes.toBytes("row1")); doPut(connection, row); doPut(connection, Bytes.toBytes("row2")); } Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { return ReplicationEndpointForTest.replicateCount.get() >= 1; } }); Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get()); admin.removePeer("testWALEntryFilterFromReplicationEndpoint"); }
protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, StdOutSink sink, ExecutorService executor, boolean treatFailureAsError) { super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError); Configuration configuration = connection.getConfiguration(); znode = configuration.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT); timeout = configuration .getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); ConnectStringParser parser = new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration)); hosts = Lists.newArrayList(); for (InetSocketAddress server : parser.getServerAddresses()) { hosts.add(server.toString()); } }
@Test (timeout=120000) public void testCustomReplicationEndpoint() throws Exception { // test installing a custom replication endpoint other than the default one. admin.addPeer("testCustomReplicationEndpoint", new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); // check whether the class has been constructed and started Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers; } }); Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { return ReplicationEndpointForTest.startedCount.get() >= numRegionServers; } }); Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); // now replicate some data. doPut(Bytes.toBytes("row42")); Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { return ReplicationEndpointForTest.replicateCount.get() >= 1; } }); doAssert(Bytes.toBytes("row42")); admin.removePeer("testCustomReplicationEndpoint"); }
@Test (timeout=120000) public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception { Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get()); int peerCount = admin.getPeersCount(); final String id = "testReplicationEndpointReturnsFalseOnReplicate"; admin.addPeer(id, new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null); // This test is flakey and then there is so much stuff flying around in here its, hard to // debug. Peer needs to be up for the edit to make it across. This wait on // peer count seems to be a hack that has us not progress till peer is up. if (admin.getPeersCount() <= peerCount) { LOG.info("Waiting on peercount to go up from " + peerCount); Threads.sleep(100); } // now replicate some data doPut(row); Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { // Looks like replication endpoint returns false unless we put more than 10 edits. We // only send over one edit. int count = ReplicationEndpointForTest.replicateCount.get(); LOG.info("count=" + count); return ReplicationEndpointReturningFalse.replicated.get(); } }); if (ReplicationEndpointReturningFalse.ex.get() != null) { throw ReplicationEndpointReturningFalse.ex.get(); } admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate"); }
@Test public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException { // create a table with region replicas. Check whether the replication peer is created // and replication started. ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); String peerId = "region_replica_replication"; if (admin.getPeerConfig(peerId) != null) { admin.removePeer(peerId); } HTableDescriptor htd = HTU.createTableDescriptor( "testReplicationPeerIsCreated_no_region_replicas"); HTU.getHBaseAdmin().createTable(htd); ReplicationPeerConfig peerConfig = admin.getPeerConfig(peerId); assertNull(peerConfig); htd = HTU.createTableDescriptor("testReplicationPeerIsCreated"); htd.setRegionReplication(2); HTU.getHBaseAdmin().createTable(htd); // assert peer configuration is correct peerConfig = admin.getPeerConfig(peerId); assertNotNull(peerConfig); assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( HTU.getConfiguration())); assertEquals(peerConfig.getReplicationEndpointImpl(), RegionReplicaReplicationEndpoint.class.getName()); admin.close(); }
@Test (timeout=240000) public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception { // modify a table by adding region replicas. Check whether the replication peer is created // and replication started. ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); String peerId = "region_replica_replication"; if (admin.getPeerConfig(peerId) != null) { admin.removePeer(peerId); } HTableDescriptor htd = HTU.createTableDescriptor("testRegionReplicaReplicationPeerIsCreatedForModifyTable"); HTU.getHBaseAdmin().createTable(htd); // assert that replication peer is not created yet ReplicationPeerConfig peerConfig = admin.getPeerConfig(peerId); assertNull(peerConfig); HTU.getHBaseAdmin().disableTable(htd.getTableName()); htd.setRegionReplication(2); HTU.getHBaseAdmin().modifyTable(htd.getTableName(), htd); HTU.getHBaseAdmin().enableTable(htd.getTableName()); // assert peer configuration is correct peerConfig = admin.getPeerConfig(peerId); assertNotNull(peerConfig); assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( HTU.getConfiguration())); assertEquals(peerConfig.getReplicationEndpointImpl(), RegionReplicaReplicationEndpoint.class.getName()); admin.close(); }
private static String initPeerClusterState(String baseZKNode) throws IOException, KeeperException { // Add a dummy region server and set up the cluster id Configuration testConf = new Configuration(conf); testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode); ZooKeeperWatcher zkw1 = new ZooKeeperWatcher(testConf, "test1", null); String fakeRs = ZKUtil.joinZNode(zkw1.rsZNode, "hostname1.example.org:1234"); ZKUtil.createWithParents(zkw1, fakeRs); ZKClusterId.setClusterId(zkw1, new ClusterId()); return ZKConfig.getZooKeeperClusterKey(testConf); }
@Before @Override public void setUp() { super.setUp(); DummyServer ds1 = new DummyServer(server1); DummyServer ds2 = new DummyServer(server2); DummyServer ds3 = new DummyServer(server3); rq1 = ReplicationFactory.getReplicationQueues(zkw, conf, ds1); rq2 = ReplicationFactory.getReplicationQueues(zkw, conf, ds2); rq3 = ReplicationFactory.getReplicationQueues(zkw, conf, ds3); rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1); rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1); }
public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf, Abortable abortable) { this.zookeeper = zookeeper; this.conf = conf; this.abortable = abortable; String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf); this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName); this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName); }
/** * Apply the settings in the given key to the given configuration, this is * used to communicate with distant clusters * @param conf configuration object to configure * @param key string that contains the 3 required configuratins * @throws IOException */ private static void applyClusterKeyToConf(Configuration conf, String key) throws IOException{ ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key); conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.getQuorumString()); conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.getClientPort()); conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.getZnodeParent()); }
/** * * @param configuration * @param peerName * @param tableCFs * @throws ReplicationException * @throws IOException */ protected void addPeer(final Configuration configuration,String peerName, Map<TableName, List<String>> tableCFs) throws ReplicationException, IOException { try (ReplicationAdmin replicationAdmin = new ReplicationAdmin(configuration)) { ReplicationPeerConfig peerConfig = new ReplicationPeerConfig() .setClusterKey(ZKConfig.getZooKeeperClusterKey(configuration)) .setReplicationEndpointImpl(HbaseEndpoint.class.getName()); replicationAdmin.addPeer(peerName, peerConfig, tableCFs); } }
private static String initPeerClusterState(String baseZKNode) throws IOException, KeeperException { // Add a dummy region server and set up the cluster id Configuration testConf = new Configuration(conf); testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode); ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null); String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, "hostname1.example.org:1234"); ZKUtil.createWithParents(zkw1, fakeRs); ZKClusterId.setClusterId(zkw1, new ClusterId()); return ZKConfig.getZooKeeperClusterKey(testConf); }
@Before public void setUp() { zkTimeoutCount = 0; rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); rp = ReplicationFactory.getReplicationPeers(zkw, conf); OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); }
private void checkClusterKey(String clusterKey) throws DoNotRetryIOException { try { ZKConfig.validateClusterKey(clusterKey); } catch (IOException e) { throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e); } }
@Test (timeout=120000) public void testWALEntryFilterFromReplicationEndpoint() throws Exception { ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); //test that we can create mutliple WALFilters reflectively rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, EverythingPassesWALEntryFilter.class.getName() + "," + EverythingPassesWALEntryFilterSubclass.class.getName()); admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc); // now replicate some data. try (Connection connection = ConnectionFactory.createConnection(conf1)) { doPut(connection, Bytes.toBytes("row1")); doPut(connection, row); doPut(connection, Bytes.toBytes("row2")); } Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { return ReplicationEndpointForTest.replicateCount.get() >= 1; } }); Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get()); //make sure our reflectively created filter is in the filter chain Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry()); admin.removePeer("testWALEntryFilterFromReplicationEndpoint"); }
@Test (timeout=120000, expected=IOException.class) public void testWALEntryFilterAddValidation() throws Exception { ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); //test that we can create mutliple WALFilters reflectively rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, "IAmNotARealWalEntryFilter"); admin.addPeer("testWALEntryFilterAddValidation", rpc); }
@Test (timeout=120000, expected=IOException.class) public void testWALEntryFilterUpdateValidation() throws Exception { ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); //test that we can create mutliple WALFilters reflectively rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, "IAmNotARealWalEntryFilter"); admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc); }
@Test public void testWithoutConfigurationObject() throws Exception { testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); deleteTable = true; ctx.put("batchSize", "2"); ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, ZKConfig.getZKQuorumServersString(testUtility.getConfiguration())); ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT)); AsyncHBaseSink sink = new AsyncHBaseSink(); Configurables.configure(sink, ctx); // Reset context to values usable by other tests. ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, null); ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, null); ctx.put("batchSize", "100"); Channel channel = new MemoryChannel(); Configurables.configure(channel, ctx); sink.setChannel(channel); sink.start(); Transaction tx = channel.getTransaction(); tx.begin(); for (int i = 0; i < 3; i++) { Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); channel.put(e); } tx.commit(); tx.close(); int count = 0; Status status = Status.READY; while (status != Status.BACKOFF) { count++; status = sink.process(); } /* * Make sure that the configuration was picked up from the context itself * and not from a configuration object which was created by the sink. */ Assert.assertTrue(sink.isConfNull()); sink.stop(); Assert.assertEquals(2, count); HTable table = new HTable(testUtility.getConfiguration(), tableName); byte[][] results = getResults(table, 3); byte[] out; int found = 0; for (int i = 0; i < 3; i++) { for (int j = 0; j < 3; j++) { if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { found++; break; } } } Assert.assertEquals(3, found); out = results[3]; Assert.assertArrayEquals(Longs.toByteArray(3), out); }
/** * Use this before submitting a TableReduce job. It will * appropriately set up the JobConf. * * @param table The output table. * @param reducer The reducer class to use. * @param job The current job to adjust. Make sure the passed job is * carrying all necessary HBase configuration. * @param partitioner Partitioner to use. Pass <code>null</code> to use * default partitioner. * @param quorumAddress Distant cluster to write to; default is null for * output to the cluster that is designated in <code>hbase-site.xml</code>. * Set this String to the zookeeper ensemble of an alternate remote cluster * when you would have the reduce write a cluster that is other than the * default; e.g. copying tables between clusters, the source would be * designated by <code>hbase-site.xml</code> and this param would have the * ensemble address of the remote cluster. The format to pass is particular. * Pass <code> <hbase.zookeeper.quorum>:< * hbase.zookeeper.client.port>:<zookeeper.znode.parent> * </code> such as <code>server,server2,server3:2181:/hbase</code>. * @param serverClass redefined hbase.regionserver.class * @param serverImpl redefined hbase.regionserver.impl * @param addDependencyJars upload HBase jars and jars for any of the configured * job classes via the distributed cache (tmpjars). * @throws IOException When determining the region count fails. */ public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl, boolean addDependencyJars) throws IOException { Configuration conf = job.getConfiguration(); HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); job.setOutputFormatClass(TableOutputFormat.class); if (reducer != null) job.setReducerClass(reducer); conf.set(TableOutputFormat.OUTPUT_TABLE, table); conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName()); // If passed a quorum/ensemble address, pass it on to TableOutputFormat. if (quorumAddress != null) { // Calling this will validate the format ZKConfig.validateClusterKey(quorumAddress); conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress); } if (serverClass != null && serverImpl != null) { conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass); conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl); } job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writable.class); if (partitioner == HRegionPartitioner.class) { job.setPartitionerClass(HRegionPartitioner.class); int regions = MetaTableAccessor.getRegionCount(conf, TableName.valueOf(table)); if (job.getNumReduceTasks() > regions) { job.setNumReduceTasks(regions); } } else if (partitioner != null) { job.setPartitionerClass(partitioner); } if (addDependencyJars) { addDependencyJars(job); } initCredentials(job); }
@Test (timeout=120000) public void testInterClusterReplication() throws Exception { final String id = "testInterClusterReplication"; List<HRegion> regions = utility1.getHBaseCluster().getRegions(tableName); int totEdits = 0; // Make sure edits are spread across regions because we do region based batching // before shipping edits. for(HRegion region: regions) { HRegionInfo hri = region.getRegionInfo(); byte[] row = hri.getStartKey(); for (int i = 0; i < 100; i++) { if (row.length > 0) { Put put = new Put(row); put.addColumn(famName, row, row); region.put(put); totEdits++; } } } admin.addPeer(id, new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf2)) .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()), null); final int numEdits = totEdits; Waiter.waitFor(conf1, 30000, new Waiter.ExplainingPredicate<Exception>() { @Override public boolean evaluate() throws Exception { return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits; } @Override public String explainFailure() throws Exception { String failure = "Failed to replicate all edits, expected = " + numEdits + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get(); return failure; } }); admin.removePeer("testInterClusterReplication"); utility1.deleteTableData(tableName); }
@Test (timeout=120000) public void testInterClusterReplication() throws Exception { final String id = "testInterClusterReplication"; List<HRegion> regions = utility1.getHBaseCluster().getRegions(tableName); int totEdits = 0; // Make sure edits are spread across regions because we do region based batching // before shipping edits. for(HRegion region: regions) { RegionInfo hri = region.getRegionInfo(); byte[] row = hri.getStartKey(); for (int i = 0; i < 100; i++) { if (row.length > 0) { Put put = new Put(row); put.addColumn(famName, row, row); region.put(put); totEdits++; } } } admin.addPeer(id, new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf2)) .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()), null); final int numEdits = totEdits; Waiter.waitFor(conf1, 30000, new Waiter.ExplainingPredicate<Exception>() { @Override public boolean evaluate() throws Exception { return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits; } @Override public String explainFailure() throws Exception { String failure = "Failed to replicate all edits, expected = " + numEdits + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get(); return failure; } }); admin.removePeer("testInterClusterReplication"); utility1.deleteTableData(tableName); }
public static void validateClusterKey(String quorumAddress) throws IOException { ZKConfig.validateClusterKey(quorumAddress); }