private void removeWatches(int opCode, String path, Watcher watcher, WatcherType watcherType, boolean local, VoidCallback cb, Object ctx) { PathUtils.validatePath(path); final String clientPath = path; final String serverPath = prependChroot(clientPath); WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher, watcherType, local, watchManager); RequestHeader h = new RequestHeader(); h.setType(opCode); Record request = getRemoveWatchesRequest(opCode, watcherType, serverPath); cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, clientPath, serverPath, ctx, null, wcb); }
@Test public void testSync() throws Exception { complete = false; create2EmptyNode(zkClient, PARENT_PATH); VoidCallback onSync = new VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { complete = true; callComplete.countDown(); } }; zkClient.sync(PARENT_PATH, onSync, null); callComplete.await(30, TimeUnit.SECONDS); Assert.assertTrue( String.format("%s Sync completed", serverState), complete); }
/** * @see org.apache.zookeeper.ZooKeeper#delete(String path, int version , VoidCallback cb , Object ctx) */ public void delete(final String path, final int version, final VoidCallback cb, final Object ctx) throws InterruptedException, KeeperException { retryOperation(new ZooKeeperOperation() { public Object execute() throws KeeperException, InterruptedException { zookeeper.delete(path, version, cb, ctx); return null; } }); }
/** * The asynchronous version of delete. * * @see #delete(String, int) */ public void delete(final String path, int version, VoidCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath); final String serverPath; // maintain semantics even in chroot case // specifically - root cannot be deleted // I think this makes sense even in chroot case. if (clientPath.equals("/")) { // a bit of a hack, but delete(/) will never succeed and ensures // that the same semantics are maintained serverPath = clientPath; } else { serverPath = prependChroot(clientPath); } RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.delete); DeleteRequest request = new DeleteRequest(); request.setPath(serverPath); request.setVersion(version); cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, clientPath, serverPath, ctx, null); }
/** * Asynchronous sync. Flushes channel between process and leader. * @param path * @param cb a handler for the callback * @param ctx context to be provided to the callback * @throws IllegalArgumentException if an invalid path is specified */ public void sync(final String path, VoidCallback cb, Object ctx){ final String clientPath = path; PathUtils.validatePath(clientPath); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.sync); SyncRequest request = new SyncRequest(); SyncResponse response = new SyncResponse(); request.setPath(serverPath); cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, clientPath, serverPath, ctx, null); }
/** * The asynchronous version of removeWatches. * * @see #removeWatches */ public void removeWatches(String path, Watcher watcher, WatcherType watcherType, boolean local, VoidCallback cb, Object ctx) { validateWatcher(watcher); removeWatches(ZooDefs.OpCode.checkWatches, path, watcher, watcherType, local, cb, ctx); }
/** * The asynchronous version of removeAllWatches. * * @see #removeAllWatches */ public void removeAllWatches(String path, WatcherType watcherType, boolean local, VoidCallback cb, Object ctx) { removeWatches(ZooDefs.OpCode.removeWatches, path, null, watcherType, local, cb, ctx); }
/** * Recursively delete the node with the given path. (async version). * * <p> * Important: All versions, of all nodes, under the given node are deleted. * <p> * If there is an error with deleting one of the sub-nodes in the tree, * this operation would abort and would be the responsibility of the app to handle the same. * <p> * * @throws IllegalArgumentException if an invalid path is specified */ public void deleteRecursive(final String pathRoot, VoidCallback cb, Object ctx) throws InterruptedException, KeeperException { PathUtils.validatePath(pathRoot); List<String> tree = this.listSubTreeBFS(pathRoot); LOG.debug("Deleting " + tree); LOG.debug("Deleting " + tree.size() + " subnodes "); for (int i = tree.size() - 1; i >= 0 ; --i) { //Delete the leaves first and eventually get rid of the root this.delete(tree.get(i), -1, cb, ctx); //Delete all versions of the node with -1. } }
/** * The Asynchronous version of delete. The request doesn't actually until * the asynchronous callback is called. * * @see #delete(String, int) */ public void delete(final String path, int version, VoidCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath); final String serverPath; // maintain semantics even in chroot case // specifically - root cannot be deleted // I think this makes sense even in chroot case. if (clientPath.equals("/")) { // a bit of a hack, but delete(/) will never succeed and ensures // that the same semantics are maintained serverPath = clientPath; } else { serverPath = prependChroot(clientPath); } RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.delete); DeleteRequest request = new DeleteRequest(); request.setPath(serverPath); request.setVersion(version); cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, clientPath, serverPath, ctx, null); }
@Override public void sync(String path, VoidCallback cb, Object ctx) { executor.execute(() -> { if (getProgrammedFailStatus()) { cb.processResult(failReturnCode.intValue(), path, ctx); return; } else if (stopped) { cb.processResult(KeeperException.Code.ConnectionLoss, path, ctx); return; } cb.processResult(0, path, ctx); }); }
@Override public void delete(final String path, int version, final VoidCallback cb, final Object ctx) { mutex.lock(); if (executor.isShutdown()) { mutex.unlock(); cb.processResult(KeeperException.Code.SESSIONEXPIRED.intValue(), path, ctx); return; } final Set<Watcher> toNotifyDelete = Sets.newHashSet(); toNotifyDelete.addAll(watchers.get(path)); final Set<Watcher> toNotifyParent = Sets.newHashSet(); final String parent = path.substring(0, path.lastIndexOf("/")); if (!parent.isEmpty()) { toNotifyParent.addAll(watchers.get(parent)); } executor.execute(() -> { mutex.lock(); if (getProgrammedFailStatus()) { mutex.unlock(); cb.processResult(failReturnCode.intValue(), path, ctx); } else if (stopped) { mutex.unlock(); cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx); } else if (!tree.containsKey(path)) { mutex.unlock(); cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx); } else if (hasChildren(path)) { mutex.unlock(); cb.processResult(KeeperException.Code.NOTEMPTY.intValue(), path, ctx); } else { if (version != -1) { int currentVersion = tree.get(path).second; if (version != currentVersion) { cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx); return; } } tree.remove(path); mutex.unlock(); cb.processResult(0, path, ctx); toNotifyDelete.forEach(watcher -> watcher .process(new WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected, path))); toNotifyParent.forEach(watcher -> watcher .process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent))); } }); watchers.removeAll(path); mutex.unlock(); }
/** * Recursively delete the node with the given path. (async version). * * <p> * Important: All versions, of all nodes, under the given node are deleted. * <p> * If there is an error with deleting one of the sub-nodes in the tree, * this operation would abort and would be the responsibility of the app to handle the same. * <p> * @param zk the zookeeper handle * @param pathRoot the path to be deleted * @param cb call back method * @param ctx the context the callback method is called with * @throws IllegalArgumentException if an invalid path is specified */ public static void deleteRecursive(ZooKeeper zk, final String pathRoot, VoidCallback cb, Object ctx) throws InterruptedException, KeeperException { PathUtils.validatePath(pathRoot); List<String> tree = listSubTreeBFS(zk, pathRoot); LOG.debug("Deleting " + tree); LOG.debug("Deleting " + tree.size() + " subnodes "); for (int i = tree.size() - 1; i >= 0 ; --i) { //Delete the leaves first and eventually get rid of the root zk.delete(tree.get(i), -1, cb, ctx); //Delete all versions of the node with -1. } }