/** * * trigger a purge operation * @param path pathn * @param id yarn ID * @param policyMatch policy to match ID on * @param purgePolicy policy when there are children under a match * @param callback optional callback * @return the number purged * @throws IOException */ public int purge(String path, String id, String policyMatch, RegistryAdminService.PurgePolicy purgePolicy, BackgroundCallback callback) throws IOException, ExecutionException, InterruptedException { Future<Integer> future = registry.purgeRecordsAsync(path, id, policyMatch, purgePolicy, callback); try { return future.get(); } catch (ExecutionException e) { if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); } else { throw e; } } }
private <T extends SingularityId> List<T> existsThrows(final String pathNameforLogs, final Collection<String> paths, final IdTranscoder<T> idTranscoder) throws Exception { if (paths.isEmpty()) { return Collections.emptyList(); } final List<T> objects = Lists.newArrayListWithCapacity(paths.size()); final CountDownLatch latch = new CountDownLatch(paths.size()); final BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { try { if (event.getStat() != null) { objects.add(Transcoders.getFromStringFunction(idTranscoder).apply(ZKPaths.getNodeFromPath(event.getPath()))); } } finally { latch.countDown(); } } }; return queryAndReturnResultsThrows(objects, paths, callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.GET_DATA); }
private <T extends SingularityId> List<T> notExistsThrows(final String pathNameforLogs, final Map<String, T> pathsMap) throws Exception { if (pathsMap.isEmpty()) { return Collections.emptyList(); } final List<T> objects = Lists.newArrayListWithCapacity(pathsMap.size()); final CountDownLatch latch = new CountDownLatch(pathsMap.size()); final BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { try { if (event.getStat() == null) { objects.add(pathsMap.get(event.getPath())); } } finally { latch.countDown(); } } }; return queryAndReturnResultsThrows(objects, pathsMap.keySet(), callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.CHECK_EXISTS); }
private <T> T queryAndReturnResultsThrows(final T results, final Collection<String> paths, final BackgroundCallback callback, final CountDownLatch latch, final String pathNameForLogs, final AtomicInteger bytes, final CuratorQueryMethod method) throws Exception { final long start = System.currentTimeMillis(); try { for (String path : paths) { switch (method) { case GET_DATA: curator.getData().inBackground(callback).forPath(path); break; case GET_CHILDREN: curator.getChildren().inBackground(callback).forPath(path); break; case CHECK_EXISTS: default: curator.checkExists().inBackground(callback).forPath(path); break; } } checkLatch(latch, pathNameForLogs); } finally { log(OperationType.READ, Optional.of(paths.size()), bytes.get() > 0 ? Optional.of(bytes.get()) : Optional.<Integer>absent(), start, pathNameForLogs); } return results; }
private BackgroundCallback callback(Consumer<CuratorEvent> result, Consumer<Throwable> exception, String path) { return (client, event) -> { if (event.getResultCode() == KeeperException.Code.OK.intValue()) { result.accept(event); } else if (event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() || event.getResultCode() == KeeperException.Code.SESSIONEXPIRED.intValue() || event.getResultCode() == KeeperException.Code.SESSIONMOVED.intValue() || event.getResultCode() == KeeperException.Code.OPERATIONTIMEOUT.intValue()) { exception.accept(StoreException.create(StoreException.Type.CONNECTION_ERROR, path)); } else if (event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue()) { exception.accept(StoreException.create(StoreException.Type.DATA_EXISTS, path)); } else if (event.getResultCode() == KeeperException.Code.BADVERSION.intValue()) { exception.accept(StoreException.create(StoreException.Type.WRITE_CONFLICT, path)); } else if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { exception.accept(StoreException.create(StoreException.Type.DATA_NOT_FOUND, path)); } else if (event.getResultCode() == KeeperException.Code.NOTEMPTY.intValue()) { exception.accept(StoreException.create(StoreException.Type.DATA_CONTAINS_ELEMENTS, path)); } else { exception.accept(StoreException.create(StoreException.Type.UNKNOWN, KeeperException.create(KeeperException.Code.get(event.getResultCode()), path))); } }; }
private CompletableFuture<Void> createNode(CreateMode createMode, boolean createParents, String path, byte[] data) { CompletableFuture<Void> result = new CompletableFuture<>(); try { BackgroundCallback callback = (cli, event) -> { if (event.getResultCode() == KeeperException.Code.OK.intValue() || event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue()) { result.complete(null); } else { result.completeExceptionally(translateErrorCode(path, event)); } }; if (createParents) { client.create().creatingParentsIfNeeded().withMode(createMode).inBackground(callback, executor) .forPath(path, data); } else { client.create().withMode(createMode).inBackground(callback, executor).forPath(path, data); } } catch (Exception e) { result.completeExceptionally(StoreException.create(StoreException.Type.UNKNOWN, e)); } return result; }
/** * Releases the lock for the given state node and tries to remove the state node if it is no longer locked. * The deletion of the state node is executed asynchronously. After the state node has been deleted, the given * callback is called with the {@link RetrievableStateHandle} of the deleted state node. * * <p><strong>Important</strong>: This also discards the stored state handle after the given action * has been executed. * * @param pathInZooKeeper Path of state handle to remove * @param callback The callback to execute after a successful deletion. Null if no action needs to be executed. * @throws Exception If the ZooKeeper operation fails */ public void releaseAndTryRemove( String pathInZooKeeper, @Nullable final RemoveCallback<T> callback) throws Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); final String path = normalizePath(pathInZooKeeper); RetrievableStateHandle<T> stateHandle = null; try { stateHandle = get(path, false); } catch (Exception e) { LOG.warn("Could not retrieve the state handle from node " + path + '.', e); } release(pathInZooKeeper); final BackgroundCallback backgroundCallback = new RemoveBackgroundCallback<>(stateHandle, callback, path); client.delete().inBackground(backgroundCallback, executor).forPath(path); }
public void createEphemeralNodeRecursionInBackground(String path) throws Exception { curatorFramework.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { LOG.info("event's result code: {}, type: {}", event.getResultCode(), event.getType()); showCurrentThreadName(); countDownLatch.countDown(); } }).forPath(path); }
private <T> T queryAndReturnResultsThrows(final T results, final Collection<String> paths, final BackgroundCallback callback, final CountDownLatch latch, final String pathNameForLogs, final AtomicInteger bytes, final CuratorQueryMethod method) throws Exception { final long start = System.currentTimeMillis(); try { for (String path : paths) { switch (method) { case GET_DATA: curator.getData().inBackground(callback).forPath(path); break; case GET_CHILDREN: curator.getChildren().inBackground(callback).forPath(path); break; case CHECK_EXISTS: default: curator.checkExists().inBackground(callback).forPath(path); break; } } checkLatch(latch, pathNameForLogs); } finally { log(method.operationType, Optional.of(paths.size()), bytes.get() > 0 ? Optional.of(bytes.get()) : Optional.<Integer>absent(), start, pathNameForLogs); } return results; }
public AsyncPurge(String path, NodeSelector selector, PurgePolicy purgePolicy, BackgroundCallback callback) { this.callback = callback; this.selector = selector; this.path = path; this.purgePolicy = purgePolicy; }
private <T extends SingularityId> List<T> getChildrenAsIdsForParentsThrows(final String pathNameforLogs, final Collection<String> parents, final IdTranscoder<T> idTranscoder) throws Exception { if (parents.isEmpty()) { return Collections.emptyList(); } final List<T> objects = Lists.newArrayListWithExpectedSize(parents.size()); final List<T> synchronizedObjects = Collections.synchronizedList(objects); final CountDownLatch latch = new CountDownLatch(parents.size()); final BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { try { if (event.getChildren() == null || event.getChildren().size() == 0) { LOG.trace("Expected children for node {} - but found none", event.getPath()); return; } synchronizedObjects.addAll(Lists.transform(event.getChildren(), Transcoders.getFromStringFunction(idTranscoder))); } finally { latch.countDown(); } } }; return queryAndReturnResultsThrows(synchronizedObjects, parents, callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.GET_CHILDREN); }
protected <T, Q> Map<T, List<Q>> getAsyncNestedChildDataAsMapThrows(final String pathNameForLogs, final Map<String, T> parentPathsMap, final String subpath, final Transcoder<Q> transcoder) throws Exception { final Map<String, T> allPathsMap = Maps.newHashMap(); for (Map.Entry<String, T> entry : parentPathsMap.entrySet()) { for (String child : getChildren(ZKPaths.makePath(entry.getKey(), subpath))) { allPathsMap.put(ZKPaths.makePath(entry.getKey(), subpath, child), entry.getValue()); } } final ConcurrentHashMap<T, List<Q>> resultsMap = new ConcurrentHashMap<>(); final CountDownLatch latch = new CountDownLatch(allPathsMap.size()); final AtomicInteger bytes = new AtomicInteger(); final BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { try { if (event.getData() == null || event.getData().length == 0) { LOG.trace("Expected active node {} but it wasn't there", event.getPath()); return; } bytes.getAndAdd(event.getData().length); final Q object = transcoder.fromBytes(event.getData()); if (allPathsMap.get(event.getPath()) != null) { resultsMap.putIfAbsent(allPathsMap.get(event.getPath()), new ArrayList<Q>()); resultsMap.get(allPathsMap.get(event.getPath())).add(object); } } finally { latch.countDown(); } } }; return queryAndReturnResultsThrows(resultsMap, allPathsMap.keySet(), callback, latch, pathNameForLogs, bytes, CuratorQueryMethod.GET_DATA); }
public void testDelete() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181,localhost:3181,localhost:4181", new ExponentialBackoffRetry(1000, 4)); client.start(); client.delete().inBackground(new BackgroundCallback() { public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { System.out.println(JsonHelper.toJson(curatorEvent)); } }).forPath("/curd-test/delete"); Thread.sleep(10000); }
/** * Tests that state handles are correctly removed with a callback. */ @Test public void testRemoveWithCallback() throws Exception { // Setup LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testRemoveWithCallback"; final Long state = 27255442L; store.add(pathInZooKeeper, state); final CountDownLatch sync = new CountDownLatch(1); BackgroundCallback callback = mock(BackgroundCallback.class); doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { sync.countDown(); return null; } }).when(callback).processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class)); // Test store.remove(pathInZooKeeper, callback); // Verify discarded and callback called assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size()); sync.await(); verify(callback, times(1)) .processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class)); }
@Override protected <M extends Message> ListenableFuture<M> getNodeData( WatchCallback watcher, String path, final Message.Builder builder) { final SettableFuture<M> future = SettableFuture.create(); Watcher wc = ZkWatcherCallback.makeZkWatcher(watcher); BackgroundCallback cb = new BackgroundCallback() { @Override @SuppressWarnings("unchecked") // we don't know what M is until runtime public void processResult(CuratorFramework aClient, CuratorEvent event) throws Exception { byte[] data; if (event != null & (data = event.getData()) != null) { builder.mergeFrom(data); safeSetFuture(future, (M) builder.build()); } else { safeSetException(future, new RuntimeException("Failed to fetch data from path: " + event.getPath())); } } }; try { client.getData().usingWatcher(wc).inBackground(cb).forPath(path); // Suppress it since forPath() throws Exception // SUPPRESS CHECKSTYLE IllegalCatch } catch (Exception e) { safeSetException(future, new RuntimeException("Could not getNodeData", e)); } return future; }
Backgrounding(BackgroundCallback callback) { this.inBackground = true; this.context = null; this.callback = callback; errorListener = null; }
Backgrounding(BackgroundCallback callback, Object context) { this.inBackground = true; this.context = context; this.callback = callback; errorListener = null; }
public Backgrounding(BackgroundCallback callback, UnhandledErrorListener errorListener) { this.callback = callback; this.errorListener = errorListener; inBackground = true; context = null; }
private static BackgroundCallback wrapCallback(final CuratorFrameworkImpl client, final BackgroundCallback callback, final Executor executor) { return new BackgroundCallback() { @Override public void processResult(CuratorFramework dummy, final CuratorEvent event) throws Exception { executor.execute ( new Runnable() { @Override public void run() { try { callback.processResult(client, event); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); if ( e instanceof KeeperException ) { client.validateConnection(client.codeToState(((KeeperException)e).code())); } client.logError("Background operation result handling threw exception", e); } } } ); } }; }
OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context, boolean connectionRequired) { this.operation = operation; this.data = data; this.callback = callback; this.errorCallback = errorCallback; this.context = context; this.connectionRequired = connectionRequired; reset(); }
/** * @param givenClient client instance * @param mode creation mode * @param useProtection if true, call {@link CreateBuilder#withProtection()} * @param basePath the base path for the node * @param initData data for the node * @param ttl for ttl modes, the ttl to use */ public PersistentNode(CuratorFramework givenClient, final CreateMode mode, boolean useProtection, final String basePath, byte[] initData, long ttl) { this.useProtection = useProtection; this.client = Preconditions.checkNotNull(givenClient, "client cannot be null").newWatcherRemoveCuratorFramework(); this.basePath = PathUtils.validatePath(basePath); this.mode = Preconditions.checkNotNull(mode, "mode cannot be null"); this.ttl = ttl; final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null"); backgroundCallback = new BackgroundCallback() { @Override public void processResult(CuratorFramework dummy, CuratorEvent event) throws Exception { if ( isActive() ) { processBackgroundCallback(event); } else { processBackgroundCallbackClosedState(event); } } }; this.data.set(Arrays.copyOf(data, data.length)); }
void refresh(final RefreshMode mode) throws Exception { ensurePath(); final BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if (PathChildrenCache.this.state.get().equals(State.CLOSED)) { // This ship is closed, don't handle the callback PathChildrenCache.this.client.removeWatchers(); return; } if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { processChildren(event.getChildren(), mode); } else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) { if ( mode == RefreshMode.NO_NODE_EXCEPTION ) { log.debug("KeeperException.NoNodeException received for getChildren() and refresh has failed. Resetting ensureContainers but not refreshing. Path: [{}]", path); ensureContainers.reset(); } else { log.debug("KeeperException.NoNodeException received for getChildren(). Resetting ensureContainers. Path: [{}]", path); ensureContainers.reset(); offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.NO_NODE_EXCEPTION)); } } } }; client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path); }
void getDataAndStat(final String fullPath) throws Exception { BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { applyNewData(fullPath, event.getResultCode(), event.getStat(), cacheData ? event.getData() : null); } }; if ( USE_EXISTS && !cacheData ) { client.checkExists().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath); } else { // always use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak if ( dataIsCompressed && cacheData ) { client.getData().decompressed().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath); } else { client.getData().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath); } } }
@VisibleForTesting void reset() throws Exception { setLeadership(false); setNode(null); BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if ( debugResetWaitLatch != null ) { debugResetWaitLatch.await(); debugResetWaitLatch = null; } if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { setNode(event.getName()); if ( state.get() == State.CLOSED ) { setNode(null); } else { getChildren(); } } else { log.error("getChildren() failed. rc = " + event.getResultCode()); } } }; client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id)); }
private void getChildren() throws Exception { BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { checkLeadership(event.getChildren()); } } }; client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null)); }
@Test public void testSetUpdatedDataWhenReconnected() throws Exception { CuratorFramework curator = newCurator(); byte[] initialData = "Hello World".getBytes(); byte[] updatedData = "Updated".getBytes(); PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, initialData); try { node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds()); node.start(); node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS); assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), initialData)); node.setData(updatedData); assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), updatedData)); server.restart(); final CountDownLatch dataUpdateLatch = new CountDownLatch(1); curator.getData().inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { dataUpdateLatch.countDown(); } }).forPath(node.getActualPath()); assertTrue(timing.awaitLatch(dataUpdateLatch)); assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), updatedData)); } finally { CloseableUtils.closeQuietly(node); } }
private void syncStateNode() throws Exception { final CountDownLatch latch = new CountDownLatch(1); curatorReference.get().sync().inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { latch.countDown(); } }).forPath("/state"); latch.await(); }