private static CuratorFramework createConnection() { String url = ZkConfig.getInstance().getZkURL(); CuratorFramework framework = CuratorFrameworkFactory.newClient(url, new ExponentialBackoffRetry(100, 6)); // start connection framework.start(); // wait 3 second to establish connect try { framework.blockUntilConnected(3, TimeUnit.SECONDS); if (framework.getZookeeperClient().isConnected()) { LOGGER.info("CuratorFramework createConnection success"); return framework; } } catch (InterruptedException ignored) { LOGGER.info("CuratorFramework createConnection error", ignored); Thread.currentThread().interrupt(); } // fail situation framework.close(); throw new RuntimeException("failed to connect to zookeeper service : " + url); }
private static CuratorFramework createConnection() { String url= ZkConfig.getInstance().getZkURL(); CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(url, new ExponentialBackoffRetry(100, 6)); // start connection curatorFramework.start(); // wait 3 second to establish connect try { curatorFramework.blockUntilConnected(3, TimeUnit.SECONDS); if (curatorFramework.getZookeeperClient().isConnected()) { return curatorFramework; } } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); } // fail situation curatorFramework.close(); throw new RuntimeException("failed to connect to zookeeper service : " + url); }
private ZooMap(Builder builder) { this.connectionString = builder.connectionString; ConnectStringParser connectStringParser = new ConnectStringParser(connectionString); if(connectStringParser.getChrootPath() != null) { final String connectionStringForChrootCreation = connectStringParser.getServerAddresses().stream().map(InetSocketAddress::toString).collect(Collectors.joining(",")); try(final CuratorFramework clientForChrootCreation = newCuratorFrameworkClient(builder, connectionStringForChrootCreation)) { startAndBlock(clientForChrootCreation); tryIt(() -> clientForChrootCreation.createContainers(connectStringParser.getChrootPath())); } } client = newCuratorFrameworkClient(builder, connectionString); this.root = builder.root; startAndBlock(client); if(!root.isEmpty()) { tryIt(() -> client.createContainers(root)); } }
@Before public void setUp() throws Exception { Configurator .initialize("FastMQ", Thread.currentThread().getContextClassLoader(), "log4j2.xml"); Log log = new Log(); LogSegment logSegment = new LogSegment(); logSegment.setLedgerId(ledgerId); logSegment.setTimestamp(System.currentTimeMillis()); log.setSegments(Collections.singletonList(logSegment)); when(logInfoStorage.getLogInfo(any())).thenReturn(log); CuratorFramework curatorFramework = CuratorFrameworkFactory .newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3)); curatorFramework.start(); asyncCuratorFramework = AsyncCuratorFramework.wrap(curatorFramework); offsetStorage = new ZkOffsetStorageImpl(logInfoStorage, asyncCuratorFramework); }
public ZkAbstractStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException { this.parent = "/" + config.getName(); this.prefix = parent + "/"; this.framework = framework; this.config = config; // make sure the parent node exists. try { if (framework.checkExists().forPath(parent) == null) { framework.create().withMode(CreateMode.PERSISTENT).forPath(parent); } this.childrenCache = new PathChildrenCache(framework, parent, true); this.childrenCache.start(StartMode.BUILD_INITIAL_CACHE); } catch (Exception e) { throw new RuntimeException("Failure while accessing Zookeeper for PStore: " + e.getMessage(), e); } }
public void notifyClusterDDL(String schema, String table, String sql, DDLInfo.DDLStatus ddlStatus, boolean needNotifyOther) throws Exception { CuratorFramework zkConn = ZKUtils.getConnection(); DDLInfo ddlInfo = new DDLInfo(schema, sql, ZkConfig.getInstance().getValue(ZkParamCfg.ZK_CFG_MYID), ddlStatus); String nodeName = StringUtil.getFullName(schema, table); String nodePath = ZKPaths.makePath(KVPathUtil.getDDLPath(), nodeName); if (zkConn.checkExists().forPath(nodePath) == null) { zkConn.create().forPath(nodePath, ddlInfo.toString().getBytes(StandardCharsets.UTF_8)); } else { String instancePath = ZKPaths.makePath(nodePath, KVPathUtil.DDL_INSTANCE); String thisNode = ZkConfig.getInstance().getValue(ZkParamCfg.ZK_CFG_MYID); ZKUtils.createTempNode(instancePath, thisNode); if (needNotifyOther) { //this node is ddl sender zkConn.setData().forPath(nodePath, ddlInfo.toString().getBytes(StandardCharsets.UTF_8)); while (true) { List<String> preparedList = zkConn.getChildren().forPath(instancePath); List<String> onlineList = zkConn.getChildren().forPath(KVPathUtil.getOnlinePath()); if (preparedList.size() >= onlineList.size()) { zkConn.delete().deletingChildrenIfNeeded().forPath(nodePath); break; } LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100)); } } } }
public ZkRangeStore getZkRangeStore(int clientIndex) { String lockPath = "/snowflake/locks"; String storePath = "/snowflake/idstore"; CuratorFramework curatorFramework = ZkUtils.create("127.0.0.1:2181", 1000, 10000); curatorFramework.start(); for (String client : clients) { try { curatorFramework.setData().forPath(ZKPaths.makePath(storePath, client), "0".getBytes()); } catch (Exception e) { if (e instanceof KeeperException.NoNodeException) { try { curatorFramework.create().creatingParentsIfNeeded().forPath(storePath, "0".getBytes()); } catch (Exception e1) { e1.printStackTrace(); } } } } return new ZkRangeStore(clients.get(clientIndex), curatorFramework, lockPath, storePath, 1, TimeUnit.SECONDS, 0, 10); }
@Override protected FlumeConfiguration getFlumeConfiguration() { try { CuratorFramework cf = createClient(); cf.start(); try { byte[] data = cf.getData().forPath(basePath + "/" + getAgentName()); return configFromBytes(data); } finally { cf.close(); } } catch (Exception e) { LOGGER.error("Error getting configuration info from Zookeeper", e); throw new FlumeException(e); } }
/** * Create a File-based repository service * @param partitionData the partition data configuration * @param partitionUrls the partition URL configuration * @param curator the curator framework * @param producer the kafka producer * @param notifications the notification service * @param idSupplier an identifier supplier for new resources * @param async generate cached resources asynchronously if true, synchonously if false * @throws IOException if the directory is not writable */ public FileResourceService(final Map<String, String> partitionData, final Map<String, String> partitionUrls, final CuratorFramework curator, final Producer<String, String> producer, final EventService notifications, final Supplier<String> idSupplier, final Boolean async) throws IOException { super(partitionUrls, producer, curator, notifications, idSupplier, async); requireNonNull(partitionData, "partition data configuration may not be null!"); RESERVED_PARTITION_NAMES.stream().filter(partitionData::containsKey).findAny().ifPresent(name -> { throw new IllegalArgumentException("Invalid partition name: " + name); }); this.partitionData = partitionData; init(); }
@Bean public CuratorFramework curatorFramework() { ZKConfig config = config(); if (config.useZooKeeperWaitTimePolicy()) { return new RedirectorCuratorFramework(config); } CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(config.getZooKeeperConnection()) .connectionTimeoutMs(config.getZooKeeperConnectionTimeout()) .sessionTimeoutMs(config.getZooKeeperSessionTimeout()) .retryPolicy(new RetryNTimes(config.getZooKeeperRetryAttempts(), config.getZooKeeperRetryInterval())) .compressionProvider(new GzipCompressionProvider()); return builder.build(); }
@Bean(name = "curator-framework") public CuratorFramework curatorFramework() { return CuratorFrameworkFactory .builder() .connectString( env.getProperty("rpc.server.zookeeper.connect.string")) .sessionTimeoutMs( Integer.parseInt(env.getProperty( "rpc.server.zookeeper.session.timeout.ms", "10000"))) .connectionTimeoutMs( Integer.parseInt(env.getProperty( "rpc.server.zookeeper.connection.timeout.ms", "10000"))).retryPolicy(this.retryPolicy()) .aclProvider(this.aclProvider()).authorization(this.authInfo()) .build(); }
@Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { ChildData childData = event.getData(); switch (event.getType()) { case CHILD_ADDED: createOrUpdateViewMeta(childData, false); break; case CHILD_UPDATED: createOrUpdateViewMeta(childData, true); break; case CHILD_REMOVED: deleteNode(childData); break; default: break; } }
/** * 设置子节点更改监听 * * @param path * @throws Exception */ public boolean listenerPathChildrenCache(String path, BiConsumer<CuratorFramework, PathChildrenCacheEvent> biConsumer) { if (!ObjectUtils.allNotNull(zkClient, path, biConsumer)) { return Boolean.FALSE; } try { Stat stat = exists(path); if (stat != null) { PathChildrenCache watcher = new PathChildrenCache(zkClient, path, true); watcher.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); //该模式下 watcher在重连的时候会自动 rebuild 否则需要重新rebuild watcher.getListenable().addListener(biConsumer::accept, pool); if (!pathChildrenCaches.contains(watcher)) { pathChildrenCaches.add(watcher); } // else{ // watcher.rebuild(); // } return Boolean.TRUE; } } catch (Exception e) { log.error("listen path children cache fail! path:{} , error:{}", path, e); } return Boolean.FALSE; }
public RulesxmlTozkLoader(ZookeeperProcessListen zookeeperListen, CuratorFramework curator, XmlProcessBase xmlParseBase) { this.setCurator(curator); // 获得当前集群的名称 String schemaPath = zookeeperListen.getBasePath(); schemaPath = schemaPath + ZookeeperPath.ZK_SEPARATOR.getKey() + ZookeeperPath.FLOW_ZK_PATH_RULE.getKey(); currZkPath = schemaPath; // 将当前自己注册为事件接收对象 zookeeperListen.addListen(schemaPath, this); // 生成xml与类的转换信息 parseRulesXMl = new RuleParseXmlImpl(xmlParseBase); }
private void initZkDnindex() { //upload the dnindex data to zk try { if (dnIndexLock.acquire(30, TimeUnit.SECONDS)) { try { File file = new File(SystemConfig.getHomePath(), "conf" + File.separator + "dnindex.properties"); String path = KVPathUtil.getDnIndexNode(); CuratorFramework zk = ZKUtils.getConnection(); if (zk.checkExists().forPath(path) == null) { zk.create().creatingParentsIfNeeded().forPath(path, Files.toByteArray(file)); } } finally { dnIndexLock.release(); } } } catch (Exception e) { throw new RuntimeException(e); } }
/** * 在拉完全量后将此schema的kafka consumer的offset设置为最新 * @param dbSchema */ /*public void setKafkaOffsetToLargest(String targetTopic){ if(targetTopic==null) return; TopicPartition partition0 = new TopicPartition(targetTopic, 0); KafkaConsumerContainer.getInstances().getConsumer(targetTopic).seekToEnd(Arrays.asList(partition0)); }*/ protected <T> T deserialize(String path, Class<T> clazz) throws Exception { T packet = null; CuratorFramework curator = CuratorContainer.getInstance().getCurator(); if (curator.getState() == CuratorFrameworkState.STOPPED) { LOG.info("[EventContainer] CuratorFrameworkState:{}", CuratorFrameworkState.STOPPED.name()); } else { byte[] bytes = curator.getData().forPath(path); if (bytes != null && bytes.length != 0) { packet = JsonUtil.fromJson(new String(bytes, Charset.forName("UTF-8")), clazz); } } return packet; }
@Inject public ServiceDiscoveryManager(final @ServiceDiscoveryCuratorFramework CuratorFramework curatorFramework) throws Exception { LOGGER.info("ServiceDiscoveryManager starting..."); this.curatorFrameworkRef.set(curatorFramework); // // First register this service // serviceDiscovery = ServiceDiscoveryBuilder // .builder(String.class) // .client(curatorFramework) // .basePath("/") // .thisInstance(getThisServiceInstance(config)) // .build(); // serviceDiscovery.start(); // Then register services this service depends on // Arrays.stream(ExternalService.values()).forEach(externalService -> // serviceProviders.put(externalService, createProvider(externalService.get()))); }
@Override public CuratorFramework get() { String quorum = zookeeperConfig.getQuorum(); String serviceDiscoveryPath = zookeeperConfig.getServiceDiscoveryPath(); String connectionString = quorum + (serviceDiscoveryPath == null ? "" : serviceDiscoveryPath); RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); LOGGER.info("Initiating Curator connection to Zookeeper using: [{}]", connectionString); // Use chroot so all subsequent paths are below /stroom-services to avoid conflicts with hbase/zookeeper/kafka etc. CuratorFramework client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy); client.start(); try { //Ensure the chrooted path for stroom-services exists Stat stat = client.checkExists().forPath("/"); if (stat == null) { LOGGER.info("Creating chroot-ed root node inside " + serviceDiscoveryPath); client.create().forPath("/"); } } catch (Exception e) { throw new RuntimeException("Error connecting to zookeeper using connection String: " + connectionString, e); } return client; }
@Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if (newState == ConnectionState.CONNECTED) { isConnected.set(true); if (!isFirstConnection.get()) { for (ConnectionStateListener listener : listenerStateProxy.getListeners()) { listener.stateChanged(client, ConnectionState.RECONNECTED); } } return; } if (newState == ConnectionState.LOST) { isConnected.set(false); isFirstConnection.set(false); retryConnection(); } }
@Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { ChildData data = event.getData(); switch (event.getType()) { case CHILD_ADDED: add(data.getPath().substring(data.getPath().lastIndexOf("/")+1),event.getData().getData()) ; break; case CHILD_REMOVED: delete(data.getPath().substring(data.getPath().lastIndexOf("/")+1),event.getData().getData()); ; break; case CHILD_UPDATED: add(data.getPath().substring(data.getPath().lastIndexOf("/")+1),event.getData().getData()) ; break; default: break; } }
public EcacheszkToxmlLoader(ZookeeperProcessListen zookeeperListen, CuratorFramework curator, XmlProcessBase xmlParseBase) { this.setCurator(curator); this.zookeeperListen = zookeeperListen; // 获得当前集群的名称 String schemaPath = zookeeperListen.getBasePath(); schemaPath = schemaPath + ZookeeperPath.ZK_SEPARATOR.getKey() + ZookeeperPath.FLOW_ZK_PATH_CACHE.getKey(); currZkPath = schemaPath; // 将当前自己注册为事件接收对象 this.zookeeperListen.addListen(schemaPath, this); // 生成xml与类的转换信息 parseEcacheXMl = new EhcacheParseXmlImpl(xmlParseBase); }
public ServerzkToxmlLoader(ZookeeperProcessListen zookeeperListen, CuratorFramework curator, XmlProcessBase xmlParseBase) { this.setCurator(curator); this.zookeeperListen = zookeeperListen; // 获得当前集群的名称 String serverPath = zookeeperListen.getBasePath(); serverPath = serverPath + ZookeeperPath.ZK_SEPARATOR.getKey() + ZookeeperPath.FLOW_ZK_PATH_SERVER.getKey(); currZkPath = serverPath; // 将当前自己注册为事件接收对象 this.zookeeperListen.addListen(serverPath, this); // 生成xml与类的转换信息 parseServerXMl = new ServerParseXmlImpl(xmlParseBase); }
private static CuratorFramework buildConnection(String url) { CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(url, new ExponentialBackoffRetry(100, 6)); // start connection curatorFramework.start(); // wait 3 second to establish connect try { curatorFramework.blockUntilConnected(3, TimeUnit.SECONDS); if (curatorFramework.getZookeeperClient().isConnected()) { return curatorFramework.usingNamespace(""); } } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); } // fail situation curatorFramework.close(); throw new RuntimeException("failed to connect to zookeeper service : " + url); }
public EcachesxmlTozkLoader(ZookeeperProcessListen zookeeperListen, CuratorFramework curator, XmlProcessBase xmlParseBase) { this.setCurator(curator); // 获得当前集群的名称 String schemaPath = zookeeperListen.getBasePath(); schemaPath = schemaPath + ZookeeperPath.ZK_SEPARATOR.getKey() + ZookeeperPath.FLOW_ZK_PATH_CACHE.getKey(); currZkPath = schemaPath; // 将当前自己注册为事件接收对象 zookeeperListen.addListen(schemaPath, this); // 生成xml与类的转换信息 parseEcacheXMl = new EhcacheParseXmlImpl(xmlParseBase); }
public ServerxmlTozkLoader(ZookeeperProcessListen zookeeperListen, CuratorFramework curator, XmlProcessBase xmlParseBase) { this.setCurator(curator); // 获得当前集群的名称 String schemaPath = zookeeperListen.getBasePath(); schemaPath = schemaPath + ZookeeperPath.ZK_SEPARATOR.getKey() + ZookeeperPath.FLOW_ZK_PATH_SERVER.getKey(); currZkPath = schemaPath; // 将当前自己注册为事件接收对象 zookeeperListen.addListen(schemaPath, this); // 生成xml与类的转换信息 parseServerXMl = new ServerParseXmlImpl(xmlParseBase); }
public SchemasxmlTozkLoader(ZookeeperProcessListen zookeeperListen, CuratorFramework curator, XmlProcessBase xmlParseBase) { this.setCurator(curator); // 获得当前集群的名称 String schemaPath = zookeeperListen.getBasePath(); schemaPath = schemaPath + ZookeeperPath.ZK_SEPARATOR.getKey() + ZookeeperPath.FOW_ZK_PATH_SCHEMA.getKey(); currZkPath = schemaPath; // 将当前自己注册为事件接收对象 zookeeperListen.addListen(schemaPath, this); // 生成xml与类的转换信息 this.parseSchemaXmlService = new SchemasParseXmlImpl(xmlParseBase); }
CuratorFramework makeStartedCuratorClient(String connectionString) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(2000, 6, 2000); CuratorFramework client = CuratorFrameworkFactory. builder().connectString(connectionString) .retryPolicy(retryPolicy) .build(); client.start(); return client; }
public LogManagerFactoryImpl(ClientConfiguration clientConfiguration, BookKeeperConfig config) throws Exception { bookKeeperConfig = config; checkNotNull(clientConfiguration); String servers = clientConfiguration.getZkServers(); checkNotNull(servers); final CountDownLatch countDownLatch = new CountDownLatch(1); zooKeeper = new ZooKeeper(servers, clientConfiguration.getZkTimeout(), event -> { if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { logger.info("Connected to zookeeper ,connectString = {}", servers); countDownLatch.countDown(); } else { logger.error("Failed to connect zookeeper,connectString = {}", servers); } }); if (!countDownLatch.await(clientConfiguration.getZkTimeout(), TimeUnit.MILLISECONDS) || zooKeeper.getState() != ZooKeeper.States.CONNECTED) { throw new LedgerStorageException( "Error connecting to zookeeper server ,connectString = " + servers + "."); } this.bookKeeper = new BookKeeper(clientConfiguration, zooKeeper); RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(servers, retryPolicy); curatorFramework.start(); asyncCuratorFramework = AsyncCuratorFramework.wrap(curatorFramework); logInfoStorage = new LogInfoStorageImpl(asyncCuratorFramework); offsetStorage = new ZkOffsetStorageImpl(logInfoStorage, asyncCuratorFramework); }
private static CuratorFramework newCuratorFrameworkClient(Builder builder, String connectionString) { return CuratorFrameworkFactory.builder() .connectString(connectionString) .retryPolicy(builder.retryPolicy) .connectionTimeoutMs((int) builder.duration.toMillis()) .build(); }
@Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { Type type = event.getType(); if( type.equals(Type.CHILD_ADDED) || type.equals(Type.CHILD_UPDATED) || type.equals(Type.CHILD_REMOVED) ) { String remoteId = ZKPaths.getNodeFromPath(event.getData().getPath()); String[] clientInfo = new String(event.getData().getData()).split(":"); if( !isThisNode(remoteId) && !hasSameInfo(clientInfo) ) { if( type.equals(Type.CHILD_ADDED) ) { senders.get(remoteId); addReceiver(remoteId, clientInfo); } else if( type.equals(Type.CHILD_UPDATED) ) { senders.get(remoteId); removeReceiver(remoteId); addReceiver(remoteId, clientInfo); } else { removeReceiver(remoteId); } } } }
public static CuratorFramework getCuratorClient(final TrellisConfiguration config) { final CuratorFramework curator = newClient(config.getZookeeper().getEnsembleServers(), new BoundedExponentialBackoffRetry(config.getZookeeper().getRetryMs(), config.getZookeeper().getRetryMaxMs(), config.getZookeeper().getRetryMax())); curator.start(); return curator; }
@Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { ChildData data = event.getData(); if(data==null || data.getData()==null){ return; } SlaveNode slaveNode = SlaveNode.parse(JSON.parseObject(data.getData(),JSONObject.class)); if(slaveNode==null){ LOGGER.error("get a null slaveNode with eventType={},path={},data={}",event.getType(),data.getPath(),data.getData()); }else { switch (event.getType()) { case CHILD_ADDED: slaveNodeMap.put(slaveNode.getId(), slaveNode); LOGGER.info("CHILD_ADDED with path={},data={},current slaveNode size={}", data.getPath(), new String(data.getData(),CharsetUtil.UTF_8),slaveNodeMap.size()); break; case CHILD_REMOVED: slaveNodeMap.remove(slaveNode.getId()); LOGGER.info("CHILD_REMOVED with path={},data={},current slaveNode size={}", data.getPath(), new String(data.getData(),CharsetUtil.UTF_8),slaveNodeMap.size()); break; case CHILD_UPDATED: slaveNodeMap.replace(slaveNode.getId(), slaveNode); LOGGER.info("CHILD_UPDATED with path={},data={},current slaveNode size={}", data.getPath(), new String(data.getData(),CharsetUtil.UTF_8),slaveNodeMap.size()); break; default: break; } } }
public void deletereRcursive(CuratorFramework zk , String path) throws Exception { String npath = PathUtils.normalize_path(path); if (existsNode(zk,npath,false)) { zk.delete().guaranteed().deletingChildrenIfNeeded().forPath(npath); } }
private void rulesInit() { final CuratorFramework zkClient = (CuratorFramework) RegistryContext.clientCache.get("curatorClient"); try { if (null != zkClient.checkExists().forPath(RULES_PATH)) { List<String> childList = zkClient.getChildren().forPath(RULES_PATH); for (String rule : childList) { StringToObjectParser stringParser = new StringToObjectParser(); rulesList.add(stringParser.parseRuleStringToRule(URLDecoder.decode(rule, "UTF-8"))); } } filterRulesForConsumer(); } catch (Exception e) { e.printStackTrace(); } }
private synchronized CuratorFramework buildCuratorFramework(final ZKConfig config) { CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(config.getZooKeeperConnection()) .connectionTimeoutMs(config.getZooKeeperConnectionTimeout()) .sessionTimeoutMs(config.getZooKeeperSessionTimeout()) .retryPolicy(new RetryNTimes(config.getZooKeeperRetryAttempts(), config.getZooKeeperRetryInterval())) .compressionProvider(new GzipCompressionProvider()); CuratorFramework framework = builder.build(); listenerStateProxy.updateCurator(framework); listenerProxy.updateCurator(framework); return framework; }
public RuleszkToxmlLoader(ZookeeperProcessListen zookeeperListen, CuratorFramework curator, XmlProcessBase xmlParseBase, ConfigStatusListener confListener) { this.setCurator(curator); currZkPath = KVPathUtil.getConfRulePath(); zookeeperListen.addToInit(this); parseRulesXMl = new RuleParseXmlImpl(xmlParseBase); confListener.addChild(this); }
@Override public void stateChanged(CuratorFramework client, ConnectionState newState) { switch (newState) { case LOST: logger.warn("zookeeper connection session lost, try to register new worker id."); doReconnecting(); break; case SUSPENDED: logger.warn("zookeeper suspended, try to register new worker id."); doReconnecting(); break; default: break; } }
private CuratorFramework obtainClient() { lock.lock(); try { if (client == null) { connect(); } return client; } finally { lock.unlock(); } }
@Override public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { if (curatorFramework.getConnectionStateErrorPolicy().isErrorState(connectionState)) { reset(); throw new CancelLeadershipException(); } }
@Test public void testNamespaces() throws Exception { final URL res = Namespaces.class.getResource(nsDoc); final CuratorFramework zk = newClient(curator.getConnectString(), new RetryNTimes(10, 1000)); zk.start(); final TreeCache cache = new TreeCache(zk, ZNODE_NAMESPACES); cache.start(); final NamespaceService svc1 = new Namespaces(zk, cache, res.getPath() + randomFilename()); assertEquals(0, svc1.getNamespaces().size()); final NamespaceService svc2 = new Namespaces(zk, cache, res.getPath()); assertEquals(2, svc2.getNamespaces().size()); assertEquals(LDP.URI, svc2.getNamespace("ldp").get()); assertEquals("ldp", svc2.getPrefix(LDP.URI).get()); assertFalse(svc2.getNamespace("jsonld").isPresent()); assertFalse(svc2.getPrefix(JSONLD.URI).isPresent()); assertTrue(svc2.setPrefix("jsonld", JSONLD.URI)); assertEquals(3, svc2.getNamespaces().size()); assertEquals(JSONLD.URI, svc2.getNamespace("jsonld").get()); assertEquals("jsonld", svc2.getPrefix(JSONLD.URI).get()); final Namespaces svc3 = new Namespaces(zk, cache); await().atMost(5, SECONDS).until(() -> 3 == svc3.getNamespaces().size()); assertEquals(JSONLD.URI, svc3.getNamespace("jsonld").get()); assertFalse(svc3.setPrefix("jsonld", JSONLD.URI)); }