Java 类org.apache.curator.framework.api.CuratorEvent 实例源码

项目:Mastering-Mesos    文件:CuratorAsyncManager.java   
private <T extends SingularityId> List<T> existsThrows(final String pathNameforLogs, final Collection<String> paths, final IdTranscoder<T> idTranscoder) throws Exception {
  if (paths.isEmpty()) {
    return Collections.emptyList();
  }

  final List<T> objects = Lists.newArrayListWithCapacity(paths.size());

  final CountDownLatch latch = new CountDownLatch(paths.size());

  final BackgroundCallback callback = new BackgroundCallback() {

    @Override
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
      try {
        if (event.getStat() != null) {
          objects.add(Transcoders.getFromStringFunction(idTranscoder).apply(ZKPaths.getNodeFromPath(event.getPath())));
        }
      } finally {
        latch.countDown();
      }
    }
  };

  return queryAndReturnResultsThrows(objects, paths, callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.GET_DATA);
}
项目:Mastering-Mesos    文件:CuratorAsyncManager.java   
private <T extends SingularityId> List<T> notExistsThrows(final String pathNameforLogs, final Map<String, T> pathsMap) throws Exception {
  if (pathsMap.isEmpty()) {
    return Collections.emptyList();
  }

  final List<T> objects = Lists.newArrayListWithCapacity(pathsMap.size());

  final CountDownLatch latch = new CountDownLatch(pathsMap.size());

  final BackgroundCallback callback = new BackgroundCallback() {

    @Override
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
      try {
        if (event.getStat() == null) {
          objects.add(pathsMap.get(event.getPath()));
        }
      } finally {
        latch.countDown();
      }
    }
  };

  return queryAndReturnResultsThrows(objects, pathsMap.keySet(), callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.CHECK_EXISTS);
}
项目:pravega    文件:ZKStoreHelper.java   
private BackgroundCallback callback(Consumer<CuratorEvent> result, Consumer<Throwable> exception, String path) {
    return (client, event) -> {
        if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
            result.accept(event);
        } else if (event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() ||
                event.getResultCode() == KeeperException.Code.SESSIONEXPIRED.intValue() ||
                event.getResultCode() == KeeperException.Code.SESSIONMOVED.intValue() ||
                event.getResultCode() == KeeperException.Code.OPERATIONTIMEOUT.intValue()) {
            exception.accept(StoreException.create(StoreException.Type.CONNECTION_ERROR, path));
        } else if (event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue()) {
            exception.accept(StoreException.create(StoreException.Type.DATA_EXISTS, path));
        } else if (event.getResultCode() == KeeperException.Code.BADVERSION.intValue()) {
            exception.accept(StoreException.create(StoreException.Type.WRITE_CONFLICT, path));
        } else if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
            exception.accept(StoreException.create(StoreException.Type.DATA_NOT_FOUND, path));
        } else if (event.getResultCode() == KeeperException.Code.NOTEMPTY.intValue()) {
            exception.accept(StoreException.create(StoreException.Type.DATA_CONTAINS_ELEMENTS, path));
        } else {
            exception.accept(StoreException.create(StoreException.Type.UNKNOWN,
                    KeeperException.create(KeeperException.Code.get(event.getResultCode()), path)));
        }
    };
}
项目:pravega    文件:ZKHostIndex.java   
private StoreException translateErrorCode(String path, CuratorEvent event) {
    StoreException ex;
    if (event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() ||
            event.getResultCode() == KeeperException.Code.SESSIONEXPIRED.intValue() ||
            event.getResultCode() == KeeperException.Code.SESSIONMOVED.intValue() ||
            event.getResultCode() == KeeperException.Code.OPERATIONTIMEOUT.intValue()) {
        ex = StoreException.create(StoreException.Type.CONNECTION_ERROR, path);
    } else if (event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue()) {
        ex = StoreException.create(StoreException.Type.DATA_EXISTS, path);
    } else if (event.getResultCode() == KeeperException.Code.BADVERSION.intValue()) {
        ex = StoreException.create(StoreException.Type.WRITE_CONFLICT, path);
    } else if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
        ex = StoreException.create(StoreException.Type.DATA_NOT_FOUND, path);
    } else if (event.getResultCode() == KeeperException.Code.NOTEMPTY.intValue()) {
        ex = StoreException.create(StoreException.Type.DATA_CONTAINS_ELEMENTS, path);
    } else {
        ex = StoreException.create(StoreException.Type.UNKNOWN,
                KeeperException.create(KeeperException.Code.get(event.getResultCode()), path));
    }
    return ex;
}
项目:yuzhouwan    文件:CuratorInBackground.java   
public void createEphemeralNodeRecursionInBackground(String path) throws Exception {
    curatorFramework.create()
            .creatingParentsIfNeeded()
            .withMode(CreateMode.PERSISTENT)
            .inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {

                    LOG.info("event's result code: {}, type: {}", event.getResultCode(), event.getType());

                    showCurrentThreadName();

                    countDownLatch.countDown();
                }
            }).forPath(path);
}
项目:curator    文件:CuratorMultiTransactionImpl.java   
@Override
public void performBackgroundOperation(final OperationAndData<CuratorMultiTransactionRecord> operationAndData) throws Exception
{
    try
    {
        final TimeTrace trace = client.getZookeeperClient().startTracer("CuratorMultiTransactionImpl-Background");
        AsyncCallback.MultiCallback callback = new AsyncCallback.MultiCallback()
        {
            @Override
            public void processResult(int rc, String path, Object ctx, List<OpResult> opResults)
            {
                trace.commit();
                List<CuratorTransactionResult> curatorResults = (opResults != null) ? CuratorTransactionImpl.wrapResults(client, opResults, operationAndData.getData()) : null;
                CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.TRANSACTION, rc, path, null, ctx, null, null, null, null, null, curatorResults);
                client.processBackgroundOperation(operationAndData, event);
            }
        };
        client.getZooKeeper().multi(operationAndData.getData(), callback, backgrounding.getContext());
    }
    catch ( Throwable e )
    {
        backgrounding.checkError(e, null);
    }
}
项目:curator    文件:ModeledFrameworkImpl.java   
public static <T> ModeledFrameworkImpl<T> build(AsyncCuratorFramework client, ModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, Set<ModeledOptions> modeledOptions)
{
    boolean isWatched = (watchMode != null);

    Objects.requireNonNull(client, "client cannot be null");
    Objects.requireNonNull(model, "model cannot be null");
    modeledOptions = ImmutableSet.copyOf(Objects.requireNonNull(modeledOptions, "modeledOptions cannot be null"));

    watchMode = (watchMode != null) ? watchMode : WatchMode.stateChangeAndSuccess;

    AsyncCuratorFrameworkDsl dslClient = client.with(watchMode, unhandledErrorListener, resultFilter, watcherFilter);
    WatchableAsyncCuratorFramework watchableClient = isWatched ? dslClient.watched() : dslClient;

    return new ModeledFrameworkImpl<>(
        client,
        dslClient,
        watchableClient,
        model,
        watchMode,
        watcherFilter,
        unhandledErrorListener,
        resultFilter,
        isWatched,
        modeledOptions
    );
}
项目:curator    文件:PersistentNode.java   
private void processBackgroundCallbackClosedState(CuratorEvent event)
{
    String path = null;
    if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() )
    {
        path = event.getPath();
    }
    else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
    {
        path = event.getName();
    }

    if ( path != null )
    {
        try
        {
            client.delete().guaranteed().inBackground().forPath(path);
        }
        catch ( Exception e )
        {
            log.error("Could not delete node after close", e);
        }
    }
}
项目:Singularity    文件:CuratorAsyncManager.java   
private <T extends SingularityId> List<T> existsThrows(final String pathNameforLogs, final Collection<String> paths, final IdTranscoder<T> idTranscoder) throws Exception {
  if (paths.isEmpty()) {
    return Collections.emptyList();
  }

  final List<T> objects = Lists.newArrayListWithCapacity(paths.size());

  final CountDownLatch latch = new CountDownLatch(paths.size());

  final BackgroundCallback callback = new BackgroundCallback() {

    @Override
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
      try {
        if (event.getStat() != null) {
          objects.add(Transcoders.getFromStringFunction(idTranscoder).apply(ZKPaths.getNodeFromPath(event.getPath())));
        }
      } finally {
        latch.countDown();
      }
    }
  };

  return queryAndReturnResultsThrows(objects, paths, callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.GET_DATA);
}
项目:Singularity    文件:CuratorAsyncManager.java   
private <T extends SingularityId> List<T> notExistsThrows(final String pathNameforLogs, final Map<String, T> pathsMap) throws Exception {
  if (pathsMap.isEmpty()) {
    return Collections.emptyList();
  }

  final List<T> objects = Lists.newArrayListWithCapacity(pathsMap.size());

  final CountDownLatch latch = new CountDownLatch(pathsMap.size());

  final BackgroundCallback callback = new BackgroundCallback() {

    @Override
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
      try {
        if (event.getStat() == null) {
          objects.add(pathsMap.get(event.getPath()));
        }
      } finally {
        latch.countDown();
      }
    }
  };

  return queryAndReturnResultsThrows(objects, pathsMap.keySet(), callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.CHECK_EXISTS);
}
项目:Baragon    文件:ZkParallelFetcher.java   
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
  try {
    KeeperException.Code code = KeeperException.Code.get(event.getResultCode());

    switch (code) {
      case OK:
        T data = event.getData() == null ? null : transformFunction.apply(event.getData());
        dataMap.put(ZKPaths.getNodeFromPath(event.getPath()), data);
        break;
      case NONODE:
        // In this case there was a race condition in which the child node was deleted before we asked for data.
        break;
      default:
        exceptions.add(KeeperException.create(code, event.getPath()));
    }
  } finally {
    countDownLatch.countDown();
  }
}
项目:Baragon    文件:ZkParallelFetcher.java   
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
  try {
    KeeperException.Code code = KeeperException.Code.get(event.getResultCode());

    switch (code) {
      case OK:
        childMap.put(ZKPaths.getNodeFromPath(event.getPath()), new HashSet<>(event.getChildren()));
        break;
      case NONODE:
        // In this case there was a race condition in which the child node was deleted before we asked for data.
        break;
      default:
        exceptions.add(KeeperException.create(code, event.getPath()));
    }
  } finally {
    countDownLatch.countDown();
  }
}
项目:hadoop    文件:DeleteCompletionCallback.java   
@Override
public void processResult(CuratorFramework client,
    CuratorEvent event) throws
    Exception {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Delete event {}", event);
  }
  events.incrementAndGet();
}
项目:hadoop    文件:CuratorEventCatcher.java   
@Override
public void processResult(CuratorFramework client,
    CuratorEvent event) throws
    Exception {
  LOG.info("received {}", event);
  eventCounter.incrementAndGet();
  events.put(event);
}
项目:hadoop    文件:TestCuratorService.java   
@Test
public void testBackgroundDelete() throws Throwable {
  mkPath("/rm", CreateMode.PERSISTENT);
  mkPath("/rm/child", CreateMode.PERSISTENT);
  CuratorEventCatcher events = new CuratorEventCatcher();
  curatorService.zkDelete("/rm", true, events);
  CuratorEvent taken = events.take();
  LOG.info("took {}", taken);
  assertEquals(1, events.getCount());
}
项目:aliyun-oss-hadoop-fs    文件:DeleteCompletionCallback.java   
@Override
public void processResult(CuratorFramework client,
    CuratorEvent event) throws
    Exception {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Delete event {}", event);
  }
  events.incrementAndGet();
}
项目:aliyun-oss-hadoop-fs    文件:CuratorEventCatcher.java   
@Override
public void processResult(CuratorFramework client,
    CuratorEvent event) throws
    Exception {
  LOG.info("received {}", event);
  eventCounter.incrementAndGet();
  events.put(event);
}
项目:aliyun-oss-hadoop-fs    文件:TestCuratorService.java   
@Test
public void testBackgroundDelete() throws Throwable {
  mkPath("/rm", CreateMode.PERSISTENT);
  mkPath("/rm/child", CreateMode.PERSISTENT);
  CuratorEventCatcher events = new CuratorEventCatcher();
  curatorService.zkDelete("/rm", true, events);
  CuratorEvent taken = events.take();
  LOG.info("took {}", taken);
  assertEquals(1, events.getCount());
}
项目:Mastering-Mesos    文件:CuratorAsyncManager.java   
private <T extends SingularityId> List<T> getChildrenAsIdsForParentsThrows(final String pathNameforLogs, final Collection<String> parents, final IdTranscoder<T> idTranscoder) throws Exception {
  if (parents.isEmpty()) {
    return Collections.emptyList();
  }

  final List<T> objects = Lists.newArrayListWithExpectedSize(parents.size());
  final List<T> synchronizedObjects = Collections.synchronizedList(objects);

  final CountDownLatch latch = new CountDownLatch(parents.size());

  final BackgroundCallback callback = new BackgroundCallback() {

    @Override
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
      try {
        if (event.getChildren() == null || event.getChildren().size() == 0) {
          LOG.trace("Expected children for node {} - but found none", event.getPath());
          return;
        }
        synchronizedObjects.addAll(Lists.transform(event.getChildren(), Transcoders.getFromStringFunction(idTranscoder)));
      } finally {
        latch.countDown();
      }
    }
  };

  return queryAndReturnResultsThrows(synchronizedObjects, parents, callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.GET_CHILDREN);
}
项目:Mastering-Mesos    文件:CuratorAsyncManager.java   
protected <T, Q> Map<T, List<Q>> getAsyncNestedChildDataAsMapThrows(final String pathNameForLogs, final Map<String, T> parentPathsMap, final String subpath, final Transcoder<Q> transcoder) throws Exception {
  final Map<String, T> allPathsMap = Maps.newHashMap();
  for (Map.Entry<String, T> entry : parentPathsMap.entrySet()) {
    for (String child : getChildren(ZKPaths.makePath(entry.getKey(), subpath))) {
      allPathsMap.put(ZKPaths.makePath(entry.getKey(), subpath, child), entry.getValue());
    }
  }

  final ConcurrentHashMap<T, List<Q>> resultsMap = new ConcurrentHashMap<>();
  final CountDownLatch latch = new CountDownLatch(allPathsMap.size());
  final AtomicInteger bytes = new AtomicInteger();
  final BackgroundCallback callback = new BackgroundCallback() {

    @Override
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
      try {
        if (event.getData() == null || event.getData().length == 0) {
          LOG.trace("Expected active node {} but it wasn't there", event.getPath());
          return;
        }
        bytes.getAndAdd(event.getData().length);

        final Q object = transcoder.fromBytes(event.getData());

        if (allPathsMap.get(event.getPath()) != null) {
          resultsMap.putIfAbsent(allPathsMap.get(event.getPath()), new ArrayList<Q>());
          resultsMap.get(allPathsMap.get(event.getPath())).add(object);
        }
      } finally {
        latch.countDown();
      }
    }
  };

  return queryAndReturnResultsThrows(resultsMap, allPathsMap.keySet(), callback, latch, pathNameForLogs, bytes, CuratorQueryMethod.GET_DATA);
}
项目:bigstreams    文件:ZGroup.java   
@Override
public synchronized void processResult(CuratorFramework client,
        CuratorEvent event) throws Exception {
    if (event.getType().equals(CuratorEventType.CHILDREN)
            || event.getType().equals(CuratorEventType.CREATE)
            || event.getType().equals(CuratorEventType.DELETE)) {
        attach();
    }

}
项目:bee    文件:CuratorEventListener.java   
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
    WatchedEvent watchedEvent = (event == null) ? null : event.getWatchedEvent();
    if (watchedEvent == null
            || Watcher.Event.EventType.None == watchedEvent.getType())
        return;
    if (LOGGER.isInfoEnabled())
        LOGGER.info(String.format("zookeeper event received, type: %s, path: %s", watchedEvent.getType(), watchedEvent.getPath()));
    try {
        EventInfo eventInfo = parseEventInfo(watchedEvent);
        if (eventInfo == null) {
            if (LOGGER.isDebugEnabled())
                LOGGER.debug(String.format("no operate event, type: %s, path: %s", watchedEvent.getType(), watchedEvent.getPath()));
            return;
        }
        if (LOGGER.isInfoEnabled())
            LOGGER.info(String.format("zookeeper parse type: %s, eventtype: %s", eventInfo.type, eventInfo.eventType));

        if (eventInfo.type == ADDRESS) {
            // 监听服务提供的更节点
            serviceAddressChange(eventInfo);
        } else if (eventInfo.type == WEIGHT && Watcher.Event.EventType.NodeDataChanged == eventInfo.eventType) {
            // 监听服务提供的权重更节点
            serviceWeightChange(eventInfo);
        } else {
            if (LOGGER.isDebugEnabled())
                LOGGER.debug(String.format("no operate event, eventType: %s, path: %s", watchedEvent.getType(), watchedEvent.getPath()));
        }
    } catch (Exception e) {
        LOGGER.error("Error in ZookeeperWatcher.process()", e);
    }
    // 无须再次对节点进行监听,在获取子节点信息时,自动添加监听
}
项目:big-c    文件:DeleteCompletionCallback.java   
@Override
public void processResult(CuratorFramework client,
    CuratorEvent event) throws
    Exception {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Delete event {}", event);
  }
  events.incrementAndGet();
}
项目:big-c    文件:CuratorEventCatcher.java   
@Override
public void processResult(CuratorFramework client,
    CuratorEvent event) throws
    Exception {
  LOG.info("received {}", event);
  eventCounter.incrementAndGet();
  events.put(event);
}
项目:big-c    文件:TestCuratorService.java   
@Test
public void testBackgroundDelete() throws Throwable {
  mkPath("/rm", CreateMode.PERSISTENT);
  mkPath("/rm/child", CreateMode.PERSISTENT);
  CuratorEventCatcher events = new CuratorEventCatcher();
  curatorService.zkDelete("/rm", true, events);
  CuratorEvent taken = events.take();
  LOG.info("took {}", taken);
  assertEquals(1, events.getCount());
}
项目:vertx-service-discovery    文件:ZookeeperBackendService.java   
private void callback(Context context, Record record, Handler<AsyncResult<Record>> resultHandler, CuratorEvent curatorEvent) {
  runOnContextIfPossible(context, () -> {
    if (curatorEvent.getResultCode() == KeeperException.Code.OK.intValue()) {
      resultHandler.handle(Future.succeededFuture(record));
    } else {
      KeeperException.Code code =
          KeeperException.Code.get(curatorEvent.getResultCode());
      resultHandler.handle(Future.failedFuture(KeeperException.create(code)));
    }
  });
}
项目:vertx-configuration-service    文件:ZookeeperConfigurationStore.java   
private void retrieve(CuratorEvent event, Handler<AsyncResult<Buffer>> completionHandler) {
  KeeperException.Code code = KeeperException.Code.get(event.getResultCode());
  if (code == KeeperException.Code.OK) {
    completionHandler.handle(Future.succeededFuture(Buffer.buffer(event.getData())));
  } else if (code == KeeperException.Code.NONODE) {
    completionHandler.handle(Future.succeededFuture(Buffer.buffer("{}")));
  } else {
    completionHandler.handle(Future.failedFuture(KeeperException.create(code, path)));
  }
}
项目:vertx-configuration-service    文件:ZookeeperConfigurationStoreTest.java   
private void dataWrittenCallback(Handler<AsyncResult<Void>> handler, Context context, CuratorEvent event) {
  context.runOnContext((x) -> {
        if (event.getResultCode() == 0) {
          handler.handle(Future.succeededFuture());
        } else {
          handler.handle(Future.failedFuture(KeeperException
              .create(KeeperException.Code.get(event.getResultCode()))));
        }
      }
  );
}
项目:niubi-job-examples    文件:CurdTest.java   
public void testDelete() throws Exception {
    CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181,localhost:3181,localhost:4181", new ExponentialBackoffRetry(1000, 4));
    client.start();
    client.delete().inBackground(new BackgroundCallback() {
        public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
            System.out.println(JsonHelper.toJson(curatorEvent));
        }
    }).forPath("/curd-test/delete");

    Thread.sleep(10000);
}
项目:OSP-sample    文件:CurdTest.java   
public void testDelete() throws Exception {
    CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181,localhost:3181,localhost:4181", new ExponentialBackoffRetry(1000, 4));
    client.start();
    client.delete().inBackground(new BackgroundCallback() {
        public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
            System.out.println(JsonHelper.toJson(curatorEvent));
        }
    }).forPath("/curd-test/delete");

    Thread.sleep(10000);
}
项目:vertx-config    文件:ZookeeperConfigStore.java   
private void retrieve(CuratorEvent event, Handler<AsyncResult<Buffer>> completionHandler) {
  KeeperException.Code code = KeeperException.Code.get(event.getResultCode());
  if (code == KeeperException.Code.OK) {
    completionHandler.handle(Future.succeededFuture(Buffer.buffer(event.getData())));
  } else if (code == KeeperException.Code.NONODE) {
    completionHandler.handle(Future.succeededFuture(Buffer.buffer("{}")));
  } else {
    completionHandler.handle(Future.failedFuture(KeeperException.create(code, path)));
  }
}
项目:vertx-config    文件:ZookeeperConfigStoreTest.java   
private void dataWrittenCallback(Handler<AsyncResult<Void>> handler, Context context, CuratorEvent event) {
  context.runOnContext((x) -> {
        if (event.getResultCode() == 0) {
          handler.handle(Future.succeededFuture());
        } else {
          handler.handle(Future.failedFuture(KeeperException
              .create(KeeperException.Code.get(event.getResultCode()))));
        }
      }
  );
}
项目:flink    文件:ZooKeeperStateHandleStoreITCase.java   
/**
 * Tests that state handles are correctly removed with a callback.
 */
@Test
public void testRemoveWithCallback() throws Exception {
    // Setup
    LongStateStorage stateHandleProvider = new LongStateStorage();

    ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>(
            ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor());

    // Config
    final String pathInZooKeeper = "/testRemoveWithCallback";
    final Long state = 27255442L;

    store.add(pathInZooKeeper, state);

    final CountDownLatch sync = new CountDownLatch(1);
    BackgroundCallback callback = mock(BackgroundCallback.class);
    doAnswer(new Answer<Void>() {
        @Override
        public Void answer(InvocationOnMock invocation) throws Throwable {
            sync.countDown();
            return null;
        }
    }).when(callback).processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class));

    // Test
    store.remove(pathInZooKeeper, callback);

    // Verify discarded and callback called
    assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size());

    sync.await();

    verify(callback, times(1))
            .processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class));
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DeleteCompletionCallback.java   
@Override
public void processResult(CuratorFramework client,
    CuratorEvent event) throws
    Exception {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Delete event {}", event);
  }
  events.incrementAndGet();
}
项目:hadoop-2.6.0-cdh5.4.3    文件:CuratorEventCatcher.java   
@Override
public void processResult(CuratorFramework client,
    CuratorEvent event) throws
    Exception {
  LOG.info("received {}", event);
  eventCounter.incrementAndGet();
  events.put(event);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestCuratorService.java   
@Test
public void testBackgroundDelete() throws Throwable {
  mkPath("/rm", CreateMode.PERSISTENT);
  mkPath("/rm/child", CreateMode.PERSISTENT);
  CuratorEventCatcher events = new CuratorEventCatcher();
  curatorService.zkDelete("/rm", true, events);
  CuratorEvent taken = events.take();
  LOG.info("took {}", taken);
  assertEquals(1, events.getCount());
}
项目:jstorm-0.9.6.3-    文件:Zookeeper.java   
/**
 * connect ZK, register Watch/unhandle Watch
 * 
 * @return
 */
public CuratorFramework mkClient(Map conf, List<String> servers,
        Object port, String root, final WatcherCallBack watcher) {

    CuratorFramework fk = Utils.newCurator(conf, servers, port, root);

    fk.getCuratorListenable().addListener(new CuratorListener() {
        @Override
        public void eventReceived(CuratorFramework _fk, CuratorEvent e)
                throws Exception {
            if (e.getType().equals(CuratorEventType.WATCHED)) {
                WatchedEvent event = e.getWatchedEvent();

                watcher.execute(event.getState(), event.getType(),
                        event.getPath());
            }

        }
    });

    fk.getUnhandledErrorListenable().addListener(
            new UnhandledErrorListener() {
                @Override
                public void unhandledError(String msg, Throwable error) {
                    String errmsg = "Unrecoverable Zookeeper error, halting process: "
                            + msg;
                    LOG.error(errmsg, error);
                    JStormUtils.halt_process(1,
                            "Unrecoverable Zookeeper error");

                }
            });
    fk.start();
    return fk;
}
项目:learn_jstorm    文件:Zookeeper.java   
/**
 * connect ZK, register Watch/unhandle Watch
 * 
 * @return
 */
public CuratorFramework mkClient(Map conf, List<String> servers,
        Object port, String root, final WatcherCallBack watcher) {

    CuratorFramework fk = Utils.newCurator(conf, servers, port, root);

    fk.getCuratorListenable().addListener(new CuratorListener() {
        @Override
        public void eventReceived(CuratorFramework _fk, CuratorEvent e)
                throws Exception {
            if (e.getType().equals(CuratorEventType.WATCHED)) {
                WatchedEvent event = e.getWatchedEvent();

                watcher.execute(event.getState(), event.getType(),
                        event.getPath());
            }

        }
    });

    fk.getUnhandledErrorListenable().addListener(
            new UnhandledErrorListener() {
                @Override
                public void unhandledError(String msg, Throwable error) {
                    String errmsg = "Unrecoverable Zookeeper error, halting process: "
                            + msg;
                    LOG.error(errmsg, error);
                    JStormUtils.halt_process(1,
                            "Unrecoverable Zookeeper error");

                }
            });
    fk.start();
    return fk;
}
项目:jstrom    文件:Zookeeper.java   
/**
 * connect ZK, register Watch/unhandle Watch
 * 
 * @return
 */
public CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) {

    CuratorFramework fk = Utils.newCurator(conf, servers, port, root);

    fk.getCuratorListenable().addListener(new CuratorListener() {
        @Override
        public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception {
            if (e.getType().equals(CuratorEventType.WATCHED)) {
                WatchedEvent event = e.getWatchedEvent();

                watcher.execute(event.getState(), event.getType(), event.getPath());
            }

        }
    });

    fk.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
        @Override
        public void unhandledError(String msg, Throwable error) {
            String errmsg = "Unrecoverable Zookeeper error, halting process: " + msg;
            LOG.error(errmsg, error);
            JStormUtils.halt_process(1, "Unrecoverable Zookeeper error");

        }
    });
    fk.start();
    return fk;
}
项目:Tstream    文件:Zookeeper.java   
/**
 * connect ZK, register Watch/unhandle Watch
 * 
 * @return
 */
public CuratorFramework mkClient(Map conf, List<String> servers,
        Object port, String root, final WatcherCallBack watcher) {

    CuratorFramework fk = Utils.newCurator(conf, servers, port, root);

    fk.getCuratorListenable().addListener(new CuratorListener() {
        @Override
        public void eventReceived(CuratorFramework _fk, CuratorEvent e)
                throws Exception {
            if (e.getType().equals(CuratorEventType.WATCHED)) {
                WatchedEvent event = e.getWatchedEvent();

                watcher.execute(event.getState(), event.getType(),
                        event.getPath());
            }

        }
    });

    fk.getUnhandledErrorListenable().addListener(
            new UnhandledErrorListener() {
                @Override
                public void unhandledError(String msg, Throwable error) {
                    String errmsg = "Unrecoverable Zookeeper error, halting process: "
                            + msg;
                    LOG.error(errmsg, error);
                    JStormUtils.halt_process(1,
                            "Unrecoverable Zookeeper error");

                }
            });
    fk.start();
    return fk;
}