public void addDataListener (String path, ZkDataListener dataListener) { if (dataListener == null) return; ConcurrentMap<ZkDataListener, CuratorWatcher> listeners = acitveDataListeners.get(path); if (listeners == null) { acitveDataListeners.putIfAbsent(path,new ConcurrentHashMap<ZkDataListener, CuratorWatcher>()); listeners = acitveDataListeners.get(path); } CuratorWatcher curatorWatcher = listeners.get(dataListener); if (curatorWatcher == null) { listeners.putIfAbsent(dataListener,new CuratorWatcherImpl(dataListener)); curatorWatcher = listeners.get(dataListener); } try { curator.getData().usingWatcher(curatorWatcher).forPath(path); } catch (Exception e) { LOGGER.warn("",e); } }
private void checkSignal() { try { client.checkExists().usingWatcher(new CuratorWatcher() { @Override public void process(WatchedEvent watchedEvent) throws Exception { if (watchedEvent.getType().equals(Watcher.Event.EventType.NodeCreated)) { System.out.println("Updating queries!"); _siddhiManager.shutdown(); parserQuerysFile(); if (client.checkExists().forPath("/query-siddhi/update") != null) client.delete().forPath("/query-siddhi/update"); } checkSignal(); } }).forPath("/query-siddhi/update"); } catch (Exception e) { e.printStackTrace(); } }
public void register() throws Exception { String ip = InetAddress.getLocalHost().getHostAddress(); String registeNode = "zk/register/" + ip;// 节点路径 byte[] data = "disable".getBytes(charset);// 节点值 CuratorWatcher watcher = new ZKWatchRegister(registeNode, data); // 创建一个register // watcher Stat stat = zkTools.checkExists().forPath(registeNode); if (stat != null) { zkTools.delete().forPath(registeNode); } zkTools.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(registeNode, data);// 创建的路径和值 // 添加到session过期监控事件中 addReconnectionWatcher(registeNode, ZookeeperWatcherType.CREATE_ON_NO_EXITS, watcher); data = zkTools.getData().usingWatcher(watcher).forPath(registeNode); System.out.println("get path form zk : " + registeNode + ":" + new String(data, charset)); }
public void start() throws Exception { CuratorFramework curatorFramework = serviceRegistry.getService().getCuratorFramework(); curatorFramework.getChildren().usingWatcher(new CuratorWatcher() { @Override public void process(WatchedEvent event) throws Exception { switch (event.getType()) { case NodeChildrenChanged: { checkForUpdate(); break; } case None: case NodeCreated: case NodeDeleted: case NodeDataChanged: break; default: break; } } }).forPath(PathBuilder.path(serviceRegistry.getService())); //Start watcher on service node serviceRegistry.nodes(checkForUpdateOnZookeeper()); logger.info("Started polling zookeeper for changes"); }
@PostConstruct private void watchDefaultStorage() { try { curator.getData().usingWatcher((CuratorWatcher) event -> { final byte[] defaultStorageId = curator.getData().forPath(ZK_TIMELINES_DEFAULT_STORAGE); if (defaultStorageId != null) { final Result<Storage> storageResult = getStorage(new String(defaultStorageId)); if (storageResult.isSuccessful()) { defaultStorage.setStorage(storageResult.getValue()); } } watchDefaultStorage(); }).forPath(ZK_TIMELINES_DEFAULT_STORAGE); } catch (final Exception e) { LOG.warn("Error while creating watcher for default storage updates {}", e.getMessage(), e); } }
private synchronized ZMapping readCurrentMapping(Stat stat) { ZMapping zMap = null; try { if (client.getState().equals(CuratorFrameworkState.STOPPED)) { LOG.warn("Attempting to read state on stopped client."); return null; } byte[] data = client.getData().storingStatIn(stat).usingWatcher(new CuratorWatcher() { @Override public void process(WatchedEvent watchedEvent) throws Exception { ZMapping newMapping = readCurrentMapping(); if (newMapping == null && mapping != null) { LOG.warn("An attempt was made to overwrite current mapping with a null one."); } else { mapping = newMapping; } LOG.debug("Host {} just updated its mapping to version {}", hostPort, (mapping != null) ? mapping.getVersion() : -1); } }).forPath(MAPPING_PATH); zMap = ZMapping.deserialize(data); } catch (Exception e) { LOG.error("Error reading current mapping: ", e); } return zMap; }
public void getAndWatch(String key, String defaultValue, Consumer<String> listener) { try { CuratorWatcher cw = (WatchedEvent watchedEvent) -> { if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) { byte[] value = cf.getData().forPath(watchedEvent.getPath()); String strVal = new String(value, CHARSET); Logger.getLogger(getClass()).debug("ZK key " + key + " changed value to " + strVal); listener.accept(strVal); } }; Stat stat = cf.checkExists().forPath(PREFIX + key); if (stat == null) cf.create().creatingParentsIfNeeded().forPath(PREFIX + key, defaultValue.getBytes(CHARSET)); cf.getData().usingWatcher(cw).forPath(PREFIX + key); listener.accept(get(key, defaultValue)); } catch (Exception ex) { Logger.getLogger(getClass()).error("Failed to add config watcher", ex); } }
public void watch(String key, Consumer<String> listener) { try { CuratorWatcher cw = (WatchedEvent watchedEvent) -> { if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) { byte[] value = cf.getData().forPath(watchedEvent.getPath()); listener.accept(new String(value, CHARSET)); } }; Stat stat = cf.checkExists().forPath(PREFIX + key); if (stat == null) cf.create().creatingParentsIfNeeded().forPath(PREFIX + key, null); cf.getData().usingWatcher(cw).forPath(PREFIX + key); } catch (Exception ex) { Logger.getLogger(getClass()).error("Failed to add config watcher", ex); } }
public void watch(String[] keys, Consumer<String> listener) { try { CuratorWatcher cw = (WatchedEvent watchedEvent) -> { if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) { byte[] value = cf.getData().forPath(watchedEvent.getPath()); listener.accept(new String(value, CHARSET)); } }; for (String key : keys) { Stat stat = cf.checkExists().forPath(PREFIX + key); if (stat == null) cf.create().creatingParentsIfNeeded().forPath(PREFIX + key, null); cf.getData().usingWatcher(cw).forPath(PREFIX + key); } } catch (Exception ex) { Logger.getLogger(getClass()).error("Failed to add config watcher", ex); } }
private ZookeeperConfigService addWatcher(String path) throws Exception { zkClient.getChildren().usingWatcher(new CuratorWatcher() { public void process(WatchedEvent event) throws Exception { // updateLocalRules(); } }).forPath(path); return this; }
public void removeDataListener(String path, ZkDataListener listener) { if (listener == null) return; ConcurrentMap<ZkDataListener, CuratorWatcher> listeners = acitveDataListeners.get(path); if (listeners != null) { CuratorWatcher curatorWatcher = listeners.remove(listener); if (curatorWatcher != null) { ((CuratorWatcherImpl)curatorWatcher).unwatch(); } } }
public void addReconnectionWatcher(final String path, final ZookeeperWatcherType watcherType, final CuratorWatcher watcher) { synchronized (this) { if (!watchers.contains(watcher.toString()))// 不要添加重复的监听事件 { watchers.add(watcher.toString()); System.out.println("add new watcher " + watcher); zkTools.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { System.out.println(newState); if (newState == ConnectionState.LOST) {// 处理session过期 try { if (watcherType == ZookeeperWatcherType.EXITS) { zkTools.checkExists().usingWatcher(watcher).forPath(path); } else if (watcherType == ZookeeperWatcherType.GET_CHILDREN) { zkTools.getChildren().usingWatcher(watcher).forPath(path); } else if (watcherType == ZookeeperWatcherType.GET_DATA) { zkTools.getData().usingWatcher(watcher).forPath(path); } else if (watcherType == ZookeeperWatcherType.CREATE_ON_NO_EXITS) { // ephemeral类型的节点session过期了,需要重新创建节点,并且注册监听事件,之后监听事件中, // 会处理create事件,将路径值恢复到先前状态 Stat stat = zkTools.checkExists().usingWatcher(watcher).forPath(path); if (stat == null) { System.err.println("to create"); zkTools.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path); } } } catch (Exception e) { e.printStackTrace(); } } } }); } } }
public static boolean exists(CuratorFramework client, String path, CuratorWatcher watcher) { try { if (watcher != null) { return ((BackgroundPathable) client.checkExists().usingWatcher(watcher)).forPath(path) != null; } return client.checkExists().forPath(path) != null; } catch (Exception e) { LOGGER.error("ZKUtil-->>exists(CuratorFramework client, String path, CuratorWatcher watcher) error, ", e); } return false; }
public static String getData(CuratorFramework client, String path, CuratorWatcher watcher) { try { if (client.checkExists().forPath(path) == null) { return null; } if (watcher != null) { return List2StringUtil .toString((byte[]) ((BackgroundPathable) client.getData().usingWatcher(watcher)).forPath(path)); } return List2StringUtil.toString((byte[]) client.getData().forPath(path)); } catch (Exception e) { LOGGER.error("ZKUtil-->>getData(CuratorFramework client, String path, CuratorWatcher watcher) error ", e); } return null; }
public static List<String> getChilds(CuratorFramework client, String path, CuratorWatcher watcher) { try { if (watcher != null) { return (List) ((BackgroundPathable) client.getChildren().usingWatcher(watcher)).forPath(path); } return (List) client.getChildren().forPath(path); } catch (Exception e) { LOGGER.error("ZKUtil-->>getChilds(CuratorFramework client, String path, CuratorWatcher watcher) error,", e); } return null; }
private void readSize(final String hostId) { String path = sizePath(hostId); try { byte[] data = client.getData().usingWatcher(new CuratorWatcher() { @Override public void process(WatchedEvent watchedEvent) throws Exception { readSize(hostId); } }).forPath(path); this.sizes.put(hostId, byteToInt(data)); } catch (Exception e) { e.printStackTrace(); } }
Watching(CuratorFrameworkImpl client, CuratorWatcher watcher) { this.client = client; this.watcher = null; this.curatorWatcher = watcher; this.watched = false; }
NamespaceWatcher(CuratorFrameworkImpl client, CuratorWatcher curatorWatcher, String unfixedPath) { this.client = client; this.actualWatcher = null; this.curatorWatcher = curatorWatcher; this.unfixedPath = Preconditions.checkNotNull(unfixedPath, "unfixedPath cannot be null"); }
@Test public void testRemoveCuratorWatch() throws Exception { Timing timing = new Timing(); CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder(). connectString(server.getConnectString()). retryPolicy(new RetryOneTime(1)). build(); try { client.start(); final CountDownLatch removedLatch = new CountDownLatch(1); final String path = "/"; CuratorWatcher watcher = new CuratorWatcher() { @Override public void process(WatchedEvent event) throws Exception { if(event.getPath().equals(path) && event.getType() == EventType.DataWatchRemoved) { removedLatch.countDown(); } } }; client.checkExists().usingWatcher(watcher).forPath(path); client.watches().remove(watcher).forPath(path); Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal"); } finally { CloseableUtils.closeQuietly(client); } }
@Override public AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher) { this.curatorWatcher = Objects.requireNonNull(watcher, "watcher cannot be null"); this.watcher = null; return this; }
@Override public AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher, Set<RemoveWatcherOption> options) { this.curatorWatcher = Objects.requireNonNull(watcher, "watcher cannot be null"); this.options = Objects.requireNonNull(options, "options cannot be null"); this.watcher = null; return this; }
@Override public AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher, Watcher.WatcherType watcherType, Set<RemoveWatcherOption> options) { this.curatorWatcher = Objects.requireNonNull(watcher, "watcher cannot be null"); this.options = Objects.requireNonNull(options, "options cannot be null"); this.watcherType = Objects.requireNonNull(watcherType, "watcherType cannot be null"); this.watcher = null; return this; }
@Override public AsyncPathable<AsyncStage<Void>> removing(CuratorWatcher watcher, Watcher.WatcherType watcherType) { this.curatorWatcher = Objects.requireNonNull(watcher, "watcher cannot be null"); this.watcherType = Objects.requireNonNull(watcherType, "watcherType cannot be null"); this.watcher = null; return this; }
@VisibleForTesting protected SharedValue(WatcherRemoveCuratorFramework client, String path, byte[] seedValue, CuratorWatcher watcher) { this.client = client; this.path = PathUtils.validatePath(path); this.seedValue = Arrays.copyOf(seedValue, seedValue.length); // inject watcher for testing this.watcher = watcher; currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length))); }
public CuratorWatcher createTargetChildListener(String path, ChildListener listener) { return new CuratorWatcherImpl(listener); }
public void removeTargetChildListener(String path, CuratorWatcher listener) { ((CuratorWatcherImpl) listener).unwatch(); }
@Override public CuratorWatcher createTargetChildListener(String path, ChildListener listener) { return new CuratorWatcherImpl(listener); }
@Override public void removeTargetChildListener(String path, CuratorWatcher listener) { ((CuratorWatcherImpl) listener).unwatch(); }
@Override public BackgroundPathable<T> usingWatcher(CuratorWatcher curatorWatcher) { throw new UnsupportedOperationException("Not implemented in MockCurator"); }
public CuratorWatcher getWatchObj() { return watchObj; }