private ZooKeeperCommandExecutor(String replicaId, CommandExecutor delegate, CuratorFramework curator, String zkPath, boolean createPathIfNotExist, File revisionFile, int numWorkers, int maxLogCount, long minLogAgeMillis) { super(replicaId); this.delegate = delegate; this.revisionFile = revisionFile; this.curator = curator; this.zkPath = zkPath; this.createPathIfNotExist = createPathIfNotExist; this.maxLogCount = maxLogCount; this.minLogAgeMillis = minLogAgeMillis; final ThreadPoolExecutor executor = new ThreadPoolExecutor( numWorkers, numWorkers, 60, TimeUnit.SECONDS, new LinkedTransferQueue<>(), new DefaultThreadFactory("zookeeper-command-executor", true)); executor.allowCoreThreadTimeOut(true); this.executor = executor; logWatcher = new PathChildrenCache(curator, absolutePath(LOG_PATH), true); logWatcher.getListenable().addListener(this, MoreExecutors.directExecutor()); oldLogRemover = new OldLogRemover(); leaderSelector = new LeaderSelector(curator, absolutePath(LEADER_PATH), oldLogRemover); leaderSelector.autoRequeue(); }
@PostConstruct public void init() { log.info("Initializing..."); Assert.hasLength(zkUrl, MiscUtils.missingProperty("zk.url")); Assert.notNull(zkRetryInterval, MiscUtils.missingProperty("zk.retry_interval_ms")); Assert.notNull(zkConnectionTimeout, MiscUtils.missingProperty("zk.connection_timeout_ms")); Assert.notNull(zkSessionTimeout, MiscUtils.missingProperty("zk.session_timeout_ms")); log.info("Initializing discovery service using ZK connect string: {}", zkUrl); zkNodesDir = zkDir + "/nodes"; try { client = CuratorFrameworkFactory.newClient(zkUrl, zkSessionTimeout, zkConnectionTimeout, new RetryForever(zkRetryInterval)); client.start(); client.blockUntilConnected(); cache = new PathChildrenCache(client, zkNodesDir, true); cache.getListenable().addListener(this); cache.start(); } catch (Exception e) { log.error("Failed to connect to ZK: {}", e.getMessage(), e); CloseableUtils.closeQuietly(client); throw new RuntimeException(e); } }
@Override public void watchSlave() { if(client==null){ throw new IllegalArgumentException("param illegal with client={null}"); } try { initSlaveNode(); PathChildrenCache watcher = new PathChildrenCache( client, ZkNode.ROOT_NODE_PATH, true ); watcher.getListenable().addListener(new SlaveNodeWatcher()); watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); }catch(Exception e){ LOGGER.error("watchSlave error cause:",e); } }
public void start() throws Exception { if (this.cache == null) { if (this.executorService == null) { this.cache = new PathChildrenCache(client, path, cacheData); } else { this.cache = new PathChildrenCache(client, path, cacheData, dataIsCompressed, this.executorService); } } this.cache.getListenable().addListener(this); this.cache.start(StartMode.POST_INITIALIZED_EVENT); //this.prepareInstances(); // call super to initialize the pool; super.start(); LOG.info("transport pooling factory started. "); }
public static void addChildPathCache( String path ,PathChildrenCacheListener listener ) { NameableExecutor businessExecutor = MycatServer.getInstance().getBusinessExecutor(); ExecutorService executor = businessExecutor ==null?Executors.newFixedThreadPool(5): businessExecutor; try { /** * 监听子节点的变化情况 */ final PathChildrenCache childrenCache = new PathChildrenCache(getConnection(), path, true); childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); childrenCache.getListenable().addListener(listener,executor); } catch (Exception e) { throw new RuntimeException(e); } }
/** * 设置子节点更改监听 * * @param path * @throws Exception */ public boolean listenerPathChildrenCache(String path, BiConsumer<CuratorFramework, PathChildrenCacheEvent> biConsumer) { if (!ObjectUtils.allNotNull(zkClient, path, biConsumer)) { return Boolean.FALSE; } try { Stat stat = exists(path); if (stat != null) { PathChildrenCache watcher = new PathChildrenCache(zkClient, path, true); watcher.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); //该模式下 watcher在重连的时候会自动 rebuild 否则需要重新rebuild watcher.getListenable().addListener(biConsumer::accept, pool); if (!pathChildrenCaches.contains(watcher)) { pathChildrenCaches.add(watcher); } // else{ // watcher.rebuild(); // } return Boolean.TRUE; } } catch (Exception e) { log.error("listen path children cache fail! path:{} , error:{}", path, e); } return Boolean.FALSE; }
@Override public PathChildrenCache createPathCache(String type, boolean cacheData, PathChildrenCacheListener listener, StartMode startMode) { try { PathChildrenCache cache = new PathChildrenCache(client, getParentPath(type), cacheData); if( listener != null ) { cache.getListenable().addListener(listener); } cache.start(startMode); return cache; } catch( Exception e ) { throw Throwables.propagate(e); } }
public ZkAbstractStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException { this.parent = "/" + config.getName(); this.prefix = parent + "/"; this.framework = framework; this.config = config; // make sure the parent node exists. try { if (framework.checkExists().forPath(parent) == null) { framework.create().withMode(CreateMode.PERSISTENT).forPath(parent); } this.childrenCache = new PathChildrenCache(framework, parent, true); this.childrenCache.start(StartMode.BUILD_INITIAL_CACHE); } catch (Exception e) { throw new RuntimeException("Failure while accessing Zookeeper for PStore: " + e.getMessage(), e); } }
/** * Close the trigger. */ @Override public void close() { // Close each of the caches that we originally opened for (PathChildrenCache cache : caches) { try { cache.close(); } catch (IOException ex) { logger.error("Unable to close cache {}", ex); } } if (curator != null) { curator.close(); curator = null; curatorHelper = null; } isOpen = false; }
public ServiceCacheImplProxy(ServiceDiscoveryImpl<T> discovery, String name, ThreadFactory threadFactory) { this.serviceCacheImpl = new ServiceCacheImpl<T>(discovery, name, threadFactory); try { Field privateListenerContainerField = ServiceCacheImpl.class.getDeclaredField("listenerContainer"); privateListenerContainerField.setAccessible(true); this.listenerContainer = (ListenerContainer)privateListenerContainerField.get(serviceCacheImpl); } catch (NoSuchFieldException | IllegalAccessException e) { log.error("Failed to construct Service Cache. Container listeners is null."); } Preconditions.checkNotNull(discovery, "discovery cannot be null"); Preconditions.checkNotNull(name, "name cannot be null"); Preconditions.checkNotNull(threadFactory, "threadFactory cannot be null"); Preconditions.checkNotNull(this.listenerContainer, "container of listeners can not be null"); this.discovery = discovery; this.cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, threadFactory); this.cache.getListenable().addListener(this); }
@Override public void start(final PathChildrenCache.StartMode startMode) throws DataSourceConnectorException { if (useCache || useCacheWhenNotConnectedToDataSource) { if (! connector.isConnected()) { throw new DataSourceConnectorException("Failed to start cache for path=" + path + " due to no connection"); } try { cache.start(startMode); allowUseCache(); log.debug("Successfully started cache for path={}", path); } catch (Exception e) { log.error("Failed to start cache for path={}", path, e); } } }
void applyChangesToHostCaches(Set<XreStackPath> allStackPaths) { List<XreStackPath> keysToRemove = hostCaches.keySet().stream().filter(xreStackPath -> !allStackPaths.contains(xreStackPath)).collect(Collectors.toList()); keysToRemove.forEach(key -> { closeCache(key); hostCaches.remove(key); }); allStackPaths.stream() .filter(path -> ! hostCaches.keySet().contains(path)) .forEach(xreStackPath -> { PathChildrenCache cache = new PathChildrenCache(curator, getAbsolutePath(xreStackPath.getPath()), true); cache.getListenable().addListener((client, event) -> listenersNotifier.notifyListeners()); try { cache.start(); } catch (Exception e) { log.error("Failed to start cache for discovered xreStackPath=" + xreStackPath.getPath(), e.getMessage()); } hostCaches.put(xreStackPath, cache); }); log.info("DiscoveredStacksCount=" + allStackPaths.size() + " and AppliedStacksCount=" + hostCaches.size()); }
@Override public void watchService() { logger.info("watchService {}", path); cache =new PathChildrenCache(curatorFramework, path ,true); cache.getListenable().addListener((client,event)->{ switch (event.getType()) { case CHILD_ADDED: dealAdd(event); break; case CHILD_REMOVED: dealRemove(event); break; default: break; } }); try { // PathChildrenCache.StartMode.POST_INITIALIZED_EVENT cache.start(); // countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } }
@PostConstruct public void listen() throws Exception { StandbyApiFactory standbyApiFactory = new StandbyApiFactoryImpl(client); PathChildrenCache pathChildrenCache = new PathChildrenCache(client, standbyApiFactory.pathApi().getJobPath(), true); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public synchronized void childEvent(CuratorFramework clientInner, PathChildrenCacheEvent event) throws Exception { if (!EventHelper.isChildUpdateEvent(event) && !EventHelper.isChildAddEvent(event)) { return; } StandbyJobData standbyJobData = new StandbyJobData(event.getData()); if (!standbyJobData.getData().isOperated()) { return; } LoggerHelper.info("begin update standby job summary " + standbyJobData.getData()); standbyJobSummaryService.updateJobSummary(standbyJobData.getData()); standbyJobLogService.updateJobLog(standbyJobData.getData()); LoggerHelper.info("update standby job summary successfully " + standbyJobData.getData()); } }); pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); }
@PostConstruct public void listen() throws Exception { MasterSlaveApiFactory masterSlaveApiFactory = new MasterSlaveApiFactoryImpl(client); PathChildrenCache pathChildrenCache = new PathChildrenCache(client, masterSlaveApiFactory.pathApi().getJobPath(), true); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public synchronized void childEvent(CuratorFramework clientInner, PathChildrenCacheEvent event) throws Exception { if (!EventHelper.isChildUpdateEvent(event) && !EventHelper.isChildAddEvent(event)) { return; } MasterSlaveJobData masterSlaveJobData = new MasterSlaveJobData(event.getData()); if (!masterSlaveJobData.getData().isOperated()) { return; } LoggerHelper.info("begin update master-slave job summary " + masterSlaveJobData.getData()); masterSlaveJobSummaryService.updateJobSummary(masterSlaveJobData.getData()); masterSlaveJobLogService.updateJobLog(masterSlaveJobData.getData()); LoggerHelper.info("update master-slave job summary successfully " + masterSlaveJobData.getData()); } }); pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); }
SampleRateUpdater(CuratorFramework client, GroupMember storeRateMember, String storeRatePath, String sampleRatePath, Function<Map<String, Integer>, Optional<Float>> calculator, Supplier<Boolean> guard ) { this.storeRateMember = storeRateMember; this.sampleRatePath = sampleRatePath; this.calculator = calculator; this.guard = guard; // We don't need to cache the data as we can already access it from storeRateMember this.dataWatcher = new PathChildrenCache(client, storeRatePath, false); try { this.dataWatcher.start(); } catch (Exception e) { throw new IllegalStateException(e); } dataWatcher.getListenable().addListener(this); }
public void start() { Preconditions.checkNotNull(nameSpace, "nameSpace can not be null"); Preconditions.checkNotNull(zkurl, "zkurl can not be null"); zkClient = RegisterHolder.getClient(zkurl); pathChildrenCache = new PathChildrenCache(zkClient, "/" + nameSpace, true); pathChildrenCache.getListenable().addListener(this); try { pathChildrenCache.start(); initDataFromZk(); } catch (Exception e) { // zookeeper cluster 不可用时,load the conf from localfile if (e instanceof ConnectionLossException) { logger.error(" ConnectionLossException has happen ,start to laod the local confFile to mem"); } } }
/** * This test ensures store subscribes to receive events from underlying client. Dispatcher tests ensures listeners * are fired on incoming events. These two sets of tests ensure observer pattern in {@code TransientStore} works fine. */ @Test public void testStoreRegistersDispatcherAndStartsItsClient() throws Exception { final StoreWithMockClient<String> store = new StoreWithMockClient<>(config, curator); final PathChildrenCache cache = Mockito.mock(PathChildrenCache.class); final ZookeeperClient client = store.getClient(); Mockito .when(client.getCache()) .thenReturn(cache); final ListenerContainer<PathChildrenCacheListener> container = Mockito.mock(ListenerContainer.class); Mockito .when(cache.getListenable()) .thenReturn(container); store.start(); Mockito .verify(container) .addListener(store.dispatcher); Mockito .verify(client) .start(); }
@Override protected void doStart() throws Exception { CuratorFramework client = zkClient.get(); serversCache = new PathChildrenCache(client, MetaZkConfig.getMetaServerRegisterPath(), true, XpipeThreadFactory.create(String.format("PathChildrenCache(%d)", currentServer.getServerId()))); serversCache.getListenable().addListener(new ChildrenChanged()); serversCache.start(); future = scheduled.scheduleWithFixedDelay(new AbstractExceptionLogTask() { @Override public void doRun() { try { childrenChanged(); } catch (Throwable th) { logger.error("[doStart]", th); } } }, 1000, metaServerConfig.getClusterServersRefreshMilli(), TimeUnit.MILLISECONDS); }
@VisibleForTesting EventTypeCache(final EventTypeRepository eventTypeRepository, final TimelineDbRepository timelineRepository, final ZooKeeperHolder zkClient, final PathChildrenCache cache, final TimelineSync timelineSync) { this.zkClient = zkClient; this.eventTypeCache = setupInMemoryEventTypeCache(eventTypeRepository, timelineRepository); this.cacheSync = cache; this.timelineSync = timelineSync; this.timelineRegistrations = new ConcurrentHashMap<>(); if (null != cacheSync) { this.cacheSync.getListenable().addListener((curator, event) -> this.onZkEvent(event)); } preloadEventTypes(eventTypeRepository, timelineRepository); }
private static PathChildrenCache setupCacheSync(final CuratorFramework zkClient) throws Exception { try { zkClient.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath(ZKNODE_PATH); } catch (final KeeperException.NodeExistsException expected) { // silently do nothing since it means that the node is already there } final PathChildrenCache cacheSync = new PathChildrenCache(zkClient, ZKNODE_PATH, false); // It is important to preload all data before specifying callback for updates, because otherwise preload won't // give any effect - all changes will be removed. cacheSync.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); return cacheSync; }
@BeforeMethod public void setUp() throws Exception { MockitoAnnotations.initMocks(this); servers = new HashMap<>(); shardManagers = new HashMap<>(); for (String hostId : config.getHostIds()) { Gondola gondola = mock(Gondola.class); when(gondola.getHostId()).thenReturn(hostId); when(gondola.getConfig()).thenReturn(config); ShardManager shardManager = mock(ShardManager.class); ZookeeperShardManagerServer server = new ZookeeperShardManagerServer("foo", zookeeperServer.getConnectString(), gondola, shardManager); shardManagers.put(hostId, shardManager); servers.put(hostId, server); } client = new ZookeeperShardManagerClient("foo", "fooClientName", zookeeperServer.getConnectString(), config); stats = (PathChildrenCache) Whitebox.getInternalState(client, "stats"); CountDownLatch latch = new CountDownLatch(1); this.stats.getListenable().addListener((curatorFramework, pathChildrenCacheEvent) -> { if (this.stats.getCurrentData().size() == config.getMembers().size()) { latch.countDown(); } }); latch.await(); }
@BeforeClass public static void setUpBeforeClass() throws Exception { org.apache.log4j.BasicConfigurator.configure(); MiniClusterController.Start(0); controller_ = MiniClusterController.instance(); rand_ = new Random(); cf_ = CuratorFrameworkFactory.builder() .connectString(testConnectionStr) .connectionTimeoutMs(30 * 1000) .aclProvider(new TestACLProvider()) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .build(); cf_.start(); cache_ = new PathChildrenCache(cf_, RecordServiceConfig.ZOOKEEPER_ZNODE_DEFAULT + "/planners", true); cache_.start(); }
private boolean taskIsComplete(PathChildrenCache completedTasksCache, RunId runId, ExecutableTask task) { if ( (task == null) || !task.isExecutable() ) { return true; } String completedTaskPath = ZooKeeperConstants.getCompletedTaskPath(runId, task.getTaskId()); ChildData currentData = completedTasksCache.getCurrentData(completedTaskPath); if ( currentData != null ) { TaskExecutionResult result = workflowManager.getSerializer().deserialize(currentData.getData(), TaskExecutionResult.class); if ( result.getSubTaskRunId().isPresent() ) { RunnableTask runnableTask = getRunnableTask(result.getSubTaskRunId().get()); return (runnableTask != null) && runnableTask.getCompletionTimeUtc().isPresent(); } return true; } return false; }
/** * Create mock {@link PathChildrenCache} using given controller ID and DPIDs. * * @param controllerId Controller ID to represent current data. * @param paths List of HexString indicating switch's DPID. * @param listener Callback object to be set as Listenable. * @return Mock PathChildrenCache object * @throws Exception */ private PathChildrenCache createPathChildrenCacheMock( final String controllerId, final String[] paths, ListenerContainer<PathChildrenCacheListener> listener) throws Exception { PathChildrenCache pathChildrenCache = createMock(PathChildrenCache.class); expect(pathChildrenCache.getListenable()).andReturn(listener).anyTimes(); pathChildrenCache.start(anyObject(StartMode.class)); expectLastCall().anyTimes(); List<ChildData> childs = new ArrayList<ChildData>(); for (String path : paths) { childs.add(createChildDataMockForCurrentData(controllerId, path)); } expect(pathChildrenCache.getCurrentData()).andReturn(childs).anyTimes(); pathChildrenCache.rebuild(); expectLastCall().anyTimes(); replay(pathChildrenCache); return pathChildrenCache; }
/** * Creates an instance of {@code ZooKeeperNodeDiscovery}. * * @param curator Curator framework reference. * @param nodePath The path in ZooKeeper to watch. * @param parser The strategy to convert from ZooKeeper {@code byte[]} to {@code T}. */ public NodeDiscovery(CuratorFramework curator, String nodePath, NodeDataParser<T> parser) { checkNotNull(curator); checkNotNull(nodePath); checkNotNull(parser); checkArgument(curator.getState() == CuratorFrameworkState.STARTED); checkArgument(!"".equals(nodePath)); ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat(getClass().getSimpleName() + "(" + nodePath + ")-%d") .setDaemon(true) .build(); _nodes = Maps.newConcurrentMap(); _listeners = Sets.newSetFromMap(Maps.<NodeListener<T>, Boolean>newConcurrentMap()); _curator = curator; _executor = Executors.newSingleThreadScheduledExecutor(threadFactory); _pathCache = new PathChildrenCache(curator, nodePath, true, false, _executor); _nodeDataParser = parser; _closed = false; }
public TransactorCache(Environment env) { final FluoConfiguration conf = env.getConfiguration(); timeoutCache = CacheBuilder.newBuilder().maximumSize(FluoConfigurationImpl.getTransactorMaxCacheSize(conf)) .expireAfterAccess( FluoConfigurationImpl.getTransactorCacheTimeout(conf, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) .concurrencyLevel(10).build(); this.env = env; cache = new PathChildrenCache(env.getSharedResources().getCurator(), ZookeeperPath.TRANSACTOR_NODES, true); try { cache.start(StartMode.BUILD_INITIAL_CACHE); status = TcStatus.OPEN; } catch (Exception e) { throw new RuntimeException(e); } }
@Override public void shutdownGracefully() { for (PathChildrenCache childrenCache : services.values()) { try { childrenCache.close(); } catch (IOException ignored) {} } client.close(); }
private void _initWatch() throws Exception { PathChildrenCache watcher = new PathChildrenCache( client, STORE_PATH, true // if cache data ); watcher.getListenable().addListener((client1, event) -> { try { rwlock.writeLock().lock(); ChildData data = event.getData(); if (data == null) { System.out.println("No data in event[" + event + "]"); } else { System.out.println("Receive event: " + "type=[" + event.getType() + "]" + ", path=[" + data.getPath() + "]" + ", data=[" + new String(data.getData()) + "]" + ", stat=[" + data.getStat() + "]"); if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED || event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) { String path = data.getPath(); if (path.startsWith(STORE_PATH)) { String key = path.replace(STORE_PATH + "/", ""); String dataStr = new String(data.getData(), "utf-8"); storeMap.put(key, dataStr); } } } }finally { rwlock.writeLock().unlock(); } }); watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); }
@Override public void addDataListener(String path, DataListener listener) { try { // 第一步:获取-校验-创建监听器 PathChildrenCacheListener pathChildrenCacheListener = dataListenerMap.get(listener); if(pathChildrenCacheListener != null){//已监听 return; } else { // 添加外部监听器 Set<DataListener> dataListenerSet = dataListenersMap.get(path); if(dataListenerSet == null){ dataListenersMap.put(path, dataListenerSet = new ConcurrentHashSet<DataListener>()); } dataListenerSet.add(listener); dataListenerMap.put(listener, pathChildrenCacheListener = new PathChildrenCacheListenerImpl(path)); } // 第二步:获取-校验-创建子节点缓存连接 PathChildrenCache pathChildrenCache = pathChildrenCacheMap.get(path); if(pathChildrenCache == null){ pathChildrenCacheMap.put(path, pathChildrenCache = new PathChildrenCache(client, path, true)); // 第三步:启动监听 pathChildrenCache.start(StartMode.POST_INITIALIZED_EVENT); } // 第四步:添加监听器 pathChildrenCache.getListenable().addListener(pathChildrenCacheListener); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
@Override public void removeDataListener(String path, DataListener listener) { try { // 第一步:移除dataListenerMap中的数据 PathChildrenCacheListener pathChildrenCacheListener = dataListenerMap.get(listener); if(pathChildrenCacheListener == null){ return; } else { dataListenerMap.remove(listener); // 第二步:移除Set<DataListener>中的数据 Set<DataListener> dataListenerSet = dataListenersMap.get(path); if(dataListenerSet != null && dataListenerSet.contains(listener)){ dataListenerSet.remove(listener); } // 第三步:移除dataListenersMap和childDataMap中的数据 if(dataListenerSet == null || dataListenerSet.isEmpty()){ dataListenersMap.remove(path); childDataMap.remove(path); } } // 第四步:取消监听,并移除pathChildrenCacheMap中的数据 PathChildrenCache pathChildrenCache = pathChildrenCacheMap.get(path); if(pathChildrenCache != null){ pathChildrenCache.getListenable().removeListener(pathChildrenCacheListener); ((PathChildrenCacheListenerImpl)listener).unwatch(); if(pathChildrenCache.getListenable().size() == 0){ pathChildrenCacheMap.remove(path); pathChildrenCache.close(); } } } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } }
@Override public void start() throws Exception { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); for (ChildData childData : cache.getCurrentData()) { addInstance(childData, true); } discovery.cacheOpened(this); }
private static void startCache(IPathChildrenCacheWrapper cache, String path) { try { cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); } catch (DataSourceConnectorException e) { log.error("Failed to start path children cache for path=" + path, e); } }
public ZkPathChildrenCacheWrapper(IDataSourceConnector connector, String path, boolean dataIsCompressed, PathChildrenCache cache) { super(connector); this.useCache = cache != null; this.path = path; this.dataIsCompressed = dataIsCompressed; this.cache = cache; }
@Test(expected = DataSourceConnectorException.class) public void startCache_WithoutPreloading_Fails_WhenNoConnected() throws DataSourceConnectorException { when(client.isConnected()).thenReturn(false); testee = new ZkPathChildrenCacheWrapper(client, "/test", false, cache); PathChildrenCache.StartMode startMode = PathChildrenCache.StartMode.POST_INITIALIZED_EVENT; testee.start(startMode); }
@Test(expected = DataSourceConnectorException.class) public void startCache_WithPreloading_Fails_WhenNoConnected() throws DataSourceConnectorException { when(client.isConnected()).thenReturn(false); testee = new ZkPathChildrenCacheWrapper(client, "/test", false, cache); PathChildrenCache.StartMode startMode = PathChildrenCache.StartMode.BUILD_INITIAL_CACHE; testee.start(startMode); }
@Test public void testStartWithUseCache() throws Exception { PathChildrenCache.StartMode startMode = PathChildrenCache.StartMode.POST_INITIALIZED_EVENT; testee = new ZkPathChildrenCacheWrapper(client, "/test", false, cache); testee.start(startMode); verify(cache, times(1)).start(startMode); }
@Test public void testStartWithUseCacheExceptionHappens() throws Exception { PathChildrenCache.StartMode startMode = PathChildrenCache.StartMode.POST_INITIALIZED_EVENT; setupCacheThrowsExceptionOnStart(new Exception()); testee = new ZkPathChildrenCacheWrapper(client, "/test", false, cache); testee.start(startMode); }
@Test public void testStartWithUseCacheNoConnection() throws Exception { PathChildrenCache.StartMode startMode = PathChildrenCache.StartMode.POST_INITIALIZED_EVENT; setupCacheThrowsExceptionOnStart(new KeeperException.ConnectionLossException()); testee = new ZkPathChildrenCacheWrapper(client, "/test", false, cache); testee.start(startMode); }