protected void delete(String path, V v, Handler<AsyncResult<V>> asyncResultHandler) { try { curator.delete().deletingChildrenIfNeeded().inBackground((client, event) -> { if (event.getType() == CuratorEventType.DELETE) { curator.getChildren().inBackground((childClient, childEvent) -> { //clean parent node if doesn't have child node. if (childEvent.getChildren().size() == 0) { String[] paths = path.split("/"); String parentNodePath = Stream.of(paths).limit(paths.length - 1).reduce((previous, current) -> previous + "/" + current).get(); curator.delete().inBackground((deleteClient, deleteEvent) -> { if (deleteEvent.getType() == CuratorEventType.DELETE) vertx.runOnContext(ea -> asyncResultHandler.handle(Future.succeededFuture(v))); }).forPath(parentNodePath); } else { vertx.runOnContext(ea -> asyncResultHandler.handle(Future.succeededFuture(v))); } }).forPath(path); } }).forPath(path); } catch (Exception ex) { vertx.runOnContext(aVoid -> asyncResultHandler.handle(Future.failedFuture(ex))); } }
@Override public synchronized void leave(Handler<AsyncResult<Void>> resultHandler) { vertx.executeBlocking(future -> { if (active) { active = false; try { curator.delete().deletingChildrenIfNeeded().inBackground((client, event) -> { if (event.getType() == CuratorEventType.DELETE) { clusterNodes.getListenable().removeListener(ZookeeperClusterManager.this); } }).forPath(ZK_PATH_CLUSTER_NODE + nodeID); } catch (Exception e) { log.error(e); } } future.complete(); }, resultHandler); }
@Override public void performBackgroundOperation(final OperationAndData<CuratorMultiTransactionRecord> operationAndData) throws Exception { try { final TimeTrace trace = client.getZookeeperClient().startTracer("CuratorMultiTransactionImpl-Background"); AsyncCallback.MultiCallback callback = new AsyncCallback.MultiCallback() { @Override public void processResult(int rc, String path, Object ctx, List<OpResult> opResults) { trace.commit(); List<CuratorTransactionResult> curatorResults = (opResults != null) ? CuratorTransactionImpl.wrapResults(client, opResults, operationAndData.getData()) : null; CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.TRANSACTION, rc, path, null, ctx, null, null, null, null, null, curatorResults); client.processBackgroundOperation(operationAndData, event); } }; client.getZooKeeper().multi(operationAndData.getData(), callback, backgrounding.getContext()); } catch ( Throwable e ) { backgrounding.checkError(e, null); } }
@Override public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception { final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("BackgroundSyncImpl"); final String data = operationAndData.getData(); client.getZooKeeper().sync ( data, new AsyncCallback.VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { trace.setReturnCode(rc).setRequestBytesLength(data).commit(); CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.SYNC, rc, path, null, ctx, null, null, null, null, null, null); client.processBackgroundOperation(operationAndData, event); } }, context ); }
@Override public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception { try { final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("GetACLBuilderImpl-Background"); AsyncCallback.ACLCallback callback = new AsyncCallback.ACLCallback() { @Override public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) { trace.setReturnCode(rc).setPath(path).setStat(stat).commit(); CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.GET_ACL, rc, path, null, ctx, stat, null, null, null, acl, null); client.processBackgroundOperation(operationAndData, event); } }; client.getZooKeeper().getACL(operationAndData.getData(), responseStat, callback, backgrounding.getContext()); } catch ( Throwable e ) { backgrounding.checkError(e, null); } }
@Override public synchronized void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if (event.getType().equals(CuratorEventType.CHILDREN) || event.getType().equals(CuratorEventType.CREATE) || event.getType().equals(CuratorEventType.DELETE)) { attach(); } }
protected void checkExists(String path, AsyncResultHandler<Boolean> handler) { try { curator.checkExists().inBackground((client, event) -> { if (event.getType() == CuratorEventType.EXISTS) { if (event.getStat() == null) { vertx.runOnContext(aVoid -> handler.handle(Future.succeededFuture(false))); } else { vertx.runOnContext(aVoid -> handler.handle(Future.succeededFuture(true))); } } }).forPath(path); } catch (Exception ex) { vertx.runOnContext(aVoid -> handler.handle(Future.failedFuture(ex))); } }
protected void create(String path, V v, Handler<AsyncResult<Void>> completionHandler) { try { //there are two type of node - ephemeral and persistent. //if path is 'asyncMultiMap/subs/' which save the data of eventbus address and serverID we could using ephemeral, //since the lifecycle of this path as long as this verticle. CreateMode nodeMode = path.contains(EVENTBUS_PATH) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT; curator.create().creatingParentsIfNeeded().withMode(nodeMode).inBackground((cl, el) -> { if (el.getType() == CuratorEventType.CREATE) { vertx.runOnContext(event -> completionHandler.handle(Future.succeededFuture())); } }).forPath(path, asByte(v)); } catch (Exception ex) { vertx.runOnContext(event -> completionHandler.handle(Future.failedFuture(ex))); } }
protected void setData(String path, V v, Handler<AsyncResult<Void>> completionHandler) { try { curator.setData().inBackground((client, event) -> { if (event.getType() == CuratorEventType.SET_DATA) { vertx.runOnContext(e -> completionHandler.handle(Future.succeededFuture())); } }).forPath(path, asByte(v)); } catch (Exception ex) { vertx.runOnContext(event -> completionHandler.handle(Future.failedFuture(ex))); } }
/** * connect ZK, register Watch/unhandle Watch * * @return */ public CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) { CuratorFramework fk = Utils.newCurator(conf, servers, port, root); fk.getCuratorListenable().addListener(new CuratorListener() { @Override public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception { if (e.getType().equals(CuratorEventType.WATCHED)) { WatchedEvent event = e.getWatchedEvent(); watcher.execute(event.getState(), event.getType(), event.getPath()); } } }); fk.getUnhandledErrorListenable().addListener( new UnhandledErrorListener() { @Override public void unhandledError(String msg, Throwable error) { String errmsg = "Unrecoverable Zookeeper error, halting process: " + msg; LOG.error(errmsg, error); JStormUtils.halt_process(1, "Unrecoverable Zookeeper error"); } }); fk.start(); return fk; }
/** * connect ZK, register Watch/unhandle Watch * * @return */ public CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) { CuratorFramework fk = Utils.newCurator(conf, servers, port, root); fk.getCuratorListenable().addListener(new CuratorListener() { @Override public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception { if (e.getType().equals(CuratorEventType.WATCHED)) { WatchedEvent event = e.getWatchedEvent(); watcher.execute(event.getState(), event.getType(), event.getPath()); } } }); fk.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() { @Override public void unhandledError(String msg, Throwable error) { String errmsg = "Unrecoverable Zookeeper error, halting process: " + msg; LOG.error(errmsg, error); JStormUtils.halt_process(1, "Unrecoverable Zookeeper error"); } }); fk.start(); return fk; }
/** * {@inheritDoc} */ @Override public void processResult(CuratorFramework curatorFramework, CuratorEvent event) throws Exception { CuratorEventType eventType = event.getType(); switch (eventType) { case CHILDREN: break; case CLOSING: break; case CREATE: _invalidateCache(event.getPath()); break; case DELETE: _invalidateCache(event.getPath()); break; case EXISTS: break; case GET_ACL: break; case GET_DATA: break; case SET_ACL: break; case SET_DATA: break; case SYNC: break; case WATCHED: process(event.getWatchedEvent()); break; default: LOGGER.debug(event.toString()); break; } }
CuratorEventImpl(CuratorFrameworkImpl client, CuratorEventType type, int resultCode, String path, String name, Object context, Stat stat, byte[] data, List<String> children, WatchedEvent watchedEvent, List<ACL> aclList, List<CuratorTransactionResult> opResults) { this.type = type; this.resultCode = resultCode; this.opResults = (opResults != null) ? ImmutableList.copyOf(opResults) : null; this.path = client.unfixForNamespace(path); this.name = name; this.context = context; this.stat = stat; this.data = data; this.children = children; this.watchedEvent = (watchedEvent != null) ? new NamespaceWatchedEvent(client, watchedEvent) : null; this.aclList = (aclList != null) ? ImmutableList.copyOf(aclList) : null; }
@Override public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception { try { final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("GetChildrenBuilderImpl-Background"); AsyncCallback.Children2Callback callback = new AsyncCallback.Children2Callback() { @Override public void processResult(int rc, String path, Object o, List<String> strings, Stat stat) { watching.commitWatcher(rc, false); trace.setReturnCode(rc).setPath(path).setWithWatcher(watching.hasWatcher()).setStat(stat).commit(); if ( strings == null ) { strings = Lists.newArrayList(); } CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.CHILDREN, rc, path, null, o, stat, null, strings, null, null, null); client.processBackgroundOperation(operationAndData, event); } }; if ( watching.isWatched() ) { client.getZooKeeper().getChildren(operationAndData.getData(), true, callback, backgrounding.getContext()); } else { client.getZooKeeper().getChildren(operationAndData.getData(), watching.getWatcher(operationAndData.getData()), callback, backgrounding.getContext()); } } catch ( Throwable e ) { backgrounding.checkError(e, watching); } }
@Test public void testBackgroundDeleteWithChildren() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); client.start(); try { client.getCuratorListenable().addListener ((client1, event) -> { if ( event.getType() == CuratorEventType.DELETE ) { Assert.assertEquals(event.getPath(), "/one/two"); ((CountDownLatch)event.getContext()).countDown(); } }); CountDownLatch latch = new CountDownLatch(1); AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); async.create().withOptions(EnumSet.of(CreateOption.createParentsIfNeeded)).forPath("/one/two/three/four").thenRun(() -> async.delete().withOptions(EnumSet.of(DeleteOption.deletingChildrenIfNeeded)).forPath("/one/two").handle((v, e) -> { Assert.assertNull(v); Assert.assertNull(e); latch.countDown(); return null; }) ); Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); Assert.assertNull(client.checkExists().forPath("/one/two")); } finally { CloseableUtils.closeQuietly(client); } }
@Override public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception { if (event.getType() == CuratorEventType.CHILDREN) { if (event.getChildren().isEmpty()) { client.getChildren().inBackground(event.getContext()).forPath(event.getPath()); } else { String path = event.getPath() + "/" + event.getChildren().get(0); LOG.info("Operations Node registered in ZK. Waiting for transports configration"); client.getData().inBackground(event.getContext()).forPath(path); } } else if (event.getType() == CuratorEventType.GET_DATA) { if (event.getData() == null) { client.getData().inBackground(event.getContext()).forPath(event.getPath()); } else { OperationsNodeInfo nodeInfo = OPERATIONS_NODE_INFO_CONVERTER.fromByteArray(event.getData()); boolean isTransportInitialized = !nodeInfo.getTransports().isEmpty(); if (isTransportInitialized) { LOG.info("Operations Node updated tarnsports configuration in ZK"); ((CountDownLatch) event.getContext()).countDown(); } else { client.getData().inBackground(event.getContext()).forPath(event.getPath()); } } } }
/** * connect ZK, register watchers */ public CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) { CuratorFramework fk = Utils.newCurator(conf, servers, port, root); fk.getCuratorListenable().addListener(new CuratorListener() { @Override public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception { if (e.getType().equals(CuratorEventType.WATCHED)) { WatchedEvent event = e.getWatchedEvent(); watcher.execute(event.getState(), event.getType(), event.getPath()); } } }); fk.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() { @Override public void unhandledError(String msg, Throwable error) { String errmsg = "Unrecoverable zookeeper error, halting process: " + msg; LOG.error(errmsg, error); JStormUtils.halt_process(1, "Unrecoverable zookeeper error"); } }); fk.start(); return fk; }
@Override public void performBackgroundOperation(final OperationAndData<Void> operationAndData) throws Exception { final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("FindAndDeleteProtectedNodeInBackground"); AsyncCallback.Children2Callback callback = new AsyncCallback.Children2Callback() { @Override public void processResult(int rc, String path, Object o, List<String> strings, Stat stat) { trace.setReturnCode(rc).setPath(path).setStat(stat).commit(); if ( debugInsertError.compareAndSet(true, false) ) { rc = KeeperException.Code.CONNECTIONLOSS.intValue(); } if ( rc == KeeperException.Code.OK.intValue() ) { final String node = CreateBuilderImpl.findNode(strings, "/", protectedId); // due to namespacing, don't let CreateBuilderImpl.findNode adjust the path if ( node != null ) { try { String deletePath = client.unfixForNamespace(ZKPaths.makePath(namespaceAdjustedParentPath, node)); client.delete().guaranteed().inBackground().forPath(deletePath); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error("Could not start guaranteed delete for node: " + node); rc = KeeperException.Code.CONNECTIONLOSS.intValue(); } } } if ( rc != KeeperException.Code.OK.intValue() ) { CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.CHILDREN, rc, path, null, o, stat, null, strings, null, null, null); client.processBackgroundOperation(operationAndData, event); } } }; client.getZooKeeper().getChildren(namespaceAdjustedParentPath, false, callback, null); }
@Override public CuratorEventType getType() { return type; }
private void doPutInBackground(final T item, String path, final MultiItem<T> givenMultiItem, byte[] bytes) throws Exception { BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if ( event.getResultCode() != KeeperException.Code.OK.intValue() ) { return; } if ( event.getType() == CuratorEventType.CREATE ) { synchronized(putCount) { putCount.decrementAndGet(); putCount.notifyAll(); } } putListenerContainer.forEach ( new Function<QueuePutListener<T>, Void>() { @Override public Void apply(QueuePutListener<T> listener) { if ( item != null ) { listener.putCompleted(item); } else { listener.putMultiCompleted(givenMultiItem); } return null; } } ); } }; internalCreateNode(path, bytes, callback); }