public static void addChildPathCache( String path ,PathChildrenCacheListener listener ) { NameableExecutor businessExecutor = MycatServer.getInstance().getBusinessExecutor(); ExecutorService executor = businessExecutor ==null?Executors.newFixedThreadPool(5): businessExecutor; try { /** * 监听子节点的变化情况 */ final PathChildrenCache childrenCache = new PathChildrenCache(getConnection(), path, true); childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); childrenCache.getListenable().addListener(listener,executor); } catch (Exception e) { throw new RuntimeException(e); } }
@Override public PathChildrenCache createPathCache(String type, boolean cacheData, PathChildrenCacheListener listener, StartMode startMode) { try { PathChildrenCache cache = new PathChildrenCache(client, getParentPath(type), cacheData); if( listener != null ) { cache.getListenable().addListener(listener); } cache.start(startMode); return cache; } catch( Exception e ) { throw Throwables.propagate(e); } }
@PostConstruct public void listen() throws Exception { StandbyApiFactory standbyApiFactory = new StandbyApiFactoryImpl(client); PathChildrenCache pathChildrenCache = new PathChildrenCache(client, standbyApiFactory.pathApi().getJobPath(), true); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public synchronized void childEvent(CuratorFramework clientInner, PathChildrenCacheEvent event) throws Exception { if (!EventHelper.isChildUpdateEvent(event) && !EventHelper.isChildAddEvent(event)) { return; } StandbyJobData standbyJobData = new StandbyJobData(event.getData()); if (!standbyJobData.getData().isOperated()) { return; } LoggerHelper.info("begin update standby job summary " + standbyJobData.getData()); standbyJobSummaryService.updateJobSummary(standbyJobData.getData()); standbyJobLogService.updateJobLog(standbyJobData.getData()); LoggerHelper.info("update standby job summary successfully " + standbyJobData.getData()); } }); pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); }
@PostConstruct public void listen() throws Exception { MasterSlaveApiFactory masterSlaveApiFactory = new MasterSlaveApiFactoryImpl(client); PathChildrenCache pathChildrenCache = new PathChildrenCache(client, masterSlaveApiFactory.pathApi().getJobPath(), true); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public synchronized void childEvent(CuratorFramework clientInner, PathChildrenCacheEvent event) throws Exception { if (!EventHelper.isChildUpdateEvent(event) && !EventHelper.isChildAddEvent(event)) { return; } MasterSlaveJobData masterSlaveJobData = new MasterSlaveJobData(event.getData()); if (!masterSlaveJobData.getData().isOperated()) { return; } LoggerHelper.info("begin update master-slave job summary " + masterSlaveJobData.getData()); masterSlaveJobSummaryService.updateJobSummary(masterSlaveJobData.getData()); masterSlaveJobLogService.updateJobLog(masterSlaveJobData.getData()); LoggerHelper.info("update master-slave job summary successfully " + masterSlaveJobData.getData()); } }); pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); }
private PathChildrenCacheListener pathChildrenCacheListener(final ClusterListener listener) { return (client, event) -> { log.debug("Event {} generated on cluster", event); switch (event.getType()) { case CHILD_ADDED: log.info("Node {} added to cluster", getServerName(event)); listener.onEvent(HOST_ADDED, (Host) SerializationUtils.deserialize(event.getData().getData())); break; case CHILD_REMOVED: log.info("Node {} removed from cluster", getServerName(event)); listener.onEvent(HOST_REMOVED, (Host) SerializationUtils.deserialize(event.getData().getData())); break; case CHILD_UPDATED: log.warn("Invalid usage: Node {} updated externally for cluster", getServerName(event)); break; case CONNECTION_LOST: log.error("Connection lost with Zookeeper"); listener.onEvent(ERROR, null); break; //$CASES-OMITTED$ default: log.warn("Received unknown event {}", event.getType()); } }; }
private static PathChildrenCacheListener getDefaultListener() { return (client, event) -> { switch (event.getType()) { case CHILD_ADDED : { LOGGER.info("Node added: [{}]", ZKPaths.getNodeFromPath(event.getData().getPath())); break; } case CHILD_REMOVED : { LOGGER.info("Node removed: [{}]", ZKPaths.getNodeFromPath(event.getData().getPath())); break; } default : break; } }; }
private PathChildrenCacheListener newListener(final EventType eventType, final GroupChangedListener delegate) { return (client, event) -> { switch (event.getType()) { case CHILD_ADDED : { String addedMemberId = ZKPaths.getNodeFromPath(event.getData().getPath()); delegate.memberAdded(eventType, addedMemberId); break; } case CHILD_REMOVED : { final String removedMemberId = ZKPaths.getNodeFromPath(event.getData().getPath()); delegate.memberRemoved(eventType, removedMemberId); break; } default : break; } }; }
/** * This test ensures store subscribes to receive events from underlying client. Dispatcher tests ensures listeners * are fired on incoming events. These two sets of tests ensure observer pattern in {@code TransientStore} works fine. */ @Test public void testStoreRegistersDispatcherAndStartsItsClient() throws Exception { final StoreWithMockClient<String> store = new StoreWithMockClient<>(config, curator); final PathChildrenCache cache = Mockito.mock(PathChildrenCache.class); final ZookeeperClient client = store.getClient(); Mockito .when(client.getCache()) .thenReturn(cache); final ListenerContainer<PathChildrenCacheListener> container = Mockito.mock(ListenerContainer.class); Mockito .when(cache.getListenable()) .thenReturn(container); store.start(); Mockito .verify(container) .addListener(store.dispatcher); Mockito .verify(client) .start(); }
/** * Create mock {@link PathChildrenCache} using given controller ID and DPIDs. * * @param controllerId Controller ID to represent current data. * @param paths List of HexString indicating switch's DPID. * @param listener Callback object to be set as Listenable. * @return Mock PathChildrenCache object * @throws Exception */ private PathChildrenCache createPathChildrenCacheMock( final String controllerId, final String[] paths, ListenerContainer<PathChildrenCacheListener> listener) throws Exception { PathChildrenCache pathChildrenCache = createMock(PathChildrenCache.class); expect(pathChildrenCache.getListenable()).andReturn(listener).anyTimes(); pathChildrenCache.start(anyObject(StartMode.class)); expectLastCall().anyTimes(); List<ChildData> childs = new ArrayList<ChildData>(); for (String path : paths) { childs.add(createChildDataMockForCurrentData(controllerId, path)); } expect(pathChildrenCache.getCurrentData()).andReturn(childs).anyTimes(); pathChildrenCache.rebuild(); expectLastCall().anyTimes(); replay(pathChildrenCache); return pathChildrenCache; }
@Override public void addDataListener(String path, DataListener listener) { try { // 第一步:获取-校验-创建监听器 PathChildrenCacheListener pathChildrenCacheListener = dataListenerMap.get(listener); if(pathChildrenCacheListener != null){//已监听 return; } else { // 添加外部监听器 Set<DataListener> dataListenerSet = dataListenersMap.get(path); if(dataListenerSet == null){ dataListenersMap.put(path, dataListenerSet = new ConcurrentHashSet<DataListener>()); } dataListenerSet.add(listener); dataListenerMap.put(listener, pathChildrenCacheListener = new PathChildrenCacheListenerImpl(path)); } // 第二步:获取-校验-创建子节点缓存连接 PathChildrenCache pathChildrenCache = pathChildrenCacheMap.get(path); if(pathChildrenCache == null){ pathChildrenCacheMap.put(path, pathChildrenCache = new PathChildrenCache(client, path, true)); // 第三步:启动监听 pathChildrenCache.start(StartMode.POST_INITIALIZED_EVENT); } // 第四步:添加监听器 pathChildrenCache.getListenable().addListener(pathChildrenCacheListener); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
@Override public void removeDataListener(String path, DataListener listener) { try { // 第一步:移除dataListenerMap中的数据 PathChildrenCacheListener pathChildrenCacheListener = dataListenerMap.get(listener); if(pathChildrenCacheListener == null){ return; } else { dataListenerMap.remove(listener); // 第二步:移除Set<DataListener>中的数据 Set<DataListener> dataListenerSet = dataListenersMap.get(path); if(dataListenerSet != null && dataListenerSet.contains(listener)){ dataListenerSet.remove(listener); } // 第三步:移除dataListenersMap和childDataMap中的数据 if(dataListenerSet == null || dataListenerSet.isEmpty()){ dataListenersMap.remove(path); childDataMap.remove(path); } } // 第四步:取消监听,并移除pathChildrenCacheMap中的数据 PathChildrenCache pathChildrenCache = pathChildrenCacheMap.get(path); if(pathChildrenCache != null){ pathChildrenCache.getListenable().removeListener(pathChildrenCacheListener); ((PathChildrenCacheListenerImpl)listener).unwatch(); if(pathChildrenCache.getListenable().size() == 0){ pathChildrenCacheMap.remove(path); pathChildrenCache.close(); } } } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } }
public static void addChildPathCache(String path, PathChildrenCacheListener listener) { try { //watch the child status final PathChildrenCache childrenCache = new PathChildrenCache(getConnection(), path, true); childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); childrenCache.getListenable().addListener(listener); } catch (Exception e) { throw new RuntimeException(e); } }
public static void addViewPathCache(String path, PathChildrenCacheListener listener) { try { //watch the child status final PathChildrenCache childrenCache = new PathChildrenCache(getConnection(), path, true); childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); childrenCache.getListenable().addListener(listener); } catch (Exception e) { throw new RuntimeException(e); } }
private static void addListener(PathChildrenCache cache) { // a PathChildrenCacheListener is optional. Here, it's used just to log changes PathChildrenCacheListener listener = new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch ( event.getType() ) { case CHILD_ADDED: { System.out.println("Node added: " + ZKPaths.getNodeFromPath(event.getData().getPath())); break; } case CHILD_UPDATED: { System.out.println("Node changed: " + ZKPaths.getNodeFromPath(event.getData().getPath())); break; } case CHILD_REMOVED: { System.out.println("Node removed: " + ZKPaths.getNodeFromPath(event.getData().getPath())); break; } } } }; cache.getListenable().addListener(listener); }
@Override @SneakyThrows public void registerBucketOwnershipListener(BucketOwnershipListener listener) { Preconditions.checkNotNull(listener); PathChildrenCacheListener bucketListener = (client, event) -> { switch (event.getType()) { case CHILD_ADDED: // no action required break; case CHILD_REMOVED: int bucketId = Integer.parseInt(ZKPaths.getNodeFromPath(event.getData().getPath())); listener.notify(new BucketNotification(bucketId, BucketNotification.NotificationType.BucketAvailable)); break; case CONNECTION_LOST: listener.notify(new BucketNotification(Integer.MIN_VALUE, BucketNotification.NotificationType.ConnectivityError)); break; default: log.warn("Received unknown event {}", event.getType()); } }; bucketOwnershipCacheRef.compareAndSet(null, new PathChildrenCache(storeHelper.getClient(), ZKStoreHelper.BUCKET_OWNERSHIP_PATH, true)); bucketOwnershipCacheRef.get().getListenable().addListener(bucketListener); bucketOwnershipCacheRef.get().start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); log.info("bucket ownership listener registered"); }
@Override @SneakyThrows public void registerBucketChangeListener(int bucket, BucketChangeListener listener) { Preconditions.checkNotNull(listener); PathChildrenCacheListener bucketListener = (client, event) -> { StreamImpl stream; switch (event.getType()) { case CHILD_ADDED: stream = getStreamFromPath(event.getData().getPath()); listener.notify(new StreamNotification(stream.getScope(), stream.getStreamName(), NotificationType.StreamAdded)); break; case CHILD_REMOVED: stream = getStreamFromPath(event.getData().getPath()); listener.notify(new StreamNotification(stream.getScope(), stream.getStreamName(), NotificationType.StreamRemoved)); break; case CHILD_UPDATED: stream = getStreamFromPath(event.getData().getPath()); listener.notify(new StreamNotification(stream.getScope(), stream.getStreamName(), NotificationType.StreamUpdated)); break; case CONNECTION_LOST: listener.notify(new StreamNotification(null, null, NotificationType.ConnectivityError)); break; default: log.warn("Received unknown event {} on bucket", event.getType(), bucket); } }; String bucketRoot = String.format(ZKStoreHelper.BUCKET_PATH, bucket); bucketCacheMap.put(bucket, new PathChildrenCache(storeHelper.getClient(), bucketRoot, true)); PathChildrenCache pathChildrenCache = bucketCacheMap.get(bucket); pathChildrenCache.getListenable().addListener(bucketListener); pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL); log.info("bucket {} change notification listener registered", bucket); }
@Override public void start() { super.start(); Preconditions.checkState(null != cache, "Implementation changed. Cache should be set - check super GroupMember"); final PathChildrenCacheListener cacheListener = null != listener ? listener : getDefaultListener(); cache.getListenable().addListener(cacheListener); }
@Override public List<String> listenChildrenPath(final String parent, final NodeListener listener, final boolean sync) throws Exception { PathChildrenCache cache = new PathChildrenCache(client, parent, false, false, EVENT_THREAD_POOL); cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework c, PathChildrenCacheEvent e) throws Exception { if (e.getData() == null) { return; } switch (e.getType()) { case CHILD_ADDED: listener.nodeChanged(ZKClientImpl.this, new ChangedEvent(e.getData().getPath(), ChangedEvent.Type.CHILD_ADDED)); break; case CHILD_REMOVED: listener.nodeChanged(ZKClientImpl.this, new ChangedEvent(e.getData().getPath(), ChangedEvent.Type.CHILD_REMOVED)); break; case CHILD_UPDATED: listener.nodeChanged(ZKClientImpl.this, new ChangedEvent(e.getData().getPath(), ChangedEvent.Type.CHILD_UPDATED)); break; } } }, SAME_EXECUTOR); PathChildrenCache.StartMode mode = sync ? PathChildrenCache.StartMode.BUILD_INITIAL_CACHE : PathChildrenCache.StartMode.NORMAL; cache.start(mode); List<ChildData> children = cache.getCurrentData(); List<String> result = new ArrayList<String>(); for (ChildData child : children) { result.add(child.getPath()); } return result; }
/** * Returns a future that waits for the particular 'eventType' to happen in the * path tracked by 'cache_'. */ private Future<Void> createEventFuture(final Type eventType) throws InterruptedException { final Status status = new Status(); PathChildrenCacheListener listener = new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception { if (event.getType() == eventType) { status.done = true; } } }; cache_.getListenable().clear(); cache_.getListenable().addListener(listener); return executor_.submit(new Callable<Void>() { @Override public Void call() throws Exception { // Wait infinitely until the event is triggered while (!status.done) { TimeUnit.SECONDS.sleep(1); } return null; } }); }
@Override public Closeable watchLocks(String lockPathRoot, Executor executor, final Watcher watcher) { lockPathRoot = norm(lockPathRoot); PathChildrenCache cache = new PathChildrenCache(curator, lockPathRoot, true); try { cache.start(); cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: watcher.onLock(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8"))); break; case CHILD_REMOVED: watcher.onUnlock(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8"))); break; default: break; } } }, executor); } catch (Exception ex) { logger.error("Error to watch lock path " + lockPathRoot, ex); } return cache; }
private RoundRobinNedisClientPool(CuratorFramework curatorClient, boolean closeCurator, String zkProxyDir, NedisClientPoolBuilder poolBuilder) throws Exception { this.curatorClient = curatorClient; this.closeCurator = closeCurator; this.poolBuilder = poolBuilder; EventLoop eventLoop = poolBuilder.group().next(); this.closePromise = eventLoop.newPromise(); this.initPromise = eventLoop.newPromise(); watcher = new PathChildrenCache(curatorClient, zkProxyDir, true); watcher.getListenable().addListener(new PathChildrenCacheListener() { private boolean initialized = false; @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { StringBuilder sb = new StringBuilder("Zookeeper event received: type=") .append(event.getType()); if (event.getData() != null) { ChildData data = event.getData(); sb.append(", path=").append(data.getPath()).append(", stat=") .append(data.getStat()); } LOG.info(sb.toString()); if (!initialized) { if (event.getType() == INITIALIZED) { resetPools(); initPromise.trySuccess(RoundRobinNedisClientPool.this); initialized = true; } } else if (RESET_TYPES.contains(event.getType())) { resetPools(); } } }); watcher.start(StartMode.POST_INITIALIZED_EVENT); }
public void listenChildren(String path, ZNodeEventHandler handler) throws Exception { init(); PathChildrenCache cache = new PathChildrenCache(client, path, true); cache.start(); PathChildrenCacheListener listener = new ZNodeListener(handler); cache.getListenable().addListener(listener); }
public void listenForChildren(String path, ZookeeperEventListener listener) throws Exception { init(); PathChildrenCache cache = new PathChildrenCache(client, path, true); cache.start(); PathChildrenCacheListener cacheListener = new ZNodeListener( listener.getHandler()); cache.getListenable().addListener(cacheListener); }
private static void addListener(PathChildrenCache cache) { // a PathChildrenCacheListener is optional. Here, it's used just to log // changes PathChildrenCacheListener listener = new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: { System.out.println("Node added: " + ZKPaths.getNodeFromPath(event.getData().getPath()) + ", value: " + new String(event.getData().getData())); break; } case CHILD_UPDATED: { System.out.println("Node changed: " + ZKPaths.getNodeFromPath(event.getData().getPath()) + ", value: " + new String(event.getData().getData())); break; } case CHILD_REMOVED: { System.out.println("Node removed: " + ZKPaths.getNodeFromPath(event.getData().getPath())); break; } default: break; } } }; cache.getListenable().addListener(listener); }
@SuppressWarnings("resource") @Override public void execute(CuratorFramework client) { final PathChildrenCache cache = new PathChildrenCache(client, serverPath, false); cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework zkClient, PathChildrenCacheEvent event) throws Exception { int maxtried = loadMaxTried(zkClient, serverPath + "maxtried"); JedisPoolConfig poolConfig = loadConfig(zkClient); String changedPath = null; switch (event.getType()) { case CHILD_ADDED: changedPath = event.getData().getPath(); String server = ZKPaths.getNodeFromPath(changedPath); JedisClient c = createClient(server, poolConfig, maxtried); master = c; LOG.info("server added:" + c.toString()); break; case CHILD_REMOVED: changedPath = event.getData().getPath(); server = ZKPaths.getNodeFromPath(changedPath); c = createClient(server, poolConfig, maxtried); master = null; LOG.info("server removed:" + c.toString()); break; default: break; } LOG.info("servers:" + master); } }); try { cache.start(); } catch (Exception e) { LOG.error("Start PathChildrenCache error for path: {}, error info: {}", serverPath, e.getMessage()); } }
@Override public void init() { final PathChildrenCache pathChildren = _configuration.getPathChildren(); if (pathChildren != null) { pathChildren.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { ChildData data = event.getData(); String path = null; if (data != null) { path = data.getPath(); String key = path.replace(_configuration.getPath() + "/", ""); String value = new String(data.getData()); switch (event.getType()) { case CHILD_ADDED: _configuration.addProperty(key, value); break; case CHILD_UPDATED: _configuration.setProperty(key, value); break; case CHILD_REMOVED: _configuration.clearProperty(key); break; default: break; } } } }); } }
@Override public void start() throws Exception { starterStopper.start(); Preconditions.checkArgument(framework.getState() == CuratorFrameworkState.STARTED); if (framework.checkExists().forPath(serviceNode) == null) { framework.create().creatingParentsIfNeeded().forPath(serviceNode); } cache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: log.info("`{}` service node added: {}", serviceName, event.getData().getPath()); break; case CHILD_UPDATED: break; case CHILD_REMOVED: log.info("`{}` service node removed: {}", serviceName, event.getData().getPath()); break; case CONNECTION_SUSPENDED: break; case CONNECTION_RECONNECTED: break; case CONNECTION_LOST: break; case INITIALIZED: break; } } }); cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); }
public AlarmEPLManager(Map conf, CountDownLatch latch) { this.conf = conf; this.latch = latch; this.configuration = new Configuration(); this.addMethods(configuration); this.addDbs(); this.epService = EPServiceProviderManager.getDefaultProvider(configuration); this.admin = epService.getEPAdministrator(); this.addSchemas(); this.addEpls(); this.start(); curator = CuratorFrameworkFactory .newClient(com.jzsec.rtc.config.Configuration.getConfig().getString("rtc.zk.hosts"), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)), new RetryNTimes( Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)) )); curator.start(); createPath(); ExecutorService pool = Executors.newFixedThreadPool(2); final PathChildrenCache pathChildrenCache = new PathChildrenCache(curator, "/alarm", true); try { pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework framework, PathChildrenCacheEvent event) throws Exception { List<ChildData> childDataList = pathChildrenCache.getCurrentData(); if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED || event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) { if (childDataList != null && childDataList.size() > 0) { // update and reload single rule by businessScope in future List<Map<Object, Object>> zkDataList = new ArrayList<Map<Object, Object>>(); for (ChildData childData : childDataList) { LOG.info("==" + childData.getPath() + " changed," + new String(childData.getData(), "UTF-8")); String data = new String(childData.getData(), "UTF-8"); if(!StringUtils.isEmpty(data)) { System.out.println("==" + childData.getPath() + " changed," + new String(childData.getData(), "UTF-8")); Map<Object, Object> zkData = (Map<Object, Object>) JSONValue.parse(data); zkDataList.add(zkData); } } if(zkDataList.size() > 0) refresh(zkDataList); } } } }, pool); } catch (Exception e) { e.printStackTrace(); } }
public EPLManager(Map conf, OutputCollector collector, CountDownLatch latch) { if(collector != null) this.collector = collector; this.conf = conf; this.latch = latch; this.configuration = new com.espertech.esper.client.Configuration(); this.configuration.getEngineDefaults().getThreading().setInsertIntoDispatchPreserveOrder(false); this.addMethods(configuration); this.addDbs(); this.epService = EPServiceProviderManager.getDefaultProvider(configuration); this.admin = epService.getEPAdministrator(); this.addSchemas(null); this.addEpls(); this.start(); curator = CuratorFrameworkFactory .newClient(com.jzsec.rtc.config.Configuration.getConfig().getString("rtc.zk.hosts"), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)), new RetryNTimes( Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)) )); curator.start(); createPath(); ExecutorService pool = Executors.newFixedThreadPool(2); final PathChildrenCache pathChildrenCache = new PathChildrenCache(curator, "/risk", true); try { pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework framework, PathChildrenCacheEvent event) throws Exception { List<ChildData> childDataList = pathChildrenCache.getCurrentData(); if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED || event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) { if (childDataList != null && childDataList.size() > 0) { // update and reload single rule by businessScope in future List<Map<Object, Object>> zkDataList = new ArrayList<Map<Object, Object>>(); for (ChildData childData : childDataList) { LOG.info("==" + childData.getPath() + " changed," + new String(childData.getData(), "UTF-8")); String data = new String(childData.getData(), "UTF-8"); if(!StringUtils.isEmpty(data)) { System.out.println("==" + childData.getPath() + " changed," + new String(childData.getData(), "UTF-8")); Map<Object, Object> zkData = (Map<Object, Object>) JSONValue.parse(data); if(!zkData.containsKey("type")) { String childPath = childData.getPath(); zkData.put("type", childPath.substring(childPath.lastIndexOf("/") + 1)); } zkDataList.add(zkData); } } if(zkDataList.size() > 0) refresh(zkDataList); } } } }, pool); } catch (Exception e) { e.printStackTrace(); } }
@Override public PathChildrenCache createPathCache(String type, boolean cacheData, PathChildrenCacheListener listener) { return createPathCache(type, cacheData, listener, StartMode.NORMAL); }
PathChildrenCache createPathCache(String type, boolean cacheData, PathChildrenCacheListener listener, StartMode startMode);
@Override public void addListener(PathChildrenCacheListener listener) { if (useCache || useCacheWhenNotConnectedToDataSource) { cache.getListenable().addListener(listener); } }
@Test public void test_path_cache() throws Exception { CuratorFramework zk = curator.usingNamespace("namespace-test"); String groupPath = "/group-1"; String s = zk.create().forPath(groupPath); Assert.assertEquals(groupPath, s); Stat stat = zk.checkExists().forPath("/group-1"); Assert.assertNotNull(stat); stat = zk.checkExists().forPath("/namespace-test/group-1"); Assert.assertNull(stat); final PathChildrenCacheEvent.Type[] saveEventType = new PathChildrenCacheEvent.Type[1]; final long[] saveTime = new long[1]; PathChildrenCache pcc = new PathChildrenCache(zk, groupPath, true); pcc.start(); pcc.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { logger.info("event type={}", event.getType()); switch (event.getType()) { case CHILD_ADDED: saveEventType[0] = PathChildrenCacheEvent.Type.CHILD_ADDED; saveTime[0] = System.currentTimeMillis(); logger.info("child[path={}, date={}] added", event.getData().getPath(), new String(event.getData().getData())); break; case CHILD_UPDATED: saveEventType[0] = PathChildrenCacheEvent.Type.CHILD_UPDATED; saveTime[0] = System.currentTimeMillis(); logger.info("child[path={}, date={}] updated", event.getData().getPath(), new String(event.getData().getData())); break; } } }); String hostPath = groupPath + "/localhost:8001"; zk.create().forPath(hostPath); long wtStart = System.currentTimeMillis(); Thread.sleep(30); // use 15 ms // System.out.println("listener wait time="+(saveTime[0] - wtStart)); Assert.assertEquals(PathChildrenCacheEvent.Type.CHILD_ADDED, saveEventType[0]); // System.out.println(new String(zk.getData().forPath(hostPath))); // create three node String threePath = groupPath + "/hosts/localhost:8001"; zk.create().creatingParentsIfNeeded().forPath(threePath); zk.setData().forPath(threePath, "{tree}".getBytes()); // test update zk.setData().forPath(hostPath, "{}".getBytes()); Thread.sleep(30); Assert.assertEquals(PathChildrenCacheEvent.Type.CHILD_UPDATED, saveEventType[0]); // test set parent node's data zk.setData().forPath(groupPath, "{grou-data}".getBytes()); Thread.sleep(30); Assert.assertEquals(PathChildrenCacheEvent.Type.CHILD_UPDATED, saveEventType[0]); }
public void add(Path path, PathChildrenCacheListener listener) { directoryListeners.put(path, listener); }
@Override public void addListener(PathChildrenCacheListener listener) { listeners.add(path, listener); }
@Override public void addListener(PathChildrenCacheListener listener) { wrapped.getListenable().addListener(listener); }
public ZKGroupMember(final CuratorFramework client, final String membershipPath, final String thisId, final byte[] payload, final PathChildrenCacheListener listener) { super(client, membershipPath, thisId, payload); this.listener = listener; }
/** * Creates a cache for the child nodes, and subscribes for any updates. * (add, update, delete). * * @param path Path of the parent node. * @param pathUpdateListener Floe's Path update listener. * @param cacheData If true, each child node's data is cached * along with the stat information. * @return Curator client's cache object. TODO: Change this to higher * level abstraction. */ public PathChildrenCache cacheAndSubscribeChildren( final String path, final PathChildrenUpdateListener pathUpdateListener, final boolean cacheData) { PathChildrenCache cache = new PathChildrenCache( curatorClient, path, cacheData ); if (pathUpdateListener != null) { PathChildrenCacheListener cacheListener = new PathChildrenCacheListener() { @Override public void childEvent( final CuratorFramework curatorFramework, final PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { switch (pathChildrenCacheEvent.getType()) { case CHILD_ADDED: pathUpdateListener.childAdded( pathChildrenCacheEvent.getData() ); break; case CHILD_UPDATED: pathUpdateListener.childUpdated( pathChildrenCacheEvent.getData() ); break; case CHILD_REMOVED: pathUpdateListener.childRemoved( pathChildrenCacheEvent.getData() ); break; case INITIALIZED: pathUpdateListener.childrenListInitialized( pathChildrenCacheEvent.getInitialData() ); break; default: /* Ignore other pathChildrenEvents. These are handled internally by the curator's zkcache framework. */ } } }; cache.getListenable().addListener(cacheListener); } try { cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); } catch (Exception e) { e.printStackTrace(); LOGGER.error("Could not start cache client."); throw new RuntimeException(e); } return cache; }
@SuppressWarnings("resource") @Override public void execute(CuratorFramework client) { PathChildrenCache cache = new PathChildrenCache(client, serverPath, true); cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework zkClient, PathChildrenCacheEvent event) throws Exception { int maxtried = loadMaxTried(zkClient, serverPath + "maxtried"); JedisPoolConfig poolConfig = loadConfig(zkClient); String changedPath = null; switch (event.getType()) { case CHILD_ADDED: changedPath = event.getData().getPath(); String server = ZKPaths.getNodeFromPath(changedPath); if (server.equals("config")) { break; } JedisClient c = createClient(server, poolConfig, maxtried); synchronized (clients) { clients.add(c); LOG.info("server added:" + c.toString()); } break; case CHILD_REMOVED: changedPath = event.getData().getPath(); server = ZKPaths.getNodeFromPath(changedPath); c = createClient(server, poolConfig, maxtried); synchronized (clients) { clients.remove(c); LOG.info("server removed:" + c.toString()); } break; default: break; } LOG.info("server size: "+ clients.size() +", servers:" + clients); } }); try { cache.start(); } catch (Exception e) { LOG.error("Start PathChildrenCache error for path: {}, error info: {}", serverPath, e.getMessage()); } }