/** * 读取指定节点的子菜单的值 * * @param path * @return */ public Map<String, String> readTargetChildsData(String path) { if (!ObjectUtils.allNotNull(zkClient, path)) { return null; } Map<String, String> map = null; try { Stat stat = exists(path); if (stat != null) { List<String> childrens = zkClient.getChildren().forPath(path); GetDataBuilder dataBuilder = zkClient.getData(); if (childrens != null) { map = childrens.stream().collect(Collectors.toMap(Function.identity(), (child) -> { try { return new String(dataBuilder.forPath(ZKPaths.makePath(path, child)), Charsets.UTF_8); } catch (Exception e1) { return null; } })); } } } catch (Exception e) { log.error("get target childs data fail!, path:{} , error:{}", path, e); } return map; }
/** * 获取指定节点数据 * @return * @throws Exception */ public Object load() throws Exception { String name = StringUtils.isBlank(nodeName) ? maker.make() : nodeName; if (name == null) return null; byte[] data; GetDataBuilder builder = client.getData(); if (watchObj != null) { builder.usingWatcher(watchObj); }else if (watchBool) { builder.watched(); } data = builder.storingStatIn(stat).forPath(name); if(debug) { log.info("ZooKeeper get data. path: {} --- data: {}", name, data); } if(deserializer != null) { return deserializer.deserialize(data); } return data; }
@Override public List<PropertyItem> findProperties(String node) { LOGGER.debug("Find properties in node: [{}]", node); List<PropertyItem> properties = Lists.newArrayList(); try { Stat stat = getClient().checkExists().forPath(node); if (stat != null) { GetChildrenBuilder childrenBuilder = getClient().getChildren(); List<String> children = childrenBuilder.forPath(node); GetDataBuilder dataBuilder = getClient().getData(); if (children != null) { for (String child : children) { String propPath = ZKPaths.makePath(node, child); PropertyItem item = new PropertyItem(child, new String(dataBuilder.forPath(propPath), Charsets.UTF_8)); properties.add(item); } } } } catch (Exception e) { throw Throwables.propagate(e); } return properties; }
@Test public void testShouldReturnActiveServerAddress() throws Exception { when(curatorFactory.clientInstance()).thenReturn(curatorFramework); when(configuration.getString( HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); when(curatorFramework.getData()).thenReturn(getDataBuilder); when(getDataBuilder.forPath(getPath())). thenReturn(SERVER_ADDRESS.getBytes(Charset.forName("UTF-8"))); ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory); String actualServerAddress = activeInstanceState.getActiveServerAddress(); assertEquals(SERVER_ADDRESS, actualServerAddress); }
@Test public void testShouldHandleExceptionsInFetchingServerAddress() throws Exception { when(curatorFactory.clientInstance()).thenReturn(curatorFramework); when(configuration.getString( HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); when(curatorFramework.getData()).thenReturn(getDataBuilder); when(getDataBuilder.forPath(getPath())). thenThrow(new Exception()); ActiveInstanceState activeInstanceState = new ActiveInstanceState(configuration, curatorFactory); assertNull(activeInstanceState.getActiveServerAddress()); }
@Test public void testCanReadFromZookeeper() throws Exception { CuratorFramework curatorFramework = mock(CuratorFramework.class); ExistsBuilder existsBuilder = mock(ExistsBuilder.class); GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class); when(getDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenReturn(mockGlobalData()); when(curatorFramework.checkExists()).thenReturn(existsBuilder); when(curatorFramework.getData()).thenReturn(getDataBuilder); when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder); when(getChildrenBuilder.forPath(anyString())).thenReturn(Collections.<String> emptyList()); Configuration configuration = new Configuration(Paths.get("foo")); configuration.curatorFramework = curatorFramework; configuration.update(); checkResult(configuration); }
private Pair<String, String> loadKey(final String nodePath) throws Exception { final String nodeName = ZKPaths.getNodeFromPath(nodePath); final Set<String> keysSpecified = configProfile.getKeysSpecified(); switch (configProfile.getKeyLoadingMode()) { case INCLUDE: if (keysSpecified == null || !keysSpecified.contains(nodeName)) { return null; } break; case EXCLUDE: if (keysSpecified.contains(nodeName)) { return null; } break; case ALL: break; default: break; } final GetDataBuilder data = client.getData(); final String value = new String(data.watched().forPath(nodePath), Charsets.UTF_8); return new ImmutablePair<String, String>(nodeName, value); }
@Override protected byte[] query(final boolean setListener) throws NakadiRuntimeException { final GetDataBuilder builder = curatorFramework.getData(); if (setListener) { builder.usingWatcher(this); } try { return builder.forPath(key); } catch (final Exception ex) { throw new NakadiRuntimeException(ex); } }
@Test public void getShouldWrapNonNoNodeExceptionInRestException() throws Exception { exception.expect(RestException.class); GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); when(getDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenThrow(Exception.class); when(curatorFramework.getData()).thenReturn(getDataBuilder); globalConfigService.get(); }
private <T> Optional<T> getData(String path, Optional<Stat> stat, Transcoder<T> transcoder, Optional<ZkCache<T>> zkCache, Optional<Boolean> shouldCheckExists) { if (!stat.isPresent() && zkCache.isPresent()) { Optional<T> cachedValue = zkCache.get().get(path); if (cachedValue.isPresent() && (!shouldCheckExists.isPresent() || (shouldCheckExists.get().booleanValue() && checkExists(path).isPresent()))) { return cachedValue; } } final long start = System.currentTimeMillis(); int bytes = 0; try { GetDataBuilder bldr = curator.getData(); if (stat.isPresent()) { bldr.storingStatIn(stat.get()); } byte[] data = bldr.forPath(path); if (data == null || data.length == 0) { LOG.trace("Empty data found for path {}", path); return Optional.absent(); } bytes = data.length; final T object = transcoder.fromBytes(data); if (zkCache.isPresent()) { zkCache.get().set(path, object); } return Optional.of(object); } catch (NoNodeException nne) { return Optional.absent(); } catch (Throwable t) { throw Throwables.propagate(t); } finally { log(OperationType.READ, Optional.<Integer> absent(), Optional.<Integer> of(bytes), start, path); } }
@Override public GetDataBuilder getData() { return new MockGetDataBuilder(); }
/** * Test getNodeData method * @throws Exception */ @Test public void testGetNodeData() throws Exception { CuratorStateManager spyStateManager = spy(new CuratorStateManager()); final CuratorFramework mockClient = mock(CuratorFramework.class); GetDataBuilder mockGetBuilder = mock(GetDataBuilder.class); // Mockito doesn't support mock type-parametrized class, thus suppress the warning @SuppressWarnings("rawtypes") BackgroundPathable mockBackPathable = mock(BackgroundPathable.class); final CuratorEvent mockEvent = mock(CuratorEvent.class); Message.Builder mockBuilder = mock(Message.Builder.class); Message mockMessage = mock(Message.class); final byte[] data = "wy_1989".getBytes(); doReturn(mockMessage) .when(mockBuilder).build(); doReturn(data) .when(mockEvent).getData(); doReturn(PATH) .when(mockEvent).getPath(); doReturn(mockClient) .when(spyStateManager).getCuratorClient(); doReturn(true) .when(mockClient).blockUntilConnected(anyInt(), any(TimeUnit.class)); doReturn(mockGetBuilder) .when(mockClient).getData(); doReturn(mockBackPathable) .when(mockGetBuilder).usingWatcher(any(Watcher.class)); doAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { Object[] objests = invocationOnMock.getArguments(); // the first object is the BackgroundCallback ((BackgroundCallback) objests[0]).processResult(mockClient, mockEvent); return null; } }).when(mockBackPathable).inBackground(any(BackgroundCallback.class)); spyStateManager.initialize(config); // Verify the data on node is fetched correctly ListenableFuture<Message> result = spyStateManager.getNodeData(null, PATH, mockBuilder); assertTrue(result.get().equals(mockMessage)); }
@Override public GetDataBuilder getData() { return namespaceDelegate().getData(); }
private <T> Optional<T> getData(String path, Optional<Stat> stat, Transcoder<T> transcoder, Optional<ZkCache<T>> zkCache, Optional<Boolean> shouldCheckExists) { if (!stat.isPresent() && zkCache.isPresent()) { Optional<T> cachedValue = zkCache.get().get(path); if (cachedValue.isPresent() && (!shouldCheckExists.isPresent() || (shouldCheckExists.get().booleanValue() && checkExists(path).isPresent()))) { return cachedValue; } } final long start = System.currentTimeMillis(); int bytes = 0; try { GetDataBuilder bldr = curator.getData(); if (stat.isPresent()) { bldr.storingStatIn(stat.get()); } byte[] data = bldr.forPath(path); if (data == null || data.length == 0) { LOG.trace("Empty data found for path {}", path); return Optional.absent(); } bytes = data.length; final T object = transcoder.fromBytes(data); if (zkCache.isPresent()) { zkCache.get().set(path, object); } return Optional.of(object); } catch (NoNodeException nne) { LOG.trace("No node found for path {}", path); return Optional.absent(); } catch (Throwable t) { throw Throwables.propagate(t); } finally { log(OperationType.GET, Optional.absent(), Optional.<Integer> of(bytes), start, path); } }