@Override public void startup() { if( isCluster() ) { CuratorFramework curator = getCurator(); debugCache = new NodeCache(curator, CLUSTER_DEBUG_FULL_PATH); try { debugCache.start(); createNode(SERVER_REL_PATH, ""); membersCache = createPathCache(SERVER_REL_PATH, false); ensureDebug = curator.newNamespaceAwareEnsurePath(CLUSTER_DEBUG_FULL_PATH); } catch( Exception e ) { Throwables.propagate(e); } } hasStarted = true; }
/** * @param cache NodeCache * @param zkListen * @throws Exception * @Created 2016/9/20 */ private static void runWatch(final NodeCache cache, final ZookeeperProcessListen zkListen) throws Exception { cache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() { LOGGER.info("ZktoxmlMain runWatch process path event start "); String notPath = cache.getCurrentData().getPath(); LOGGER.info("NodeCache changed, path is: " + notPath); // notify zkListen.notify(notPath); LOGGER.info("ZktoxmlMain runWatch process path event over"); } }); }
private List<NodeCache> listenNodes(String profileCode, String[] appCodes) { ZkTemplate.NodeListener listener = new ZkTemplate.NodeListener() { @Override public void nodeChanged() throws Exception { try { refresher.refresh(); } catch (Throwable e) { logger.error("触发刷新出错:", e); } } }; List<NodeCache> nodeCaches = new ArrayList<>(); for (String appCode : appCodes) { NodeCache nodeCache = zkTemplate.listenNode(ZkTemplate.buildPath(profileCode, appCode), false, listener); nodeCaches.add(nodeCache); } return nodeCaches; }
@Test public void test_zkNode() throws Exception { CuratorFramework zk = curator.usingNamespace("namespace-test"); String groupPath = "/group-1/localhost:9001"; String s = zk.create().creatingParentsIfNeeded().forPath(groupPath); Assert.assertEquals(s, "/group-1/localhost:9001"); NodeCache nodeCache = new NodeCache(zk, "/group-1", true); nodeCache.start(); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { logger.info("node cache change"); } }); zk.setData().forPath("/group-1", "test-0".getBytes()); zk.setData().forPath("/group-1", "test-2".getBytes()); }
private void initProvider(CuratorFramework client, String serviceName) throws Exception { this.client = client; this.serviceName = serviceName; tmpFile = confFile(true); confFile = confFile(false); String configPath = ZookeeperUtils.configPath(serviceName); byte[] bytes = client.getData().forPath(configPath); saveFile(bytes); cache = new NodeCache(client, configPath); cache.getListenable().addListener(() -> { try { if (cache.getCurrentData() != null) { byte[] data = cache.getCurrentData().getData(); if (data.length > 0) { saveFile(data); } } } catch (Exception e) { logger.warn("Error while processing config file change event. message={}", e.getMessage()); } }); cache.start(); }
public void addNodeCacheListener(String path) throws Exception { Stat existStat = curatorFramework .checkExists() .forPath(path); if (existStat == null) curatorFramework .create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath(path); NodeCache nodeCache = new NodeCache(curatorFramework, path, false); nodeCache.start(); nodeCache.getListenable().addListener(() -> { ChildData currentData = nodeCache.getCurrentData(); LOG.info("New Cache Data: {}", currentData == null ? "null" : new String(currentData.getData())); } ); }
public static void main(String[] args) throws Exception { CuratorFramework framework = CuratorFrameworkFactory.builder() .connectString("localhost:2181").retryPolicy(new RetryNTimes(3, 2000)).build(); try { framework.start(); final NodeCache nodeCache = new NodeCache(framework, "/test"); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { ChildData data = nodeCache.getCurrentData(); System.out.println(new String(data.getData()) + " / " + data); } }); nodeCache.start(); Thread.sleep(30000); nodeCache.close(); } finally { framework.close(); } }
/** * Start. * * @throws Exception the exception */ public void start() throws Exception { //NOSONAR LOG.info("Starting node tracker"); zkClient.getUnhandledErrorListenable().addListener(errorsListener); if (createZkNode()) { controlCache = new NodeCache(zkClient, CONTROL_SERVER_NODE_PATH); controlCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { ChildData currentData = controlCache.getCurrentData(); if (currentData == null) { LOG.warn("Control service node died!"); onNoMaster(); } else { LOG.warn("Control service node changed!"); onMasterChange(currentData); } } }); controlCache.start(); } else { LOG.warn("Failed to create ZK node!"); } }
private void registerListener(final CuratorFramework curator, final String path, final ZookeeperConfigurationListener listener) throws Exception { final NodeCache cache = new NodeCache(curator, path, true); cache.start(); cache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { listener.onChange(curator, cache, path); } }); childrens.add(cache); logger.info(String.format("Add listener %s for %s", listener, path)); }
public ZkNodeCacheWrapper(IDataSourceConnector connector, String path, NodeCache cache, boolean dataIsCompressed, boolean useCache, boolean useCacheWhenNotConnectedToDataSource) { super(connector); this.useCache = useCache; this.dataIsCompressed = dataIsCompressed; this.useCacheWhenNotConnectedToDataSource = useCacheWhenNotConnectedToDataSource; this.path = path; this.cache = cache; }
private static void loadZkWatch(Set<String> setPaths, final CuratorFramework zkConn, final ZookeeperProcessListen zkListen) throws Exception { if (null != setPaths && !setPaths.isEmpty()) { for (String path : setPaths) { final NodeCache node = new NodeCache(zkConn, path); node.start(true); runWatch(node, zkListen); LOGGER.info("ZktoxmlMain loadZkWatch path:" + path + " regist success"); } } }
/** * 关闭(释放相关资源) */ public void close() { for (NodeCache nodeCache : nodeCaches) { try { nodeCache.close(); } catch (IOException e) { logger.error("关闭节点监听器出错:", e); } } zkTemplate.close(); }
private Observable<NodeCache> buildNodeCache(final ConfigDescriptor desc) { return Observable.fromCallable(() -> { final String configPath = makePath(ROOT_ZK_PATH, desc.getConfigName()); final NodeCache nc = new NodeCache(curator, configPath); nc.getListenable().addListener(() -> onNodeChanged(nc, desc)); try { nc.start(true); // Note that we have to force calling onNodeChanged() here since `nc.start(true)` will not emit an initial event. onNodeChanged(nc, desc); // Create the ephemeral node last, just in case something goes wrong with setting up the node cache // NOTE: This process is what actually creates the configuration node if it was missing. PersistentEphemeralNode en = new PersistentEphemeralNode(curator, EPHEMERAL, makePath(configPath, localNodeName), new byte[0]); en.start(); if (!en.waitForInitialCreate(getDefaultNodeCreationTimeout().toMillis(), TimeUnit.MILLISECONDS)) { throw new TimeoutException("Timeout on creation of ephemeral node for " + makePath(configPath, localNodeName)); } ephemeralNodes.put(desc, en); return nc; } catch (Exception ex) { log.warn("Failed to initialize for configPath {}", configPath, ex); throw ex; } }); }
public void onNodeChanged(final NodeCache cache, final ConfigDescriptor desc) { ChildData childData = cache.getCurrentData(); try { Optional<String> valueOpt = Optional.empty(); if (childData != null && childData.getData() != null && childData.getData().length > 0) { valueOpt = Optional.of(new String(childData.getData(), Charsets.UTF_8)); } emitEvent(desc.getConfigName(), valueOpt); } catch (Exception ex) { log.warn("Failed to handle onNodeChanged w/ new data for config key {}, data {}", desc.getConfigName(), childData, ex); } }
@Override protected void serviceStart() throws Exception { framework.start(); nodeCache = new NodeCache(framework, zkMountTablePath, false); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { handleMountTableChange(nodeCache.getCurrentData().getData()); } }); nodeCache.start(false); }
private void configureCurrentTSOServerZNodeCache(String currentTsoPath) { try { currentTSOZNode = new NodeCache(zkClient, currentTsoPath); currentTSOZNode.getListenable().addListener(this); currentTSOZNode.start(true); } catch (Exception e) { throw new IllegalStateException("Cannot start watcher on current TSO Server ZNode: " + e.getMessage()); } }
public ZkValueStore(CuratorFramework curator, String zkPath, ZkValueSerializer<T> serializer, T defaultValue) { _curator = checkNotNull(curator, "curator"); _zkPath = checkNotNull(zkPath, "zkPath"); _serializer = checkNotNull(serializer, "serializer"); _nodeCache = new NodeCache(curator, zkPath); _defaultValue = defaultValue; // Create node on instantiation // In practice, this creates a persistent zookeeper node without having to start the Managed resource. try { createNode(); } catch (Exception e) { // Log a warning. We will try again when Managed is started _log.warn(format("Could not create node %s", _zkPath)); } }
/** * Creates an instance of ZKSegmentContainerMonitor. * * @param containerRegistry The registry used to control the container state. * @param zkClient The curator client. * @param pravegaServiceEndpoint The pravega endpoint for which we need to fetch the container assignment. */ ZKSegmentContainerMonitor(SegmentContainerRegistry containerRegistry, CuratorFramework zkClient, Host pravegaServiceEndpoint, ScheduledExecutorService executor) { Preconditions.checkNotNull(zkClient, "zkClient"); this.registry = Preconditions.checkNotNull(containerRegistry, "containerRegistry"); this.host = Preconditions.checkNotNull(pravegaServiceEndpoint, "pravegaServiceEndpoint"); this.executor = Preconditions.checkNotNull(executor, "executor"); this.handles = new ConcurrentHashMap<>(); this.pendingTasks = new ConcurrentSkipListSet<>(); String clusterPath = ZKPaths.makePath("cluster", "segmentContainerHostMapping"); this.hostContainerMapNode = new NodeCache(zkClient, clusterPath); this.assigmentTask = new AtomicReference<>(); }
/** * Creates a ZooKeeperLeaderElectionService object. * * @param client Client which is connected to the ZooKeeper quorum * @param latchPath ZooKeeper node path for the leader election latch * @param leaderPath ZooKeeper node path for the node which stores the current leader information */ public ZooKeeperLeaderElectionService(CuratorFramework client, String latchPath, String leaderPath) { this.client = Preconditions.checkNotNull(client, "CuratorFramework client"); this.leaderPath = Preconditions.checkNotNull(leaderPath, "leaderPath"); leaderLatch = new LeaderLatch(client, latchPath); cache = new NodeCache(client, leaderPath); issuedLeaderSessionID = null; confirmedLeaderSessionID = null; leaderContender = null; running = false; }
/** * Creates a leader retrieval service which uses ZooKeeper to retrieve the leader information. * * @param client Client which constitutes the connection to the ZooKeeper quorum * @param retrievalPath Path of the ZooKeeper node which contains the leader information */ public ZooKeeperLeaderRetrievalService(CuratorFramework client, String retrievalPath) { this.client = Preconditions.checkNotNull(client, "CuratorFramework client"); this.cache = new NodeCache(client, retrievalPath); this.leaderListener = null; this.lastLeaderAddress = null; this.lastLeaderSessionID = null; running = false; }
private Map<String, String> processNodeData(String path, NodeCache nodeCache) { ChildData currentData = nodeCache.getCurrentData(); if (currentData == null) { throw new RuntimeException(String.format("node of path: %s do not exists", path)); } String currentDataStr = new String(currentData.getData()); logger.info("current data is : {}", currentDataStr); List<String> strings = Splitter.on("=").splitToList(currentDataStr); Map<String, String> map = Maps.newHashMap(); map.put(strings.get(0), strings.get(1)); return map; }
public DynamicConfigZKAdapter(final Class<T> clazz, final String path, final CuratorFramework curatorFramework, Converter<T, byte[]> converter, ChangeListener<T> changeListener) throws Exception { super(clazz, converter, Optional.fromNullable(changeListener)); Preconditions.checkNotNull(curatorFramework, "CuratorFramework cannot be null"); Preconditions.checkArgument(curatorFramework.getState() == CuratorFrameworkState.STARTED, "CuratorFramework must be started"); Preconditions.checkArgument(path != null && !path.isEmpty(), "path cannot be null or blank"); this.curatorFramework = curatorFramework; this.path = path; this.nodeCache = new NodeCache(curatorFramework, path); }
@Override public void stop() { singleThreadExecutor.shutdown(); nodes.forEach(CloseableUtils::closeQuietly); for (NodeCache node : nodes) { try { node.close(); } catch (IOException e) { // ignored. logger.warn("[{}] Close ZK node cache failed. message={}", gondola.getHostId(), e.getMessage()); } } Utils.stopThreads(threads); }
public synchronized void close() { // TODO: implemente with AtomicReference<CloseFuture> logger.info("Close zookeeper client"); try { for (NodeCache cache : cachePool) { logger.info("Close cache {}", cache); cache.getListenable().clear(); cache.close(); } cachePool.clear(); } catch (IOException e) { logger.error("Error while closing zookeeper node listening caches", e); } client.close(); }
/** * Listen for a single node * * @param path * @param listener * @throws Exception */ public void listenForZNode(String path, ZookeeperEventListener listener) throws Exception { init(); // TODO: simplify code of this method to: // ZNodeListener nodeListener = new ZNodeListener(path, listener, client); NodeCache cache = new NodeCache(client, path); cache.start(); NodeCacheListener cacheListener = new ZNodeListener( listener.getHandler(), cache); cache.getListenable().addListener(cacheListener); cachePool.add(cache); }
public void forcReadListenForZNode(String path, ZookeeperEventListener listener) throws Exception { init(); NodeCache cache = new NodeCache(client, path); cache.start(); NodeCacheListener cacheListener = new ZNodeListener( listener.getHandler(), cache); cache.getListenable().addListener(cacheListener); cacheListener.nodeChanged(); cachePool.add(cache); }
private void _initCacheWatcher() { cacheNodeWatcher = CacheBuilder.newBuilder() .concurrencyLevel(Runtime.getRuntime().availableProcessors()).maximumSize(10000) .expireAfterAccess(3600, TimeUnit.SECONDS) .removalListener(new RemovalListener<String, NodeCache>() { @Override public void onRemoval(RemovalNotification<String, NodeCache> event) { try { event.getValue().close(); } catch (IOException e) { LOGGER.warn(e.getMessage(), e); } } }).build(new CacheLoader<String, NodeCache>() { @Override public NodeCache load(final String path) throws Exception { final NodeCache nodeCache = new NodeCache(curatorFramework, path); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { ChildData data = nodeCache.getCurrentData(); _invalidateCache(path, data != null ? data.getData() : null); } }); nodeCache.start(); return nodeCache; } }); }
private void create() throws Exception { client = CuratorFrameworkFactory.builder().connectString(ensemble).sessionTimeoutMs(sessionTimeout) .retryPolicy(new RetryForever(1000)).canBeReadOnly(true).build(); client.start(); cache = new NodeCache(client, NODE); cache.start(true); }
public synchronized NodeCache acquire() throws Exception { if (ref == 0) { try { create(); } catch (Exception e) { close(); throw e; } } ref++; return cache; }
public void startNodeCache(NodeCache cache) { try { cache.start(); } catch (Exception e) { log.error("Error starting nodeCache {}", cache, e); throw new RuntimeException(e); } }
public void stopNodeCache(NodeCache cache) { try { cache.close(); } catch (IOException e) { log.error("Error stopping nodeCache {}", cache, e); throw new RuntimeException(e); } }
public void registerUpdater(ConfigurationUpdater updater) { NodeCache cache = getOrCreateNodeCache(updater.getPath()); cache .getListenable() .addListener( new NodeCacheListener() { @Override public void nodeChanged() { updater.update(cache.getCurrentData().getData()); } }); }
public void init() { try { logger.info("init..."); client = CuratorFrameworkFactory.newClient( ConfigurationManager.getConfiguration().getString(ZOOKEEPER_SERVER), new ExponentialBackoffRetry(1000, 3)); client.start(); // in this example we will cache data. Notice that this is optional. cache = new NodeCache(client, getZKPath()); cache.start(); cache.getListenable().addListener(this); nodeChanged(); } catch (Exception e) { logger.info("error ", e); } /* finally { CloseableUtils.closeQuietly(cache); CloseableUtils.closeQuietly(client); } */ }
private static String path(NodeCache nodeCache) { try { if (nodeCache == null) { return "n/a"; } Field f = NodeCache.class.getDeclaredField("path"); f.setAccessible(true); return (String) f.get(nodeCache); } catch (Throwable e) { logger.error("Ops.fail to get path from node:{}, exception:{}", nodeCache, e.toString()); return null; } }
public T get() { checkClosed(); if (resource == null) { if (zkNodeExists == NOT_EXISTS) { // for performance, short circuit it outside sync block. return emptyObject; } synchronized (lock) { checkClosed(); if (resource == null) { NodeCache cache = nodeCache.get(); tryAddListener(cache); ChildData currentData = cache.getCurrentData(); if (currentData == null || currentData.getData() == null) { zkNodeExists = NOT_EXISTS; if (!emptyLogged) { // 只在刚开始一次或者几次打印这个log logger.warn("found no zk path for:{}, using empty data:{}", path(cache), emptyObject); emptyLogged = true; } return emptyObject; } zkNodeExists = EXISTS; try { resource = factory.apply(currentData.getData(), currentData.getStat()); if (onResourceChange != null) { onResourceChange.accept(resource, emptyObject); } } catch (Exception e) { factoryFailedListener.accept(e); throwIfUnchecked(e); throw new RuntimeException(e); } } } } return resource; }
private void tryAddListener(NodeCache cache) { if (!hasAddListener) { NodeCacheListener nodeCacheListener = () -> { T oldResource; synchronized (lock) { ChildData data = cache.getCurrentData(); oldResource = resource; if (data != null && data.getData() != null) { zkNodeExists = EXISTS; ListenableFuture<T> future = refreshFactory.apply(data.getData(), data.getStat()); addCallback(future, new FutureCallback<T>() { @Override public void onSuccess(@Nullable T result) { resource = result; cleanup(resource, oldResource, cache); } @Override public void onFailure(Throwable t) { factoryFailedListener.accept(t); logger.error("", t); } }, directExecutor()); } else { zkNodeExists = NOT_EXISTS; resource = null; emptyLogged = false; cleanup(resource, oldResource, cache); } } }; cache.getListenable().addListener(nodeCacheListener); nodeCacheRemoveListener = () -> cache.getListenable().removeListener(nodeCacheListener); hasAddListener = true; } }
private void cleanup(T currentResource, T oldResource, NodeCache nodeCache) { if (oldResource != null && oldResource != emptyObject) { if (currentResource == oldResource) { logger.warn( "[BUG!!!!] should NOT occurred, old resource is same as current, path:{}, {}", path(nodeCache), oldResource); } else { new ThreadFactoryBuilder() // .setNameFormat("old [" + oldResource.getClass().getSimpleName() + "] cleanup thread-[%d]") .setUncaughtExceptionHandler( (t, e) -> logger.error("fail to cleanup resource, path:{}, {}", path(nodeCache), oldResource.getClass().getSimpleName(), e)) // .setPriority(MIN_PRIORITY) // .setDaemon(true) // .build() // .newThread(() -> { do { if (waitStopPeriod > 0) { sleepUninterruptibly(waitStopPeriod, MILLISECONDS); } if (cleanup.test(oldResource)) { break; } } while (true); if (onResourceChange != null) { onResourceChange.accept(currentResource, oldResource); } }).start(); return; } } if (onResourceChange != null) { onResourceChange.accept(currentResource, oldResource); } }
private static void addListener(final NodeCache cache) { // a PathChildrenCacheListener is optional. Here, it's used just to log // changes NodeCacheListener listener = new NodeCacheListener() { @Override public void nodeChanged() throws Exception { if (cache.getCurrentData() != null) System.out.println("Node changed: " + cache.getCurrentData().getPath() + ", value: " + new String(cache.getCurrentData().getData())); } }; cache.getListenable().addListener(listener); }
private static void processCommands(CuratorFramework client, NodeCache cache) throws Exception { printHelp(); try { addListener(cache); BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); boolean done = false; while (!done) { System.out.print("> "); String line = in.readLine(); if (line == null) { break; } String command = line.trim(); String[] parts = command.split("\\s"); if (parts.length == 0) { continue; } String operation = parts[0]; String args[] = Arrays.copyOfRange(parts, 1, parts.length); if (operation.equalsIgnoreCase("help") || operation.equalsIgnoreCase("?")) { printHelp(); } else if (operation.equalsIgnoreCase("q") || operation.equalsIgnoreCase("quit")) { done = true; } else if (operation.equals("set")) { setValue(client, command, args); } else if (operation.equals("remove")) { remove(client); } else if (operation.equals("show")) { show(cache); } Thread.sleep(1000); // just to allow the console output to catch // up } } catch (Exception ex) { ex.printStackTrace(); } finally { } }
@Override public void init() { final NodeCache node = _configuration.getNode(); if (node != null) { node.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { _configuration.reload(); } }); } }