public boolean removeWatch(String path, WatcherType type, Watcher watcher) { boolean removed = false; switch (type) { case Children: removed = this.childWatches.removeWatcher(path, watcher); break; case Data: removed = this.dataWatches.removeWatcher(path, watcher); break; case Any: if (this.childWatches.removeWatcher(path, watcher)) { removed = true; } if (this.dataWatches.removeWatcher(path, watcher)) { removed = true; } break; } return removed; }
private void removeWatches(int opCode, String path, Watcher watcher, WatcherType watcherType, boolean local) throws InterruptedException, KeeperException { PathUtils.validatePath(path); final String clientPath = path; final String serverPath = prependChroot(clientPath); WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher, watcherType, local, watchManager); RequestHeader h = new RequestHeader(); h.setType(opCode); Record request = getRemoveWatchesRequest(opCode, watcherType, serverPath); ReplyHeader r = cnxn.submitRequest(h, request, null, null, wcb); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } }
private void removeWatches(int opCode, String path, Watcher watcher, WatcherType watcherType, boolean local, VoidCallback cb, Object ctx) { PathUtils.validatePath(path); final String clientPath = path; final String serverPath = prependChroot(clientPath); WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher, watcherType, local, watchManager); RequestHeader h = new RequestHeader(); h.setType(opCode); Record request = getRemoveWatchesRequest(opCode, watcherType, serverPath); cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, clientPath, serverPath, ctx, null, wcb); }
private Record getRemoveWatchesRequest(int opCode, WatcherType watcherType, final String serverPath) { Record request = null; switch (opCode) { case ZooDefs.OpCode.checkWatches: CheckWatchesRequest chkReq = new CheckWatchesRequest(); chkReq.setPath(serverPath); chkReq.setType(watcherType.getIntValue()); request = chkReq; break; case ZooDefs.OpCode.removeWatches: RemoveWatchesRequest rmReq = new RemoveWatchesRequest(); rmReq.setPath(serverPath); rmReq.setType(watcherType.getIntValue()); request = rmReq; break; default: LOG.warn("unknown type " + opCode); break; } return request; }
private void removeWatches(ZooKeeper zk, String path, Watcher watcher, WatcherType watcherType, boolean local, KeeperException.Code rc) throws InterruptedException, KeeperException { LOG.info( "Sending removeWatches req using zk {} path: {} type: {} watcher: {} ", new Object[] { zk, path, watcherType, watcher }); if (useAsync) { MyCallback c1 = new MyCallback(rc.intValue(), path); zk.removeWatches(path, watcher, watcherType, local, c1, null); Assert.assertTrue("Didn't succeeds removeWatch operation", c1.matches()); if (KeeperException.Code.OK.intValue() != c1.rc) { KeeperException ke = KeeperException .create(KeeperException.Code.get(c1.rc)); throw ke; } } else { zk.removeWatches(path, watcher, watcherType, local); } }
private void removeAllWatches(ZooKeeper zk, String path, WatcherType watcherType, boolean local, KeeperException.Code rc) throws InterruptedException, KeeperException { LOG.info("Sending removeWatches req using zk {} path: {} type: {} ", new Object[] { zk, path, watcherType }); if (useAsync) { MyCallback c1 = new MyCallback(rc.intValue(), path); zk.removeAllWatches(path, watcherType, local, c1, null); Assert.assertTrue("Didn't succeeds removeWatch operation", c1.matches()); if (KeeperException.Code.OK.intValue() != c1.rc) { KeeperException ke = KeeperException .create(KeeperException.Code.get(c1.rc)); throw ke; } } else { zk.removeAllWatches(path, watcherType, local); } }
/** * Test verifies WatcherType.Any - removes only the configured child watcher * function */ @Test(timeout = 90000) public void testRemoveAnyChildWatcher() throws Exception { zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); MyWatcher w1 = new MyWatcher("/node1", 2); MyWatcher w2 = new MyWatcher("/node1", 1); LOG.info("Adding data watcher {} on path {}", new Object[] { w1, "/node1" }); Assert.assertNotNull("Didn't set data watches", zk2.exists("/node1", w1)); // Add multiple child watches LOG.info("Adding child watcher {} on path {}", new Object[] { w1, "/node1" }); zk2.getChildren("/node1", w2); LOG.info("Adding child watcher {} on path {}", new Object[] { w2, "/node1" }); zk2.getChildren("/node1", w1); removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK); Assert.assertTrue("Didn't remove child watcher", w2.matches()); Assert.assertEquals("Didn't find child watcher", 1, zk2 .getChildWatches().size()); Assert.assertEquals("Didn't find data watcher", 1, zk2 .getDataWatches().size()); removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK); Assert.assertTrue("Didn't remove watchers", w1.matches()); }
/** * Verify that if a given watcher doesn't exist, the server properly * returns an error code for it. * * In our Java client implementation, we check that a given watch exists at * two points: * * 1) before submitting the RemoveWatches request * 2) after a successful server response, when the watcher needs to be * removed * * Since this can be racy (i.e. a watch can fire while a RemoveWatches * request is in-flight), we need to verify that the watch was actually * removed (i.e. from ZKDatabase and DataTree) and return NOWATCHER if * needed. * * Also, other implementations might not do a client side check before * submitting a RemoveWatches request. If we don't do a server side check, * we would just return ZOK even if no watch was removed. * */ @Test(timeout = 90000) public void testNoWatcherServerException() throws InterruptedException, IOException, TimeoutException { CountdownWatcher watcher = new CountdownWatcher(); MyZooKeeper zk = new MyZooKeeper(hostPort, CONNECTION_TIMEOUT, watcher); boolean nw = false; watcher.waitForConnected(CONNECTION_TIMEOUT); try { zk.removeWatches("/nowatchhere", watcher, WatcherType.Data, false); } catch (KeeperException nwe) { if (nwe.code().intValue() == Code.NOWATCHER.intValue()) { nw = true; } } Assert.assertTrue("Server didn't return NOWATCHER", zk.getRemoveWatchesRC() == Code.NOWATCHER.intValue()); Assert.assertTrue("NoWatcherException didn't happen", nw); }
/** * Test verifies null watcher */ @Test(timeout = 30000) public void testNullWatcherReference() throws Exception { zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); try { if (useAsync) { zk1.removeWatches("/node1", null, WatcherType.Data, false, null, null); } else { zk1.removeWatches("/node1", null, WatcherType.Data, false); } Assert.fail("Must throw IllegalArgumentException as watcher is null!"); } catch (IllegalArgumentException iae) { // expected } }
@Override public boolean exec() throws KeeperException, InterruptedException { String path = args[1]; WatcherType wtype = WatcherType.Any; // if no matching option -c or -d or -a is specified, we remove // the watches of the given node by choosing WatcherType.Any if (cl.hasOption("c")) { wtype = WatcherType.Children; } else if (cl.hasOption("d")) { wtype = WatcherType.Data; } else if (cl.hasOption("a")) { wtype = WatcherType.Any; } // whether to remove the watches locally boolean local = cl.hasOption("l"); try { zk.removeAllWatches(path, wtype, local); } catch (KeeperException.NoWatcherException ex) { err.println(ex.getMessage()); return false; } return true; }
void internalRemoval(Watcher watcher, String path) throws Exception { this.watcher = watcher; watcherType = WatcherType.Any; quietly = true; guaranteed = true; if ( Boolean.getBoolean(DebugUtils.PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND) ) { this.backgrounding = new Backgrounding(); pathInForeground(path); } else { this.backgrounding = new Backgrounding(true); pathInBackground(path); } }
public WatchDeregistration(String clientPath, Watcher watcher, WatcherType watcherType, boolean local, ZKWatchManager zkManager) { this.clientPath = clientPath; this.watcher = watcher; this.watcherType = watcherType; this.local = local; this.zkManager = zkManager; }
/** * The asynchronous version of removeWatches. * * @see #removeWatches */ public void removeWatches(String path, Watcher watcher, WatcherType watcherType, boolean local, VoidCallback cb, Object ctx) { validateWatcher(watcher); removeWatches(ZooDefs.OpCode.checkWatches, path, watcher, watcherType, local, cb, ctx); }
/** * The asynchronous version of removeAllWatches. * * @see #removeAllWatches */ public void removeAllWatches(String path, WatcherType watcherType, boolean local, VoidCallback cb, Object ctx) { removeWatches(ZooDefs.OpCode.removeWatches, path, null, watcherType, local, cb, ctx); }
/** * Test verifies removal of single watcher when there is server connection */ @Test(timeout = 90000) public void testRemoveSingleWatcher() throws Exception { zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zk1.create("/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); MyWatcher w1 = new MyWatcher("/node1", 1); LOG.info("Adding data watcher {} on path {}", new Object[] { w1, "/node1" }); Assert.assertNotNull("Didn't set data watches", zk2.exists("/node1", w1)); MyWatcher w2 = new MyWatcher("/node2", 1); LOG.info("Adding data watcher {} on path {}", new Object[] { w2, "/node1" }); Assert.assertNotNull("Didn't set data watches", zk2.exists("/node2", w2)); removeWatches(zk2, "/node1", w1, WatcherType.Data, false, Code.OK); Assert.assertEquals("Didn't find data watcher", 1, zk2.getDataWatches().size()); Assert.assertEquals("Didn't find data watcher", "/node2", zk2.getDataWatches().get(0)); removeWatches(zk2, "/node2", w2, WatcherType.Any, false, Code.OK); Assert.assertTrue("Didn't remove data watcher", w2.matches()); // closing session should remove ephemeral nodes and trigger data // watches if any if (zk1 != null) { zk1.close(); zk1 = null; } List<EventType> events = w1.getEventsAfterWatchRemoval(); Assert.assertFalse( "Shouldn't get NodeDeletedEvent after watch removal", events.contains(EventType.NodeDeleted)); Assert.assertEquals( "Shouldn't get NodeDeletedEvent after watch removal", 0, events.size()); }
/** * Test verifies removal of multiple data watchers when there is server * connection */ @Test(timeout = 90000) public void testMultipleDataWatchers() throws IOException, InterruptedException, KeeperException { zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); MyWatcher w1 = new MyWatcher("/node1", 1); LOG.info("Adding data watcher {} on path {}", new Object[] { w1, "/node1" }); Assert.assertNotNull("Didn't set data watches", zk2.exists("/node1", w1)); MyWatcher w2 = new MyWatcher("/node1", 1); LOG.info("Adding data watcher {} on path {}", new Object[] { w2, "/node1" }); Assert.assertNotNull("Didn't set data watches", zk2.exists("/node1", w2)); removeWatches(zk2, "/node1", w2, WatcherType.Data, false, Code.OK); Assert.assertEquals("Didn't find data watcher", 1, zk2.getDataWatches().size()); Assert.assertEquals("Didn't find data watcher", "/node1", zk2.getDataWatches().get(0)); removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK); Assert.assertTrue("Didn't remove data watcher", w2.matches()); // closing session should remove ephemeral nodes and trigger data // watches if any if (zk1 != null) { zk1.close(); zk1 = null; } List<EventType> events = w2.getEventsAfterWatchRemoval(); Assert.assertEquals( "Shouldn't get NodeDeletedEvent after watch removal", 0, events.size()); }
/** * Test verifies removal of multiple child watchers when there is server * connection */ @Test(timeout = 90000) public void testMultipleChildWatchers() throws IOException, InterruptedException, KeeperException { zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); MyWatcher w1 = new MyWatcher("/node1", 1); LOG.info("Adding child watcher {} on path {}", new Object[] { w1, "/node1" }); zk2.getChildren("/node1", w1); MyWatcher w2 = new MyWatcher("/node1", 1); LOG.info("Adding child watcher {} on path {}", new Object[] { w2, "/node1" }); zk2.getChildren("/node1", w2); removeWatches(zk2, "/node1", w2, WatcherType.Children, false, Code.OK); Assert.assertTrue("Didn't remove child watcher", w2.matches()); Assert.assertEquals("Didn't find child watcher", 1, zk2 .getChildWatches().size()); removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK); Assert.assertTrue("Didn't remove child watcher", w1.matches()); // create child to see NodeChildren notification zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // waiting for child watchers to be notified int count = 30; while (count > 0) { if (w1.getEventsAfterWatchRemoval().size() > 0) { break; } count--; Thread.sleep(100); } // watcher2 List<EventType> events = w2.getEventsAfterWatchRemoval(); Assert.assertEquals("Shouldn't get NodeChildrenChanged event", 0, events.size()); }
/** * Test verifies null watcher with WatcherType.Any - remove all the watchers * data, child, exists */ @Test(timeout = 90000) public void testRemoveAllWatchers() throws IOException, InterruptedException, KeeperException { zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); MyWatcher w1 = new MyWatcher("/node1", 2); MyWatcher w2 = new MyWatcher("/node1", 2); LOG.info("Adding data watcher {} on path {}", new Object[] { w1, "/node1" }); Assert.assertNotNull("Didn't set data watches", zk2.exists("/node1", w1)); LOG.info("Adding data watcher {} on path {}", new Object[] { w2, "/node1" }); Assert.assertNotNull("Didn't set data watches", zk2.exists("/node1", w2)); LOG.info("Adding child watcher {} on path {}", new Object[] { w1, "/node1" }); zk2.getChildren("/node1", w1); LOG.info("Adding child watcher {} on path {}", new Object[] { w2, "/node1" }); zk2.getChildren("/node1", w2); removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK); removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK); zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); Assert.assertTrue("Didn't remove data watcher", w1.matches()); Assert.assertTrue("Didn't remove child watcher", w2.matches()); }
/** * Test verifies WatcherType.Any - removes only the configured data watcher * function */ @Test(timeout = 90000) public void testRemoveAnyDataWatcher() throws Exception { zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); MyWatcher w1 = new MyWatcher("/node1", 1); MyWatcher w2 = new MyWatcher("/node1", 2); // Add multiple data watches LOG.info("Adding data watcher {} on path {}", new Object[] { w1, "/node1" }); Assert.assertNotNull("Didn't set data watches", zk2.exists("/node1", w1)); LOG.info("Adding data watcher {} on path {}", new Object[] { w2, "/node1" }); Assert.assertNotNull("Didn't set data watches", zk2.exists("/node1", w2)); // Add child watch LOG.info("Adding child watcher {} on path {}", new Object[] { w2, "/node1" }); zk2.getChildren("/node1", w2); removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK); Assert.assertTrue("Didn't remove data watcher", w1.matches()); Assert.assertEquals("Didn't find child watcher", 1, zk2 .getChildWatches().size()); Assert.assertEquals("Didn't find data watcher", 1, zk2 .getDataWatches().size()); removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK); Assert.assertTrue("Didn't remove child watcher", w2.matches()); }
/** * Test verifies when there is no server connection. Remove watches when * local=true, otw should retain it */ @Test(timeout = 90000) public void testRemoveWatcherWhenNoConnection() throws Exception { zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); MyWatcher w1 = new MyWatcher("/node1", 2); MyWatcher w2 = new MyWatcher("/node1", 1); LOG.info("Adding data watcher {} on path {}", new Object[] { w1, "/node1" }); Assert.assertNotNull("Didn't set data watches", zk2.exists("/node1", w1)); // Add multiple child watches LOG.info("Adding child watcher {} on path {}", new Object[] { w1, "/node1" }); zk2.getChildren("/node1", w1); LOG.info("Adding child watcher {} on path {}", new Object[] { w1, "/node1" }); zk2.getChildren("/node1", w2); stopServer(); removeWatches(zk2, "/node1", w2, WatcherType.Any, true, Code.OK); Assert.assertTrue("Didn't remove child watcher", w2.matches()); Assert.assertFalse("Shouldn't remove data watcher", w1.matches()); try { removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.CONNECTIONLOSS); Assert.fail("Should throw exception as last watch removal requires server connection"); } catch (KeeperException.ConnectionLossException nwe) { // expected } Assert.assertFalse("Shouldn't remove data watcher", w1.matches()); // when local=true, here if connection not available, simply removes // from local session removeWatches(zk2, "/node1", w1, WatcherType.Any, true, Code.OK); Assert.assertTrue("Didn't remove data watcher", w1.matches()); }
/** * Test verifies many pre-node watchers. Also, verifies internal * datastructure 'watchManager.existWatches' */ @Test(timeout = 90000) public void testManyPreNodeWatchers() throws Exception { int count = 50; List<MyWatcher> wList = new ArrayList<MyWatcher>(count); MyWatcher w; String path = "/node"; // Exists watcher for (int i = 0; i < count; i++) { final String nodePath = path + i; w = new MyWatcher(nodePath, 1); wList.add(w); LOG.info("Adding pre node watcher {} on path {}", new Object[] { w, nodePath }); zk1.exists(nodePath, w); } Assert.assertEquals("Failed to add watchers!", count, zk1 .getExistWatches().size()); for (int i = 0; i < count; i++) { final MyWatcher watcher = wList.get(i); removeWatches(zk1, path + i, watcher, WatcherType.Data, false, Code.OK); Assert.assertTrue("Didn't remove data watcher", watcher.matches()); } Assert.assertEquals("Didn't remove watch references!", 0, zk1 .getExistWatches().size()); }