Java 类org.apache.curator.framework.api.BackgroundCallback 实例源码

项目:hadoop    文件:TestRegistryRMOperations.java   
/**
 *
 * trigger a purge operation
 * @param path pathn
 * @param id yarn ID
 * @param policyMatch policy to match ID on
 * @param purgePolicy policy when there are children under a match
 * @param callback optional callback
 * @return the number purged
 * @throws IOException
 */
public int purge(String path,
    String id,
    String policyMatch,
    RegistryAdminService.PurgePolicy purgePolicy,
    BackgroundCallback callback) throws
    IOException,
    ExecutionException,
    InterruptedException {

  Future<Integer> future = registry.purgeRecordsAsync(path,
      id, policyMatch, purgePolicy, callback);
  try {
    return future.get();
  } catch (ExecutionException e) {
    if (e.getCause() instanceof IOException) {
      throw (IOException) e.getCause();
    } else {
      throw e;
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestRegistryRMOperations.java   
/**
 *
 * trigger a purge operation
 * @param path pathn
 * @param id yarn ID
 * @param policyMatch policy to match ID on
 * @param purgePolicy policy when there are children under a match
 * @param callback optional callback
 * @return the number purged
 * @throws IOException
 */
public int purge(String path,
    String id,
    String policyMatch,
    RegistryAdminService.PurgePolicy purgePolicy,
    BackgroundCallback callback) throws
    IOException,
    ExecutionException,
    InterruptedException {

  Future<Integer> future = registry.purgeRecordsAsync(path,
      id, policyMatch, purgePolicy, callback);
  try {
    return future.get();
  } catch (ExecutionException e) {
    if (e.getCause() instanceof IOException) {
      throw (IOException) e.getCause();
    } else {
      throw e;
    }
  }
}
项目:Mastering-Mesos    文件:CuratorAsyncManager.java   
private <T extends SingularityId> List<T> existsThrows(final String pathNameforLogs, final Collection<String> paths, final IdTranscoder<T> idTranscoder) throws Exception {
  if (paths.isEmpty()) {
    return Collections.emptyList();
  }

  final List<T> objects = Lists.newArrayListWithCapacity(paths.size());

  final CountDownLatch latch = new CountDownLatch(paths.size());

  final BackgroundCallback callback = new BackgroundCallback() {

    @Override
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
      try {
        if (event.getStat() != null) {
          objects.add(Transcoders.getFromStringFunction(idTranscoder).apply(ZKPaths.getNodeFromPath(event.getPath())));
        }
      } finally {
        latch.countDown();
      }
    }
  };

  return queryAndReturnResultsThrows(objects, paths, callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.GET_DATA);
}
项目:Mastering-Mesos    文件:CuratorAsyncManager.java   
private <T extends SingularityId> List<T> notExistsThrows(final String pathNameforLogs, final Map<String, T> pathsMap) throws Exception {
  if (pathsMap.isEmpty()) {
    return Collections.emptyList();
  }

  final List<T> objects = Lists.newArrayListWithCapacity(pathsMap.size());

  final CountDownLatch latch = new CountDownLatch(pathsMap.size());

  final BackgroundCallback callback = new BackgroundCallback() {

    @Override
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
      try {
        if (event.getStat() == null) {
          objects.add(pathsMap.get(event.getPath()));
        }
      } finally {
        latch.countDown();
      }
    }
  };

  return queryAndReturnResultsThrows(objects, pathsMap.keySet(), callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.CHECK_EXISTS);
}
项目:Mastering-Mesos    文件:CuratorAsyncManager.java   
private <T> T queryAndReturnResultsThrows(final T results, final Collection<String> paths, final BackgroundCallback callback, final CountDownLatch latch, final String pathNameForLogs, final AtomicInteger bytes, final CuratorQueryMethod method) throws Exception {
  final long start = System.currentTimeMillis();

  try {
    for (String path : paths) {
      switch (method) {
        case GET_DATA:
          curator.getData().inBackground(callback).forPath(path);
          break;
        case GET_CHILDREN:
          curator.getChildren().inBackground(callback).forPath(path);
          break;
        case CHECK_EXISTS:
        default:
          curator.checkExists().inBackground(callback).forPath(path);
          break;
      }
    }

    checkLatch(latch, pathNameForLogs);
  } finally {
    log(OperationType.READ, Optional.of(paths.size()), bytes.get() > 0 ? Optional.of(bytes.get()) : Optional.<Integer>absent(), start, pathNameForLogs);
  }

  return results;
}
项目:big-c    文件:TestRegistryRMOperations.java   
/**
 *
 * trigger a purge operation
 * @param path pathn
 * @param id yarn ID
 * @param policyMatch policy to match ID on
 * @param purgePolicy policy when there are children under a match
 * @param callback optional callback
 * @return the number purged
 * @throws IOException
 */
public int purge(String path,
    String id,
    String policyMatch,
    RegistryAdminService.PurgePolicy purgePolicy,
    BackgroundCallback callback) throws
    IOException,
    ExecutionException,
    InterruptedException {

  Future<Integer> future = registry.purgeRecordsAsync(path,
      id, policyMatch, purgePolicy, callback);
  try {
    return future.get();
  } catch (ExecutionException e) {
    if (e.getCause() instanceof IOException) {
      throw (IOException) e.getCause();
    } else {
      throw e;
    }
  }
}
项目:pravega    文件:ZKStoreHelper.java   
private BackgroundCallback callback(Consumer<CuratorEvent> result, Consumer<Throwable> exception, String path) {
    return (client, event) -> {
        if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
            result.accept(event);
        } else if (event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() ||
                event.getResultCode() == KeeperException.Code.SESSIONEXPIRED.intValue() ||
                event.getResultCode() == KeeperException.Code.SESSIONMOVED.intValue() ||
                event.getResultCode() == KeeperException.Code.OPERATIONTIMEOUT.intValue()) {
            exception.accept(StoreException.create(StoreException.Type.CONNECTION_ERROR, path));
        } else if (event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue()) {
            exception.accept(StoreException.create(StoreException.Type.DATA_EXISTS, path));
        } else if (event.getResultCode() == KeeperException.Code.BADVERSION.intValue()) {
            exception.accept(StoreException.create(StoreException.Type.WRITE_CONFLICT, path));
        } else if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
            exception.accept(StoreException.create(StoreException.Type.DATA_NOT_FOUND, path));
        } else if (event.getResultCode() == KeeperException.Code.NOTEMPTY.intValue()) {
            exception.accept(StoreException.create(StoreException.Type.DATA_CONTAINS_ELEMENTS, path));
        } else {
            exception.accept(StoreException.create(StoreException.Type.UNKNOWN,
                    KeeperException.create(KeeperException.Code.get(event.getResultCode()), path)));
        }
    };
}
项目:pravega    文件:ZKHostIndex.java   
private CompletableFuture<Void> createNode(CreateMode createMode, boolean createParents, String path, byte[] data) {
    CompletableFuture<Void> result = new CompletableFuture<>();
    try {
        BackgroundCallback callback = (cli, event) -> {
            if (event.getResultCode() == KeeperException.Code.OK.intValue() ||
                    event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue()) {
                result.complete(null);
            } else {
                result.completeExceptionally(translateErrorCode(path, event));
            }
        };
        if (createParents) {
            client.create().creatingParentsIfNeeded().withMode(createMode).inBackground(callback, executor)
                    .forPath(path, data);
        } else {
            client.create().withMode(createMode).inBackground(callback, executor).forPath(path, data);
        }
    } catch (Exception e) {
        result.completeExceptionally(StoreException.create(StoreException.Type.UNKNOWN, e));
    }
    return result;
}
项目:flink    文件:ZooKeeperStateHandleStore.java   
/**
 * Releases the lock for the given state node and tries to remove the state node if it is no longer locked.
 * The deletion of the state node is executed asynchronously. After the state node has been deleted, the given
 * callback is called with the {@link RetrievableStateHandle} of the deleted state node.
 *
 * <p><strong>Important</strong>: This also discards the stored state handle after the given action
 * has been executed.
 *
 * @param pathInZooKeeper Path of state handle to remove
 * @param callback The callback to execute after a successful deletion. Null if no action needs to be executed.
 * @throws Exception If the ZooKeeper operation fails
 */
public void releaseAndTryRemove(
        String pathInZooKeeper,
        @Nullable final RemoveCallback<T> callback) throws Exception {
    checkNotNull(pathInZooKeeper, "Path in ZooKeeper");

    final String path = normalizePath(pathInZooKeeper);

    RetrievableStateHandle<T> stateHandle = null;

    try {
        stateHandle = get(path, false);
    } catch (Exception e) {
        LOG.warn("Could not retrieve the state handle from node " + path + '.', e);
    }

    release(pathInZooKeeper);

    final BackgroundCallback backgroundCallback = new RemoveBackgroundCallback<>(stateHandle, callback, path);

    client.delete().inBackground(backgroundCallback, executor).forPath(path);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestRegistryRMOperations.java   
/**
 *
 * trigger a purge operation
 * @param path pathn
 * @param id yarn ID
 * @param policyMatch policy to match ID on
 * @param purgePolicy policy when there are children under a match
 * @param callback optional callback
 * @return the number purged
 * @throws IOException
 */
public int purge(String path,
    String id,
    String policyMatch,
    RegistryAdminService.PurgePolicy purgePolicy,
    BackgroundCallback callback) throws
    IOException,
    ExecutionException,
    InterruptedException {

  Future<Integer> future = registry.purgeRecordsAsync(path,
      id, policyMatch, purgePolicy, callback);
  try {
    return future.get();
  } catch (ExecutionException e) {
    if (e.getCause() instanceof IOException) {
      throw (IOException) e.getCause();
    } else {
      throw e;
    }
  }
}
项目:hops    文件:TestRegistryRMOperations.java   
/**
 *
 * trigger a purge operation
 * @param path pathn
 * @param id yarn ID
 * @param policyMatch policy to match ID on
 * @param purgePolicy policy when there are children under a match
 * @param callback optional callback
 * @return the number purged
 * @throws IOException
 */
public int purge(String path,
    String id,
    String policyMatch,
    RegistryAdminService.PurgePolicy purgePolicy,
    BackgroundCallback callback) throws
    IOException,
    ExecutionException,
    InterruptedException {

  Future<Integer> future = registry.purgeRecordsAsync(path,
      id, policyMatch, purgePolicy, callback);
  try {
    return future.get();
  } catch (ExecutionException e) {
    if (e.getCause() instanceof IOException) {
      throw (IOException) e.getCause();
    } else {
      throw e;
    }
  }
}
项目:yuzhouwan    文件:CuratorInBackground.java   
public void createEphemeralNodeRecursionInBackground(String path) throws Exception {
    curatorFramework.create()
            .creatingParentsIfNeeded()
            .withMode(CreateMode.PERSISTENT)
            .inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {

                    LOG.info("event's result code: {}, type: {}", event.getResultCode(), event.getType());

                    showCurrentThreadName();

                    countDownLatch.countDown();
                }
            }).forPath(path);
}
项目:Singularity    文件:CuratorAsyncManager.java   
private <T extends SingularityId> List<T> existsThrows(final String pathNameforLogs, final Collection<String> paths, final IdTranscoder<T> idTranscoder) throws Exception {
  if (paths.isEmpty()) {
    return Collections.emptyList();
  }

  final List<T> objects = Lists.newArrayListWithCapacity(paths.size());

  final CountDownLatch latch = new CountDownLatch(paths.size());

  final BackgroundCallback callback = new BackgroundCallback() {

    @Override
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
      try {
        if (event.getStat() != null) {
          objects.add(Transcoders.getFromStringFunction(idTranscoder).apply(ZKPaths.getNodeFromPath(event.getPath())));
        }
      } finally {
        latch.countDown();
      }
    }
  };

  return queryAndReturnResultsThrows(objects, paths, callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.GET_DATA);
}
项目:Singularity    文件:CuratorAsyncManager.java   
private <T extends SingularityId> List<T> notExistsThrows(final String pathNameforLogs, final Map<String, T> pathsMap) throws Exception {
  if (pathsMap.isEmpty()) {
    return Collections.emptyList();
  }

  final List<T> objects = Lists.newArrayListWithCapacity(pathsMap.size());

  final CountDownLatch latch = new CountDownLatch(pathsMap.size());

  final BackgroundCallback callback = new BackgroundCallback() {

    @Override
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
      try {
        if (event.getStat() == null) {
          objects.add(pathsMap.get(event.getPath()));
        }
      } finally {
        latch.countDown();
      }
    }
  };

  return queryAndReturnResultsThrows(objects, pathsMap.keySet(), callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.CHECK_EXISTS);
}
项目:Singularity    文件:CuratorAsyncManager.java   
private <T> T queryAndReturnResultsThrows(final T results, final Collection<String> paths, final BackgroundCallback callback, final CountDownLatch latch, final String pathNameForLogs, final AtomicInteger bytes, final CuratorQueryMethod method) throws Exception {
  final long start = System.currentTimeMillis();

  try {
    for (String path : paths) {
      switch (method) {
        case GET_DATA:
          curator.getData().inBackground(callback).forPath(path);
          break;
        case GET_CHILDREN:
          curator.getChildren().inBackground(callback).forPath(path);
          break;
        case CHECK_EXISTS:
        default:
          curator.checkExists().inBackground(callback).forPath(path);
          break;
      }
    }

    checkLatch(latch, pathNameForLogs);
  } finally {
    log(method.operationType, Optional.of(paths.size()), bytes.get() > 0 ? Optional.of(bytes.get()) : Optional.<Integer>absent(), start, pathNameForLogs);
  }

  return results;
}
项目:hadoop    文件:RegistryAdminService.java   
public AsyncPurge(String path,
    NodeSelector selector,
    PurgePolicy purgePolicy,
    BackgroundCallback callback) {
  this.callback = callback;
  this.selector = selector;
  this.path = path;
  this.purgePolicy = purgePolicy;
}
项目:aliyun-oss-hadoop-fs    文件:RegistryAdminService.java   
public AsyncPurge(String path,
    NodeSelector selector,
    PurgePolicy purgePolicy,
    BackgroundCallback callback) {
  this.callback = callback;
  this.selector = selector;
  this.path = path;
  this.purgePolicy = purgePolicy;
}
项目:Mastering-Mesos    文件:CuratorAsyncManager.java   
private <T extends SingularityId> List<T> getChildrenAsIdsForParentsThrows(final String pathNameforLogs, final Collection<String> parents, final IdTranscoder<T> idTranscoder) throws Exception {
  if (parents.isEmpty()) {
    return Collections.emptyList();
  }

  final List<T> objects = Lists.newArrayListWithExpectedSize(parents.size());
  final List<T> synchronizedObjects = Collections.synchronizedList(objects);

  final CountDownLatch latch = new CountDownLatch(parents.size());

  final BackgroundCallback callback = new BackgroundCallback() {

    @Override
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
      try {
        if (event.getChildren() == null || event.getChildren().size() == 0) {
          LOG.trace("Expected children for node {} - but found none", event.getPath());
          return;
        }
        synchronizedObjects.addAll(Lists.transform(event.getChildren(), Transcoders.getFromStringFunction(idTranscoder)));
      } finally {
        latch.countDown();
      }
    }
  };

  return queryAndReturnResultsThrows(synchronizedObjects, parents, callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.GET_CHILDREN);
}
项目:Mastering-Mesos    文件:CuratorAsyncManager.java   
protected <T, Q> Map<T, List<Q>> getAsyncNestedChildDataAsMapThrows(final String pathNameForLogs, final Map<String, T> parentPathsMap, final String subpath, final Transcoder<Q> transcoder) throws Exception {
  final Map<String, T> allPathsMap = Maps.newHashMap();
  for (Map.Entry<String, T> entry : parentPathsMap.entrySet()) {
    for (String child : getChildren(ZKPaths.makePath(entry.getKey(), subpath))) {
      allPathsMap.put(ZKPaths.makePath(entry.getKey(), subpath, child), entry.getValue());
    }
  }

  final ConcurrentHashMap<T, List<Q>> resultsMap = new ConcurrentHashMap<>();
  final CountDownLatch latch = new CountDownLatch(allPathsMap.size());
  final AtomicInteger bytes = new AtomicInteger();
  final BackgroundCallback callback = new BackgroundCallback() {

    @Override
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
      try {
        if (event.getData() == null || event.getData().length == 0) {
          LOG.trace("Expected active node {} but it wasn't there", event.getPath());
          return;
        }
        bytes.getAndAdd(event.getData().length);

        final Q object = transcoder.fromBytes(event.getData());

        if (allPathsMap.get(event.getPath()) != null) {
          resultsMap.putIfAbsent(allPathsMap.get(event.getPath()), new ArrayList<Q>());
          resultsMap.get(allPathsMap.get(event.getPath())).add(object);
        }
      } finally {
        latch.countDown();
      }
    }
  };

  return queryAndReturnResultsThrows(resultsMap, allPathsMap.keySet(), callback, latch, pathNameForLogs, bytes, CuratorQueryMethod.GET_DATA);
}
项目:big-c    文件:RegistryAdminService.java   
public AsyncPurge(String path,
    NodeSelector selector,
    PurgePolicy purgePolicy,
    BackgroundCallback callback) {
  this.callback = callback;
  this.selector = selector;
  this.path = path;
  this.purgePolicy = purgePolicy;
}
项目:niubi-job-examples    文件:CurdTest.java   
public void testDelete() throws Exception {
    CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181,localhost:3181,localhost:4181", new ExponentialBackoffRetry(1000, 4));
    client.start();
    client.delete().inBackground(new BackgroundCallback() {
        public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
            System.out.println(JsonHelper.toJson(curatorEvent));
        }
    }).forPath("/curd-test/delete");

    Thread.sleep(10000);
}
项目:OSP-sample    文件:CurdTest.java   
public void testDelete() throws Exception {
    CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181,localhost:3181,localhost:4181", new ExponentialBackoffRetry(1000, 4));
    client.start();
    client.delete().inBackground(new BackgroundCallback() {
        public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
            System.out.println(JsonHelper.toJson(curatorEvent));
        }
    }).forPath("/curd-test/delete");

    Thread.sleep(10000);
}
项目:flink    文件:ZooKeeperStateHandleStoreITCase.java   
/**
 * Tests that state handles are correctly removed with a callback.
 */
@Test
public void testRemoveWithCallback() throws Exception {
    // Setup
    LongStateStorage stateHandleProvider = new LongStateStorage();

    ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
            ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());

    // Config
    final String pathInZooKeeper = "/testRemoveWithCallback";
    final Long state = 27255442L;

    store.add(pathInZooKeeper, state);

    final CountDownLatch sync = new CountDownLatch(1);
    BackgroundCallback callback = mock(BackgroundCallback.class);
    doAnswer(new Answer<Void>() {
        @Override
        public Void answer(InvocationOnMock invocation) throws Throwable {
            sync.countDown();
            return null;
        }
    }).when(callback).processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class));

    // Test
    store.remove(pathInZooKeeper, callback);

    // Verify discarded and callback called
    assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());

    sync.await();

    verify(callback, times(1))
            .processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class));
}
项目:hadoop-2.6.0-cdh5.4.3    文件:RegistryAdminService.java   
public AsyncPurge(String path,
    NodeSelector selector,
    PurgePolicy purgePolicy,
    BackgroundCallback callback) {
  this.callback = callback;
  this.selector = selector;
  this.path = path;
  this.purgePolicy = purgePolicy;
}
项目:hops    文件:RegistryAdminService.java   
public AsyncPurge(String path,
    NodeSelector selector,
    PurgePolicy purgePolicy,
    BackgroundCallback callback) {
  this.callback = callback;
  this.selector = selector;
  this.path = path;
  this.purgePolicy = purgePolicy;
}
项目:heron    文件:CuratorStateManager.java   
@Override
protected <M extends Message> ListenableFuture<M> getNodeData(
    WatchCallback watcher,
    String path,
    final Message.Builder builder) {
  final SettableFuture<M> future = SettableFuture.create();

  Watcher wc = ZkWatcherCallback.makeZkWatcher(watcher);

  BackgroundCallback cb = new BackgroundCallback() {
    @Override
    @SuppressWarnings("unchecked") // we don't know what M is until runtime
    public void processResult(CuratorFramework aClient, CuratorEvent event) throws Exception {
      byte[] data;
      if (event != null & (data = event.getData()) != null) {
        builder.mergeFrom(data);
        safeSetFuture(future, (M) builder.build());
      } else {
        safeSetException(future, new RuntimeException("Failed to fetch data from path: "
            + event.getPath()));
      }
    }
  };

  try {
    client.getData().usingWatcher(wc).inBackground(cb).forPath(path);

    // Suppress it since forPath() throws Exception
    // SUPPRESS CHECKSTYLE IllegalCatch
  } catch (Exception e) {
    safeSetException(future, new RuntimeException("Could not getNodeData", e));
  }

  return future;
}
项目:curator    文件:Backgrounding.java   
Backgrounding(BackgroundCallback callback)
{
    this.inBackground = true;
    this.context = null;
    this.callback = callback;
    errorListener = null;
}
项目:curator    文件:Backgrounding.java   
Backgrounding(BackgroundCallback callback, Object context)
{
    this.inBackground = true;
    this.context = context;
    this.callback = callback;
    errorListener = null;
}
项目:curator    文件:Backgrounding.java   
public Backgrounding(BackgroundCallback callback, UnhandledErrorListener errorListener)
{
    this.callback = callback;
    this.errorListener = errorListener;
    inBackground = true;
    context = null;
}
项目:curator    文件:Backgrounding.java   
private static BackgroundCallback wrapCallback(final CuratorFrameworkImpl client, final BackgroundCallback callback, final Executor executor)
{
    return new BackgroundCallback()
    {
        @Override
        public void processResult(CuratorFramework dummy, final CuratorEvent event) throws Exception
        {
            executor.execute
                (
                    new Runnable()
                    {
                        @Override
                        public void run()
                        {
                            try
                            {
                                callback.processResult(client, event);
                            }
                            catch ( Exception e )
                            {
                                ThreadUtils.checkInterrupted(e);
                                if ( e instanceof KeeperException )
                                {
                                    client.validateConnection(client.codeToState(((KeeperException)e).code()));
                                }
                                client.logError("Background operation result handling threw exception", e);
                            }
                        }
                    }
                );
        }
    };
}
项目:curator    文件:OperationAndData.java   
OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context, boolean connectionRequired)
{
    this.operation = operation;
    this.data = data;
    this.callback = callback;
    this.errorCallback = errorCallback;
    this.context = context;
    this.connectionRequired = connectionRequired;
    reset();
}
项目:curator    文件:PersistentNode.java   
/**
 * @param givenClient        client instance
 * @param mode          creation mode
 * @param useProtection if true, call {@link CreateBuilder#withProtection()}
 * @param basePath the base path for the node
 * @param initData data for the node
 * @param ttl for ttl modes, the ttl to use
 */
public PersistentNode(CuratorFramework givenClient, final CreateMode mode, boolean useProtection, final String basePath, byte[] initData, long ttl)
{
    this.useProtection = useProtection;
    this.client = Preconditions.checkNotNull(givenClient, "client cannot be null").newWatcherRemoveCuratorFramework();
    this.basePath = PathUtils.validatePath(basePath);
    this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
    this.ttl = ttl;
    final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null");

    backgroundCallback = new BackgroundCallback()
    {
        @Override
        public void processResult(CuratorFramework dummy, CuratorEvent event) throws Exception
        {
            if ( isActive() )
            {
                processBackgroundCallback(event);
            }
            else
            {
                processBackgroundCallbackClosedState(event);
            }
        }
    };

    this.data.set(Arrays.copyOf(data, data.length));
}
项目:curator    文件:PathChildrenCache.java   
void refresh(final RefreshMode mode) throws Exception
{
    ensurePath();

    final BackgroundCallback callback = new BackgroundCallback()
    {
        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
        {
            if (PathChildrenCache.this.state.get().equals(State.CLOSED)) {
                // This ship is closed, don't handle the callback
                PathChildrenCache.this.client.removeWatchers();
                return;
            }
            if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
            {
                processChildren(event.getChildren(), mode);
            }
            else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
            {
                if ( mode == RefreshMode.NO_NODE_EXCEPTION )
                {
                    log.debug("KeeperException.NoNodeException received for getChildren() and refresh has failed. Resetting ensureContainers but not refreshing. Path: [{}]", path);
                    ensureContainers.reset();
                }
                else
                {
                    log.debug("KeeperException.NoNodeException received for getChildren(). Resetting ensureContainers. Path: [{}]", path);
                    ensureContainers.reset();
                    offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.NO_NODE_EXCEPTION));
                }
            }
        }
    };

    client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path);
}
项目:curator    文件:PathChildrenCache.java   
void getDataAndStat(final String fullPath) throws Exception
{
    BackgroundCallback callback = new BackgroundCallback()
    {
        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
        {
            applyNewData(fullPath, event.getResultCode(), event.getStat(), cacheData ? event.getData() : null);
        }
    };

    if ( USE_EXISTS && !cacheData )
    {
        client.checkExists().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
    }
    else
    {
        // always use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
        if ( dataIsCompressed && cacheData )
        {
            client.getData().decompressed().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
        }
        else
        {
            client.getData().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
        }
    }
}
项目:curator    文件:LeaderLatch.java   
@VisibleForTesting
void reset() throws Exception
{
    setLeadership(false);
    setNode(null);

    BackgroundCallback callback = new BackgroundCallback()
    {
        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
        {
            if ( debugResetWaitLatch != null )
            {
                debugResetWaitLatch.await();
                debugResetWaitLatch = null;
            }

            if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
            {
                setNode(event.getName());
                if ( state.get() == State.CLOSED )
                {
                    setNode(null);
                }
                else
                {
                    getChildren();
                }
            }
            else
            {
                log.error("getChildren() failed. rc = " + event.getResultCode());
            }
        }
    };
    client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
}
项目:curator    文件:LeaderLatch.java   
private void getChildren() throws Exception
{
    BackgroundCallback callback = new BackgroundCallback()
    {
        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
        {
            if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
            {
                checkLeadership(event.getChildren());
            }
        }
    };
    client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
}
项目:curator    文件:TestPersistentEphemeralNode.java   
@Test
public void testSetUpdatedDataWhenReconnected() throws Exception
{
    CuratorFramework curator = newCurator();

    byte[] initialData = "Hello World".getBytes();
    byte[] updatedData = "Updated".getBytes();

    PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, initialData);
    try
    {
        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
        node.start();
        node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
        assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), initialData));

        node.setData(updatedData);
        assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), updatedData));

        server.restart();

        final CountDownLatch dataUpdateLatch = new CountDownLatch(1);
        curator.getData().inBackground(new BackgroundCallback() {

            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                dataUpdateLatch.countDown();
            }
        }).forPath(node.getActualPath());

        assertTrue(timing.awaitLatch(dataUpdateLatch));

        assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), updatedData));
    }
    finally
    {
        CloseableUtils.closeQuietly(node);
    }
}
项目:Singularity    文件:CuratorAsyncManager.java   
private <T extends SingularityId> List<T> getChildrenAsIdsForParentsThrows(final String pathNameforLogs, final Collection<String> parents, final IdTranscoder<T> idTranscoder) throws Exception {
  if (parents.isEmpty()) {
    return Collections.emptyList();
  }

  final List<T> objects = Lists.newArrayListWithExpectedSize(parents.size());
  final List<T> synchronizedObjects = Collections.synchronizedList(objects);

  final CountDownLatch latch = new CountDownLatch(parents.size());

  final BackgroundCallback callback = new BackgroundCallback() {

    @Override
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
      try {
        if (event.getChildren() == null || event.getChildren().size() == 0) {
          LOG.trace("Expected children for node {} - but found none", event.getPath());
          return;
        }
        synchronizedObjects.addAll(Lists.transform(event.getChildren(), Transcoders.getFromStringFunction(idTranscoder)));
      } finally {
        latch.countDown();
      }
    }
  };

  return queryAndReturnResultsThrows(synchronizedObjects, parents, callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.GET_CHILDREN);
}
项目:Singularity    文件:CuratorAsyncManager.java   
protected <T, Q> Map<T, List<Q>> getAsyncNestedChildDataAsMapThrows(final String pathNameForLogs, final Map<String, T> parentPathsMap, final String subpath, final Transcoder<Q> transcoder) throws Exception {
  final Map<String, T> allPathsMap = Maps.newHashMap();
  for (Map.Entry<String, T> entry : parentPathsMap.entrySet()) {
    for (String child : getChildren(ZKPaths.makePath(entry.getKey(), subpath))) {
      allPathsMap.put(ZKPaths.makePath(entry.getKey(), subpath, child), entry.getValue());
    }
  }

  final ConcurrentHashMap<T, List<Q>> resultsMap = new ConcurrentHashMap<>();
  final CountDownLatch latch = new CountDownLatch(allPathsMap.size());
  final AtomicInteger bytes = new AtomicInteger();
  final BackgroundCallback callback = new BackgroundCallback() {

    @Override
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
      try {
        if (event.getData() == null || event.getData().length == 0) {
          LOG.trace("Expected active node {} but it wasn't there", event.getPath());
          return;
        }
        bytes.getAndAdd(event.getData().length);

        final Q object = transcoder.fromBytes(event.getData());

        if (allPathsMap.get(event.getPath()) != null) {
          resultsMap.putIfAbsent(allPathsMap.get(event.getPath()), new ArrayList<Q>());
          resultsMap.get(allPathsMap.get(event.getPath())).add(object);
        }
      } finally {
        latch.countDown();
      }
    }
  };

  return queryAndReturnResultsThrows(resultsMap, allPathsMap.keySet(), callback, latch, pathNameForLogs, bytes, CuratorQueryMethod.GET_DATA);
}
项目:Baragon    文件:DefaultBaragonStateFetcher.java   
private void syncStateNode() throws Exception {
  final CountDownLatch latch = new CountDownLatch(1);
  curatorReference.get().sync().inBackground(new BackgroundCallback() {

    @Override
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
      latch.countDown();
    }
  }).forPath("/state");

  latch.await();
}