public void start() throws Exception { if (this.cache == null) { if (this.executorService == null) { this.cache = new PathChildrenCache(client, path, cacheData); } else { this.cache = new PathChildrenCache(client, path, cacheData, dataIsCompressed, this.executorService); } } this.cache.getListenable().addListener(this); this.cache.start(StartMode.POST_INITIALIZED_EVENT); //this.prepareInstances(); // call super to initialize the pool; super.start(); LOG.info("transport pooling factory started. "); }
@Override public PathChildrenCache createPathCache(String type, boolean cacheData, PathChildrenCacheListener listener, StartMode startMode) { try { PathChildrenCache cache = new PathChildrenCache(client, getParentPath(type), cacheData); if( listener != null ) { cache.getListenable().addListener(listener); } cache.start(startMode); return cache; } catch( Exception e ) { throw Throwables.propagate(e); } }
public ZkAbstractStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException { this.parent = "/" + config.getName(); this.prefix = parent + "/"; this.framework = framework; this.config = config; // make sure the parent node exists. try { if (framework.checkExists().forPath(parent) == null) { framework.create().withMode(CreateMode.PERSISTENT).forPath(parent); } this.childrenCache = new PathChildrenCache(framework, parent, true); this.childrenCache.start(StartMode.BUILD_INITIAL_CACHE); } catch (Exception e) { throw new RuntimeException("Failure while accessing Zookeeper for PStore: " + e.getMessage(), e); } }
/** * Create mock {@link PathChildrenCache} using given controller ID and DPIDs. * * @param controllerId Controller ID to represent current data. * @param paths List of HexString indicating switch's DPID. * @param listener Callback object to be set as Listenable. * @return Mock PathChildrenCache object * @throws Exception */ private PathChildrenCache createPathChildrenCacheMock( final String controllerId, final String[] paths, ListenerContainer<PathChildrenCacheListener> listener) throws Exception { PathChildrenCache pathChildrenCache = createMock(PathChildrenCache.class); expect(pathChildrenCache.getListenable()).andReturn(listener).anyTimes(); pathChildrenCache.start(anyObject(StartMode.class)); expectLastCall().anyTimes(); List<ChildData> childs = new ArrayList<ChildData>(); for (String path : paths) { childs.add(createChildDataMockForCurrentData(controllerId, path)); } expect(pathChildrenCache.getCurrentData()).andReturn(childs).anyTimes(); pathChildrenCache.rebuild(); expectLastCall().anyTimes(); replay(pathChildrenCache); return pathChildrenCache; }
public TransactorCache(Environment env) { final FluoConfiguration conf = env.getConfiguration(); timeoutCache = CacheBuilder.newBuilder().maximumSize(FluoConfigurationImpl.getTransactorMaxCacheSize(conf)) .expireAfterAccess( FluoConfigurationImpl.getTransactorCacheTimeout(conf, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) .concurrencyLevel(10).build(); this.env = env; cache = new PathChildrenCache(env.getSharedResources().getCurator(), ZookeeperPath.TRANSACTOR_NODES, true); try { cache.start(StartMode.BUILD_INITIAL_CACHE); status = TcStatus.OPEN; } catch (Exception e) { throw new RuntimeException(e); } }
@PostConstruct public void init() { taskRunnerExecutor = new ExecutorCompletionService<TaskResult>(createTaskExecutor()); priorityTaskRunnerExecutor = new ExecutorCompletionService<TaskResult>(createPriorityTaskExecutor()); Version version = ApplicationVersion.get(); String prefix; if( version.isDevelopment() ) { prefix = "tasksdev/"; } else { prefix = "tasks-" + version.getMmr() + '/'; } ZK_TASKPATH = prefix + ZK_TASKPATH; ZK_GLOBALTASKPATH = prefix + ZK_GLOBALTASKPATH; ZK_TASKOWNERPATH = prefix + ZK_TASKOWNERPATH; globalCache = zookeeperService.createPathCache(ZK_GLOBALTASKPATH, true); taskCache = zookeeperService.createPathCache(ZK_TASKPATH, false, this, StartMode.POST_INITIALIZED_EVENT); curator = zookeeperService.getCurator(); ourNodeId = zookeeperService.getNodeId(); final Reaper reaper = new Reaper(curator, 10000); try { reaper.start(); } catch( Exception e1 ) { Throwables.propagate(e1); } new TaskWatchThread("Task Finisher listener", taskRunnerExecutor, reaper).start(); new TaskWatchThread("Priority Task Finisher listener", priorityTaskRunnerExecutor, reaper).start(); }
@Override public void addDataListener(String path, DataListener listener) { try { // 第一步:获取-校验-创建监听器 PathChildrenCacheListener pathChildrenCacheListener = dataListenerMap.get(listener); if(pathChildrenCacheListener != null){//已监听 return; } else { // 添加外部监听器 Set<DataListener> dataListenerSet = dataListenersMap.get(path); if(dataListenerSet == null){ dataListenersMap.put(path, dataListenerSet = new ConcurrentHashSet<DataListener>()); } dataListenerSet.add(listener); dataListenerMap.put(listener, pathChildrenCacheListener = new PathChildrenCacheListenerImpl(path)); } // 第二步:获取-校验-创建子节点缓存连接 PathChildrenCache pathChildrenCache = pathChildrenCacheMap.get(path); if(pathChildrenCache == null){ pathChildrenCacheMap.put(path, pathChildrenCache = new PathChildrenCache(client, path, true)); // 第三步:启动监听 pathChildrenCache.start(StartMode.POST_INITIALIZED_EVENT); } // 第四步:添加监听器 pathChildrenCache.getListenable().addListener(pathChildrenCacheListener); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
@Override public void afterPropertiesSet() throws Exception { // 如果zk尚未启动,则启动 if (zkClient.getState() == CuratorFrameworkState.LATENT) { zkClient.start(); } buildPathChildrenCache(zkClient, getServicePath(), true); cachedPath.start(StartMode.POST_INITIALIZED_EVENT); countDownLatch.await(); }
public ServiceCache build() throws Exception { cache.start(StartMode.BUILD_INITIAL_CACHE); // init cache data for (ChildData childData : cache.getCurrentData()) { addProvider(childData, true); } return this; }
private void initialisePathChildrenCache() throws Exception { if (curatorFramework.checkExists().forPath("/brokers/ids") != null) { pathChildrenCache = makePathChildrenCache(); pathChildrenCache.start(StartMode.BUILD_INITIAL_CACHE); } else { throw new NoNodeException("There is no node at /brokers/ids"); } initialised = true; }
@Test public void whenItIsNotInitialised_fetchBrokerInfo_initialisesZookeeperConnection() throws Exception { Mockito.when(existsBuilder.forPath(Mockito.anyString())).thenReturn(new Stat()); Mockito.when(pathChildrenCache.getCurrentData()).thenReturn(new ArrayList<ChildData>()); brokerInfoFetcher.fetchBrokerInfo(); Mockito.verify(zookeeperConnector).getCuratorFramework(); Mockito.verify(pathChildrenCache).start(StartMode.BUILD_INITIAL_CACHE); brokerInfoFetcher.close(); }
private RoundRobinNedisClientPool(CuratorFramework curatorClient, boolean closeCurator, String zkProxyDir, NedisClientPoolBuilder poolBuilder) throws Exception { this.curatorClient = curatorClient; this.closeCurator = closeCurator; this.poolBuilder = poolBuilder; EventLoop eventLoop = poolBuilder.group().next(); this.closePromise = eventLoop.newPromise(); this.initPromise = eventLoop.newPromise(); watcher = new PathChildrenCache(curatorClient, zkProxyDir, true); watcher.getListenable().addListener(new PathChildrenCacheListener() { private boolean initialized = false; @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { StringBuilder sb = new StringBuilder("Zookeeper event received: type=") .append(event.getType()); if (event.getData() != null) { ChildData data = event.getData(); sb.append(", path=").append(data.getPath()).append(", stat=") .append(data.getStat()); } LOG.info(sb.toString()); if (!initialized) { if (event.getType() == INITIALIZED) { resetPools(); initPromise.trySuccess(RoundRobinNedisClientPool.this); initialized = true; } } else if (RESET_TYPES.contains(event.getType())) { resetPools(); } } }); watcher.start(StartMode.POST_INITIALIZED_EVENT); }
protected void load(final String path) throws ConfigurationException { PathChildrenCache pathChildren = new PathChildrenCache(_client, path, true); try { pathChildren.start(StartMode.BUILD_INITIAL_CACHE); this._pathChildren = pathChildren; if (pathChildren.getCurrentData() != null) { load(pathChildren); } } catch (Exception e) { throw new ConfigurationException("Unable to start path children cache for path " + path, e); } }
PartitionManager(Environment env, long minSleepTime, long maxSleepTime) { try { this.curator = env.getSharedResources().getCurator(); this.env = env; this.minSleepTime = minSleepTime; this.maxSleepTime = maxSleepTime; this.retrySleepTime = minSleepTime; groupSize = env.getConfiguration().getInt(FluoConfigurationImpl.WORKER_PARTITION_GROUP_SIZE, FluoConfigurationImpl.WORKER_PARTITION_GROUP_SIZE_DEFAULT); myESNode = new PersistentEphemeralNode(curator, Mode.EPHEMERAL_SEQUENTIAL, ZookeeperPath.FINDERS + "/" + ZK_FINDER_PREFIX, ("" + groupSize).getBytes(UTF_8)); myESNode.start(); myESNode.waitForInitialCreate(1, TimeUnit.MINUTES); childrenCache = new PathChildrenCache(curator, ZookeeperPath.FINDERS, true); childrenCache.getListenable().addListener(new FindersListener()); childrenCache.start(StartMode.BUILD_INITIAL_CACHE); schedExecutor = Executors.newScheduledThreadPool(1, new FluoThreadFactory("Fluo worker partition manager")); schedExecutor.scheduleWithFixedDelay(new CheckTabletsTask(), 0, maxSleepTime, TimeUnit.MILLISECONDS); scheduleUpdate(); } catch (Exception e) { throw new RuntimeException(e); } }
@Override public PathChildrenCache createPathCache(String type, boolean cacheData) { return createPathCache(type, cacheData, null, StartMode.NORMAL); }
@Override public PathChildrenCache createPathCache(String type, boolean cacheData, PathChildrenCacheListener listener) { return createPathCache(type, cacheData, listener, StartMode.NORMAL); }
PathChildrenCache createPathCache(String type, boolean cacheData, PathChildrenCacheListener listener, StartMode startMode);
@PostConstruct public void start() throws Exception { try { kafkaVersion = new Version(properties.getKafkaVersion()); } catch (Exception ex) { throw new IllegalStateException("Invalid kafka version: " + properties.getKafkaVersion(), ex); } threadPool = new ForkJoinPool(properties.getThreadPoolSize()); FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); backOffPolicy.setBackOffPeriod(properties.getRetry().getBackoffMillis()); final SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(properties.getRetry().getMaxAttempts(), ImmutableMap.of(InterruptedException.class, false, Exception.class, true)); retryTemplate = new RetryTemplate(); retryTemplate.setBackOffPolicy(backOffPolicy); retryTemplate.setRetryPolicy(retryPolicy); cacheInitCounter.set(4); brokerPathCache = new PathChildrenCache(curatorFramework, ZkUtils.BrokerIdsPath(), true); brokerPathCache.getListenable().addListener(new BrokerListener()); brokerPathCache.getListenable().addListener((f, e) -> { if (e.getType() == PathChildrenCacheEvent.Type.INITIALIZED) { cacheInitCounter.decrementAndGet(); LOG.info("Broker cache initialized"); } }); brokerPathCache.start(StartMode.POST_INITIALIZED_EVENT); topicConfigPathCache = new PathChildrenCache(curatorFramework, ZkUtils.TopicConfigPath(), true); topicConfigPathCache.getListenable().addListener((f, e) -> { if (e.getType() == PathChildrenCacheEvent.Type.INITIALIZED) { cacheInitCounter.decrementAndGet(); LOG.info("Topic configuration cache initialized"); } }); topicConfigPathCache.start(StartMode.POST_INITIALIZED_EVENT); topicTreeCache = new TreeCache(curatorFramework, ZkUtils.BrokerTopicsPath()); topicTreeCache.getListenable().addListener((client, event) -> { if (event.getType() == TreeCacheEvent.Type.INITIALIZED) { cacheInitCounter.decrementAndGet(); LOG.info("Topic tree cache initialized"); } }); topicTreeCache.start(); consumerTreeCache = new TreeCache(curatorFramework, ZkUtils.ConsumersPath()); consumerTreeCache.getListenable().addListener((client, event) -> { if (event.getType() == TreeCacheEvent.Type.INITIALIZED) { cacheInitCounter.decrementAndGet(); LOG.info("Consumer tree cache initialized"); } }); consumerTreeCache.start(); controllerNodeCache = new NodeCache(curatorFramework, ZkUtils.ControllerPath()); controllerNodeCache.getListenable().addListener(this::updateController); controllerNodeCache.start(true); updateController(); }
@Override public void init(FloodlightModuleContext context) throws FloodlightModuleException { // Read the Zookeeper connection string from the config Map<String, String> configParams = context.getConfigParams(this); String connectionStringParam = configParams.get("connectionString"); if (connectionStringParam != null) { connectionString = connectionStringParam; } else { connectionString = System.getProperty( "net.onrc.onos.core.registry.ZookeeperRegistry.connectionString", DEFAULT_CONNECTION_STRING); } // Remove spaces from connection string otherwise Zookeeper complains connectionString = connectionString.replaceAll("\\s", ""); log.info("Setting Zookeeper connection string to {}", this.connectionString); namespace = System.getProperty(ZK_NAMESPACE_KEY, DEFAULT_NAMESPACE).trim(); if (namespace.isEmpty()) { namespace = DEFAULT_NAMESPACE; } log.info("Setting Zookeeper namespace to {}", namespace); restApi = context.getServiceImpl(IRestApiService.class); switches = new ConcurrentHashMap<String, SwitchLeadershipData>(); switchPathCaches = new ConcurrentHashMap<String, PathChildrenCache>(); RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); curatorFrameworkClient = CuratorFrameworkFactory.newClient(this.connectionString, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy); curatorFrameworkClient.start(); curatorFrameworkClient = curatorFrameworkClient.usingNamespace(namespace); distributedIdCounter = new DistributedAtomicLong( curatorFrameworkClient, ID_COUNTER_PATH, new RetryOneTime(100)); rootSwitchCache = new PathChildrenCache( curatorFrameworkClient, SWITCH_LATCHES_PATH, true); rootSwitchCache.getListenable().addListener(switchPathCacheListener); // Build the service discovery object serviceDiscovery = ServiceDiscoveryBuilder.builder(ControllerService.class) .client(curatorFrameworkClient).basePath(SERVICES_PATH).build(); // We read the list of services very frequently (GUI periodically // queries them) so we'll cache them to cut down on Zookeeper queries. serviceCache = serviceDiscovery.serviceCacheBuilder() .name(CONTROLLER_SERVICE_NAME).build(); try { serviceDiscovery.start(); serviceCache.start(); // Don't prime the cache, we want a notification for each child // node in the path rootSwitchCache.start(StartMode.NORMAL); } catch (Exception e) { throw new FloodlightModuleException( "Error initialising ZookeeperRegistry", e); } ExecutorService eventThreadExecutorService = Executors.newSingleThreadExecutor(); eventThreadExecutorService.execute( new Runnable() { @Override public void run() { dispatchEvents(); } }); }
@Override public void start() throws Exception { pathChildrenCache.start(StartMode.BUILD_INITIAL_CACHE); pathChildrenCache.getListenable().addListener(this); }