private void processTokenAddOrUpdate(ChildData data) throws IOException { ByteArrayInputStream bin = new ByteArrayInputStream(data.getData()); DataInputStream din = new DataInputStream(bin); TokenIdent ident = createIdentifier(); ident.readFields(din); long renewDate = din.readLong(); int pwdLen = din.readInt(); byte[] password = new byte[pwdLen]; int numRead = din.read(password, 0, pwdLen); if (numRead > -1) { DelegationTokenInformation tokenInfo = new DelegationTokenInformation(renewDate, password); synchronized (this) { currentTokens.put(ident, tokenInfo); // The cancel task might be waiting notifyAll(); } } }
/** * add instance; * * @param data */ private void addChild(ChildData data) { if (data == null || ArrayUtils.isEmpty(data.getData())) { return; } try { ServiceInstance<RpcPayload> instance = serializer.deserialize(data .getData()); this.onInstanceAdded(instance); } catch (Exception ex) { LOG.error("Could not add zk node " + data.getPath() + " to pool.", ex); } }
@Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { ChildData data = event.getData(); switch (event.getType()) { case CHILD_ADDED: add(data.getPath().substring(data.getPath().lastIndexOf("/")+1),event.getData().getData()) ; break; case CHILD_REMOVED: delete(data.getPath().substring(data.getPath().lastIndexOf("/")+1),event.getData().getData()); ; break; case CHILD_UPDATED: add(data.getPath().substring(data.getPath().lastIndexOf("/")+1),event.getData().getData()) ; break; default: break; } }
private void startInitialTasks() { if( !startedInitialTasks ) { List<ChildData> currentData = taskCache.getCurrentData(); List<String> taskIds = Lists.newArrayList(); for( ChildData task : currentData ) { String taskId = ZKPaths.getNodeFromPath(task.getPath()); LOGGER.debug("Task added " + taskId); taskIds.add(taskId); addRunner(taskId); } requestFullStatus(taskIds); startedInitialTasks = true; } }
@Override public void onChanged(TreeCacheEvent event) { ChildData data = event.getData(); if (data == null) { return; } String path = data.getPath(); switch (event.getType()) { case NODE_ADDED: case NODE_REMOVED: case NODE_UPDATED: { boolean isFullHostPath = StringUtils.isNotBlank(path) && path.split(RedirectorConstants.DELIMETER).length == 7; if (isFullHostPath && path.contains(RedirectorConstants.DELIMETER + applicationName + RedirectorConstants.DELIMETER)) { snapshotNeeded = true; } break; } } }
@Override public byte[] getCurrentData() { byte[] result = null; if (isCacheUsageAllowed()) { ChildData data = cache.getCurrentData(); if (data != null) { result = data.getData(); } } else { try { result = dataIsCompressed ? connector.getDataDecompressed(path) : connector.getData(path); } catch (DataSourceConnectorException e) { log.warn("Failed to get data for zkPath={} {}", path); } } return result; }
@Override public int getCurrentDataVersion() { int result = RedirectorConstants.NO_MODEL_NODE_VERSION; if (isCacheUsageAllowed()) { ChildData data = cache.getCurrentData(); if (data != null && data.getStat() != null) { result = data.getStat().getVersion(); } } else { try { if (connector.isPathExists(path)) { result = connector.getNodeVersion(path); } else { log.warn("Node {} does not exist", path); } } catch (DataSourceConnectorException e) { log.error("Failed to get data for zkPath={} {}", path, e); } } return result; }
/** * Create a zookeeper-based namespace service * @param client the curator client * @param cache the treecache * @param filePath the file */ public Namespaces(final CuratorFramework client, final TreeCache cache, final String filePath) { requireNonNull(cache, "TreeCache may not be null!"); this.client = client; this.cache = cache; try { this.client.create().orSetData().forPath(ZNODE_NAMESPACES); this.cache.getListenable().addListener((c, e) -> { final Map<String, ChildData> tree = cache.getCurrentChildren(ZNODE_NAMESPACES); readTree(tree).forEach(data::put); }); init(filePath).forEach(data::put); } catch (final Exception ex) { LOGGER.error("Could not create a zk node cache: {}", ex); throw new RuntimeTrellisException(ex); } }
@Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { ChildData childData = event.getData(); switch (event.getType()) { case CHILD_ADDED: try { lockTableByNewNode(childData); } catch (Exception e) { LOGGER.info("CHILD_ADDED error", e); } break; case CHILD_UPDATED: updateMeta(childData); break; case CHILD_REMOVED: deleteNode(childData); break; default: break; } }
private void lockTableByNewNode(ChildData childData) throws Exception { String data = new String(childData.getData(), StandardCharsets.UTF_8); LOGGER.info("DDL node " + childData.getPath() + " created , and data is " + data); DDLInfo ddlInfo = new DDLInfo(data); final String fromNode = ddlInfo.getFrom(); if (fromNode.equals(ZkConfig.getInstance().getValue(ZkParamCfg.ZK_CFG_MYID))) { return; //self node } if (DDLStatus.INIT != ddlInfo.getStatus()) { return; } String nodeName = childData.getPath().substring(childData.getPath().lastIndexOf("/") + 1); String[] tableInfo = nodeName.split("\\."); final String schema = StringUtil.removeBackQuote(tableInfo[0]); final String table = StringUtil.removeBackQuote(tableInfo[1]); try { DbleServer.getInstance().getTmManager().addMetaLock(schema, table); } catch (Exception t) { DbleServer.getInstance().getTmManager().removeMetaLock(schema, table); throw t; } }
@Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { ChildData childData = event.getData(); switch (event.getType()) { case CHILD_ADDED: createOrUpdateViewMeta(childData, false); break; case CHILD_UPDATED: createOrUpdateViewMeta(childData, true); break; case CHILD_REMOVED: deleteNode(childData); break; default: break; } }
/** * update the meta if the view updated */ private void createOrUpdateViewMeta(ChildData childData, boolean isReplace) throws Exception { String path = childData.getPath(); String[] paths = path.split("/"); String jsonValue = new String(childData.getData(), StandardCharsets.UTF_8); JSONObject obj = (JSONObject) JSONObject.parse(jsonValue); //if the view is create or replace by this server it self String serverId = obj.getString(SERVER_ID); if (serverId.equals(ZkConfig.getInstance().getValue(ZkParamCfg.ZK_CFG_MYID))) { return; } String createSql = obj.getString(CREATE_SQL); String schema = paths[paths.length - 1].split(SCHEMA_VIEW_SPLIT)[0]; ViewMeta vm = new ViewMeta(createSql, schema, DbleServer.getInstance().getTmManager()); vm.initAndSet(isReplace); }
@Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { ChildData data = event.getData(); switch (event.getType()) { case CHILD_ADDED: add(data, false); break; case CHILD_UPDATED: add(data, true); break; case CHILD_REMOVED: delete(data); break; default: break; } }
@Override public void childEvent(CuratorFramework curator, TreeCacheEvent event) throws Exception { ChildData data = event.getData(); if (data == null) return; String dataPath = data.getPath(); if (Strings.isNullOrEmpty(dataPath)) return; if (dataPath.startsWith(watchPath)) { switch (event.getType()) { case NODE_ADDED: listener.onServiceAdded(dataPath, Jsons.fromJson(data.getData(), CommonServiceNode.class)); break; case NODE_REMOVED: listener.onServiceRemoved(dataPath, Jsons.fromJson(data.getData(), CommonServiceNode.class)); break; case NODE_UPDATED: listener.onServiceUpdated(dataPath, Jsons.fromJson(data.getData(), CommonServiceNode.class)); break; } Logs.RSD.info("ZK node data change={}, nodePath={}, watchPath={}, ns={}"); } }
@Override public void get(K k, Handler<AsyncResult<ChoosableIterable<V>>> asyncResultHandler) { if (!keyIsNull(k, asyncResultHandler)) { vertx.runOnContext(event -> { Map<String, ChildData> maps = curatorCache.getCurrentChildren(keyPath(k)); ChoosableSet<V> choosableSet = new ChoosableSet<>(0); if (maps != null) { for (ChildData childData : maps.values()) { try { if (childData != null && childData.getData() != null && childData.getData().length > 0) choosableSet.add(asObject(childData.getData())); } catch (Exception ex) { asyncResultHandler.handle(Future.failedFuture(ex)); } } } asyncResultHandler.handle(Future.succeededFuture(choosableSet)); }); } }
@Override public void start() throws Exception { // Create the zookeeper node createNode(); // Initial data load (avoid race conditions w/"NodeCache.start(true)") updateFromZkBytes(_curator.getData().forPath(_zkPath), _defaultValue); // Re-load the data and watch for changes. _nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { ChildData childData = _nodeCache.getCurrentData(); if (childData != null) { updateFromZkBytes(childData.getData(), _defaultValue); } } }); _nodeCache.start(); }
private void dumpDirectly(final String path, final List<String> result) { for (String each : coordinatorRegistryCenter.getChildrenKeys(path)) { String zkPath = path + "/" + each; String zkValue = coordinatorRegistryCenter.get(zkPath); if (null == zkValue) { zkValue = ""; } TreeCache treeCache = (TreeCache) coordinatorRegistryCenter.getRawCache("/" + jobName); ChildData treeCacheData = treeCache.getCurrentData(zkPath); String treeCachePath = null == treeCacheData ? "" : treeCacheData.getPath(); String treeCacheValue = null == treeCacheData ? "" : new String(treeCacheData.getData()); if (zkValue.equals(treeCacheValue) && zkPath.equals(treeCachePath)) { result.add(Joiner.on(" | ").join(zkPath, zkValue)); } else { result.add(Joiner.on(" | ").join(zkPath, zkValue, treeCachePath, treeCacheValue)); } dumpDirectly(zkPath, result); } }
private ChildData getConfigNode() { return Iterables.find ( cache.getCurrentData(), new Predicate<ChildData>() { @Override public boolean apply(ChildData data) { return ZKPaths.getNodeFromPath(data.getPath()).equals(CONFIG_NODE_NAME); } }, null ); }
@Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { ChildData data = event.getData(); if(data==null || data.getData()==null){ return; } SlaveNode slaveNode = SlaveNode.parse(JSON.parseObject(data.getData(),JSONObject.class)); if(slaveNode==null){ LOGGER.error("get a null slaveNode with eventType={},path={},data={}",event.getType(),data.getPath(),data.getData()); }else { switch (event.getType()) { case CHILD_ADDED: slaveNodeMap.put(slaveNode.getId(), slaveNode); LOGGER.info("CHILD_ADDED with path={},data={},current slaveNode size={}", data.getPath(), new String(data.getData(),CharsetUtil.UTF_8),slaveNodeMap.size()); break; case CHILD_REMOVED: slaveNodeMap.remove(slaveNode.getId()); LOGGER.info("CHILD_REMOVED with path={},data={},current slaveNode size={}", data.getPath(), new String(data.getData(),CharsetUtil.UTF_8),slaveNodeMap.size()); break; case CHILD_UPDATED: slaveNodeMap.replace(slaveNode.getId(), slaveNode); LOGGER.info("CHILD_UPDATED with path={},data={},current slaveNode size={}", data.getPath(), new String(data.getData(),CharsetUtil.UTF_8),slaveNodeMap.size()); break; default: break; } } }
private void processTokenRemoved(ChildData data) throws IOException { ByteArrayInputStream bin = new ByteArrayInputStream(data.getData()); DataInputStream din = new DataInputStream(bin); TokenIdent ident = createIdentifier(); ident.readFields(din); synchronized (this) { currentTokens.remove(ident); // The cancel task might be waiting notifyAll(); } }
/** * remove instance; * * @param data */ private void removeChild(ChildData data) { if (data == null) { return; } try { ServiceInstance<RpcPayload> instance = serializer.deserialize(data .getData()); this.onInstanceRemoved(instance); } catch (Exception ex) { LOG.error("Could not remove zk node " + data.getPath() + " from pool.", ex); } }
private void _initWatch() throws Exception { PathChildrenCache watcher = new PathChildrenCache( client, STORE_PATH, true // if cache data ); watcher.getListenable().addListener((client1, event) -> { try { rwlock.writeLock().lock(); ChildData data = event.getData(); if (data == null) { System.out.println("No data in event[" + event + "]"); } else { System.out.println("Receive event: " + "type=[" + event.getType() + "]" + ", path=[" + data.getPath() + "]" + ", data=[" + new String(data.getData()) + "]" + ", stat=[" + data.getStat() + "]"); if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED || event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) { String path = data.getPath(); if (path.startsWith(STORE_PATH)) { String key = path.replace(STORE_PATH + "/", ""); String dataStr = new String(data.getData(), "utf-8"); storeMap.put(key, dataStr); } } } }finally { rwlock.writeLock().unlock(); } }); watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); }
private Map<String, ChildData> getPropertyMap(String propertyServicePath) { Map<String, ChildData> propertyMap = treeCache.getCurrentChildren(propertyServicePath); if (propertyMap == null) { String error = "We can't get properties from ZooKeeper! " + "Is the propertyServicePath configured correctly? " + "Is ZooKeeper available? " + "Has ZooKeeper been pre-loaded with the necessary properties?" + "Has the treeCache been initialised?"; LOGGER.error(error); throw new RuntimeException(error); } else { return propertyMap; } }
private String childDataToLogString(final ChildData childData) { return new StringBuilder() .append(childData.getPath()) .append(" - ") .append(Bytes.toString(childData.getData())) .toString(); }
private static void checkTreeCache(CuratorFramework curatorFramework, String path) throws Exception { final Semaphore semaphore = new Semaphore(0); TreeCache treeCache = TreeCache.newBuilder(curatorFramework, path) .setCacheData(true) .setMaxDepth(3) .build(); if (treeCache == null) { LOGGER.error("treeCache is null"); } treeCache.getListenable().addListener((client, event) -> { if (event.getType().equals(TreeCacheEvent.Type.INITIALIZED)) { semaphore.release(); } }); treeCache.start(); semaphore.tryAcquire(2, TimeUnit.SECONDS); Map<String, ChildData> map = treeCache.getCurrentChildren("/propertyService"); if (map == null) { LOGGER.error("map is null"); } map.entrySet().forEach(entry -> { LOGGER.info("{} - {}", entry.getKey(), Bytes.toString(entry.getValue().getData())); }); }
@Override public String get(final String key) { TreeCache cache = findTreeCache(key); if (null == cache) { return getDirectly(key); } ChildData resultInCache = cache.getCurrentData(key); if (null != resultInCache) { return null == resultInCache.getData() ? null : new String(resultInCache.getData(), StandardCharsets.UTF_8); } return getDirectly(key); }
@Test public void assertChildEventWhenIsNotConfigPath() throws Exception { cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/other/test_job", null, "".getBytes()))); verify(producerManager, times(0)).schedule(ArgumentMatchers.<CloudJobConfiguration>any()); verify(producerManager, times(0)).reschedule(ArgumentMatchers.<String>any()); verify(producerManager, times(0)).unschedule(ArgumentMatchers.<String>any()); }
@Test public void assertChildEventWhenIsRootConfigPath() throws Exception { cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/config/job", null, "".getBytes()))); verify(producerManager, times(0)).schedule(ArgumentMatchers.<CloudJobConfiguration>any()); verify(producerManager, times(0)).reschedule(ArgumentMatchers.<String>any()); verify(producerManager, times(0)).unschedule(ArgumentMatchers.<String>any()); }
@Test public void assertChildEventWhenStateIsAddAndIsConfigPathAndInvalidData() throws Exception { cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData("/config/job/test_job", null, "".getBytes()))); verify(producerManager, times(0)).schedule(ArgumentMatchers.<CloudJobConfiguration>any()); verify(producerManager, times(0)).reschedule(ArgumentMatchers.<String>any()); verify(producerManager, times(0)).unschedule(ArgumentMatchers.<String>any()); }
@Test public void assertChildEventWhenStateIsUpdateAndIsConfigPathAndDaemonJob() throws Exception { cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson(CloudJobExecutionType.DAEMON).getBytes()))); verify(readyService).remove(Collections.singletonList("test_job")); verify(producerManager).reschedule(ArgumentMatchers.<String>any()); }
@Test public void assertChildEventWhenStateIsUpdateAndIsConfigPathAndMisfireDisabled() throws Exception { cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson(false).getBytes()))); verify(readyService).setMisfireDisabled("test_job"); verify(producerManager).reschedule(ArgumentMatchers.<String>any()); }
@Override public String get(final String key) { TreeCache cache = findTreeCache(key); if (null == cache) { return getDirectly(key); } ChildData resultInCache = cache.getCurrentData(key); if (null != resultInCache) { return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8); } return getDirectly(key); }
private void refreshConfiguration() throws IOException { LOGGER.info("Refreshing configuration from ZooKeeper"); byte[] data = null; ChildData childData = agentNodeCache.getCurrentData(); if (childData != null) { data = childData.getData(); } flumeConfiguration = configFromBytes(data); eventBus.post(getConfiguration()); }
@Override public String getRunningGlobalTask(String globalId) { String globalPath = zookeeperService.getFullPath(ZK_GLOBALTASKPATH, globalId); ChildData data = globalCache.getCurrentData(globalPath); if( data != null ) { return new String(data.getData()); } return null; }
@Override public boolean isClusterDebugging() { if( isCluster() ) { ChildData currentData = debugCache.getCurrentData(); return currentData != null && Boolean.parseBoolean(new String(currentData.getData())); } return false; }
@Override public List<String> getAppServers() { return Lists.transform(membersCache.getCurrentData(), new Function<ChildData, String>() { @Override public String apply(ChildData input) { return ZKPaths.getNodeFromPath(input.getPath()); } }); }