public boolean watchTree(String path, TreeCacheListener listener) { TreeCache tc = nodeTreeCache.get(path); if (tc != null) { return false; // node been listened } try { tc = TreeCache.newBuilder(client, path).build(); tc.start(); if (executor != null) { tc.getListenable().addListener(listener, executor); } else { tc.getListenable().addListener(listener); } nodeTreeCache.put(path, tc); return true; } catch (Throwable e) { throw checkException(String.format("Unable to watch tree for path: %s", path), e); } }
public ZKStacksCache(CuratorFramework curator, IDataSourceConnector connector, ServiceDiscoveryHostDeserializer hostsSerializer, boolean cacheHosts, String basePath) { log.info("starting stacks cache"); this.stackPathPrefix = getStackPathPrefix(basePath); this.hostGetter = cacheHosts ? new CachedHostsGetter() : new ZookeeperHostGetter(connector); this.hostsSerializer = hostsSerializer; stacksCache = TreeCache.newBuilder(curator, basePath + ZKPathHelperConstants.STACKS_PATH) .setCacheData(cacheHosts) .setMaxDepth(cacheHosts ? STACK_ELEMENTS_COUNT + 1 : STACK_ELEMENTS_COUNT) .build(); //(APPDS-1904-related) please be careful: listener somehow may be initialized after cache population, so the INITIALIZE event may be missed stacksCache.getListenable().addListener(new StackCacheListener());//this is a curator problem, so now ZooKeeper // is connected _after_ cache start, which prevents initialize to happen too early try { stacksCache.start(); } catch (Exception e) { log.error("Failed to start stacks cache", e); throw new RuntimeException(e); } }
/** * Create a zookeeper-based namespace service * @param client the curator client * @param cache the treecache * @param filePath the file */ public Namespaces(final CuratorFramework client, final TreeCache cache, final String filePath) { requireNonNull(cache, "TreeCache may not be null!"); this.client = client; this.cache = cache; try { this.client.create().orSetData().forPath(ZNODE_NAMESPACES); this.cache.getListenable().addListener((c, e) -> { final Map<String, ChildData> tree = cache.getCurrentChildren(ZNODE_NAMESPACES); readTree(tree).forEach(data::put); }); init(filePath).forEach(data::put); } catch (final Exception ex) { LOGGER.error("Could not create a zk node cache: {}", ex); throw new RuntimeTrellisException(ex); } }
@Override public void start() { try { // 1000ms - initial amount of time to wait between retries // 3 times - max number of times to retry client = CuratorFrameworkFactory.newClient(getIdentifier().getConnectionString(), new ExponentialBackoffRetry(1000, 3)); client.start(); client.getZookeeperClient().blockUntilConnectedOrTimedOut(); cache = TreeCache.newBuilder(client, buildBasePath()).build(); serializer = new JsonInstanceSerializer<HostMetadata>(HostMetadata.class); } catch (Exception e) { logger.error("Service registry start error for server identifier '{}' !", getIdentifier().getConnectionString(), e); throw new ServiceDiscoveryException(e); } }
public static Object getData(TreeCache treeCache, String pPath, String encode) throws IOException, KeeperException, InterruptedException { log.debug("pPath=" + pPath); ChildData cdata = treeCache.getCurrentData(pPath); log.debug("cdata=" + cdata); byte[] data = cdata.getData(); log.debug("data=" + data); String datas = new String(data, encode); Object ret = datas; if (datas != null && !datas.trim().isEmpty()) { datas = datas.trim(); if (datas.startsWith("{") && datas.endsWith("}")) { Map<String, Object> map = JsonUtil.toJavaBean(datas, Map.class); ret = map; } else if (datas.startsWith("[") && datas.endsWith("]")) { Collection<Object> ocoll = JsonUtil.toJavaBean(datas, Collection.class); ret = ocoll; } } log.debug("ret=" + ret); return ret; }
private void dumpDirectly(final String path, final List<String> result) { for (String each : coordinatorRegistryCenter.getChildrenKeys(path)) { String zkPath = path + "/" + each; String zkValue = coordinatorRegistryCenter.get(zkPath); if (null == zkValue) { zkValue = ""; } TreeCache treeCache = (TreeCache) coordinatorRegistryCenter.getRawCache("/" + jobName); ChildData treeCacheData = treeCache.getCurrentData(zkPath); String treeCachePath = null == treeCacheData ? "" : treeCacheData.getPath(); String treeCacheValue = null == treeCacheData ? "" : new String(treeCacheData.getData()); if (zkValue.equals(treeCacheValue) && zkPath.equals(treeCachePath)) { result.add(Joiner.on(" | ").join(zkPath, zkValue)); } else { result.add(Joiner.on(" | ").join(zkPath, zkValue, treeCachePath, treeCacheValue)); } dumpDirectly(zkPath, result); } }
@SuppressWarnings("resource") @Override public void addChildNodeListener(String path, ChildNodeListener listener) { TreeCache cache = new TreeCache(client, path); cache.getListenable().addListener((curatorFramework, event) -> { if (null != event && null != event.getData() && null != event.getType()) { String servicePath = event.getData().getPath(); if (servicePath.split("/").length > 3) { if (null != event.getData().getData()) { listener.childChanged(servicePath, event.getType().toString(), new String(event.getData().getData())); } else { listener.childChanged(servicePath, event.getType().toString(), null); } } } }); try { cache.start(); } catch (Exception e) { e.printStackTrace(); } }
private void dumpDirectly(final String path, final List<String> result) { for (String each : coordinatorRegistryCenter.getElasticConfigRegistryCenter().getChildrenKeys(path)) { String zkPath = path + "/" + each; String zkValue = coordinatorRegistryCenter.getElasticConfigRegistryCenter().get(zkPath); if (null == zkValue) { zkValue = ""; } TreeCache treeCache = (TreeCache) coordinatorRegistryCenter.getElasticConfigRegistryCenter().getRawCache( "/" + configProfile.getNode()); ChildData treeCacheData = treeCache.getCurrentData(zkPath); String treeCachePath = null == treeCacheData ? "" : treeCacheData.getPath(); String treeCacheValue = null == treeCacheData ? "" : new String(treeCacheData.getData()); if (zkValue.equals(treeCacheValue) && zkPath.equals(treeCachePath)) { result.add(Joiner.on(" | ").join(zkPath, zkValue)); } else { result.add(Joiner.on(" | ").join(zkPath, zkValue, treeCachePath, treeCacheValue)); } dumpDirectly(zkPath, result); } }
public AccumuloGraphMetadataStore(CuratorFramework curatorFramework, String zkPath) { this.zkPath = zkPath; this.curatorFramework = curatorFramework; this.treeCache = new TreeCache(curatorFramework, zkPath); this.treeCache.getListenable().addListener((client, event) -> { if (LOGGER.isTraceEnabled()) { LOGGER.trace("treeCache event, clearing cache"); } synchronized (entries) { entries.clear(); } }); try { this.treeCache.start(); } catch (Exception e) { throw new VertexiumException("Could not start metadata sync", e); } }
public static void start() throws Exception { if (!started) { synchronized (RegistryManager.class) { if (!started) { String zookeeperAddress = ConfigManager.getInstance().getProperty(Constants.ZOOKEEPER_ADDRESS); RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.newClient(zookeeperAddress, retryPolicy); client.start(); TreeCache treeCache = TreeCache.newBuilder(client, Constants.SERVICE_ZK_PATH_PREFIX).setCacheData(false).build(); treeCache.getListenable().addListener(new ProviderNodeEventListener(), curatorEventThreadPool); treeCache.start(); started = client.blockUntilConnected(1000, TimeUnit.MILLISECONDS); } } } }
ModeledCacheImpl(CuratorFramework client, ModelSpec<T> modelSpec, ExecutorService executor) { if ( !modelSpec.path().isResolved() && !modelSpec.path().isRoot() && modelSpec.path().parent().isResolved() ) { modelSpec = modelSpec.parent(); // i.e. the last item is a parameter } basePath = modelSpec.path(); this.serializer = modelSpec.serializer(); cache = TreeCache.newBuilder(client, basePath.fullPath()) .setCacheData(false) .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress)) .setExecutor(executor) .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers)) .build(); }
private static void checkTreeCache(CuratorFramework curatorFramework, String path) throws Exception { final Semaphore semaphore = new Semaphore(0); TreeCache treeCache = TreeCache.newBuilder(curatorFramework, path) .setCacheData(true) .setMaxDepth(3) .build(); if (treeCache == null) { LOGGER.error("treeCache is null"); } treeCache.getListenable().addListener((client, event) -> { if (event.getType().equals(TreeCacheEvent.Type.INITIALIZED)) { semaphore.release(); } }); treeCache.start(); semaphore.tryAcquire(2, TimeUnit.SECONDS); Map<String, ChildData> map = treeCache.getCurrentChildren("/propertyService"); if (map == null) { LOGGER.error("map is null"); } map.entrySet().forEach(entry -> { LOGGER.info("{} - {}", entry.getKey(), Bytes.toString(entry.getValue().getData())); }); }
@Override public void close() { for (Entry<String, TreeCache> each : caches.entrySet()) { each.getValue().close(); } waitForCacheClose(); CloseableUtils.closeQuietly(client); }
@Override public String get(final String key) { TreeCache cache = findTreeCache(key); if (null == cache) { return getDirectly(key); } ChildData resultInCache = cache.getCurrentData(key); if (null != resultInCache) { return null == resultInCache.getData() ? null : new String(resultInCache.getData(), StandardCharsets.UTF_8); } return getDirectly(key); }
private TreeCache findTreeCache(final String key) { for (Entry<String, TreeCache> entry : caches.entrySet()) { if (key.startsWith(entry.getKey())) { return entry.getValue(); } } return null; }
@Override public void addCacheData(final String cachePath) { TreeCache cache = new TreeCache(client, cachePath); try { cache.start(); // CHECKSTYLE:OFF } catch (final Exception ex) { // CHECKSTYLE:ON RegExceptionHandler.handleException(ex); } caches.put(cachePath + "/", cache); }
@Override public void evictCacheData(final String cachePath) { TreeCache cache = caches.remove(cachePath + "/"); if (null != cache) { cache.close(); } }
private TreeCache getCache() { TreeCache result = (TreeCache) regCenter.getRawCache(CloudJobConfigurationNode.ROOT); if (null != result) { return result; } regCenter.addCacheData(CloudJobConfigurationNode.ROOT); return (TreeCache) regCenter.getRawCache(CloudJobConfigurationNode.ROOT); }
@Override public String get(final String key) { TreeCache cache = findTreeCache(key); if (null == cache) { return getDirectly(key); } ChildData resultInCache = cache.getCurrentData(key); if (null != resultInCache) { return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8); } return getDirectly(key); }
@Override public void addCacheData(final String cachePath) { TreeCache cache = new TreeCache(client, cachePath); try { cache.start(); //CHECKSTYLE:OFF } catch (final Exception ex) { //CHECKSTYLE:ON RegExceptionHandler.handleException(ex); } caches.put(cachePath + "/", cache); }
@Test public void testNamespaces() throws Exception { final URL res = Namespaces.class.getResource(nsDoc); final CuratorFramework zk = newClient(curator.getConnectString(), new RetryNTimes(10, 1000)); zk.start(); final TreeCache cache = new TreeCache(zk, ZNODE_NAMESPACES); cache.start(); final NamespaceService svc1 = new Namespaces(zk, cache, res.getPath() + randomFilename()); assertEquals(0, svc1.getNamespaces().size()); final NamespaceService svc2 = new Namespaces(zk, cache, res.getPath()); assertEquals(2, svc2.getNamespaces().size()); assertEquals(LDP.URI, svc2.getNamespace("ldp").get()); assertEquals("ldp", svc2.getPrefix(LDP.URI).get()); assertFalse(svc2.getNamespace("jsonld").isPresent()); assertFalse(svc2.getPrefix(JSONLD.URI).isPresent()); assertTrue(svc2.setPrefix("jsonld", JSONLD.URI)); assertEquals(3, svc2.getNamespaces().size()); assertEquals(JSONLD.URI, svc2.getNamespace("jsonld").get()); assertEquals("jsonld", svc2.getPrefix(JSONLD.URI).get()); final Namespaces svc3 = new Namespaces(zk, cache); await().atMost(5, SECONDS).until(() -> 3 == svc3.getNamespaces().size()); assertEquals(JSONLD.URI, svc3.getNamespace("jsonld").get()); assertFalse(svc3.setPrefix("jsonld", JSONLD.URI)); }
ZKAsyncMultiMap(Vertx vertx, CuratorFramework curator, String mapName) { super(curator, vertx, ZK_PATH_ASYNC_MULTI_MAP, mapName); // /io.vertx/asyncMultiMap/subs curatorCache = new TreeCache(curator, mapPath); try { curatorCache.start(); } catch (Exception e) { throw new VertxException(e); } }
public void start() throws Exception { this.treeCache = TreeCache.newBuilder(curator, rootPath()) .setExecutor(eventExecutor) .setCacheData(true) .setCreateParentNodes(true) .setDataIsCompressed(false) .setMaxDepth(10).build(); treeCache.start(); startListen(); }
/** * Creates a new EndpointPubSub * @param listeners an optional array of endpoint listeners to add */ private EndpointListener(final AdvertisedEndpointListener...listeners) { super(SharedNotificationExecutor.getInstance(), NOTIF_INFOS); for(AdvertisedEndpointListener listener : listeners) { instance.addEndpointListener(listener); } CloseableService.getInstance().register(this); log.info("ZK_CONNECT_CONF: [{}]", System.getProperty(ZK_CONNECT_CONF, "undefined")); zkConnect = ConfigurationHelper.getSystemThenEnvProperty(ZK_CONNECT_CONF, ZK_CONNECT_DEFAULT); log.info("EndpointListener ZooKeep Connect: [{}]", zkConnect); serviceType = ConfigurationHelper.getSystemThenEnvProperty(SERVICE_TYPE_CONF, SERVICE_TYPE_DEFAULT); connectionTimeout = ConfigurationHelper.getIntSystemThenEnvProperty(DISC_CONN_TO_CONF, DISC_CONN_TO_DEFAULT); sessionTimeout = ConfigurationHelper.getIntSystemThenEnvProperty(DISC_SESS_TO_CONF, DISC_SESS_TO_DEFAULT); curator = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeout, connectionTimeout, new ExponentialBackoffRetry( 1000, 3 )); curator.getConnectionStateListenable().addListener(this); curator.start(); treeCache = TreeCache.newBuilder(curator, serviceType) .setCacheData(true) .setCreateParentNodes(true) .setExecutor(executor) .setMaxDepth(5) .build(); treeCache.getListenable().addListener(this, executor); try { JMXHelper.registerMBean(OBJECT_NAME, this); } catch (Exception ex) { log.warn("Failed to register management interface. Will continue without.", ex); } connectClients(); }
@Override public void close() { for (Entry<String, TreeCache> each : caches.entrySet()) { each.getValue().close(); } waitForCacheClose(); CloseableUtils.closeQuietly(client); if (zkConfig.isUseNestedZookeeper()) { NestedZookeeperServers.getInstance().closeServer(zkConfig.getNestedPort()); } }
@Override public String get(final String key) { TreeCache cache = findTreeCache(key); if (null == cache) { return getDirectly(key); } ChildData resultInCache = cache.getCurrentData(key); if (null != resultInCache) { return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charset.forName("UTF-8")); } return getDirectly(key); }
@Test public void assertAddDataListener() { TreeCache treeCache = mock(TreeCache.class); @SuppressWarnings("unchecked") Listenable<TreeCacheListener> listeners = mock(Listenable.class); TreeCacheListener listener = mock(TreeCacheListener.class); when(treeCache.getListenable()).thenReturn(listeners); when(coordinatorRegistryCenter.getRawCache("/testJob")).thenReturn(treeCache); jobNodeStorage.addDataListener(listener); verify(listeners).addListener(listener); }
/** * 注册数据监听器. * * @param listener 监听器 */ public ConsoleRegistryCenter addDataListener(final TreeCacheListener listener, final String cachePath) { TreeCache cache = (TreeCache) registryCenter.getRawCache(cachePath); cache.getListenable().addListener(listener); return this; }