@Inject public SingularityCuratorProvider(final SingularityConfiguration configuration, final Set<ConnectionStateListener> connectionStateListeners) { checkNotNull(configuration, "configuration is null"); checkNotNull(connectionStateListeners, "connectionStateListeners is null"); ZooKeeperConfiguration zookeeperConfig = configuration.getZooKeeperConfiguration(); this.curatorFramework = CuratorFrameworkFactory.builder() .defaultData(null) .sessionTimeoutMs(zookeeperConfig.getSessionTimeoutMillis()) .connectionTimeoutMs(zookeeperConfig.getConnectTimeoutMillis()) .connectString(zookeeperConfig.getQuorum()) .retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getRetryBaseSleepTimeMilliseconds(), zookeeperConfig.getRetryMaxTries())) .namespace(zookeeperConfig.getZkNamespace()).build(); for (ConnectionStateListener connectionStateListener : connectionStateListeners) { curatorFramework.getConnectionStateListenable().addListener(connectionStateListener); } }
@Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if (newState == ConnectionState.CONNECTED) { isConnected.set(true); if (!isFirstConnection.get()) { for (ConnectionStateListener listener : listenerStateProxy.getListeners()) { listener.stateChanged(client, ConnectionState.RECONNECTED); } } return; } if (newState == ConnectionState.LOST) { isConnected.set(false); isFirstConnection.set(false); retryConnection(); } }
@Override public ListenableFuture<Boolean> setMetricsCacheLocation( TopologyMaster.MetricsCacheLocation location, String topologyName) { client.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework arg0, ConnectionState arg1) { if (arg1 != ConnectionState.CONNECTED) { // if not the first time successful connection, fail fast throw new RuntimeException("Unexpected state change to: " + arg1.name()); } } }); return createNode( StateLocation.METRICSCACHE_LOCATION, topologyName, location.toByteArray(), true); }
@Override protected void before() throws Throwable { ts = new TestingServer(); curator = CuratorFrameworkFactory.builder() .namespace("ezkr") .connectString(ts.getConnectString()) .retryPolicy(new BoundedExponentialBackoffRetry(10, 100, 7)) .build(); curator.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(final CuratorFramework client, final ConnectionState newState) { } }); curator.start(); }
@Test public void testBasic() throws Exception { Timing timing = new Timing(); DistributedDelayQueue<Long> queue = null; CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); client.start(); try { BlockingQueueConsumer<Long> consumer = new BlockingQueueConsumer<Long>(Mockito.mock(ConnectionStateListener.class)); queue = QueueBuilder.builder(client, consumer, new LongSerializer(), "/test").buildDelayQueue(); queue.start(); queue.put(1L, System.currentTimeMillis() + 1000); Thread.sleep(100); Assert.assertEquals(consumer.size(), 0); // delay hasn't been reached Long value = consumer.take(timing.forWaiting().seconds(), TimeUnit.SECONDS); Assert.assertEquals(value, Long.valueOf(1)); } finally { CloseableUtils.closeQuietly(queue); CloseableUtils.closeQuietly(client); } }
private BlockingQueueConsumer<String> makeConsumer(final CountDownLatch latch) { ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { } }; return new BlockingQueueConsumer<String>(connectionStateListener) { @Override public void consumeMessage(String message) throws Exception { if ( latch != null ) { latch.await(); } super.consumeMessage(message); } }; }
private CuratorFramework buildCuratorWithZookeeperDirectly(Configuration configuration) { LOGGER.debug("configuring direct zookeeper connection."); CuratorFramework curator = CuratorFrameworkFactory.newClient( this.zookeeperConnectionString, configuration.getInt(ZOOKEEPER_SESSION_TIMEOUT_MILLIS.getPropertyName()), configuration.getInt(ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS.getPropertyName()), buildZookeeperRetryPolicy(configuration)); curator.getConnectionStateListenable().addListener(new ConnectionStateListener() { public void stateChanged(CuratorFramework client, ConnectionState newState) { LOGGER.debug("Connection state to ZooKeeper changed: " + newState); } }); return curator; }
private ZookeeperClient(){ //1000 是重试间隔时间基数,3 是重试次数 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); zkClient = createWithOptions(zkConnectionString, retryPolicy, 2000, 10000); zkClient.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { logger.info("CuratorFramework state changed: {}", newState); if(newState == ConnectionState.CONNECTED || newState == ConnectionState.RECONNECTED){ for (String key : listeners.keySet()) { System.out.println(key); IZoopkeeperListener listener = listeners.get(key); listener.execute(client); } } } }); zkClient.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() { @Override public void unhandledError(String message, Throwable e) { logger.info("CuratorFramework unhandledError: {}", message); } }); zkClient.start(); }
/** * Builds a {@link org.apache.curator.framework.CuratorFramework} from the specified {@link java.util.Map<String, ?>}. */ private synchronized CuratorFramework buildCuratorFramework(ZookeeperConfig curatorConfig) { CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .ensembleProvider(new FixedEnsembleProvider(curatorConfig.getZookeeperUrl())) .connectionTimeoutMs(curatorConfig.getZookeeperConnectionTimeOut()) .sessionTimeoutMs(curatorConfig.getZookeeperSessionTimeout()) .retryPolicy(new RetryNTimes(curatorConfig.getZookeeperRetryMax(), curatorConfig.getZookeeperRetryInterval())); if (!StringUtils.isNullOrBlank(curatorConfig.getZookeeperPassword())) { String scheme = "digest"; byte[] auth = ("fabric:" + PasswordEncoder.decode(curatorConfig.getZookeeperPassword())).getBytes(); builder = builder.authorization(scheme, auth).aclProvider(aclProvider.get()); } CuratorFramework framework = builder.build(); for (ConnectionStateListener listener : connectionStateListeners) { framework.getConnectionStateListenable().addListener(listener); } return framework; }
public RestablishingKeeper(String hostList) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5); client = CuratorFrameworkFactory.newClient(hostList, retryPolicy); client.getConnectionStateListenable().addListener(new ConnectionStateListener(){ @Override public void stateChanged(CuratorFramework framework, ConnectionState state) { LOGGER.debug("State change "+ state); if (state.equals(ConnectionState.CONNECTED) || state.equals(ConnectionState.RECONNECTED)){ reEstablished.incrementAndGet(); try { onReconnect(framework.getZookeeperClient().getZooKeeper(), framework); } catch (Exception e) { throw new RuntimeException(e); } } }}); }
@Override public void addListener(Object listener) { ConnectionStateListener stateListener = (ConnectionStateListener) listener; if (curator != null) { curator.getConnectionStateListenable().addListener(stateListener); } listeners.add(stateListener); }
@Override public void addListener(Object listener, Executor executor) { ConnectionStateListener stateListener = (ConnectionStateListener) listener; if (curator != null) { curator.getConnectionStateListenable().addListener(stateListener, executor); } listeners.add(stateListener); }
@Override public void removeListener(Object listener) { ConnectionStateListener stateListener = (ConnectionStateListener) listener; if (curator != null) { curator.getConnectionStateListenable().removeListener(stateListener); } listeners.remove(stateListener); }
public void clearCurator() { for (ConnectionStateListener stateListener : listeners) { if (curator != null) { curator.getConnectionStateListenable().removeListener(stateListener); } } }
public void updateCurator(CuratorFramework curator) { this.curator = curator; for (ConnectionStateListener stateListener : listeners) { if (this.curator != null) { this.curator.getConnectionStateListenable().addListener(stateListener); } } }
/** * Creates a KafkaConsumerCache object. Before it is used, you must call * startCache() * * @param apiId * @param s * @param metrics */ public KafkaConsumerCache(String apiId, MetricsSet metrics) { if (apiId == null) { throw new IllegalArgumentException("API Node ID must be specified."); } fApiId = apiId; // fSettings = s; fMetrics = metrics; String strkSetting_ZkBasePath= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_ZkBasePath); if(null==strkSetting_ZkBasePath)strkSetting_ZkBasePath = kDefault_ZkBasePath; fBaseZkPath = strkSetting_ZkBasePath; fConsumers = new ConcurrentHashMap<String, KafkaConsumer>(); fSweepScheduler = Executors.newScheduledThreadPool(1); curatorConsumerCache = null; status = Status.NOT_STARTED; listener = new ConnectionStateListener() { public void stateChanged(CuratorFramework client, ConnectionState newState) { if (newState == ConnectionState.LOST) { log.info("ZooKeeper connection expired"); handleConnectionLoss(); } else if (newState == ConnectionState.READ_ONLY) { log.warn("ZooKeeper connection set to read only mode."); } else if (newState == ConnectionState.RECONNECTED) { log.info("ZooKeeper connection re-established"); handleReconnection(); } else if (newState == ConnectionState.SUSPENDED) { log.warn("ZooKeeper connection has been suspended."); handleConnectionSuspended(); } } }; }
public CuratorClient(CuratorParam curatorParam) throws InterruptedException { CuratorFramework newClient = CuratorFrameworkFactory.builder() .sessionTimeoutMs(curatorParam.getSessionTimeOut()) .connectionTimeoutMs(curatorParam.getConnectTimeOut()) .connectString(curatorParam.getAddress()) .retryPolicy(new RetryNTimes(curatorParam.getRetries(), curatorParam.getRetryInterval())) .namespace(curatorParam.getNameSpace()) .build(); newClient.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if(LOGGER.isInfoEnabled()) LOGGER.info("zookeeper client status change to: " + newState.toString()); // TODO 连接状态变化更新 MONITOR.logEvent("Bee.registry", "zookeeper:" + newState.name().toLowerCase(), ""); } }); newClient.getCuratorListenable().addListener(new CuratorEventListener(this), curatorEventThreadPool); newClient.start(); boolean isConnect = newClient.getZookeeperClient().blockUntilConnectedOrTimedOut(); CuratorFramework oldClient = this.client; closeCuratorClient(oldClient); this.client = newClient; if(isConnect) { if(LOGGER.isInfoEnabled()) LOGGER.info("CuratorClient: already connected"); MONITOR.logEvent("Bee.registry", "zookeeper:rebuild_success", ""); } else { LOGGER.error("CuratorClient: failed to connect zookeeper server"); MONITOR.logEvent("Bee.registry", "zookeeper:rebuild_failure", ""); } }
public void addReconnectionWatcher(final String path, final ZookeeperWatcherType watcherType, final CuratorWatcher watcher) { synchronized (this) { if (!watchers.contains(watcher.toString()))// 不要添加重复的监听事件 { watchers.add(watcher.toString()); System.out.println("add new watcher " + watcher); zkTools.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { System.out.println(newState); if (newState == ConnectionState.LOST) {// 处理session过期 try { if (watcherType == ZookeeperWatcherType.EXITS) { zkTools.checkExists().usingWatcher(watcher).forPath(path); } else if (watcherType == ZookeeperWatcherType.GET_CHILDREN) { zkTools.getChildren().usingWatcher(watcher).forPath(path); } else if (watcherType == ZookeeperWatcherType.GET_DATA) { zkTools.getData().usingWatcher(watcher).forPath(path); } else if (watcherType == ZookeeperWatcherType.CREATE_ON_NO_EXITS) { // ephemeral类型的节点session过期了,需要重新创建节点,并且注册监听事件,之后监听事件中, // 会处理create事件,将路径值恢复到先前状态 Stat stat = zkTools.checkExists().usingWatcher(watcher).forPath(path); if (stat == null) { System.err.println("to create"); zkTools.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path); } } } catch (Exception e) { e.printStackTrace(); } } } }); } } }
@Test public void assertAddConnectionStateListener() { CuratorFramework client = mock(CuratorFramework.class); @SuppressWarnings("unchecked") Listenable<ConnectionStateListener> listeners = mock(Listenable.class); ConnectionStateListener listener = mock(ConnectionStateListener.class); when(client.getConnectionStateListenable()).thenReturn(listeners); when(coordinatorRegistryCenter.getRawClient()).thenReturn(client); jobNodeStorage.addConnectionStateListener(listener); verify(listeners).addListener(listener); }
@Before public void setUp() throws Exception { Injector inj = Guice.createInjector(new CuratorModule(new AbstractModule() { @Override protected void configure() { bind(EnsembleProvider.class).annotatedWith(Curator.class).toInstance( new FixedEnsembleProvider(testingCluster.getConnectString())); bind(RetryPolicy.class).annotatedWith(Curator.class).toInstance(new ExponentialBackoffRetry(1000, 3)); } })); manager = inj.getInstance(CultivarStartStopManager.class); CuratorManagementService managementService = inj.getInstance(Key.get(CuratorManagementService.class, Curator.class)); managementService.addConnectionListener(new ConnectionStateListener() { @Override public void stateChanged(final CuratorFramework client, final ConnectionState newState) { if (ConnectionState.CONNECTED.equals(newState)) { connectionLatch.countDown(); } } }); state = inj.getInstance(LastKnownState.class); manager.startAsync().awaitRunning(); }
@Inject DefaultCuratorManagementService(@Curator final CuratorFramework framework, final Set<ConnectionStateListener> connectionStateListeners, final Set<CuratorListener> curatorListeners, final Set<UnhandledErrorListener> unhandledErrorListeners) { this.framework = framework; this.connectionStateListeners = connectionStateListeners; this.curatorListeners = curatorListeners; this.unhandledErrorListeners = unhandledErrorListeners; }
@Test @SuppressWarnings("unchecked") public void getInstance_SetOfConnectionStateListeners_ContainsConnectionStateMetricsListener() { Injector inj = Guice.createInjector(new MetricsModule()); assertTrue(((Set<ConnectionStateListener>) inj.getInstance(Key.get(Types.setOf(ConnectionStateListener.class)))) .contains(inj.getInstance(ConnectionStateMetricsListener.class))); }
@Test public void testDisconnectEventOnWatcherDoesNotRetry() throws Exception { final CountDownLatch gotSuspendEvent = new CountDownLatch(1); CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 1000)); curatorFramework.start(); curatorFramework.blockUntilConnected(); SharedCount sharedCount = new SharedCount(curatorFramework, "/count", 10); sharedCount.start(); curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if (newState == ConnectionState.SUSPENDED) { gotSuspendEvent.countDown(); } } }); try { server.stop(); // if watcher goes into 10second retry loop we won't get timely notification Assert.assertTrue(gotSuspendEvent.await(5, TimeUnit.SECONDS)); } finally { CloseableUtils.closeQuietly(sharedCount); TestCleanState.closeAndTestClean(curatorFramework); } }
@Test public void testLateAddition() throws Exception { Timing timing = new Timing(); DistributedDelayQueue<Long> queue = null; CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); client.start(); try { BlockingQueueConsumer<Long> consumer = new BlockingQueueConsumer<Long>(Mockito.mock(ConnectionStateListener.class)); queue = QueueBuilder.builder(client, consumer, new LongSerializer(), "/test").buildDelayQueue(); queue.start(); queue.put(1L, System.currentTimeMillis() + Integer.MAX_VALUE); // never come out Long value = consumer.take(1, TimeUnit.SECONDS); Assert.assertNull(value); queue.put(2L, System.currentTimeMillis()); value = consumer.take(timing.seconds(), TimeUnit.SECONDS); Assert.assertEquals(value, Long.valueOf(2)); value = consumer.take(1, TimeUnit.SECONDS); Assert.assertNull(value); } finally { CloseableUtils.closeQuietly(queue); CloseableUtils.closeQuietly(client); } }
@Test public void testSimple() throws Exception { final int QTY = 10; Timing timing = new Timing(); DistributedDelayQueue<Long> queue = null; CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); client.start(); try { BlockingQueueConsumer<Long> consumer = new BlockingQueueConsumer<Long>(Mockito.mock(ConnectionStateListener.class)); queue = QueueBuilder.builder(client, consumer, new LongSerializer(), "/test").buildDelayQueue(); queue.start(); Random random = new Random(); for ( int i = 0; i < QTY; ++i ) { long delay = System.currentTimeMillis() + random.nextInt(100); queue.put(delay, delay); } long lastValue = -1; for ( int i = 0; i < QTY; ++i ) { Long value = consumer.take(timing.forWaiting().seconds(), TimeUnit.SECONDS); Assert.assertNotNull(value); Assert.assertTrue(value >= lastValue); lastValue = value; } } finally { CloseableUtils.closeQuietly(queue); CloseableUtils.closeQuietly(client); } }
@Test public void testMultiPutterSingleGetter() throws Exception { final int itemQty = 100; DistributedQueue<TestQueueItem> queue = null; CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); client.start(); try { BlockingQueueConsumer<TestQueueItem> consumer = new BlockingQueueConsumer<TestQueueItem>(Mockito.mock(ConnectionStateListener.class)); queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_PATH).buildQueue(); queue.start(); QueueTestProducer producer1 = new QueueTestProducer(queue, itemQty / 2, 0); QueueTestProducer producer2 = new QueueTestProducer(queue, ((itemQty + 1) / 2), itemQty / 2); ExecutorService service = Executors.newCachedThreadPool(); service.submit(producer1); service.submit(producer2); int iteration = 0; while ( consumer.size() < itemQty ) { Assert.assertTrue(++iteration < 10); Thread.sleep(1000); } List<TestQueueItem> items = consumer.getItems(); Assert.assertEquals(com.google.common.collect.Sets.<TestQueueItem>newHashSet(items).size(), items.size()); // check no duplicates } finally { CloseableUtils.closeQuietly(queue); CloseableUtils.closeQuietly(client); } }
@Test public void testSimple() throws Exception { final int itemQty = 10; DistributedQueue<TestQueueItem> queue = null; CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); client.start(); try { BlockingQueueConsumer<TestQueueItem> consumer = new BlockingQueueConsumer<TestQueueItem>(Mockito.mock(ConnectionStateListener.class)); queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_PATH).buildQueue(); queue.start(); QueueTestProducer producer = new QueueTestProducer(queue, itemQty, 0); ExecutorService service = Executors.newCachedThreadPool(); service.submit(producer); int iteration = 0; while ( consumer.size() < itemQty ) { Assert.assertTrue(++iteration < 10); Thread.sleep(1000); } int i = 0; for ( TestQueueItem item : consumer.getItems() ) { Assert.assertEquals(item.str, Integer.toString(i++)); } } finally { CloseableUtils.closeQuietly(queue); CloseableUtils.closeQuietly(client); } }
@Test public void testMinItemsBeforeRefresh() throws Exception { DistributedPriorityQueue<Integer> queue = null; CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); client.start(); try { final int minItemsBeforeRefresh = 3; BlockingQueueConsumer<Integer> consumer = new BlockingQueueConsumer<Integer>(Mockito.mock(ConnectionStateListener.class)); queue = QueueBuilder.builder(client, consumer, new IntSerializer(), "/test").buildPriorityQueue(minItemsBeforeRefresh); queue.start(); for ( int i = 0; i < 10; ++i ) { queue.put(i, 10 + i); } Assert.assertEquals(consumer.take(1, TimeUnit.SECONDS), new Integer(0)); queue.put(1000, 1); // lower priority int count = 0; while ( consumer.take(1, TimeUnit.SECONDS) < 1000 ) { ++count; } Assert.assertTrue(Math.abs(minItemsBeforeRefresh - count) < minItemsBeforeRefresh, String.format("Diff: %d - min: %d", Math.abs(minItemsBeforeRefresh - count), minItemsBeforeRefresh)); // allows for some slack - testing that within a slop value the newly inserted item with lower priority comes out } finally { CloseableUtils.closeQuietly(queue); CloseableUtils.closeQuietly(client); } }
private CuratorFramework buildCuratorWithExhibitor(Configuration configuration) { LOGGER.debug("configuring zookeeper connection through Exhibitor..."); ExhibitorEnsembleProvider ensembleProvider = new KixeyeExhibitorEnsembleProvider( exhibitors, new KixeyeExhibitorRestClient(configuration.getBoolean(EXHIBITOR_USE_HTTPS.getPropertyName())), configuration.getString(EXHIBITOR_URI_PATH.getPropertyName()), configuration.getInt(EXHIBITOR_POLL_INTERVAL.getPropertyName()), new ExponentialBackoffRetry( configuration.getInt(EXHIBITOR_INITIAL_SLEEP_MILLIS.getPropertyName()), configuration.getInt(EXHIBITOR_MAX_RETRIES.getPropertyName()), configuration.getInt(EXHIBITOR_RETRIES_MAX_MILLIS.getPropertyName()))); //without this (undocumented) step, curator will attempt (and fail) to connect to a local instance of zookeeper (default behavior if no zookeeper connection string is provided) for //several seconds until the EnsembleProvider polls to get the SERVER list from Exhibitor. Polling before staring curator //ensures that the SERVER list from Exhibitor is already downloaded before curator attempts to connect to zookeeper. try { ensembleProvider.pollForInitialEnsemble(); } catch (Exception e) { try { Closeables.close(ensembleProvider, true); } catch (IOException e1) { } throw new BootstrapException("Failed to initialize Exhibitor with host(s) " + exhibitors.getHostnames(), e); } CuratorFramework curator = CuratorFrameworkFactory.builder().ensembleProvider(ensembleProvider).retryPolicy(buildZookeeperRetryPolicy(configuration)).build(); curator.getConnectionStateListenable().addListener(new ConnectionStateListener() { public void stateChanged(CuratorFramework client, ConnectionState newState) { LOGGER.debug("Connection state to ZooKeeper changed: " + newState); } }); return curator; }
@Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if (newState == ConnectionState.CONNECTED) { if (registration == null) { registration = bundleContext.registerService(CuratorFramework.class, curator, null); } } for (ConnectionStateListener listener : connectionStateListeners) { listener.stateChanged(client, newState); } if (newState == ConnectionState.LOST) { run(); } }
void bindConnectionStateListener(ConnectionStateListener connectionStateListener) { connectionStateListeners.add(connectionStateListener); State curr = state.get(); CuratorFramework curator = curr != null ? curr.curator : null; if (curator != null && curator.getZookeeperClient().isConnected()) { connectionStateListener.stateChanged(curator, ConnectionState.CONNECTED); } }
@Inject DefaultCuratorManagementService(@Curator final CuratorFramework framework, final Set<ConnectionStateListener> connectionStateListeners, final Set<CuratorListener> curatorListeners, final Set<UnhandledErrorListener> unhandledErrorListeners) { this(framework, connectionStateListeners, curatorListeners, unhandledErrorListeners, LoggerFactory .getLogger(DefaultCuratorManagementService.class)); }
DefaultCuratorManagementService(@Curator final CuratorFramework framework, final Set<ConnectionStateListener> connectionStateListeners, final Set<CuratorListener> curatorListeners, final Set<UnhandledErrorListener> unhandledErrorListeners, final Logger log) { this.framework = framework; this.connectionStateListeners = connectionStateListeners; this.curatorListeners = curatorListeners; this.unhandledErrorListeners = unhandledErrorListeners; this.log = log; }
public void clearListener(){ ListenerContainer<ConnectionStateListener> list=(ListenerContainer<ConnectionStateListener>) client.getConnectionStateListenable(); list.clear(); }
/** * 添加连接监听 * @param listener zk状态监听listener */ public void addConnectionLJistener(ConnectionStateListener listener) { CuratorFramework client = (CuratorFramework) regCenter.getRawClient(); client.getConnectionStateListenable().addListener(listener); }