/** * 在拉完全量后将此schema的kafka consumer的offset设置为最新 * @param dbSchema */ /*public void setKafkaOffsetToLargest(String targetTopic){ if(targetTopic==null) return; TopicPartition partition0 = new TopicPartition(targetTopic, 0); KafkaConsumerContainer.getInstances().getConsumer(targetTopic).seekToEnd(Arrays.asList(partition0)); }*/ protected <T> T deserialize(String path, Class<T> clazz) throws Exception { T packet = null; CuratorFramework curator = CuratorContainer.getInstance().getCurator(); if (curator.getState() == CuratorFrameworkState.STOPPED) { LOG.info("[EventContainer] CuratorFrameworkState:{}", CuratorFrameworkState.STOPPED.name()); } else { byte[] bytes = curator.getData().forPath(path); if (bytes != null && bytes.length != 0) { packet = JsonUtil.fromJson(new String(bytes, Charset.forName("UTF-8")), clazz); } } return packet; }
/** * 初始化 * * @author zhangshaobin * @created 2013-6-26 上午10:55:30 */ private void init() { applicationName = DynamicPropertyFactory.getInstance().getStringProperty(CommonConstant.CONFIG_APPLICATION_NAME_KEY, null).get(); String zkConfigEnsemble = DynamicPropertyFactory.getInstance().getStringProperty(CommonConstant.ZK_ENSEMABLE_KEY, null).get(); Integer zkConfigSessionTimeout = DynamicPropertyFactory.getInstance().getIntProperty(CommonConstant.ZK_SESSION_TIMEOUT_KEY, 15000).get(); Integer zkConfigConnTimeout = DynamicPropertyFactory.getInstance().getIntProperty(CommonConstant.ZK_CONN_TIMEOUT_KEY, 5000).get(); if (Check.NuNStr(zkConfigEnsemble)) { logger.warn("ZooKeeper configuration running in file mode, zk is not enabled since not configured"); return; } try { client = createAndStartZKClient(zkConfigEnsemble, zkConfigSessionTimeout, zkConfigConnTimeout); if (client.getState() != CuratorFrameworkState.STARTED) { throw new RuntimeException("ZooKeeper located at " + zkConfigEnsemble + " is not started."); } } catch (Exception e) { e.printStackTrace(); logger.error("连接配置中心服务器超时,时间5000毫秒。", e.getCause()); System.exit(1); } System.out.println(new SimpleDateFormat("[yyyy-MM-dd HH:mm:ss] ").format(new Date()) + applicationName + " connected to cofnig server(" + zkConfigEnsemble + ")."); logger.info(applicationName + " connected to cofnig server(" + zkConfigEnsemble + ")."); }
@Override public void register(String service, String version, String address) { if(zkClient.getState() == CuratorFrameworkState.LATENT){ zkClient.start(); } if (version == null || version == ""){ version ="1.0.0"; } //创建临时节点 try { zkClient.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .forPath("/"+service+"/"+version+"/"+address); } catch (Exception e) { logger.error("register service address to zookeeper exception:{}",e); throw new ThriftException("register service address to zookeeper exception:{}", e); } }
@Override public void register(String service, String version, String address) { if(curatorFramework.getState() == CuratorFrameworkState.LATENT){ curatorFramework.start(); } if (version == null || version == ""){ version = ThriftConstant.DEFAULT_VERSION; } //创建临时节点 try { String path = "/"+service+"/"+version+"/"+address; logger.info("register: {}", path); curatorFramework.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath(path); } catch (Exception e) { logger.error("register service address to zookeeper exception:{}",e); throw new ThriftException("register service address to zookeeper exception:{}", e); } }
@Override public void relinquishLeadership() { try { if (nodeCache != null) { nodeCache.close(); } LoggerHelper.info("node cache has been closed."); } catch (Throwable e) { LoggerHelper.warn("node cache close failed.", e); } if (client.getState() == CuratorFrameworkState.STARTED) { MasterSlaveNodeData.Data nodeData = new MasterSlaveNodeData.Data(getNodeIp()); releaseJobs(nodePath, nodeData); nodeData.setNodeState("Slave"); masterSlaveApiFactory.nodeApi().updateNode(nodePath, nodeData); } LoggerHelper.info("clear node successfully."); }
@Override public void relinquishLeadership() { try { if (jobCache != null) { jobCache.close(); } LoggerHelper.info("job cache has been closed."); } catch (Throwable e) { LoggerHelper.warn("job cache close failed.", e); } LoggerHelper.info("begin stop scheduler manager."); schedulerManager.shutdown(); if (client.getState() == CuratorFrameworkState.STARTED) { StandbyNodeData.Data data = new StandbyNodeData.Data(getNodeIp()); standbyApiFactory.nodeApi().updateNode(nodePath, data); LoggerHelper.info(getNodeIp() + " has been shutdown. [" + data + "]"); } LoggerHelper.info("clear node successfully."); }
public void save(SingularityHostState hostState) throws InterruptedException { final String path = ZKPaths.makePath(ROOT_PATH, hostState.getHostname()); final byte[] data = hostStateTranscoder.toBytes(hostState); if (curator.getState() == CuratorFrameworkState.STARTED) { try { if (exists(path)) { curator.setData().forPath(path, data); } else { curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, data); } } catch (Throwable t) { throw Throwables.propagate(t); } } }
/** * Test the timeout exception * * @throws IOException * @throws InterruptedException */ @Test public void testConnect() throws IOException, InterruptedException { // we expect a ConnectException to be thrown final ZConnection conn = new ZConnection("localhost:" + server.getPort(), 10000L); try { CuratorFramework zk = conn.get(); assertTrue(zk.getState().compareTo(CuratorFrameworkState.STARTED) == 0); } finally { conn.close(); } }
private ServiceDiscovery<ZookeeperInstance> buildServiceDiscoveryWith(ZookeeperConfiguration configuration, CuratorFramework curator, InstanceSerializer<ZookeeperInstance> serializer) { try { if (!CuratorFrameworkState.STARTED.equals(curator.getState())) { curator.start(); } ServiceDiscovery<ZookeeperInstance> serviceDiscovery = ServiceDiscoveryBuilder.builder(ZookeeperInstance.class) .client(curator) .basePath(configuration.root()) .serializer(serializer) .build(); serviceDiscovery.start(); return serviceDiscovery; } catch (Exception e) { throw new ZookeeperServiceDiscoveryException("Error on create Zookeeper ServiceDiscovery", e); } }
@Override public WaitResult wait(DockerFacade dockerClient, Container container) { return new PortWait().wait(ZOOKEEPER_PORT, dockerClient, container, (ipAddress, externalPort) -> { String zookeeperConnectionString = String.format("%s:%s", ipAddress, externalPort); RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); client.start(); try { client.blockUntilConnected(); CuratorFrameworkState status = client.getState(); if (!status.equals(CuratorFrameworkState.STARTED)) { return WaitResult.failure("Status [%s] does not match expected [STARTED]", status); } return WaitResult.success(); } catch (InterruptedException e) { throw new RuntimeException("Could not connect to Zookeeper", e); } finally { client.close(); } }); }
private synchronized ZMapping readCurrentMapping(Stat stat) { ZMapping zMap = null; try { if (client.getState().equals(CuratorFrameworkState.STOPPED)) { LOG.warn("Attempting to read state on stopped client."); return null; } byte[] data = client.getData().storingStatIn(stat).usingWatcher(new CuratorWatcher() { @Override public void process(WatchedEvent watchedEvent) throws Exception { ZMapping newMapping = readCurrentMapping(); if (newMapping == null && mapping != null) { LOG.warn("An attempt was made to overwrite current mapping with a null one."); } else { mapping = newMapping; } LOG.debug("Host {} just updated its mapping to version {}", hostPort, (mapping != null) ? mapping.getVersion() : -1); } }).forPath(MAPPING_PATH); zMap = ZMapping.deserialize(data); } catch (Exception e) { LOG.error("Error reading current mapping: ", e); } return zMap; }
@Override protected void doHealthCheck(Health.Builder builder) throws Exception { try { if (this.curator.getState() != CuratorFrameworkState.STARTED) { builder.down().withDetail("error", "Client not started"); } else if (this.curator.checkExists().forPath("/") == null) { builder.down().withDetail("error", "Root for namespace does not exist"); } else { builder.up(); } builder.withDetail("connectionString", this.curator.getZookeeperClient().getCurrentConnectionString()) .withDetail("state", this.curator.getState()); } catch (Exception e) { builder.down(e); } }
private ZKUtils(String zookeeperCluster, String groupId) throws Exception { this.groupId = groupId; // ZOOKEPER CONNECTION client = CuratorFrameworkFactory.newClient(zookeeperCluster, 25 * 1000, 10 * 1000, new ExponentialBackoffRetry( 1000, 3)); client.start(); client.getZookeeperClient().blockUntilConnectedOrTimedOut(); if (client.getState().compareTo(CuratorFrameworkState.STARTED) != 0) { throw new Exception("Connection to Zookeeper timed out after seconds"); } else { backgroundZookeeperCleanerTasks = Executors.newFixedThreadPool(1); backgroundZookeeperCleanerTasks.submit(new ZookeeperBackgroundCleaner(client, groupId)); } }
@SuppressWarnings("unchecked") @Test public void testStart() throws Exception { CuratorFramework framework = mockFramework(); ExistsBuilder ceBuilder = mock(ExistsBuilder.class); CreateBuilder createBuilder = mock(CreateBuilder.class); when(framework.checkExists()).thenReturn(ceBuilder); when(ceBuilder.forPath("/services/myservice/nodes")).thenReturn(mock(Stat.class)); when(framework.create()).thenReturn(createBuilder); when(framework.getState()).thenReturn(CuratorFrameworkState.STARTED); ACLBackgroundPathAndBytesable<String> os = mock(ACLBackgroundPathAndBytesable.class); when(createBuilder.withMode(CreateMode.EPHEMERAL)).thenReturn(os); DiscoService service = new DiscoService(framework, "myservice"); byte[] payload = "foo bar baz bingo".getBytes(); service.start("foo", 4321, true, payload); verify(os).forPath(eq("/services/myservice/nodes/foo:4321"), eq(payload)); }
@SuppressWarnings("unchecked") @Test public void testDeletesEphemeralNode() throws Exception { CuratorFramework framework = mockFramework(); ExistsBuilder ceBuilder = mock(ExistsBuilder.class); CreateBuilder createBuilder = mock(CreateBuilder.class); when(framework.checkExists()).thenReturn(ceBuilder); when(ceBuilder.forPath("/services/myservice/nodes")).thenReturn(mock(Stat.class)); when(ceBuilder.forPath("/services/myservice/nodes/foo:4321")).thenReturn(mock(Stat.class)); when(framework.create()).thenReturn(createBuilder); when(framework.getState()).thenReturn(CuratorFrameworkState.STARTED); DeleteBuilder deleteBuilder = mock(DeleteBuilder.class); when(framework.delete()).thenReturn(deleteBuilder); ACLBackgroundPathAndBytesable<String> os = mock(ACLBackgroundPathAndBytesable.class); when(createBuilder.withMode(CreateMode.EPHEMERAL)).thenReturn(os); DiscoService service = new DiscoService(framework, "myservice"); byte[] payload = "foo bar baz bingo".getBytes(); service.start("foo", 4321, true, payload); verify(deleteBuilder).forPath("/services/myservice/nodes/foo:4321"); verify(os).forPath(eq("/services/myservice/nodes/foo:4321"), eq(payload)); }
/** * Start the registration of the {@link #candidate} for leader election. */ @Override public synchronized void start() { if (!this.running) { if (this.client.getState() != CuratorFrameworkState.STARTED) { // we want to do curator start here because it needs to // be started before leader selector and it gets a little // complicated to control ordering via beans so that // curator is fully started. this.client.start(); } this.leaderSelector = new LeaderSelector(this.client, buildLeaderPath(), new LeaderListener()); this.leaderSelector.setId(this.candidate.getId()); this.leaderSelector.autoRequeue(); this.leaderSelector.start(); this.running = true; } }
@Test public void testNewManagedCurator() { ZooKeeperConfiguration config = parse(ImmutableMap.of("retryPolicy", ImmutableMap.builder() .put("type", "untilElapsed") .put("maxElapsedTimeMs", 1000) .put("sleepMsBetweenRetries", 50) .build())); LifecycleEnvironment env = mock(LifecycleEnvironment.class); CuratorFramework curator = config.newManagedCurator(env); assertNotNull(curator); assertEquals(CuratorFrameworkState.LATENT, curator.getState()); verify(env).manage(any(ManagedCuratorFramework.class)); }
/** * Create the ephemeral node in ZooKeeper. If the node cannot be created in a timely fashion then an exception will * be thrown. */ public PersistentEphemeralNode(CuratorFramework curator, String basePath, byte[] data, CreateMode mode) { checkNotNull(curator); checkArgument(curator.getState() == CuratorFrameworkState.STARTED); checkNotNull(basePath); checkNotNull(data); checkNotNull(mode); checkArgument(mode == CreateMode.EPHEMERAL || mode == CreateMode.EPHEMERAL_SEQUENTIAL); // TODO: Share this executor across multiple persistent ephemeral nodes in a way that guarantees that it is a // TODO: single thread executor. _executor = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY); _async = new Async(_executor, new Sync(curator, basePath, data, mode)); CountDownLatch latch = new CountDownLatch(1); _async.createNode(latch); await(latch, CREATION_WAIT_IN_SECONDS, TimeUnit.SECONDS); }
/** * Creates an instance of {@code ZooKeeperNodeDiscovery}. * * @param curator Curator framework reference. * @param nodePath The path in ZooKeeper to watch. * @param parser The strategy to convert from ZooKeeper {@code byte[]} to {@code T}. */ public NodeDiscovery(CuratorFramework curator, String nodePath, NodeDataParser<T> parser) { checkNotNull(curator); checkNotNull(nodePath); checkNotNull(parser); checkArgument(curator.getState() == CuratorFrameworkState.STARTED); checkArgument(!"".equals(nodePath)); ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat(getClass().getSimpleName() + "(" + nodePath + ")-%d") .setDaemon(true) .build(); _nodes = Maps.newConcurrentMap(); _listeners = Sets.newSetFromMap(Maps.<NodeListener<T>, Boolean>newConcurrentMap()); _curator = curator; _executor = Executors.newSingleThreadScheduledExecutor(threadFactory); _pathCache = new PathChildrenCache(curator, nodePath, true, false, _executor); _nodeDataParser = parser; _closed = false; }
/** * Check whether this KijiClient is healthy. * * @return health status of the KijiClient. */ public HealthCheck.Result checkHealth() { final State state = mState.get(); Preconditions.checkState(state == State.STARTED, "Can not check health while in state %s.", state); List<String> issues = Lists.newArrayList(); if (mZKFramework.getState() != CuratorFrameworkState.STARTED) { issues.add(String.format("ZooKeeper connection in unhealthy state %s.", mZKFramework.getState())); } for (KijiInstanceCache instanceCache : mInstanceCaches.asMap().values()) { issues.addAll(instanceCache.checkHealth()); } if (issues.isEmpty()) { return HealthCheck.Result.healthy(); } else { return HealthCheck.Result.unhealthy(Joiner.on('\n').join(issues)); } }
@Override public void start() throws Exception { if (!isStarted.compareAndSet(false, true)) { throw new RuntimeException("Service already started!"); } if (framework.getState() != CuratorFrameworkState.STARTED) { throw new RuntimeException("CuratorFramework is not started!"); } nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { build(); } }); nodeCache.start(true); build(); }
public synchronized void stop() throws Exception { if (started) { server.stop(); serverThread.join(); if (gcTsTracker != null) { gcTsTracker.stop(); } started = false; currentLeader = null; if (curatorFramework.getState().equals(CuratorFrameworkState.STARTED)) { pathChildrenCache.getListenable().removeListener(this); pathChildrenCache.close(); leaderSelector.close(); curatorFramework.getConnectionStateListenable().removeListener(this); curatorFramework.close(); } log.info("Oracle server has been stopped."); } }
/** * 创建Zookeeper连接 * * @throws InterruptedException * @throws UnsupportedEncodingException */ @PostConstruct public void connect() throws InterruptedException, UnsupportedEncodingException { client = CuratorFrameworkFactory.builder() .connectString(connectString) .retryPolicy(new ExponentialBackoffRetry(DEFAULT_BASE_SLEEP_TIME_MS, DEFAULT_MAX_RETRIES)) .namespace(DEFAULT_NAMESPACE) .authorization(DEFAULT_ACL_SCHEME, DEFAULT_ACL_AUTH.getBytes(DEFAULT_CHARSET)) // 设置权限 .aclProvider(new ACLProvider() { // 设置ACL规则 @Override public List<ACL> getDefaultAcl() { return DEFAULT_ACL_LIST; } @Override public List<ACL> getAclForPath(String path) { return DEFAULT_ACL_LIST; } }) .build(); if (CuratorFrameworkState.STARTED != client.getState()) { client.start(); } while (!client.blockUntilConnected(MAX_WAIT_SECONDS, TimeUnit.SECONDS)) { log.info("can not connect to zookeeper , retry again!!"); } }
protected <T> T deserialize(String path, Class<T> clazz) throws Exception { T packet = null; CuratorFramework curator = CuratorContainer.getInstance().getCurator(); if (curator.getState() == CuratorFrameworkState.STOPPED) { LOG.info("[control-event] CuratorFrameworkState:{}", CuratorFrameworkState.STOPPED.name()); } else { byte[] bytes = curator.getData().forPath(path); if (bytes != null && bytes.length != 0) { packet = JsonUtil.fromJson(new String(bytes, Charset.forName("UTF-8")), clazz); } } return packet; }
protected void saveZk(String node, String packet) { try { CuratorFramework curator = CuratorContainer.getInstance().getCurator(); if (curator.getState() == CuratorFrameworkState.STOPPED) { LOG.info("[control-event] CuratorFrameworkState:{}", CuratorFrameworkState.STOPPED.name()); } else { curator.setData().forPath(node, packet.getBytes()); } } catch (Exception e) { LOG.error("[control-event] 报错znode: " + node + ",数据包:" + packet + "失败!", e); } }
protected void saveZk(String node, String packet) { try { CuratorFramework curator = CuratorContainer.getInstance().getCurator(); if (curator.getState() == CuratorFrameworkState.STOPPED) { LOG.info("[EventContainer] CuratorFrameworkState:{}", CuratorFrameworkState.STOPPED.name()); } else { curator.setData().forPath(node, packet.getBytes()); } } catch (Exception e) { LOG.error("[control-event] 报错znode: " + node + ",数据包:" + packet + "失败!", e); } }
/** * 关闭注册 */ @Override public synchronized void logout() { CuratorFramework client = (CuratorFramework) regCenter.getRawClient(); if (client.getState() == CuratorFrameworkState.STARTED) { // 移除注册节点 regCenter.remove(nodePath.getWorkerIdPath()); // 关闭连接 regCenter.close(); } }
/** * 初始化zk连接 * * @author zhangshaobin * @created 2013-6-26 上午10:21:34 */ private void init() { applicationName = DynamicPropertyFactory.getInstance().getStringProperty(CommonConstant.CONFIG_APPLICATION_NAME_KEY, null).get(); String zkConfigEnsemble = DynamicPropertyFactory.getInstance().getStringProperty(CommonConstant.ZK_ENSEMABLE_KEY, null).get(); Integer zkConfigSessionTimeout = DynamicPropertyFactory.getInstance().getIntProperty(CommonConstant.ZK_SESSION_TIMEOUT_KEY, 15000).get(); Integer zkConfigConnTimeout = DynamicPropertyFactory.getInstance().getIntProperty(CommonConstant.ZK_CONN_TIMEOUT_KEY, 5000).get(); Transaction tran = Cat.newTransaction("Asura Configuration init", applicationName + "_" + zkConfigEnsemble); try { if (Check.NuNStr(zkConfigEnsemble)) { logger.warn("ZooKeeper configuration running in file mode, zk is not enabled since not configured"); Cat.logError("zk is not enabled since not configured", new RuntimeException("ZooKeeper located at " + zkConfigEnsemble + " is not started.")); return; } client = createAndStartZKClient(zkConfigEnsemble, zkConfigSessionTimeout, zkConfigConnTimeout); if (client.getState() != CuratorFrameworkState.STARTED) { throw new RuntimeException("ZooKeeper located at " + zkConfigEnsemble + " is not started."); } tran.setStatus(Transaction.SUCCESS); } catch (Exception e) { logger.error("连接配置中心服务器超时,时间5000毫秒。", e); Cat.logError("asura configuration init exception", e); tran.setStatus(e); System.exit(1); } finally { tran.complete(); } logger.info(new SimpleDateFormat("[yyyy-MM-dd HH:mm:ss] ").format(new Date()) + applicationName + " connected to cofnig server(" + zkConfigEnsemble + ")."); logger.info(applicationName + " connected to cofnig server(" + zkConfigEnsemble + ")."); }
/** * Quick check to ensure that Curator has been started. */ private void ensureCuratorHasStarted() { // If our client isn't started yet if (CuratorFrameworkState.STARTED != curator.getState()) { // Lets start it! logger.debug("Curator not started, starting it now!"); curator.start(); } }
void closeCurator() { if (curatorFramework.getState() == CuratorFrameworkState.STARTED) { log.info("Closing curator"); curatorFramework.close(); } dataStoreSupport.shutdown(); }
protected void setupClient() { client = mock(CuratorFramework.class); existsBuilder = mock(ExistsBuilder.class); pathCreator = mock(ZookeeperConnector.IPathCreator.class); stackCache = mock(IStacksCache.class); nodeCacheFactory = mock(INodeCacheFactory.class); pathChildrenCacheFactory = mock(IPathChildrenCacheFactory.class); when(client.checkExists()).thenReturn(existsBuilder); when(client.getState()).thenReturn(CuratorFrameworkState.STARTED); }
@Test public void testGetCurator() throws Exception { final TestingServer zk = new TestingServer(true); final TrellisConfiguration config = new YamlConfigurationFactory<>(TrellisConfiguration.class, Validators.newValidator(), Jackson.newObjectMapper(), "") .build(new File(getClass().getResource("/config1.yml").toURI())); config.getZookeeper().setEnsembleServers(zk.getConnectString()); final CuratorFramework curator = TrellisUtils.getCuratorClient(config); assertEquals(CuratorFrameworkState.STARTED, curator.getState()); }
@Test public void garbaged_map_should_close_curator_client() throws NamingException { withServer(srv -> { final CuratorFramework client = ZooMap.Periscope.client(ZooMap.newMap(srv.getConnectString())); System.gc(); await().atMost(5, TimeUnit.SECONDS).until(client::getState, is(CuratorFrameworkState.STOPPED)); }); }
@Override public void afterPropertiesSet() throws Exception { // 如果zk尚未启动,则启动 if (zkClient.getState() == CuratorFrameworkState.LATENT) { zkClient.start(); } buildPathChildrenCache(zkClient, getServicePath(), true); cachedPath.start(StartMode.POST_INITIALIZED_EVENT); countDownLatch.await(); }
/** * Getting the curator oject to start the zookeeper connection estabished * * @param curator * @return curator object */ public static CuratorFramework getCuratorFramework(CuratorFramework curator) { if (curator.getState() == CuratorFrameworkState.LATENT) { curator.start(); try { curator.blockUntilConnected(); } catch (InterruptedException e) { // Ignore log.error("error while setting curator framework :" + e.getMessage()); } } return curator; }
public static CuratorFramework getClient(String registerUrl) { if (registerHolder == null) { synchronized (RegisterHolder.class) { if (registerHolder == null) { registerHolder = new RegisterHolder(); curatorMap = Maps.newHashMap(); lock = new ReentrantLock(); } } } CuratorFramework client = null; lock.lock(); try { String tempUrl = registerUrl.trim(); if (!curatorMap.containsKey(tempUrl)) { ZkClientFactory zookeeperClientFactory = new ZkClientFactory().setAllAndGetClientFactory(tempUrl); client = zookeeperClientFactory.newCuratorFramework(); CuratorFrameworkState state = client.getState(); if (state != CuratorFrameworkState.STARTED) { client.start(); } curatorMap.put(tempUrl, client); } else { client = curatorMap.get(tempUrl); } } finally { lock.unlock(); } return client; }
public ClusterZKImpl(CuratorFramework zkClient, String clusterName) { this.client = zkClient; this.clusterName = clusterName; if (client.getState().equals(CuratorFrameworkState.LATENT)) { client.start(); } }
@Test public void testStart() throws InterruptedException { String zookeeperConnectionString = zookeeper.format("%HOST%:%EPORT%", 2181); RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); client.start(); client.blockUntilConnected(); assertThat(client.getState(), is(CuratorFrameworkState.STARTED)); client.close(); }
public DynamicConfigZKAdapter(final Class<T> clazz, final String path, final CuratorFramework curatorFramework, Converter<T, byte[]> converter, ChangeListener<T> changeListener) throws Exception { super(clazz, converter, Optional.fromNullable(changeListener)); Preconditions.checkNotNull(curatorFramework, "CuratorFramework cannot be null"); Preconditions.checkArgument(curatorFramework.getState() == CuratorFrameworkState.STARTED, "CuratorFramework must be started"); Preconditions.checkArgument(path != null && !path.isEmpty(), "path cannot be null or blank"); this.curatorFramework = curatorFramework; this.path = path; this.nodeCache = new NodeCache(curatorFramework, path); }