private void deleteNodeSuccess(String path) { if (ignoreZKDeleteForTesting) { return; } Task task; task = details.getTasks().remove(path); if (task == null) { if (ZKSplitLog.isRescanNode(watcher, path)) { SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet(); } SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet(); LOG.debug("deleted task without in memory state " + path); return; } synchronized (task) { task.status = DELETED; task.notify(); } SplitLogCounters.tot_mgr_task_deleted.incrementAndGet(); }
private void heartbeat(String path, int new_version, ServerName workerName) { Task task = findOrCreateOrphanTask(path); if (new_version != task.last_version) { if (task.isUnassigned()) { LOG.info("task " + path + " acquired by " + workerName); } task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName); SplitLogCounters.tot_mgr_heartbeat.incrementAndGet(); } else { // duplicate heartbeats - heartbeats w/o zk node version // changing - are possible. The timeout thread does // getDataSetWatch() just to check whether a node still // exists or not } return; }
@Test (timeout=180000) public void testOrphanTaskAcquisition() throws Exception { LOG.info("TestOrphanTaskAcquisition"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER, this.mode); zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); assertFalse(task.isUnassigned()); long curt = System.currentTimeMillis(); assertTrue((task.last_update <= curt) && (task.last_update > (curt - 1000))); LOG.info("waiting for manager to resubmit the orphan task"); waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); assertTrue(task.isUnassigned()); waitForCounter(tot_mgr_rescan, 0, 1, to + to/2); }
@Test public void testOrphanTaskAcquisition() throws Exception { LOG.info("TestOrphanTaskAcquisition"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); zkw.getRecoverableZooKeeper().create(tasknode, TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); assertFalse(task.isUnassigned()); long curt = System.currentTimeMillis(); assertTrue((task.last_update <= curt) && (task.last_update > (curt - 1000))); LOG.info("waiting for manager to resubmit the orphan task"); waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); assertTrue(task.isUnassigned()); waitForCounter(tot_mgr_rescan, 0, 1, to + to/2); }
@Test public void testOrphanTaskAcquisition() throws Exception { LOG.info("TestOrphanTaskAcquisition"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER); zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); assertFalse(task.isUnassigned()); long curt = System.currentTimeMillis(); assertTrue((task.last_update <= curt) && (task.last_update > (curt - 1000))); LOG.info("waiting for manager to resubmit the orphan task"); waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); assertTrue(task.isUnassigned()); waitForCounter(tot_mgr_rescan, 0, 1, to + to/2); }
private void deleteNodeSuccess(String path) { if (ignoreZKDeleteForTesting) { return; } Task task; task = details.getTasks().remove(path); if (task == null) { if (ZKSplitLog.isRescanNode(watcher, path)) { SplitLogCounters.tot_mgr_rescan_deleted.increment(); } SplitLogCounters.tot_mgr_missing_state_in_delete.increment(); LOG.debug("Deleted task without in memory state " + path); return; } synchronized (task) { task.status = DELETED; task.notify(); } SplitLogCounters.tot_mgr_task_deleted.increment(); }
private void heartbeat(String path, int new_version, ServerName workerName) { Task task = findOrCreateOrphanTask(path); if (new_version != task.last_version) { if (task.isUnassigned()) { LOG.info("Task " + path + " acquired by " + workerName); } task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName); SplitLogCounters.tot_mgr_heartbeat.increment(); } else { // duplicate heartbeats - heartbeats w/o zk node version // changing - are possible. The timeout thread does // getDataSetWatch() just to check whether a node still // exists or not } return; }
@Test (timeout=180000) public void testOrphanTaskAcquisition() throws Exception { LOG.info("TestOrphanTaskAcquisition"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); SplitLogTask slt = new SplitLogTask.Owned(master.getServerName()); zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); slm = new SplitLogManager(master, conf); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); assertFalse(task.isUnassigned()); long curt = System.currentTimeMillis(); assertTrue((task.last_update <= curt) && (task.last_update > (curt - 1000))); LOG.info("waiting for manager to resubmit the orphan task"); waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); assertTrue(task.isUnassigned()); waitForCounter(tot_mgr_rescan, 0, 1, to + to/2); }
@Test public void testOrphanTaskAcquisition() throws Exception { LOG.info("TestOrphanTaskAcquisition"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER); zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null); slm.finishInitialization(); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); assertFalse(task.isUnassigned()); long curt = System.currentTimeMillis(); assertTrue((task.last_update <= curt) && (task.last_update > (curt - 1000))); LOG.info("waiting for manager to resubmit the orphan task"); waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); assertTrue(task.isUnassigned()); waitForCounter(tot_mgr_rescan, 0, 1, to + to/2); }
public SplitLogManagerDetails(ConcurrentMap<String, Task> tasks, MasterServices master, Set<String> failedDeletions, ServerName serverName) { this.tasks = tasks; this.master = master; this.failedDeletions = failedDeletions; this.serverName = serverName; }
/** * It is possible for a task to stay in UNASSIGNED state indefinitely - say SplitLogManager wants * to resubmit a task. It forces the task to UNASSIGNED state but it dies before it could create * the RESCAN task node to signal the SplitLogWorkers to pick up the task. To prevent this * scenario the SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup. * @param path */ private void handleUnassignedTask(String path) { if (ZKSplitLog.isRescanNode(watcher, path)) { return; } Task task = findOrCreateOrphanTask(path); if (task.isOrphan() && (task.incarnation.get() == 0)) { LOG.info("resubmitting unassigned orphan task " + path); // ignore failure to resubmit. The timeout-monitor will handle it later // albeit in a more crude fashion resubmitTask(path, task, FORCE); } }
private void setDone(String path, TerminationStatus status) { Task task = details.getTasks().get(path); if (task == null) { if (!ZKSplitLog.isRescanNode(watcher, path)) { SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet(); LOG.debug("unacquired orphan task is done " + path); } } else { synchronized (task) { if (task.status == IN_PROGRESS) { if (status == SUCCESS) { SplitLogCounters.tot_mgr_log_split_success.incrementAndGet(); LOG.info("Done splitting " + path); } else { SplitLogCounters.tot_mgr_log_split_err.incrementAndGet(); LOG.warn("Error splitting " + path); } task.status = status; if (task.batch != null) { synchronized (task.batch) { if (status == SUCCESS) { task.batch.done++; } else { task.batch.error++; } task.batch.notify(); } } } } } // delete the task node in zk. It's an async // call and no one is blocked waiting for this node to be deleted. All // task names are unique (log.<timestamp>) there is no risk of deleting // a future task. // if a deletion fails, TimeoutMonitor will retry the same deletion later deleteNode(path, zkretries); return; }
Task findOrCreateOrphanTask(String path) { Task orphanTask = new Task(); Task task; task = details.getTasks().putIfAbsent(path, orphanTask); if (task == null) { LOG.info("creating orphan task " + path); SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet(); task = orphanTask; } return task; }
@Override public void nodeDataChanged(String path) { Task task; task = details.getTasks().get(path); if (task != null || ZKSplitLog.isRescanNode(watcher, path)) { if (task != null) { task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime()); } getDataSetWatch(path, zkretries); } }
@Test (timeout=180000) public void testUnassignedOrphan() throws Exception { LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" + " startup"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); //create an unassigned orphan task SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER, this.mode); zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); int version = ZKUtil.checkExists(zkw, tasknode); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); assertTrue(task.isUnassigned()); // wait for RESCAN node to be created waitForCounter(tot_mgr_rescan, 0, 1, to/2); Task task2 = slm.findOrCreateOrphanTask(tasknode); assertTrue(task == task2); LOG.debug("task = " + task); assertEquals(1L, tot_mgr_resubmit.get()); assertEquals(1, task.incarnation.get()); assertEquals(0, task.unforcedResubmits.get()); assertTrue(task.isOrphan()); assertTrue(task.isUnassigned()); assertTrue(ZKUtil.checkExists(zkw, tasknode) > version); }
@Test public void testUnassignedOrphan() throws Exception { LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" + " startup"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); //create an unassigned orphan task zkw.getRecoverableZooKeeper().create(tasknode, TaskState.TASK_UNASSIGNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); int version = ZKUtil.checkExists(zkw, tasknode); slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); assertTrue(task.isUnassigned()); // wait for RESCAN node to be created waitForCounter(tot_mgr_rescan, 0, 1, to/2); Task task2 = slm.findOrCreateOrphanTask(tasknode); assertTrue(task == task2); LOG.debug("task = " + task); assertEquals(1L, tot_mgr_resubmit.get()); assertEquals(1, task.incarnation); assertEquals(0, task.unforcedResubmits); assertTrue(task.isOrphan()); assertTrue(task.isUnassigned()); assertTrue(ZKUtil.checkExists(zkw, tasknode) > version); }
/** * It is possible for a task to stay in UNASSIGNED state indefinitely - say SplitLogManager wants * to resubmit a task. It forces the task to UNASSIGNED state but it dies before it could create * the RESCAN task node to signal the SplitLogWorkers to pick up the task. To prevent this * scenario the SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup. * @param path */ private void handleUnassignedTask(String path) { if (ZKSplitLog.isRescanNode(watcher, path)) { return; } Task task = findOrCreateOrphanTask(path); if (task.isOrphan() && (task.incarnation == 0)) { LOG.info("resubmitting unassigned orphan task " + path); // ignore failure to resubmit. The timeout-monitor will handle it later // albeit in a more crude fashion resubmitTask(path, task, FORCE); } }
@Test (timeout=180000) public void testUnassignedOrphan() throws Exception { LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" + " startup"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); //create an unassigned orphan task SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER, this.mode); zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); int version = ZKUtil.checkExists(zkw, tasknode); slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); assertTrue(task.isUnassigned()); // wait for RESCAN node to be created waitForCounter(tot_mgr_rescan, 0, 1, to/2); Task task2 = slm.findOrCreateOrphanTask(tasknode); assertTrue(task == task2); LOG.debug("task = " + task); assertEquals(1L, tot_mgr_resubmit.get()); assertEquals(1, task.incarnation); assertEquals(0, task.unforcedResubmits.get()); assertTrue(task.isOrphan()); assertTrue(task.isUnassigned()); assertTrue(ZKUtil.checkExists(zkw, tasknode) > version); }
@Test public void testUnassignedOrphan() throws Exception { LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" + " startup"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); //create an unassigned orphan task SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER); zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); int version = ZKUtil.checkExists(zkw, tasknode); slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); assertTrue(task.isUnassigned()); // wait for RESCAN node to be created waitForCounter(tot_mgr_rescan, 0, 1, to/2); Task task2 = slm.findOrCreateOrphanTask(tasknode); assertTrue(task == task2); LOG.debug("task = " + task); assertEquals(1L, tot_mgr_resubmit.get()); assertEquals(1, task.incarnation); assertEquals(0, task.unforcedResubmits.get()); assertTrue(task.isOrphan()); assertTrue(task.isUnassigned()); assertTrue(ZKUtil.checkExists(zkw, tasknode) > version); }
/** * It is possible for a task to stay in UNASSIGNED state indefinitely - say SplitLogManager wants * to resubmit a task. It forces the task to UNASSIGNED state but it dies before it could create * the RESCAN task node to signal the SplitLogWorkers to pick up the task. To prevent this * scenario the SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup. * @param path */ private void handleUnassignedTask(String path) { if (ZKSplitLog.isRescanNode(watcher, path)) { return; } Task task = findOrCreateOrphanTask(path); if (task.isOrphan() && (task.incarnation.get() == 0)) { LOG.info("Resubmitting unassigned orphan task " + path); // ignore failure to resubmit. The timeout-monitor will handle it later // albeit in a more crude fashion resubmitTask(path, task, FORCE); } }
private void setDone(String path, TerminationStatus status) { Task task = details.getTasks().get(path); if (task == null) { if (!ZKSplitLog.isRescanNode(watcher, path)) { SplitLogCounters.tot_mgr_unacquired_orphan_done.increment(); LOG.debug("Unacquired orphan task is done " + path); } } else { synchronized (task) { if (task.status == IN_PROGRESS) { if (status == SUCCESS) { SplitLogCounters.tot_mgr_log_split_success.increment(); LOG.info("Done splitting " + path); } else { SplitLogCounters.tot_mgr_log_split_err.increment(); LOG.warn("Error splitting " + path); } task.status = status; if (task.batch != null) { synchronized (task.batch) { if (status == SUCCESS) { task.batch.done++; } else { task.batch.error++; } task.batch.notify(); } } } } } // delete the task node in zk. It's an async // call and no one is blocked waiting for this node to be deleted. All // task names are unique (log.<timestamp>) there is no risk of deleting // a future task. // if a deletion fails, TimeoutMonitor will retry the same deletion later deleteNode(path, zkretries); return; }
private Task findOrCreateOrphanTask(String path) { return slm.tasks.computeIfAbsent(path, k -> { LOG.info("creating orphan task " + k); SplitLogCounters.tot_mgr_orphan_task_acquired.increment(); return new Task(); }); }
@Test (timeout=180000) public void testUnassignedOrphan() throws Exception { LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" + " startup"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); //create an unassigned orphan task SplitLogTask slt = new SplitLogTask.Unassigned(master.getServerName()); zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); int version = ZKUtil.checkExists(zkw, tasknode); slm = new SplitLogManager(master, conf); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); assertTrue(task.isUnassigned()); // wait for RESCAN node to be created waitForCounter(tot_mgr_rescan, 0, 1, to / 2); Task task2 = findOrCreateOrphanTask(tasknode); assertTrue(task == task2); LOG.debug("task = " + task); assertEquals(1L, tot_mgr_resubmit.sum()); assertEquals(1, task.incarnation.get()); assertEquals(0, task.unforcedResubmits.get()); assertTrue(task.isOrphan()); assertTrue(task.isUnassigned()); assertTrue(ZKUtil.checkExists(zkw, tasknode) > version); }