Java 类org.apache.curator.framework.recipes.cache.NodeCache 实例源码

项目:Equella    文件:ZookeeperServiceImpl.java   
@Override
public void startup()
{
    if( isCluster() )
    {
        CuratorFramework curator = getCurator();
        debugCache = new NodeCache(curator, CLUSTER_DEBUG_FULL_PATH);
        try
        {
            debugCache.start();
            createNode(SERVER_REL_PATH, "");
            membersCache = createPathCache(SERVER_REL_PATH, false);
            ensureDebug = curator.newNamespaceAwareEnsurePath(CLUSTER_DEBUG_FULL_PATH);
        }
        catch( Exception e )
        {
            Throwables.propagate(e);
        }
    }
    hasStarted = true;
}
项目:dble    文件:ZktoXmlMain.java   
/**
 * @param cache    NodeCache
 * @param zkListen
 * @throws Exception
 * @Created 2016/9/20
 */
private static void runWatch(final NodeCache cache, final ZookeeperProcessListen zkListen)
        throws Exception {
    cache.getListenable().addListener(new NodeCacheListener() {

        @Override
        public void nodeChanged() {
            LOGGER.info("ZktoxmlMain runWatch  process path  event start ");
            String notPath = cache.getCurrentData().getPath();
            LOGGER.info("NodeCache changed, path is: " + notPath);
            // notify
            zkListen.notify(notPath);
            LOGGER.info("ZktoxmlMain runWatch  process path  event over");
        }
    });
}
项目:configcenter    文件:RefreshTrigger.java   
private List<NodeCache> listenNodes(String profileCode, String[] appCodes) {
    ZkTemplate.NodeListener listener = new ZkTemplate.NodeListener() {
        @Override
        public void nodeChanged() throws Exception {
            try {
                refresher.refresh();
            } catch (Throwable e) {
                logger.error("触发刷新出错:", e);
            }
        }
    };
    List<NodeCache> nodeCaches = new ArrayList<>();
    for (String appCode : appCodes) {
        NodeCache nodeCache = zkTemplate.listenNode(ZkTemplate.buildPath(profileCode, appCode), false, listener);
        nodeCaches.add(nodeCache);
    }
    return nodeCaches;
}
项目:coco    文件:CuratorTest.java   
@Test
public void test_zkNode() throws Exception {
    CuratorFramework zk = curator.usingNamespace("namespace-test");
    String groupPath = "/group-1/localhost:9001";
    String s = zk.create().creatingParentsIfNeeded().forPath(groupPath);
    Assert.assertEquals(s, "/group-1/localhost:9001");
    NodeCache nodeCache = new NodeCache(zk, "/group-1", true);
    nodeCache.start();
    nodeCache.getListenable().addListener(new NodeCacheListener() {

        @Override
        public void nodeChanged() throws Exception {
            logger.info("node cache change");
        }
    });
    zk.setData().forPath("/group-1", "test-0".getBytes());
    zk.setData().forPath("/group-1", "test-2".getBytes());
}
项目:gondola    文件:ZookeeperConfigProvider.java   
private void initProvider(CuratorFramework client, String serviceName) throws Exception {
    this.client = client;
    this.serviceName = serviceName;
    tmpFile = confFile(true);
    confFile = confFile(false);
    String configPath = ZookeeperUtils.configPath(serviceName);

    byte[] bytes = client.getData().forPath(configPath);
    saveFile(bytes);
    cache = new NodeCache(client, configPath);
    cache.getListenable().addListener(() -> {
        try {
            if (cache.getCurrentData() != null) {
                byte[] data = cache.getCurrentData().getData();
                if (data.length > 0) {
                    saveFile(data);
                }
            }
        } catch (Exception e) {
            logger.warn("Error while processing config file change event. message={}", e.getMessage());
        }
    });
    cache.start();
}
项目:yuzhouwan    文件:CuratorNodeCache.java   
public void addNodeCacheListener(String path) throws Exception {
    Stat existStat = curatorFramework
            .checkExists()
            .forPath(path);
    if (existStat == null)
        curatorFramework
                .create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath(path);
    NodeCache nodeCache = new NodeCache(curatorFramework, path, false);
    nodeCache.start();
    nodeCache.getListenable().addListener(() -> {
                ChildData currentData = nodeCache.getCurrentData();
                LOG.info("New Cache Data: {}", currentData == null ? "null" : new String(currentData.getData()));
            }
    );
}
项目:ddth-zookeeper    文件:TestCuratorNodeWatch.java   
public static void main(String[] args) throws Exception {
    CuratorFramework framework = CuratorFrameworkFactory.builder()
            .connectString("localhost:2181").retryPolicy(new RetryNTimes(3, 2000)).build();
    try {
        framework.start();
        final NodeCache nodeCache = new NodeCache(framework, "/test");
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                ChildData data = nodeCache.getCurrentData();
                System.out.println(new String(data.getData()) + " / " + data);
            }
        });
        nodeCache.start();

        Thread.sleep(30000);

        nodeCache.close();
    } finally {
        framework.close();
    }
}
项目:kaa    文件:ControlNodeTracker.java   
/**
 * Start.
 *
 * @throws Exception the exception
 */
public void start() throws Exception { //NOSONAR
  LOG.info("Starting node tracker");
  zkClient.getUnhandledErrorListenable().addListener(errorsListener);
  if (createZkNode()) {
    controlCache = new NodeCache(zkClient, CONTROL_SERVER_NODE_PATH);
    controlCache.getListenable().addListener(new NodeCacheListener() {

      @Override
      public void nodeChanged() throws Exception {
        ChildData currentData = controlCache.getCurrentData();
        if (currentData == null) {
          LOG.warn("Control service node died!");
          onNoMaster();
        } else {
          LOG.warn("Control service node changed!");
          onMasterChange(currentData);
        }
      }
    });
    controlCache.start();
  } else {
    LOG.warn("Failed to create ZK node!");
  }
}
项目:interruptus    文件:AttachConfigurationListener.java   
private void registerListener(final CuratorFramework curator, final String path, final ZookeeperConfigurationListener listener) throws Exception
{
    final NodeCache cache   = new NodeCache(curator, path, true);

    cache.start();
    cache.getListenable().addListener(new NodeCacheListener()
    {
        @Override
        public void nodeChanged() throws Exception
        {
            listener.onChange(curator, cache, path);
        }
    });

    childrens.add(cache);
    logger.info(String.format("Add listener %s for %s", listener, path));
}
项目:redirector    文件:ZkNodeCacheWrapper.java   
public ZkNodeCacheWrapper(IDataSourceConnector connector, String path, NodeCache cache, boolean dataIsCompressed, boolean useCache, boolean useCacheWhenNotConnectedToDataSource) {
    super(connector);
    this.useCache = useCache;
    this.dataIsCompressed = dataIsCompressed;
    this.useCacheWhenNotConnectedToDataSource = useCacheWhenNotConnectedToDataSource;
    this.path = path;
    this.cache = cache;
}
项目:dble    文件:ZktoXmlMain.java   
private static void loadZkWatch(Set<String> setPaths, final CuratorFramework zkConn,
                                final ZookeeperProcessListen zkListen) throws Exception {
    if (null != setPaths && !setPaths.isEmpty()) {
        for (String path : setPaths) {
            final NodeCache node = new NodeCache(zkConn, path);
            node.start(true);
            runWatch(node, zkListen);
            LOGGER.info("ZktoxmlMain loadZkWatch path:" + path + " regist success");
        }
    }
}
项目:configcenter    文件:RefreshTrigger.java   
/**
 * 关闭(释放相关资源)
 */
public void close() {
    for (NodeCache nodeCache : nodeCaches) {
        try {
            nodeCache.close();
        } catch (IOException e) {
            logger.error("关闭节点监听器出错:", e);
        }
    }
    zkTemplate.close();
}
项目:ice    文件:ZooKeeperDynamicConfigSource.java   
private Observable<NodeCache> buildNodeCache(final ConfigDescriptor desc)
{
    return Observable.fromCallable(() -> {
        final String configPath = makePath(ROOT_ZK_PATH, desc.getConfigName());
        final NodeCache nc = new NodeCache(curator, configPath);
        nc.getListenable().addListener(() -> onNodeChanged(nc, desc));
        try {
            nc.start(true);

            // Note that we have to force calling onNodeChanged() here since `nc.start(true)` will not emit an initial event.
            onNodeChanged(nc, desc);

            // Create the ephemeral node last, just in case something goes wrong with setting up the node cache
            // NOTE: This process is what actually creates the configuration node if it was missing.
            PersistentEphemeralNode en = new PersistentEphemeralNode(curator, EPHEMERAL, makePath(configPath, localNodeName), new byte[0]);
            en.start();
            if (!en.waitForInitialCreate(getDefaultNodeCreationTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
                throw new TimeoutException("Timeout on creation of ephemeral node for " + makePath(configPath, localNodeName));
            }
            ephemeralNodes.put(desc, en);

            return nc;
        }
        catch (Exception ex) {
            log.warn("Failed to initialize for configPath {}", configPath, ex);
            throw ex;
        }
    });
}
项目:ice    文件:ZooKeeperDynamicConfigSource.java   
public void onNodeChanged(final NodeCache cache, final ConfigDescriptor desc)
{
    ChildData childData = cache.getCurrentData();
    try {
        Optional<String> valueOpt = Optional.empty();
        if (childData != null && childData.getData() != null && childData.getData().length > 0) {
            valueOpt = Optional.of(new String(childData.getData(), Charsets.UTF_8));
        }
        emitEvent(desc.getConfigName(), valueOpt);
    }
    catch (Exception ex) {
        log.warn("Failed to handle onNodeChanged w/ new data for config key {}, data {}", desc.getConfigName(), childData, ex);
    }
}
项目:nnproxy    文件:MountsManager.java   
@Override
protected void serviceStart() throws Exception {
    framework.start();
    nodeCache = new NodeCache(framework, zkMountTablePath, false);
    nodeCache.getListenable().addListener(new NodeCacheListener() {
        @Override
        public void nodeChanged() throws Exception {
            handleMountTableChange(nodeCache.getCurrentData().getData());
        }
    });
    nodeCache.start(false);
}
项目:incubator-omid    文件:TSOClient.java   
private void configureCurrentTSOServerZNodeCache(String currentTsoPath) {
    try {
        currentTSOZNode = new NodeCache(zkClient, currentTsoPath);
        currentTSOZNode.getListenable().addListener(this);
        currentTSOZNode.start(true);
    } catch (Exception e) {
        throw new IllegalStateException("Cannot start watcher on current TSO Server ZNode: " + e.getMessage());
    }
}
项目:emodb    文件:ZkValueStore.java   
public ZkValueStore(CuratorFramework curator, String zkPath, ZkValueSerializer<T> serializer, T defaultValue) {
    _curator = checkNotNull(curator, "curator");
    _zkPath = checkNotNull(zkPath, "zkPath");
    _serializer = checkNotNull(serializer, "serializer");
    _nodeCache = new NodeCache(curator, zkPath);
    _defaultValue = defaultValue;
    // Create node on instantiation
    // In practice, this creates a persistent zookeeper node without having to start the Managed resource.
    try {
        createNode();
    } catch (Exception e) {
        // Log a warning. We will try again when Managed is started
        _log.warn(format("Could not create node %s", _zkPath));
    }
}
项目:pravega    文件:ZKSegmentContainerMonitor.java   
/**
 * Creates an instance of ZKSegmentContainerMonitor.
 *
 * @param containerRegistry      The registry used to control the container state.
 * @param zkClient               The curator client.
 * @param pravegaServiceEndpoint The pravega endpoint for which we need to fetch the container assignment.
 */
ZKSegmentContainerMonitor(SegmentContainerRegistry containerRegistry, CuratorFramework zkClient,
                          Host pravegaServiceEndpoint, ScheduledExecutorService executor) {
    Preconditions.checkNotNull(zkClient, "zkClient");

    this.registry = Preconditions.checkNotNull(containerRegistry, "containerRegistry");
    this.host = Preconditions.checkNotNull(pravegaServiceEndpoint, "pravegaServiceEndpoint");
    this.executor = Preconditions.checkNotNull(executor, "executor");
    this.handles = new ConcurrentHashMap<>();
    this.pendingTasks = new ConcurrentSkipListSet<>();
    String clusterPath = ZKPaths.makePath("cluster", "segmentContainerHostMapping");
    this.hostContainerMapNode = new NodeCache(zkClient, clusterPath);
    this.assigmentTask = new AtomicReference<>();
}
项目:flink    文件:ZooKeeperLeaderElectionService.java   
/**
 * Creates a ZooKeeperLeaderElectionService object.
 *
 * @param client Client which is connected to the ZooKeeper quorum
 * @param latchPath ZooKeeper node path for the leader election latch
 * @param leaderPath ZooKeeper node path for the node which stores the current leader information
 */
public ZooKeeperLeaderElectionService(CuratorFramework client, String latchPath, String leaderPath) {
    this.client = Preconditions.checkNotNull(client, "CuratorFramework client");
    this.leaderPath = Preconditions.checkNotNull(leaderPath, "leaderPath");

    leaderLatch = new LeaderLatch(client, latchPath);
    cache = new NodeCache(client, leaderPath);

    issuedLeaderSessionID = null;
    confirmedLeaderSessionID = null;
    leaderContender = null;

    running = false;
}
项目:flink    文件:ZooKeeperLeaderRetrievalService.java   
/**
 * Creates a leader retrieval service which uses ZooKeeper to retrieve the leader information.
 *
 * @param client Client which constitutes the connection to the ZooKeeper quorum
 * @param retrievalPath Path of the ZooKeeper node which contains the leader information
 */
public ZooKeeperLeaderRetrievalService(CuratorFramework client, String retrievalPath) {
    this.client = Preconditions.checkNotNull(client, "CuratorFramework client");
    this.cache = new NodeCache(client, retrievalPath);

    this.leaderListener = null;
    this.lastLeaderAddress = null;
    this.lastLeaderSessionID = null;

    running = false;
}
项目:iConfig    文件:IConfig.java   
private Map<String, String> processNodeData(String path, NodeCache nodeCache) {
    ChildData currentData = nodeCache.getCurrentData();
    if (currentData == null) {
        throw new RuntimeException(String.format("node of path: %s do not exists", path));
    }
    String currentDataStr = new String(currentData.getData());
    logger.info("current data is : {}", currentDataStr);
    List<String> strings = Splitter.on("=").splitToList(currentDataStr);
    Map<String, String> map = Maps.newHashMap();
    map.put(strings.get(0), strings.get(1));
    return map;
}
项目:watchconf    文件:DynamicConfigZKAdapter.java   
public DynamicConfigZKAdapter(final Class<T> clazz,
                              final String path,
                              final CuratorFramework curatorFramework,
                              Converter<T, byte[]> converter,
                              ChangeListener<T> changeListener) throws Exception {
    super(clazz, converter, Optional.fromNullable(changeListener));
    Preconditions.checkNotNull(curatorFramework, "CuratorFramework cannot be null");
    Preconditions.checkArgument(curatorFramework.getState() == CuratorFrameworkState.STARTED, "CuratorFramework must be started");
    Preconditions.checkArgument(path != null && !path.isEmpty(), "path cannot be null or blank");

    this.curatorFramework = curatorFramework;
    this.path = path;
    this.nodeCache = new NodeCache(curatorFramework, path);
}
项目:gondola    文件:ZookeeperShardManagerServer.java   
@Override
public void stop() {
    singleThreadExecutor.shutdown();
    nodes.forEach(CloseableUtils::closeQuietly);
    for (NodeCache node : nodes) {
        try {
            node.close();
        } catch (IOException e) {
            // ignored.
            logger.warn("[{}] Close ZK node cache failed. message={}", gondola.getHostId(), e.getMessage());
        }
    }
    Utils.stopThreads(threads);
}
项目:stem    文件:ZookeeperClient.java   
public synchronized void close() { // TODO: implemente with AtomicReference<CloseFuture>
    logger.info("Close zookeeper client");
    try {
        for (NodeCache cache : cachePool) {
            logger.info("Close cache {}", cache);
            cache.getListenable().clear();
            cache.close();
        }
        cachePool.clear();
    } catch (IOException e) {
        logger.error("Error while closing zookeeper node listening caches", e);
    }
    client.close();

}
项目:stem    文件:ZookeeperClient.java   
/**
 * Listen for a single node
 *
 * @param path
 * @param listener
 * @throws Exception
 */
public void listenForZNode(String path, ZookeeperEventListener listener) throws Exception {
    init();
    // TODO: simplify code of this method to:
    // ZNodeListener nodeListener = new ZNodeListener(path, listener, client);

    NodeCache cache = new NodeCache(client, path);
    cache.start();

    NodeCacheListener cacheListener = new ZNodeListener(
            listener.getHandler(), cache);

    cache.getListenable().addListener(cacheListener);
    cachePool.add(cache);
}
项目:stem    文件:ZookeeperClient.java   
public void forcReadListenForZNode(String path, ZookeeperEventListener listener) throws Exception {
    init();
    NodeCache cache = new NodeCache(client, path);
    cache.start();

    NodeCacheListener cacheListener = new ZNodeListener(
            listener.getHandler(), cache);

    cache.getListenable().addListener(cacheListener);
    cacheListener.nodeChanged();
    cachePool.add(cache);
}
项目:ddth-zookeeper    文件:ZooKeeperClient.java   
private void _initCacheWatcher() {
    cacheNodeWatcher = CacheBuilder.newBuilder()
            .concurrencyLevel(Runtime.getRuntime().availableProcessors()).maximumSize(10000)
            .expireAfterAccess(3600, TimeUnit.SECONDS)
            .removalListener(new RemovalListener<String, NodeCache>() {
                @Override
                public void onRemoval(RemovalNotification<String, NodeCache> event) {
                    try {
                        event.getValue().close();
                    } catch (IOException e) {
                        LOGGER.warn(e.getMessage(), e);
                    }
                }
            }).build(new CacheLoader<String, NodeCache>() {
                @Override
                public NodeCache load(final String path) throws Exception {
                    final NodeCache nodeCache = new NodeCache(curatorFramework, path);
                    nodeCache.getListenable().addListener(new NodeCacheListener() {
                        @Override
                        public void nodeChanged() throws Exception {
                            ChildData data = nodeCache.getCurrentData();
                            _invalidateCache(path, data != null ? data.getData() : null);
                        }
                    });
                    nodeCache.start();
                    return nodeCache;
                }
            });
}
项目:hbase    文件:ZooKeeperScanPolicyObserver.java   
private void create() throws Exception {
  client =
      CuratorFrameworkFactory.builder().connectString(ensemble).sessionTimeoutMs(sessionTimeout)
          .retryPolicy(new RetryForever(1000)).canBeReadOnly(true).build();
  client.start();
  cache = new NodeCache(client, NODE);
  cache.start(true);
}
项目:hbase    文件:ZooKeeperScanPolicyObserver.java   
public synchronized NodeCache acquire() throws Exception {
  if (ref == 0) {
    try {
      create();
    } catch (Exception e) {
      close();
      throw e;
    }
  }
  ref++;
  return cache;
}
项目:xio    文件:ZkClient.java   
public void startNodeCache(NodeCache cache) {
  try {
    cache.start();
  } catch (Exception e) {
    log.error("Error starting nodeCache {}", cache, e);
    throw new RuntimeException(e);
  }
}
项目:xio    文件:ZkClient.java   
public void stopNodeCache(NodeCache cache) {
  try {
    cache.close();
  } catch (IOException e) {
    log.error("Error stopping nodeCache {}", cache, e);
    throw new RuntimeException(e);
  }
}
项目:xio    文件:ZkClient.java   
public void registerUpdater(ConfigurationUpdater updater) {
  NodeCache cache = getOrCreateNodeCache(updater.getPath());

  cache
      .getListenable()
      .addListener(
          new NodeCacheListener() {
            @Override
            public void nodeChanged() {
              updater.update(cache.getCurrentData().getData());
            }
          });
}
项目:Pistachio    文件:CustomizationRegistry.java   
public void init() {
    try
    {
        logger.info("init...");
        client = CuratorFrameworkFactory.newClient(
            ConfigurationManager.getConfiguration().getString(ZOOKEEPER_SERVER),
            new ExponentialBackoffRetry(1000, 3));
        client.start();

        // in this example we will cache data. Notice that this is optional.
        cache = new NodeCache(client, getZKPath());
        cache.start();

        cache.getListenable().addListener(this);

        nodeChanged();

    } catch (Exception e) {
        logger.info("error ", e);
    }
    /*
    finally
    {
        CloseableUtils.closeQuietly(cache);
        CloseableUtils.closeQuietly(client);
    }
    */
}
项目:zkconfig-resources    文件:ZkBasedNodeResource.java   
private static String path(NodeCache nodeCache) {
    try {
        if (nodeCache == null) {
            return "n/a";
        }
        Field f = NodeCache.class.getDeclaredField("path");
        f.setAccessible(true);
        return (String) f.get(nodeCache);
    } catch (Throwable e) {
        logger.error("Ops.fail to get path from node:{}, exception:{}", nodeCache,
                e.toString());
        return null;
    }
}
项目:zkconfig-resources    文件:ZkBasedNodeResource.java   
public T get() {
    checkClosed();
    if (resource == null) {
        if (zkNodeExists == NOT_EXISTS) { // for performance, short circuit it outside sync block.
            return emptyObject;
        }
        synchronized (lock) {
            checkClosed();
            if (resource == null) {
                NodeCache cache = nodeCache.get();
                tryAddListener(cache);
                ChildData currentData = cache.getCurrentData();
                if (currentData == null || currentData.getData() == null) {
                    zkNodeExists = NOT_EXISTS;
                    if (!emptyLogged) { // 只在刚开始一次或者几次打印这个log
                        logger.warn("found no zk path for:{}, using empty data:{}", path(cache),
                                emptyObject);
                        emptyLogged = true;
                    }
                    return emptyObject;
                }
                zkNodeExists = EXISTS;
                try {
                    resource = factory.apply(currentData.getData(), currentData.getStat());
                    if (onResourceChange != null) {
                        onResourceChange.accept(resource, emptyObject);
                    }
                } catch (Exception e) {
                    factoryFailedListener.accept(e);
                    throwIfUnchecked(e);
                    throw new RuntimeException(e);
                }
            }
        }
    }
    return resource;
}
项目:zkconfig-resources    文件:ZkBasedNodeResource.java   
private void tryAddListener(NodeCache cache) {
    if (!hasAddListener) {
        NodeCacheListener nodeCacheListener = () -> {
            T oldResource;
            synchronized (lock) {
                ChildData data = cache.getCurrentData();
                oldResource = resource;
                if (data != null && data.getData() != null) {
                    zkNodeExists = EXISTS;
                    ListenableFuture<T> future = refreshFactory.apply(data.getData(),
                            data.getStat());
                    addCallback(future, new FutureCallback<T>() {

                        @Override
                        public void onSuccess(@Nullable T result) {
                            resource = result;
                            cleanup(resource, oldResource, cache);
                        }

                        @Override
                        public void onFailure(Throwable t) {
                            factoryFailedListener.accept(t);
                            logger.error("", t);
                        }
                    }, directExecutor());
                } else {
                    zkNodeExists = NOT_EXISTS;
                    resource = null;
                    emptyLogged = false;
                    cleanup(resource, oldResource, cache);
                }
            }
        };
        cache.getListenable().addListener(nodeCacheListener);
        nodeCacheRemoveListener = () -> cache.getListenable().removeListener(nodeCacheListener);
        hasAddListener = true;
    }
}
项目:zkconfig-resources    文件:ZkBasedNodeResource.java   
private void cleanup(T currentResource, T oldResource, NodeCache nodeCache) {
    if (oldResource != null && oldResource != emptyObject) {
        if (currentResource == oldResource) {
            logger.warn(
                    "[BUG!!!!] should NOT occurred, old resource is same as current, path:{}, {}",
                    path(nodeCache), oldResource);
        } else {
            new ThreadFactoryBuilder() //
                    .setNameFormat("old [" + oldResource.getClass().getSimpleName()
                            + "] cleanup thread-[%d]")
                    .setUncaughtExceptionHandler(
                            (t, e) -> logger.error("fail to cleanup resource, path:{}, {}",
                                    path(nodeCache), oldResource.getClass().getSimpleName(), e)) //
                    .setPriority(MIN_PRIORITY) //
                    .setDaemon(true) //
                    .build() //
                    .newThread(() -> {
                        do {
                            if (waitStopPeriod > 0) {
                                sleepUninterruptibly(waitStopPeriod, MILLISECONDS);
                            }
                            if (cleanup.test(oldResource)) {
                                break;
                            }
                        } while (true);
                        if (onResourceChange != null) {
                            onResourceChange.accept(currentResource, oldResource);
                        }
                    }).start();
            return;
        }
    }
    if (onResourceChange != null) {
        onResourceChange.accept(currentResource, oldResource);
    }
}
项目:ZKRecipesByExample    文件:NodeCacheExample.java   
private static void addListener(final NodeCache cache) {
    // a PathChildrenCacheListener is optional. Here, it's used just to log
    // changes
    NodeCacheListener listener = new NodeCacheListener() {

        @Override
        public void nodeChanged() throws Exception {
            if (cache.getCurrentData() != null)
                System.out.println("Node changed: " + cache.getCurrentData().getPath() + ", value: " + new String(cache.getCurrentData().getData()));
        }
    };
    cache.getListenable().addListener(listener);
}
项目:ZKRecipesByExample    文件:NodeCacheExample.java   
private static void processCommands(CuratorFramework client, NodeCache cache) throws Exception {
    printHelp();
    try {
        addListener(cache);
        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        boolean done = false;
        while (!done) {
            System.out.print("> ");
            String line = in.readLine();
            if (line == null) {
                break;
            }
            String command = line.trim();
            String[] parts = command.split("\\s");
            if (parts.length == 0) {
                continue;
            }
            String operation = parts[0];
            String args[] = Arrays.copyOfRange(parts, 1, parts.length);
            if (operation.equalsIgnoreCase("help") || operation.equalsIgnoreCase("?")) {
                printHelp();
            } else if (operation.equalsIgnoreCase("q") || operation.equalsIgnoreCase("quit")) {
                done = true;
            } else if (operation.equals("set")) {
                setValue(client, command, args);
            } else if (operation.equals("remove")) {
                remove(client);
            } else if (operation.equals("show")) {
                show(cache);
            }
            Thread.sleep(1000); // just to allow the console output to catch
                                // up
        }
    } catch (Exception ex) {
        ex.printStackTrace();
    } finally {

    }
}
项目:commons-configuration-zookeeper    文件:ZKNodeChangeEventReloadingStrategy.java   
@Override
public void init() {

    final NodeCache node = _configuration.getNode();
    if (node != null) {
        node.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                _configuration.reload();
            }
        });
    }
}