private <T extends SingularityId> List<T> existsThrows(final String pathNameforLogs, final Collection<String> paths, final IdTranscoder<T> idTranscoder) throws Exception { if (paths.isEmpty()) { return Collections.emptyList(); } final List<T> objects = Lists.newArrayListWithCapacity(paths.size()); final CountDownLatch latch = new CountDownLatch(paths.size()); final BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { try { if (event.getStat() != null) { objects.add(Transcoders.getFromStringFunction(idTranscoder).apply(ZKPaths.getNodeFromPath(event.getPath()))); } } finally { latch.countDown(); } } }; return queryAndReturnResultsThrows(objects, paths, callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.GET_DATA); }
private <T extends SingularityId> List<T> notExistsThrows(final String pathNameforLogs, final Map<String, T> pathsMap) throws Exception { if (pathsMap.isEmpty()) { return Collections.emptyList(); } final List<T> objects = Lists.newArrayListWithCapacity(pathsMap.size()); final CountDownLatch latch = new CountDownLatch(pathsMap.size()); final BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { try { if (event.getStat() == null) { objects.add(pathsMap.get(event.getPath())); } } finally { latch.countDown(); } } }; return queryAndReturnResultsThrows(objects, pathsMap.keySet(), callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.CHECK_EXISTS); }
private BackgroundCallback callback(Consumer<CuratorEvent> result, Consumer<Throwable> exception, String path) { return (client, event) -> { if (event.getResultCode() == KeeperException.Code.OK.intValue()) { result.accept(event); } else if (event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() || event.getResultCode() == KeeperException.Code.SESSIONEXPIRED.intValue() || event.getResultCode() == KeeperException.Code.SESSIONMOVED.intValue() || event.getResultCode() == KeeperException.Code.OPERATIONTIMEOUT.intValue()) { exception.accept(StoreException.create(StoreException.Type.CONNECTION_ERROR, path)); } else if (event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue()) { exception.accept(StoreException.create(StoreException.Type.DATA_EXISTS, path)); } else if (event.getResultCode() == KeeperException.Code.BADVERSION.intValue()) { exception.accept(StoreException.create(StoreException.Type.WRITE_CONFLICT, path)); } else if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { exception.accept(StoreException.create(StoreException.Type.DATA_NOT_FOUND, path)); } else if (event.getResultCode() == KeeperException.Code.NOTEMPTY.intValue()) { exception.accept(StoreException.create(StoreException.Type.DATA_CONTAINS_ELEMENTS, path)); } else { exception.accept(StoreException.create(StoreException.Type.UNKNOWN, KeeperException.create(KeeperException.Code.get(event.getResultCode()), path))); } }; }
private StoreException translateErrorCode(String path, CuratorEvent event) { StoreException ex; if (event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() || event.getResultCode() == KeeperException.Code.SESSIONEXPIRED.intValue() || event.getResultCode() == KeeperException.Code.SESSIONMOVED.intValue() || event.getResultCode() == KeeperException.Code.OPERATIONTIMEOUT.intValue()) { ex = StoreException.create(StoreException.Type.CONNECTION_ERROR, path); } else if (event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue()) { ex = StoreException.create(StoreException.Type.DATA_EXISTS, path); } else if (event.getResultCode() == KeeperException.Code.BADVERSION.intValue()) { ex = StoreException.create(StoreException.Type.WRITE_CONFLICT, path); } else if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { ex = StoreException.create(StoreException.Type.DATA_NOT_FOUND, path); } else if (event.getResultCode() == KeeperException.Code.NOTEMPTY.intValue()) { ex = StoreException.create(StoreException.Type.DATA_CONTAINS_ELEMENTS, path); } else { ex = StoreException.create(StoreException.Type.UNKNOWN, KeeperException.create(KeeperException.Code.get(event.getResultCode()), path)); } return ex; }
public void createEphemeralNodeRecursionInBackground(String path) throws Exception { curatorFramework.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { LOG.info("event's result code: {}, type: {}", event.getResultCode(), event.getType()); showCurrentThreadName(); countDownLatch.countDown(); } }).forPath(path); }
@Override public void performBackgroundOperation(final OperationAndData<CuratorMultiTransactionRecord> operationAndData) throws Exception { try { final TimeTrace trace = client.getZookeeperClient().startTracer("CuratorMultiTransactionImpl-Background"); AsyncCallback.MultiCallback callback = new AsyncCallback.MultiCallback() { @Override public void processResult(int rc, String path, Object ctx, List<OpResult> opResults) { trace.commit(); List<CuratorTransactionResult> curatorResults = (opResults != null) ? CuratorTransactionImpl.wrapResults(client, opResults, operationAndData.getData()) : null; CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.TRANSACTION, rc, path, null, ctx, null, null, null, null, null, curatorResults); client.processBackgroundOperation(operationAndData, event); } }; client.getZooKeeper().multi(operationAndData.getData(), callback, backgrounding.getContext()); } catch ( Throwable e ) { backgrounding.checkError(e, null); } }
public static <T> ModeledFrameworkImpl<T> build(AsyncCuratorFramework client, ModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, Set<ModeledOptions> modeledOptions) { boolean isWatched = (watchMode != null); Objects.requireNonNull(client, "client cannot be null"); Objects.requireNonNull(model, "model cannot be null"); modeledOptions = ImmutableSet.copyOf(Objects.requireNonNull(modeledOptions, "modeledOptions cannot be null")); watchMode = (watchMode != null) ? watchMode : WatchMode.stateChangeAndSuccess; AsyncCuratorFrameworkDsl dslClient = client.with(watchMode, unhandledErrorListener, resultFilter, watcherFilter); WatchableAsyncCuratorFramework watchableClient = isWatched ? dslClient.watched() : dslClient; return new ModeledFrameworkImpl<>( client, dslClient, watchableClient, model, watchMode, watcherFilter, unhandledErrorListener, resultFilter, isWatched, modeledOptions ); }
private void processBackgroundCallbackClosedState(CuratorEvent event) { String path = null; if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() ) { path = event.getPath(); } else if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { path = event.getName(); } if ( path != null ) { try { client.delete().guaranteed().inBackground().forPath(path); } catch ( Exception e ) { log.error("Could not delete node after close", e); } } }
@Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { try { KeeperException.Code code = KeeperException.Code.get(event.getResultCode()); switch (code) { case OK: T data = event.getData() == null ? null : transformFunction.apply(event.getData()); dataMap.put(ZKPaths.getNodeFromPath(event.getPath()), data); break; case NONODE: // In this case there was a race condition in which the child node was deleted before we asked for data. break; default: exceptions.add(KeeperException.create(code, event.getPath())); } } finally { countDownLatch.countDown(); } }
@Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { try { KeeperException.Code code = KeeperException.Code.get(event.getResultCode()); switch (code) { case OK: childMap.put(ZKPaths.getNodeFromPath(event.getPath()), new HashSet<>(event.getChildren())); break; case NONODE: // In this case there was a race condition in which the child node was deleted before we asked for data. break; default: exceptions.add(KeeperException.create(code, event.getPath())); } } finally { countDownLatch.countDown(); } }
@Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("Delete event {}", event); } events.incrementAndGet(); }
@Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { LOG.info("received {}", event); eventCounter.incrementAndGet(); events.put(event); }
@Test public void testBackgroundDelete() throws Throwable { mkPath("/rm", CreateMode.PERSISTENT); mkPath("/rm/child", CreateMode.PERSISTENT); CuratorEventCatcher events = new CuratorEventCatcher(); curatorService.zkDelete("/rm", true, events); CuratorEvent taken = events.take(); LOG.info("took {}", taken); assertEquals(1, events.getCount()); }
private <T extends SingularityId> List<T> getChildrenAsIdsForParentsThrows(final String pathNameforLogs, final Collection<String> parents, final IdTranscoder<T> idTranscoder) throws Exception { if (parents.isEmpty()) { return Collections.emptyList(); } final List<T> objects = Lists.newArrayListWithExpectedSize(parents.size()); final List<T> synchronizedObjects = Collections.synchronizedList(objects); final CountDownLatch latch = new CountDownLatch(parents.size()); final BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { try { if (event.getChildren() == null || event.getChildren().size() == 0) { LOG.trace("Expected children for node {} - but found none", event.getPath()); return; } synchronizedObjects.addAll(Lists.transform(event.getChildren(), Transcoders.getFromStringFunction(idTranscoder))); } finally { latch.countDown(); } } }; return queryAndReturnResultsThrows(synchronizedObjects, parents, callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.GET_CHILDREN); }
protected <T, Q> Map<T, List<Q>> getAsyncNestedChildDataAsMapThrows(final String pathNameForLogs, final Map<String, T> parentPathsMap, final String subpath, final Transcoder<Q> transcoder) throws Exception { final Map<String, T> allPathsMap = Maps.newHashMap(); for (Map.Entry<String, T> entry : parentPathsMap.entrySet()) { for (String child : getChildren(ZKPaths.makePath(entry.getKey(), subpath))) { allPathsMap.put(ZKPaths.makePath(entry.getKey(), subpath, child), entry.getValue()); } } final ConcurrentHashMap<T, List<Q>> resultsMap = new ConcurrentHashMap<>(); final CountDownLatch latch = new CountDownLatch(allPathsMap.size()); final AtomicInteger bytes = new AtomicInteger(); final BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { try { if (event.getData() == null || event.getData().length == 0) { LOG.trace("Expected active node {} but it wasn't there", event.getPath()); return; } bytes.getAndAdd(event.getData().length); final Q object = transcoder.fromBytes(event.getData()); if (allPathsMap.get(event.getPath()) != null) { resultsMap.putIfAbsent(allPathsMap.get(event.getPath()), new ArrayList<Q>()); resultsMap.get(allPathsMap.get(event.getPath())).add(object); } } finally { latch.countDown(); } } }; return queryAndReturnResultsThrows(resultsMap, allPathsMap.keySet(), callback, latch, pathNameForLogs, bytes, CuratorQueryMethod.GET_DATA); }
@Override public synchronized void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if (event.getType().equals(CuratorEventType.CHILDREN) || event.getType().equals(CuratorEventType.CREATE) || event.getType().equals(CuratorEventType.DELETE)) { attach(); } }
@Override public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception { WatchedEvent watchedEvent = (event == null) ? null : event.getWatchedEvent(); if (watchedEvent == null || Watcher.Event.EventType.None == watchedEvent.getType()) return; if (LOGGER.isInfoEnabled()) LOGGER.info(String.format("zookeeper event received, type: %s, path: %s", watchedEvent.getType(), watchedEvent.getPath())); try { EventInfo eventInfo = parseEventInfo(watchedEvent); if (eventInfo == null) { if (LOGGER.isDebugEnabled()) LOGGER.debug(String.format("no operate event, type: %s, path: %s", watchedEvent.getType(), watchedEvent.getPath())); return; } if (LOGGER.isInfoEnabled()) LOGGER.info(String.format("zookeeper parse type: %s, eventtype: %s", eventInfo.type, eventInfo.eventType)); if (eventInfo.type == ADDRESS) { // 监听服务提供的更节点 serviceAddressChange(eventInfo); } else if (eventInfo.type == WEIGHT && Watcher.Event.EventType.NodeDataChanged == eventInfo.eventType) { // 监听服务提供的权重更节点 serviceWeightChange(eventInfo); } else { if (LOGGER.isDebugEnabled()) LOGGER.debug(String.format("no operate event, eventType: %s, path: %s", watchedEvent.getType(), watchedEvent.getPath())); } } catch (Exception e) { LOGGER.error("Error in ZookeeperWatcher.process()", e); } // 无须再次对节点进行监听,在获取子节点信息时,自动添加监听 }
private void callback(Context context, Record record, Handler<AsyncResult<Record>> resultHandler, CuratorEvent curatorEvent) { runOnContextIfPossible(context, () -> { if (curatorEvent.getResultCode() == KeeperException.Code.OK.intValue()) { resultHandler.handle(Future.succeededFuture(record)); } else { KeeperException.Code code = KeeperException.Code.get(curatorEvent.getResultCode()); resultHandler.handle(Future.failedFuture(KeeperException.create(code))); } }); }
private void retrieve(CuratorEvent event, Handler<AsyncResult<Buffer>> completionHandler) { KeeperException.Code code = KeeperException.Code.get(event.getResultCode()); if (code == KeeperException.Code.OK) { completionHandler.handle(Future.succeededFuture(Buffer.buffer(event.getData()))); } else if (code == KeeperException.Code.NONODE) { completionHandler.handle(Future.succeededFuture(Buffer.buffer("{}"))); } else { completionHandler.handle(Future.failedFuture(KeeperException.create(code, path))); } }
private void dataWrittenCallback(Handler<AsyncResult<Void>> handler, Context context, CuratorEvent event) { context.runOnContext((x) -> { if (event.getResultCode() == 0) { handler.handle(Future.succeededFuture()); } else { handler.handle(Future.failedFuture(KeeperException .create(KeeperException.Code.get(event.getResultCode())))); } } ); }
public void testDelete() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181,localhost:3181,localhost:4181", new ExponentialBackoffRetry(1000, 4)); client.start(); client.delete().inBackground(new BackgroundCallback() { public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { System.out.println(JsonHelper.toJson(curatorEvent)); } }).forPath("/curd-test/delete"); Thread.sleep(10000); }
/** * Tests that state handles are correctly removed with a callback. */ @Test public void testRemoveWithCallback() throws Exception { // Setup LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testRemoveWithCallback"; final Long state = 27255442L; store.add(pathInZooKeeper, state); final CountDownLatch sync = new CountDownLatch(1); BackgroundCallback callback = mock(BackgroundCallback.class); doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { sync.countDown(); return null; } }).when(callback).processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class)); // Test store.remove(pathInZooKeeper, callback); // Verify discarded and callback called assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size()); sync.await(); verify(callback, times(1)) .processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class)); }
/** * connect ZK, register Watch/unhandle Watch * * @return */ public CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) { CuratorFramework fk = Utils.newCurator(conf, servers, port, root); fk.getCuratorListenable().addListener(new CuratorListener() { @Override public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception { if (e.getType().equals(CuratorEventType.WATCHED)) { WatchedEvent event = e.getWatchedEvent(); watcher.execute(event.getState(), event.getType(), event.getPath()); } } }); fk.getUnhandledErrorListenable().addListener( new UnhandledErrorListener() { @Override public void unhandledError(String msg, Throwable error) { String errmsg = "Unrecoverable Zookeeper error, halting process: " + msg; LOG.error(errmsg, error); JStormUtils.halt_process(1, "Unrecoverable Zookeeper error"); } }); fk.start(); return fk; }
/** * connect ZK, register Watch/unhandle Watch * * @return */ public CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) { CuratorFramework fk = Utils.newCurator(conf, servers, port, root); fk.getCuratorListenable().addListener(new CuratorListener() { @Override public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception { if (e.getType().equals(CuratorEventType.WATCHED)) { WatchedEvent event = e.getWatchedEvent(); watcher.execute(event.getState(), event.getType(), event.getPath()); } } }); fk.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() { @Override public void unhandledError(String msg, Throwable error) { String errmsg = "Unrecoverable Zookeeper error, halting process: " + msg; LOG.error(errmsg, error); JStormUtils.halt_process(1, "Unrecoverable Zookeeper error"); } }); fk.start(); return fk; }