public void initialize(final Server server, final FileSystem fs, final Path logDir, final Path oldLogDir) throws IOException { this.server = server; this.conf = this.server.getConfiguration(); this.replication = isReplication(this.conf); if (replication) { try { this.zkHelper = new ReplicationZookeeper(server, this.replicating); } catch (KeeperException ke) { throw new IOException("Failed replication handler create " + "(replicating=" + this.replicating, ke); } this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs, this.replicating, logDir, oldLogDir) ; } else { this.replicationManager = null; this.zkHelper = null; } }
/** * Instantiate the replication management (if rep is enabled). * @param server Hosting server * @param fs handle to the filesystem * @param logDir * @param oldLogDir directory where logs are archived * @throws IOException * @throws KeeperException */ public Replication(final Server server, final FileSystem fs, final Path logDir, final Path oldLogDir) throws IOException, KeeperException { this.server = server; this.conf = this.server.getConfiguration(); this.replication = isReplication(this.conf); if (replication) { this.zkHelper = new ReplicationZookeeper(server, this.replicating); this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs, this.replicating, logDir, oldLogDir) ; } else { this.replicationManager = null; this.zkHelper = null; } }
/** * Creates a replication manager and sets the watch on all the other * registered region servers * @param zkHelper the zk helper for replication * @param conf the configuration to use * @param stopper the stopper object for this region server * @param fs the file system to use * @param replicating the status of the replication on this cluster * @param logDir the directory that contains all hlog directories of live RSs * @param oldLogDir the directory where old logs are archived */ public ReplicationSourceManager(final ReplicationZookeeper zkHelper, final Configuration conf, final Stoppable stopper, final FileSystem fs, final AtomicBoolean replicating, final Path logDir, final Path oldLogDir) { this.sources = new ArrayList<ReplicationSourceInterface>(); this.replicating = replicating; this.zkHelper = zkHelper; this.stopper = stopper; this.hlogsById = new HashMap<String, SortedSet<String>>(); this.oldsources = new ArrayList<ReplicationSourceInterface>(); this.conf = conf; this.fs = fs; this.logDir = logDir; this.oldLogDir = oldLogDir; this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000); this.zkHelper.registerRegionServerListener( new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher())); this.zkHelper.registerRegionServerListener( new PeersWatcher(this.zkHelper.getZookeeperWatcher())); this.zkHelper.listPeersIdsAndWatch(); // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. int nbWorkers = conf.getInt("replication.executor.workers", 1); // use a short 100ms sleep since this could be done inline with a RS startup // even if we fail, other region servers can take care of it this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setNameFormat("ReplicationExecutor-%d"); this.executor.setThreadFactory(tfb.build()); this.rand = new Random(); }
/** * Called when a node has been deleted * @param path full path of the deleted node */ public void nodeDeleted(String path) { if (stopper.isStopped()) { return; } boolean cont = refreshListIfRightPath(path); if (!cont) { return; } LOG.info(path + " znode expired, trying to lock it"); transferQueues(ReplicationZookeeper.getZNodeName(path)); }
/** * Called when a node has been deleted * @param path full path of the deleted node */ public void nodeDeleted(String path) { List<String> peers = refreshPeersList(path); if (peers == null) { return; } String id = ReplicationZookeeper.getZNodeName(path); removePeer(id); }
/** * Constructor that creates a connection to the local ZooKeeper ensemble. * @param conf Configuration to use * @throws IOException if the connection to ZK cannot be made * @throws RuntimeException if replication isn't enabled. */ public ReplicationAdmin(Configuration conf) throws IOException { if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) { throw new RuntimeException("hbase.replication isn't true, please " + "enable it in order to use replication"); } this.connection = HConnectionManager.getConnection(conf); ZooKeeperWatcher zkw = this.connection.getZooKeeperWatcher(); try { this.replicationZk = new ReplicationZookeeper(this.connection, conf, zkw); } catch (KeeperException e) { throw new IOException("Unable setup the ZooKeeper connection", e); } }
@Test public void testNodeFailoverDeadServerParsing() throws Exception { LOG.debug("testNodeFailoverDeadServerParsing"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); AtomicBoolean replicating = new AtomicBoolean(true); ReplicationZookeeper rz = new ReplicationZookeeper(server, replicating); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); for (String file : files) { rz.addLogToList(file, "1"); } // create 3 DummyServers Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal"); Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com"); Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com"); // simulate three server fail sequentially ReplicationZookeeper rz1 = new ReplicationZookeeper(s1, new AtomicBoolean(true)); SortedMap<String, SortedSet<String>> testMap = rz1.copyQueuesFromRSUsingMulti(server.getServerName().getServerName()); ReplicationZookeeper rz2 = new ReplicationZookeeper(s2, new AtomicBoolean(true)); testMap = rz2.copyQueuesFromRSUsingMulti(s1.getServerName().getServerName()); ReplicationZookeeper rz3 = new ReplicationZookeeper(s3, new AtomicBoolean(true)); testMap = rz3.copyQueuesFromRSUsingMulti(s2.getServerName().getServerName()); ReplicationSource s = new ReplicationSource(); s.checkIfQueueRecovered(testMap.firstKey()); List<String> result = s.getDeadRegionServers(); // verify assertTrue(result.contains(server.getServerName().getServerName())); assertTrue(result.contains(s1.getServerName().getServerName())); assertTrue(result.contains(s2.getServerName().getServerName())); server.abort("", null); }
/** * Creates a replication manager and sets the watch on all the other * registered region servers * @param zkHelper the zk helper for replication * @param conf the configuration to use * @param stopper the stopper object for this region server * @param fs the file system to use * @param replicating the status of the replication on this cluster * @param logDir the directory that contains all hlog directories of live RSs * @param oldLogDir the directory where old logs are archived */ public ReplicationSourceManager(final ReplicationZookeeper zkHelper, final Configuration conf, final Stoppable stopper, final FileSystem fs, final AtomicBoolean replicating, final Path logDir, final Path oldLogDir) { this.sources = new ArrayList<ReplicationSourceInterface>(); this.replicating = replicating; this.zkHelper = zkHelper; this.stopper = stopper; this.hlogsById = new HashMap<String, SortedSet<String>>(); this.oldsources = new ArrayList<ReplicationSourceInterface>(); this.conf = conf; this.fs = fs; this.logDir = logDir; this.oldLogDir = oldLogDir; this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000); this.zkHelper.registerRegionServerListener( new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher())); List<String> otherRSs = this.zkHelper.getRegisteredRegionServers(); this.zkHelper.registerRegionServerListener( new PeersWatcher(this.zkHelper.getZookeeperWatcher())); this.zkHelper.listPeersIdsAndWatch(); this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs; // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. int nbWorkers = conf.getInt("replication.executor.workers", 1); // use a short 100ms sleep since this could be done inline with a RS startup // even if we fail, other region servers can take care of it this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setNameFormat("ReplicationExecutor-%d"); this.executor.setThreadFactory(tfb.build()); }
/** * Called when a node has been deleted * @param path full path of the deleted node */ public void nodeDeleted(String path) { if (stopper.isStopped()) { return; } boolean cont = refreshRegionServersList(path); if (!cont) { return; } LOG.info(path + " znode expired, trying to lock it"); transferQueues(ReplicationZookeeper.getZNodeName(path)); }
/** * Creates a replication manager and sets the watch on all the other * registered region servers * @param zkHelper the zk helper for replication * @param conf the configuration to use * @param stopper the stopper object for this region server * @param fs the file system to use * @param replicating the status of the replication on this cluster * @param logDir the directory that contains all hlog directories of live RSs * @param oldLogDir the directory where old logs are archived */ public ReplicationSourceManager(final ReplicationZookeeper zkHelper, final Configuration conf, final Stoppable stopper, final FileSystem fs, final AtomicBoolean replicating, final Path logDir, final Path oldLogDir) { this.sources = new ArrayList<ReplicationSourceInterface>(); this.replicating = replicating; this.zkHelper = zkHelper; this.stopper = stopper; this.hlogsById = new HashMap<String, SortedSet<String>>(); this.oldsources = new ArrayList<ReplicationSourceInterface>(); this.conf = conf; this.fs = fs; this.logDir = logDir; this.oldLogDir = oldLogDir; this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000); this.zkHelper.registerRegionServerListener( new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher())); this.zkHelper.registerRegionServerListener( new PeersWatcher(this.zkHelper.getZookeeperWatcher())); this.zkHelper.listPeersIdsAndWatch(); // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. int nbWorkers = conf.getInt("replication.executor.workers", 1); // use a short 100ms sleep since this could be done inline with a RS startup // even if we fail, other region servers can take care of it this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setNameFormat("ReplicationExecutor-%d"); this.executor.setThreadFactory(tfb.build()); }
public void initialize(final Server server, final FileSystem fs, final Path logDir, final Path oldLogDir) throws IOException { this.server = server; this.conf = this.server.getConfiguration(); this.replication = isReplication(this.conf); this.scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat(server.getServerName() + "Replication Statistics #%d") .setDaemon(true) .build()); if (replication) { try { this.zkHelper = new ReplicationZookeeper(server, this.replicating); } catch (KeeperException ke) { throw new IOException("Failed replication handler create " + "(replicating=" + this.replicating, ke); } this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs, this.replicating, logDir, oldLogDir); this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); } else { this.replicationManager = null; this.zkHelper = null; } }