/** * signal the workers that a task was resubmitted by creating the RESCAN node. */ private void rescan(long retries) { // The RESCAN node will be deleted almost immediately by the // SplitLogManager as soon as it is created because it is being // created in the DONE state. This behavior prevents a buildup // of RESCAN nodes. But there is also a chance that a SplitLogWorker // might miss the watch-trigger that creation of RESCAN node provides. // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks // therefore this behavior is safe. SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName(), getRecoveryMode()); this.watcher .getRecoverableZooKeeper() .getZooKeeper() .create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), Long.valueOf(retries)); }
@Test (timeout=180000) public void testOrphanTaskAcquisition() throws Exception { LOG.info("TestOrphanTaskAcquisition"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER, this.mode); zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); assertFalse(task.isUnassigned()); long curt = System.currentTimeMillis(); assertTrue((task.last_update <= curt) && (task.last_update > (curt - 1000))); LOG.info("waiting for manager to resubmit the orphan task"); waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); assertTrue(task.isUnassigned()); waitForCounter(tot_mgr_rescan, 0, 1, to + to/2); }
@Test (timeout=180000) public void testTaskDone() throws Exception { LOG.info("TestTaskDone - cleanup task node once in DONE state"); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Done(worker1, this.mode); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); synchronized (batch) { while (batch.installed != batch.done) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); }
@Test (timeout=180000) public void testTaskErr() throws Exception { LOG.info("TestTaskErr - cleanup task node once in ERR state"); conf.setInt("hbase.splitlog.max.resubmit", 0); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Err(worker1, this.mode); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); synchronized (batch) { while (batch.installed != batch.error) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT); }
@Test (timeout=180000) public void testTaskResigned() throws Exception { LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); assertEquals(tot_mgr_resubmit.get(), 0); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); assertEquals(tot_mgr_resubmit.get(), 0); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); assertEquals(tot_mgr_resubmit.get(), 0); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); assertEquals(tot_mgr_resubmit.get(), 0); SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode); assertEquals(tot_mgr_resubmit.get(), 0); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); int version = ZKUtil.checkExists(zkw, tasknode); // Could be small race here. if (tot_mgr_resubmit.get() == 0) { waitForCounter(tot_mgr_resubmit, 0, 1, to/2); } assertEquals(tot_mgr_resubmit.get(), 1); byte[] taskstate = ZKUtil.getData(zkw, tasknode); slt = SplitLogTask.parseFrom(taskstate); assertTrue(slt.isUnassigned(DUMMY_MASTER)); }
@Test (timeout=180000) public void testWorkerCrash() throws Exception { slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); // Not yet resubmitted. Assert.assertEquals(0, tot_mgr_resubmit.get()); // This server becomes dead Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). // It has been resubmitted Assert.assertEquals(1, tot_mgr_resubmit.get()); }
@Ignore("DLR is broken by HBASE-12751") @Test(timeout=60000) public void testGetPreviousRecoveryMode() throws Exception { LOG.info("testGetPreviousRecoveryMode"); SplitLogCounters.resetCounters(); // Not actually enabling DLR for the cluster, just for the ZkCoordinatedStateManager to use. // The test is just manipulating ZK manually anyways. conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), new SplitLogTask.Unassigned( ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); LOG.info("Mode1=" + slm.getRecoveryMode()); assertTrue(slm.isLogSplitting()); zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1); LOG.info("Mode2=" + slm.getRecoveryMode()); slm.setRecoveryMode(false); LOG.info("Mode3=" + slm.getRecoveryMode()); assertTrue("Mode4=" + slm.getRecoveryMode(), slm.isLogReplaying()); }
@Test(timeout=60000) public void testAcquireTaskAtStartup() throws Exception { LOG.info("testAcquireTaskAtStartup"); SplitLogCounters.resetCounters(); final String TATAS = "tatas"; final ServerName RS = ServerName.valueOf("rs,1,1"); RegionServerServices mockedRS = getRegionServer(RS); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS), new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); SplitLogWorker slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw.start(); try { waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS)); SplitLogTask slt = SplitLogTask.parseFrom(bytes); assertTrue(slt.isOwned(RS)); } finally { stopSplitLogWorker(slw); } }
@Test(timeout=60000) public void testGetPreviousRecoveryMode() throws Exception { LOG.info("testGetPreviousRecoveryMode"); SplitLogCounters.resetCounters(); Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); testConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); ds = new DummyServer(zkw, testConf); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), new SplitLogTask.Unassigned( ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); slm = new SplitLogManager(ds, testConf, stopper, master, DUMMY_MASTER); LOG.info("Mode1=" + slm.getRecoveryMode()); assertTrue(slm.isLogSplitting()); zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1); LOG.info("Mode2=" + slm.getRecoveryMode()); slm.setRecoveryMode(false); LOG.info("Mode3=" + slm.getRecoveryMode()); assertTrue("Mode4=" + slm.getRecoveryMode(), slm.isLogReplaying()); }
/** * signal the workers that a task was resubmitted by creating the * RESCAN node. * @throws KeeperException */ private void createRescanNode(long retries) { // The RESCAN node will be deleted almost immediately by the // SplitLogManager as soon as it is created because it is being // created in the DONE state. This behavior prevents a buildup // of RESCAN nodes. But there is also a chance that a SplitLogWorker // might miss the watch-trigger that creation of RESCAN node provides. // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks // therefore this behavior is safe. lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis(); SplitLogTask slt = new SplitLogTask.Done(this.serverName); this.watcher.getRecoverableZooKeeper().getZooKeeper(). create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), Long.valueOf(retries)); }
@Test public void testOrphanTaskAcquisition() throws Exception { LOG.info("TestOrphanTaskAcquisition"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER); zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); assertFalse(task.isUnassigned()); long curt = System.currentTimeMillis(); assertTrue((task.last_update <= curt) && (task.last_update > (curt - 1000))); LOG.info("waiting for manager to resubmit the orphan task"); waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); assertTrue(task.isUnassigned()); waitForCounter(tot_mgr_rescan, 0, 1, to + to/2); }
@Test public void testTaskDone() throws Exception { LOG.info("TestTaskDone - cleanup task node once in DONE state"); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Done(worker1); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); synchronized (batch) { while (batch.installed != batch.done) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); }
@Test public void testTaskErr() throws Exception { LOG.info("TestTaskErr - cleanup task node once in ERR state"); conf.setInt("hbase.splitlog.max.resubmit", 0); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Err(worker1); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); synchronized (batch) { while (batch.installed != batch.error) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); conf.setInt("hbase.splitlog.max.resubmit", SplitLogManager.DEFAULT_MAX_RESUBMIT); }
@Test public void testTaskResigned() throws Exception { LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); assertEquals(tot_mgr_resubmit.get(), 0); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); assertEquals(tot_mgr_resubmit.get(), 0); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); assertEquals(tot_mgr_resubmit.get(), 0); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); assertEquals(tot_mgr_resubmit.get(), 0); SplitLogTask slt = new SplitLogTask.Resigned(worker1); assertEquals(tot_mgr_resubmit.get(), 0); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); int version = ZKUtil.checkExists(zkw, tasknode); // Could be small race here. if (tot_mgr_resubmit.get() == 0) { waitForCounter(tot_mgr_resubmit, 0, 1, to/2); } assertEquals(tot_mgr_resubmit.get(), 1); byte[] taskstate = ZKUtil.getData(zkw, tasknode); slt = SplitLogTask.parseFrom(taskstate); assertTrue(slt.isUnassigned(DUMMY_MASTER)); }
@Test public void testWorkerCrash() throws Exception { slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Owned(worker1); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); // Not yet resubmitted. Assert.assertEquals(0, tot_mgr_resubmit.get()); // This server becomes dead Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). // It has been resubmitted Assert.assertEquals(1, tot_mgr_resubmit.get()); }
@Test(timeout=60000) public void testAcquireTaskAtStartup() throws Exception { LOG.info("testAcquireTaskAtStartup"); SplitLogCounters.resetCounters(); final String TATAS = "tatas"; final ServerName RS = ServerName.valueOf("rs,1,1"); RegionServerServices mockedRS = getRegionServer(RS); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS), new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw.start(); try { waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS)); SplitLogTask slt = SplitLogTask.parseFrom(bytes); assertTrue(slt.isOwned(RS)); } finally { stopSplitLogWorker(slw); } }
/** * signal the workers that a task was resubmitted by creating the RESCAN node. */ private void rescan(long retries) { // The RESCAN node will be deleted almost immediately by the // SplitLogManager as soon as it is created because it is being // created in the DONE state. This behavior prevents a buildup // of RESCAN nodes. But there is also a chance that a SplitLogWorker // might miss the watch-trigger that creation of RESCAN node provides. // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks // therefore this behavior is safe. SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName()); this.watcher .getRecoverableZooKeeper() .getZooKeeper() .create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), Long.valueOf(retries)); }
@Test (timeout=180000) public void testOrphanTaskAcquisition() throws Exception { LOG.info("TestOrphanTaskAcquisition"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); SplitLogTask slt = new SplitLogTask.Owned(master.getServerName()); zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); slm = new SplitLogManager(master, conf); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); assertFalse(task.isUnassigned()); long curt = System.currentTimeMillis(); assertTrue((task.last_update <= curt) && (task.last_update > (curt - 1000))); LOG.info("waiting for manager to resubmit the orphan task"); waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); assertTrue(task.isUnassigned()); waitForCounter(tot_mgr_rescan, 0, 1, to + to/2); }
@Test (timeout=180000) public void testTaskDone() throws Exception { LOG.info("TestTaskDone - cleanup task node once in DONE state"); slm = new SplitLogManager(master, conf); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Done(worker1); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); synchronized (batch) { while (batch.installed != batch.done) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); }
@Test (timeout=180000) public void testTaskErr() throws Exception { LOG.info("TestTaskErr - cleanup task node once in ERR state"); conf.setInt("hbase.splitlog.max.resubmit", 0); slm = new SplitLogManager(master, conf); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Err(worker1); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); synchronized (batch) { while (batch.installed != batch.error) { batch.wait(); } } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT); }
@Test (timeout=180000) public void testTaskResigned() throws Exception { LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); assertEquals(0, tot_mgr_resubmit.sum()); slm = new SplitLogManager(master, conf); assertEquals(0, tot_mgr_resubmit.sum()); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); assertEquals(0, tot_mgr_resubmit.sum()); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); assertEquals(0, tot_mgr_resubmit.sum()); SplitLogTask slt = new SplitLogTask.Resigned(worker1); assertEquals(0, tot_mgr_resubmit.sum()); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); ZKUtil.checkExists(zkw, tasknode); // Could be small race here. if (tot_mgr_resubmit.sum() == 0) { waitForCounter(tot_mgr_resubmit, 0, 1, to/2); } assertEquals(1, tot_mgr_resubmit.sum()); byte[] taskstate = ZKUtil.getData(zkw, tasknode); slt = SplitLogTask.parseFrom(taskstate); assertTrue(slt.isUnassigned(master.getServerName())); }
@Test (timeout=180000) public void testWorkerCrash() throws Exception { slm = new SplitLogManager(master, conf); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); SplitLogTask slt = new SplitLogTask.Owned(worker1); ZKUtil.setData(zkw, tasknode, slt.toByteArray()); if (tot_mgr_heartbeat.sum() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); // Not yet resubmitted. Assert.assertEquals(0, tot_mgr_resubmit.sum()); // This server becomes dead Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). // It has been resubmitted Assert.assertEquals(1, tot_mgr_resubmit.sum()); }
@Test(timeout=60000) public void testAcquireTaskAtStartup() throws Exception { LOG.info("testAcquireTaskAtStartup"); SplitLogCounters.resetCounters(); final String TATAS = "tatas"; final ServerName RS = ServerName.valueOf("rs,1,1"); RegionServerServices mockedRS = getRegionServer(RS); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS), new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); SplitLogWorker slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw.start(); try { waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS)); SplitLogTask slt = SplitLogTask.parseFrom(bytes); assertTrue(slt.isOwned(RS)); } finally { stopSplitLogWorker(slw); } }