Java 类org.apache.curator.framework.recipes.cache.TreeCacheListener 实例源码

项目:flow-platform    文件:ZKClient.java   
public boolean watchTree(String path, TreeCacheListener listener) {
    TreeCache tc = nodeTreeCache.get(path);
    if (tc != null) {
        return false; // node been listened
    }

    try {
        tc = TreeCache.newBuilder(client, path).build();
        tc.start();

        if (executor != null) {
            tc.getListenable().addListener(listener, executor);
        } else {
            tc.getListenable().addListener(listener);
        }

        nodeTreeCache.put(path, tc);
        return true;
    } catch (Throwable e) {
        throw checkException(String.format("Unable to watch tree for path: %s", path), e);
    }
}
项目:elastic-jobx    文件:JobNodeStorageTest.java   
@Test
public void assertAddDataListener() {
    TreeCache treeCache = mock(TreeCache.class);
    @SuppressWarnings("unchecked")
    Listenable<TreeCacheListener> listeners = mock(Listenable.class);
    TreeCacheListener listener = mock(TreeCacheListener.class);
    when(treeCache.getListenable()).thenReturn(listeners);
    when(coordinatorRegistryCenter.getRawCache("/testJob")).thenReturn(treeCache);
    jobNodeStorage.addDataListener(listener);
    verify(listeners).addListener(listener);
}
项目:elastic-jobx    文件:ConsoleRegistryCenter.java   
/**
 * 注册数据监听器.
 *
 * @param listener 监听器
 */
public ConsoleRegistryCenter addDataListener(final TreeCacheListener listener, final String cachePath) {
    TreeCache cache = (TreeCache) registryCenter.getRawCache(cachePath);
    cache.getListenable().addListener(listener);

    return this;
}
项目:ElasticJob    文件:JobNodeStorageTest.java   
@Test
public void assertAddDataListener() {
    TreeCache treeCache = mock(TreeCache.class);
    @SuppressWarnings("unchecked")
    Listenable<TreeCacheListener> listeners = mock(Listenable.class);
    TreeCacheListener listener = mock(TreeCacheListener.class);
    when(treeCache.getListenable()).thenReturn(listeners);
    when(coordinatorRegistryCenter.getRawCache("/testJob")).thenReturn(treeCache);
    jobNodeStorage.addDataListener(listener);
    verify(listeners).addListener(listener);
}
项目:frc    文件:ZkClient.java   
/**
 * listen the zookeeper event
 * 
 * @param logIndex
 * @param cache
 */
private void addListener(final long logIndex, TreeCache cache) {
    final String flag = this.getClassName() + ".addListener";
    TreeCacheListener plis = new TreeCacheListener() {
        public void childEvent(CuratorFramework client, TreeCacheEvent event) {
            switch (event.getType()) {
                case NODE_ADDED: {
                    int resAdd = nodeAdd(logIndex, client, event);
                    if (resAdd == DefaultValues.ZK_ADD_SUC) {
                        rNodeNum++;
                    }

                    if (resAdd != DefaultValues.ZK_ADD_ROOT && rNodeNum == 0) {
                        FRCLogger.getInstance().warn(logIndex, flag, "No redis exist!", null);
                    }

                    break;
                }

                case NODE_UPDATED: {
                    updateNode(logIndex, event);
                    break;
                }

                case NODE_REMOVED: {
                    //removeNode(logIndex, event);
                    break;
                }
                default:
                    break;
            }
        }

    };

    cache.getListenable().addListener(plis);
}
项目:seldon-server    文件:ZkSubscriptionHandler.java   
public void addSubscription(String location, TreeCacheListener listener) throws Exception {
    CuratorFramework client = curator.getCurator();
    EnsurePath ensureMvTestPath = new EnsurePath(location);
    ensureMvTestPath.ensure(client.getZookeeperClient());
    TreeCache cache = new TreeCache(client, location);
    cache.getListenable().addListener(listener);
    cache.start();
    caches.put(location, cache);
    logger.info("Added ZooKeeper subscriber for " + location + " children.");
}
项目:metron    文件:ZKCache.java   
private ZKCache(CuratorFramework client, List<TreeCacheListener> listeners, String zkRoot, boolean ownClient) {
  this.client = client;
  this.listeners = listeners;
  this.ownClient = ownClient;
  if(zkRoot == null) {
    throw new IllegalArgumentException("Zookeeper root must not be null.");
  }
  this.zkRoot = zkRoot;
}
项目:metron    文件:ZKCache.java   
/**
 * Start the cache.
 * @throws Exception If unable to be started.
 */
public void start() throws Exception {
  if(cache == null) {
    if(ownClient) {
      client.start();
    }
    TreeCache.Builder builder = TreeCache.newBuilder(client, zkRoot);
    builder.setCacheData(true);
    cache = builder.build();
    for(TreeCacheListener l : listeners) {
      cache.getListenable().addListener(l);
    }
    cache.start();
  }
}
项目:ZKRecipesByExample    文件:TreeCacheExample.java   
private static void addListener(final TreeCache cache) {
    TreeCacheListener listener = new TreeCacheListener() {

        @Override
        public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
            switch (event.getType()) {
            case NODE_ADDED: {
                System.out.println("TreeNode added: " + ZKPaths.getNodeFromPath(event.getData().getPath()) + ", value: "
                        + new String(event.getData().getData()));
                break;
            }
            case NODE_UPDATED: {
                System.out.println("TreeNode changed: " + ZKPaths.getNodeFromPath(event.getData().getPath()) + ", value: "
                        + new String(event.getData().getData()));
                break;
            }
            case NODE_REMOVED: {
                System.out.println("TreeNode removed: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
                break;
            }
            default:
                System.out.println("Other event: " + event.getType().name());
            }
        }

    };

    cache.getListenable().addListener(listener);
}
项目:mpush    文件:ZKClient.java   
public void registerListener(TreeCacheListener listener) {
    cache.getListenable().addListener(listener);
}
项目:coco    文件:CuratorTest.java   
@Test
public void test_tree_cache() throws Exception {
    String rootPath = "/namespace-test";
    CuratorFramework zk = curator.usingNamespace("namespace-test");
    String groupPath = "/group-1";
    String s = zk.create().forPath(groupPath,"group-1-data".getBytes());
    Assert.assertEquals(groupPath, s);
    Stat stat = zk.checkExists().forPath("/group-1");
    Assert.assertNotNull(stat);
    stat = zk.checkExists().forPath(rootPath);
    Assert.assertNull(stat);

    final TreeCacheEvent.Type[] saveEventType = new TreeCacheEvent.Type[1];
    final long[] saveTime = new long[1];
    TreeCache treeCache = new TreeCache(curator, rootPath);
    treeCache.start();
    ChildData childData = treeCache.getCurrentData("/namespace-test/group-1");

    treeCache.getListenable().addListener(new TreeCacheListener() {

        @Override
        public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
            logger.info("event type={}", event.getType());
            switch (event.getType()) {
                case NODE_ADDED:
                    saveEventType[0] = TreeCacheEvent.Type.NODE_ADDED;
                    saveTime[0] = System.currentTimeMillis();
                    logger.info("child[path={}, date={}] added", event.getData().getPath(), new String(event.getData().getData()));
                    break;
                case NODE_UPDATED:
                    saveEventType[0] = TreeCacheEvent.Type.NODE_UPDATED;
                    saveTime[0] = System.currentTimeMillis();
                    logger.info("child[path={}, date={}] updated", event.getData().getPath(), new String(event.getData().getData()));
                    break;
                case NODE_REMOVED:
                    saveEventType[0] = TreeCacheEvent.Type.NODE_REMOVED;
                    saveTime[0] = System.currentTimeMillis();
                    logger.info("child[path={}, date={}] updated", event.getData().getPath(), new String(event.getData().getData()));
                    break;
                case CONNECTION_SUSPENDED:
                    break;
                case CONNECTION_RECONNECTED:
                    break;
                case CONNECTION_LOST:
                    break;
                case INITIALIZED:
                    break;
            }
        }
    });
    String hostPath = groupPath + "/localhost:8001";
    zk.create().forPath(hostPath);
    long wtStart = System.currentTimeMillis();
    Thread.sleep(300);
    // use 15 ms
    // System.out.println("listener wait time="+(saveTime[0] - wtStart));
    Assert.assertEquals(TreeCacheEvent.Type.NODE_ADDED, saveEventType[0]);
    // System.out.println(new String(zk.getData().forPath(hostPath)));

    // create three node
    String threePath = groupPath + "/hosts/localhost:8001";
    zk.create().creatingParentsIfNeeded().forPath(threePath);
    zk.setData().forPath(threePath, "{tree}".getBytes());
    // test update
    zk.setData().forPath(hostPath, "{}".getBytes());
    Thread.sleep(300);
    Assert.assertEquals(TreeCacheEvent.Type.NODE_UPDATED, saveEventType[0]);

    // test set parent node's data
    zk.setData().forPath("/group-1", "{grou-data}".getBytes());
    Thread.sleep(300);
    Assert.assertEquals(TreeCacheEvent.Type.NODE_UPDATED, saveEventType[0]);
}
项目:ibole-microservice    文件:ZkServiceDiscovery.java   
/**
 * 监控指定节点和节点下的所有的节点的变化-无限监听.
 * <pre>Cache synchronization:
 * http://stackoverflow.com/questions/39557653/zookeeper-curator-cache-how-to-wait-for-synchronization
 * <pre>The problem of treecache-eventual-consistency:
 * http://stackoverflow.com/questions/41922928/curator-treecache-eventual-consistency
 * @param serviceName the service name to retrieve the cache
 * @param listener the service state listener
 */
@Override
public void watchForCacheUpdates(String serviceName, ServiceStateListener listener) {
  cache.getListenable().addListener(new TreeCacheListener() {
    @Override
    public void childEvent(CuratorFramework client, TreeCacheEvent event)
        throws Exception {
      //Filter the even coming from leases&locks
      if (event.getData() != null && !event.getData().getPath().contains("leases")
          && !event.getData().getPath().contains("locks")) {
        switch (event.getType()) {
          case NODE_ADDED:
            // update the all children - filter the node_added even is for the parent node creation for serviceName
            if (event.getData().getPath().contains(serviceName)) {
              listener.update(getServiceInstancesFromCache(serviceName));
              if (logger.isInfoEnabled()) {
                logger.info("Service is added at path '{}'", event.getData().getPath());
              }
            }
            break;
          case NODE_REMOVED:
            listener.update(getServiceInstancesFromCache(serviceName));
            if (logger.isInfoEnabled()) {
              logger.info("Service is removed at path '{}'", event.getData().getPath());
            }
            break;
          case NODE_UPDATED:
            listener.update(getServiceInstancesFromCache(serviceName));
            if (logger.isInfoEnabled()) {
              logger.info("Service is updated at path '{}'", event.getData().getPath());
            }
          default:
            break;
        }
      }
    }

  });
  try {
    cache.start();
  } catch (Exception e) {
    logger.error("Watch start error happened for path '{}'!", buildBasePath(), e);
    throw new ServiceDiscoveryException(e);
  }
}
项目:HeliosStreams    文件:AdminFinder.java   
/**
 * Starts an AdminURL bind event loop
 */
protected void waitForAdminURLBind() {
    final TreeCache tc = TreeCache.newBuilder(cf, ZOOKEEP_URL)
            //.setExecutor(threadFactory)
            //.setExecutor((ExecutorService)threadPool)
            //.setCacheData(true)
            .build();
    final AtomicBoolean waiting = new AtomicBoolean(true);
    final Thread waitForAdminURLThread = threadFactory.newThread(new Runnable(){
        @Override
        public void run() {
            final Thread waitThread = Thread.currentThread();
            try {                       
                final Listenable<TreeCacheListener> listen = tc.getListenable();            
                listen.addListener(new TreeCacheListener(){
                    @Override
                    public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
                        ChildData cd = event.getData();
                        if(cd!=null) {
                            log.info("TreeCache Bound [{}]", cd.getPath());
                            final String boundPath = cd.getPath();
                            if(ZOOKEEP_URL.equals(boundPath)) {
                                updateAdminURL(cd.getData());
                                tc.close();
                                waiting.set(false);
                                waitThread.interrupt();
                            }
                        }
                    }
                });
                tc.start();
                log.debug("AdminURL TreeCache Started");            
                // Check for the data one more time in case we missed 
                // the bind event while setting up the listener
                final ZooKeeper z = cf.getZookeeperClient().getZooKeeper();
                final Stat st = z.exists(ZOOKEEP_URL, false);
                if(st!=null) {
                    updateAdminURL(z.getData(ZOOKEEP_URL, false, st));
                    tc.close();
                }
                while(true) {
                    try {
                        Thread.currentThread().join(retryPauseTime);
                        log.info("Still waiting for AdminURL....");
                    } catch (InterruptedException iex) {
                        if(Thread.interrupted()) Thread.interrupted();
                    }
                    if(!waiting.get()) break;
                }
                log.info("Ended wait for AdminURL");                    
            } catch (Exception ex) {
                log.error("Failed to wait for AdminURL bind",  ex);
                // FIXME:
            } finally {
                try { tc.close(); } catch (Exception x) {/* No Op */}
            }               
        }
    });
    waitForAdminURLThread.start();
}
项目:elastic-jobx    文件:AbstractGlobalListenerManager.java   
protected void addDataListener(final TreeCacheListener listener) {
    globalNodeStorage.addDataListener(listener);
}
项目:elastic-jobx    文件:AbstractListenerManager.java   
protected void addDataListener(final TreeCacheListener listener) {
    jobNodeStorage.addDataListener(listener);
}
项目:elastic-jobx    文件:JobNodeStorage.java   
/**
 * 注册数据监听器.
 */
public void addDataListener(final TreeCacheListener listener) {
    TreeCache cache = (TreeCache) coordinatorRegistryCenter.getRawCache("/" + jobConfiguration.getJobName());
    cache.getListenable().addListener(listener);
}
项目:ElasticJob    文件:AbstractListenerManager.java   
protected void addDataListener(final TreeCacheListener listener) {
    jobNodeStorage.addDataListener(listener);
}
项目:ElasticJob    文件:JobNodeStorage.java   
/**
 * 注册数据监听器.
 */
public void addDataListener(final TreeCacheListener listener) {
    TreeCache cache = (TreeCache) coordinatorRegistryCenter.getRawCache("/" + jobConfiguration.getJobName());
    cache.getListenable().addListener(listener);
}
项目:elastic-config    文件:ZookeeperListenerManager.java   
/**
 * 添加数据结点监听器.
 */
private void addDataListener(TreeCacheListener listener) {
    zookeeperConfigGroup.getConfigNodeStorage().addDataListener(listener);
}
项目:elastic-config    文件:ConfigNodeStorage.java   
/**
 * 注册配置监听器.
 */
public void addDataListener(final TreeCacheListener listener) {
    TreeCache cache = (TreeCache) elasticConfigRegistryCenter
            .getRawCache(configProfile.getConcurrentRootNodePath());
    cache.getListenable().addListener(listener);
}
项目:elastic-jobx    文件:GlobalNodeStorage.java   
/**
 * 注册数据监听器.
 *
 * @param listener 监听器
 */
public void addDataListener(final TreeCacheListener listener) {
    TreeCache cache = (TreeCache) coordinatorRegistryCenter.getRawCache(GlobalNodePath.ROOT);
    cache.getListenable().addListener(listener);
}
项目:elastic-jobx    文件:NamespaceNodeStorage.java   
/**
 * 注册数据监听器.
 *
 * @param listener 监听器
 */
public void addDataListener(final TreeCacheListener listener) {
    TreeCache cache = (TreeCache) coordinatorRegistryCenter.getRawCache("/");
    cache.getListenable().addListener(listener);
}
项目:elastic-jobx    文件:NamespaceNodeStorage.java   
/**
 * 移除数据监听器.
 *
 * @param listener 监听器
 */
public void removeDataListener(final TreeCacheListener listener) {
    TreeCache cache = (TreeCache) coordinatorRegistryCenter.getRawCache("/");
    cache.getListenable().removeListener(listener);
}
项目:metron    文件:ZKCache.java   
/**
 * Specify the treecache listener, which will be called when changes happen to the zookeeper root.
 *
 * @param listener The callback which is called when changes happen in zookeeper.
 * @return The Builder
 */
public Builder withListener(TreeCacheListener listener) {
  this.listener.add(listener);
  return this;
}