public boolean watchTree(String path, TreeCacheListener listener) { TreeCache tc = nodeTreeCache.get(path); if (tc != null) { return false; // node been listened } try { tc = TreeCache.newBuilder(client, path).build(); tc.start(); if (executor != null) { tc.getListenable().addListener(listener, executor); } else { tc.getListenable().addListener(listener); } nodeTreeCache.put(path, tc); return true; } catch (Throwable e) { throw checkException(String.format("Unable to watch tree for path: %s", path), e); } }
@Test public void assertAddDataListener() { TreeCache treeCache = mock(TreeCache.class); @SuppressWarnings("unchecked") Listenable<TreeCacheListener> listeners = mock(Listenable.class); TreeCacheListener listener = mock(TreeCacheListener.class); when(treeCache.getListenable()).thenReturn(listeners); when(coordinatorRegistryCenter.getRawCache("/testJob")).thenReturn(treeCache); jobNodeStorage.addDataListener(listener); verify(listeners).addListener(listener); }
/** * 注册数据监听器. * * @param listener 监听器 */ public ConsoleRegistryCenter addDataListener(final TreeCacheListener listener, final String cachePath) { TreeCache cache = (TreeCache) registryCenter.getRawCache(cachePath); cache.getListenable().addListener(listener); return this; }
/** * listen the zookeeper event * * @param logIndex * @param cache */ private void addListener(final long logIndex, TreeCache cache) { final String flag = this.getClassName() + ".addListener"; TreeCacheListener plis = new TreeCacheListener() { public void childEvent(CuratorFramework client, TreeCacheEvent event) { switch (event.getType()) { case NODE_ADDED: { int resAdd = nodeAdd(logIndex, client, event); if (resAdd == DefaultValues.ZK_ADD_SUC) { rNodeNum++; } if (resAdd != DefaultValues.ZK_ADD_ROOT && rNodeNum == 0) { FRCLogger.getInstance().warn(logIndex, flag, "No redis exist!", null); } break; } case NODE_UPDATED: { updateNode(logIndex, event); break; } case NODE_REMOVED: { //removeNode(logIndex, event); break; } default: break; } } }; cache.getListenable().addListener(plis); }
public void addSubscription(String location, TreeCacheListener listener) throws Exception { CuratorFramework client = curator.getCurator(); EnsurePath ensureMvTestPath = new EnsurePath(location); ensureMvTestPath.ensure(client.getZookeeperClient()); TreeCache cache = new TreeCache(client, location); cache.getListenable().addListener(listener); cache.start(); caches.put(location, cache); logger.info("Added ZooKeeper subscriber for " + location + " children."); }
private ZKCache(CuratorFramework client, List<TreeCacheListener> listeners, String zkRoot, boolean ownClient) { this.client = client; this.listeners = listeners; this.ownClient = ownClient; if(zkRoot == null) { throw new IllegalArgumentException("Zookeeper root must not be null."); } this.zkRoot = zkRoot; }
/** * Start the cache. * @throws Exception If unable to be started. */ public void start() throws Exception { if(cache == null) { if(ownClient) { client.start(); } TreeCache.Builder builder = TreeCache.newBuilder(client, zkRoot); builder.setCacheData(true); cache = builder.build(); for(TreeCacheListener l : listeners) { cache.getListenable().addListener(l); } cache.start(); } }
private static void addListener(final TreeCache cache) { TreeCacheListener listener = new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { switch (event.getType()) { case NODE_ADDED: { System.out.println("TreeNode added: " + ZKPaths.getNodeFromPath(event.getData().getPath()) + ", value: " + new String(event.getData().getData())); break; } case NODE_UPDATED: { System.out.println("TreeNode changed: " + ZKPaths.getNodeFromPath(event.getData().getPath()) + ", value: " + new String(event.getData().getData())); break; } case NODE_REMOVED: { System.out.println("TreeNode removed: " + ZKPaths.getNodeFromPath(event.getData().getPath())); break; } default: System.out.println("Other event: " + event.getType().name()); } } }; cache.getListenable().addListener(listener); }
public void registerListener(TreeCacheListener listener) { cache.getListenable().addListener(listener); }
@Test public void test_tree_cache() throws Exception { String rootPath = "/namespace-test"; CuratorFramework zk = curator.usingNamespace("namespace-test"); String groupPath = "/group-1"; String s = zk.create().forPath(groupPath,"group-1-data".getBytes()); Assert.assertEquals(groupPath, s); Stat stat = zk.checkExists().forPath("/group-1"); Assert.assertNotNull(stat); stat = zk.checkExists().forPath(rootPath); Assert.assertNull(stat); final TreeCacheEvent.Type[] saveEventType = new TreeCacheEvent.Type[1]; final long[] saveTime = new long[1]; TreeCache treeCache = new TreeCache(curator, rootPath); treeCache.start(); ChildData childData = treeCache.getCurrentData("/namespace-test/group-1"); treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { logger.info("event type={}", event.getType()); switch (event.getType()) { case NODE_ADDED: saveEventType[0] = TreeCacheEvent.Type.NODE_ADDED; saveTime[0] = System.currentTimeMillis(); logger.info("child[path={}, date={}] added", event.getData().getPath(), new String(event.getData().getData())); break; case NODE_UPDATED: saveEventType[0] = TreeCacheEvent.Type.NODE_UPDATED; saveTime[0] = System.currentTimeMillis(); logger.info("child[path={}, date={}] updated", event.getData().getPath(), new String(event.getData().getData())); break; case NODE_REMOVED: saveEventType[0] = TreeCacheEvent.Type.NODE_REMOVED; saveTime[0] = System.currentTimeMillis(); logger.info("child[path={}, date={}] updated", event.getData().getPath(), new String(event.getData().getData())); break; case CONNECTION_SUSPENDED: break; case CONNECTION_RECONNECTED: break; case CONNECTION_LOST: break; case INITIALIZED: break; } } }); String hostPath = groupPath + "/localhost:8001"; zk.create().forPath(hostPath); long wtStart = System.currentTimeMillis(); Thread.sleep(300); // use 15 ms // System.out.println("listener wait time="+(saveTime[0] - wtStart)); Assert.assertEquals(TreeCacheEvent.Type.NODE_ADDED, saveEventType[0]); // System.out.println(new String(zk.getData().forPath(hostPath))); // create three node String threePath = groupPath + "/hosts/localhost:8001"; zk.create().creatingParentsIfNeeded().forPath(threePath); zk.setData().forPath(threePath, "{tree}".getBytes()); // test update zk.setData().forPath(hostPath, "{}".getBytes()); Thread.sleep(300); Assert.assertEquals(TreeCacheEvent.Type.NODE_UPDATED, saveEventType[0]); // test set parent node's data zk.setData().forPath("/group-1", "{grou-data}".getBytes()); Thread.sleep(300); Assert.assertEquals(TreeCacheEvent.Type.NODE_UPDATED, saveEventType[0]); }
/** * 监控指定节点和节点下的所有的节点的变化-无限监听. * <pre>Cache synchronization: * http://stackoverflow.com/questions/39557653/zookeeper-curator-cache-how-to-wait-for-synchronization * <pre>The problem of treecache-eventual-consistency: * http://stackoverflow.com/questions/41922928/curator-treecache-eventual-consistency * @param serviceName the service name to retrieve the cache * @param listener the service state listener */ @Override public void watchForCacheUpdates(String serviceName, ServiceStateListener listener) { cache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { //Filter the even coming from leases&locks if (event.getData() != null && !event.getData().getPath().contains("leases") && !event.getData().getPath().contains("locks")) { switch (event.getType()) { case NODE_ADDED: // update the all children - filter the node_added even is for the parent node creation for serviceName if (event.getData().getPath().contains(serviceName)) { listener.update(getServiceInstancesFromCache(serviceName)); if (logger.isInfoEnabled()) { logger.info("Service is added at path '{}'", event.getData().getPath()); } } break; case NODE_REMOVED: listener.update(getServiceInstancesFromCache(serviceName)); if (logger.isInfoEnabled()) { logger.info("Service is removed at path '{}'", event.getData().getPath()); } break; case NODE_UPDATED: listener.update(getServiceInstancesFromCache(serviceName)); if (logger.isInfoEnabled()) { logger.info("Service is updated at path '{}'", event.getData().getPath()); } default: break; } } } }); try { cache.start(); } catch (Exception e) { logger.error("Watch start error happened for path '{}'!", buildBasePath(), e); throw new ServiceDiscoveryException(e); } }
/** * Starts an AdminURL bind event loop */ protected void waitForAdminURLBind() { final TreeCache tc = TreeCache.newBuilder(cf, ZOOKEEP_URL) //.setExecutor(threadFactory) //.setExecutor((ExecutorService)threadPool) //.setCacheData(true) .build(); final AtomicBoolean waiting = new AtomicBoolean(true); final Thread waitForAdminURLThread = threadFactory.newThread(new Runnable(){ @Override public void run() { final Thread waitThread = Thread.currentThread(); try { final Listenable<TreeCacheListener> listen = tc.getListenable(); listen.addListener(new TreeCacheListener(){ @Override public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception { ChildData cd = event.getData(); if(cd!=null) { log.info("TreeCache Bound [{}]", cd.getPath()); final String boundPath = cd.getPath(); if(ZOOKEEP_URL.equals(boundPath)) { updateAdminURL(cd.getData()); tc.close(); waiting.set(false); waitThread.interrupt(); } } } }); tc.start(); log.debug("AdminURL TreeCache Started"); // Check for the data one more time in case we missed // the bind event while setting up the listener final ZooKeeper z = cf.getZookeeperClient().getZooKeeper(); final Stat st = z.exists(ZOOKEEP_URL, false); if(st!=null) { updateAdminURL(z.getData(ZOOKEEP_URL, false, st)); tc.close(); } while(true) { try { Thread.currentThread().join(retryPauseTime); log.info("Still waiting for AdminURL...."); } catch (InterruptedException iex) { if(Thread.interrupted()) Thread.interrupted(); } if(!waiting.get()) break; } log.info("Ended wait for AdminURL"); } catch (Exception ex) { log.error("Failed to wait for AdminURL bind", ex); // FIXME: } finally { try { tc.close(); } catch (Exception x) {/* No Op */} } } }); waitForAdminURLThread.start(); }
protected void addDataListener(final TreeCacheListener listener) { globalNodeStorage.addDataListener(listener); }
protected void addDataListener(final TreeCacheListener listener) { jobNodeStorage.addDataListener(listener); }
/** * 注册数据监听器. */ public void addDataListener(final TreeCacheListener listener) { TreeCache cache = (TreeCache) coordinatorRegistryCenter.getRawCache("/" + jobConfiguration.getJobName()); cache.getListenable().addListener(listener); }
/** * 添加数据结点监听器. */ private void addDataListener(TreeCacheListener listener) { zookeeperConfigGroup.getConfigNodeStorage().addDataListener(listener); }
/** * 注册配置监听器. */ public void addDataListener(final TreeCacheListener listener) { TreeCache cache = (TreeCache) elasticConfigRegistryCenter .getRawCache(configProfile.getConcurrentRootNodePath()); cache.getListenable().addListener(listener); }
/** * 注册数据监听器. * * @param listener 监听器 */ public void addDataListener(final TreeCacheListener listener) { TreeCache cache = (TreeCache) coordinatorRegistryCenter.getRawCache(GlobalNodePath.ROOT); cache.getListenable().addListener(listener); }
/** * 注册数据监听器. * * @param listener 监听器 */ public void addDataListener(final TreeCacheListener listener) { TreeCache cache = (TreeCache) coordinatorRegistryCenter.getRawCache("/"); cache.getListenable().addListener(listener); }
/** * 移除数据监听器. * * @param listener 监听器 */ public void removeDataListener(final TreeCacheListener listener) { TreeCache cache = (TreeCache) coordinatorRegistryCenter.getRawCache("/"); cache.getListenable().removeListener(listener); }
/** * Specify the treecache listener, which will be called when changes happen to the zookeeper root. * * @param listener The callback which is called when changes happen in zookeeper. * @return The Builder */ public Builder withListener(TreeCacheListener listener) { this.listener.add(listener); return this; }