public ServiceCacheImplProxy(ServiceDiscoveryImpl<T> discovery, String name, ThreadFactory threadFactory) { this.serviceCacheImpl = new ServiceCacheImpl<T>(discovery, name, threadFactory); try { Field privateListenerContainerField = ServiceCacheImpl.class.getDeclaredField("listenerContainer"); privateListenerContainerField.setAccessible(true); this.listenerContainer = (ListenerContainer)privateListenerContainerField.get(serviceCacheImpl); } catch (NoSuchFieldException | IllegalAccessException e) { log.error("Failed to construct Service Cache. Container listeners is null."); } Preconditions.checkNotNull(discovery, "discovery cannot be null"); Preconditions.checkNotNull(name, "name cannot be null"); Preconditions.checkNotNull(threadFactory, "threadFactory cannot be null"); Preconditions.checkNotNull(this.listenerContainer, "container of listeners can not be null"); this.discovery = discovery; this.cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, threadFactory); this.cache.getListenable().addListener(this); }
/** * This test ensures store subscribes to receive events from underlying client. Dispatcher tests ensures listeners * are fired on incoming events. These two sets of tests ensure observer pattern in {@code TransientStore} works fine. */ @Test public void testStoreRegistersDispatcherAndStartsItsClient() throws Exception { final StoreWithMockClient<String> store = new StoreWithMockClient<>(config, curator); final PathChildrenCache cache = Mockito.mock(PathChildrenCache.class); final ZookeeperClient client = store.getClient(); Mockito .when(client.getCache()) .thenReturn(cache); final ListenerContainer<PathChildrenCacheListener> container = Mockito.mock(ListenerContainer.class); Mockito .when(cache.getListenable()) .thenReturn(container); store.start(); Mockito .verify(container) .addListener(store.dispatcher); Mockito .verify(client) .start(); }
/** * Create mock {@link PathChildrenCache} using given controller ID and DPIDs. * * @param controllerId Controller ID to represent current data. * @param paths List of HexString indicating switch's DPID. * @param listener Callback object to be set as Listenable. * @return Mock PathChildrenCache object * @throws Exception */ private PathChildrenCache createPathChildrenCacheMock( final String controllerId, final String[] paths, ListenerContainer<PathChildrenCacheListener> listener) throws Exception { PathChildrenCache pathChildrenCache = createMock(PathChildrenCache.class); expect(pathChildrenCache.getListenable()).andReturn(listener).anyTimes(); pathChildrenCache.start(anyObject(StartMode.class)); expectLastCall().anyTimes(); List<ChildData> childs = new ArrayList<ChildData>(); for (String path : paths) { childs.add(createChildDataMockForCurrentData(controllerId, path)); } expect(pathChildrenCache.getCurrentData()).andReturn(childs).anyTimes(); pathChildrenCache.rebuild(); expectLastCall().anyTimes(); replay(pathChildrenCache); return pathChildrenCache; }
/** * Return the cache listenable * * @return listenable */ public ListenerContainer<NodeCacheListener> getListenable() { Preconditions.checkState(state.get() != State.CLOSED, "Closed"); return listeners; }
@Inject public BaragonStateWatcher(final BaragonStateFetcher stateFetcher, Set<BaragonStateListener> listeners, @Baragon PersistentWatcher watcher) { this.stateFetcher = stateFetcher; this.listenerContainer = new ListenerContainer<>(); this.executor = newExecutor(); this.versionQueue = new LinkedTransferQueue<>(); for (BaragonStateListener listener : listeners) { listenerContainer.addListener(listener); } watcher.getEventListenable().addListener(new EventListener() { @Override public void newEvent(Event event) { switch (event.getType()) { case NODE_UPDATED: int version = event.getStat().getVersion(); versionQueue.add(version); executor.submit(new Runnable() { @Override public void run() { updateToLatestVersion(); } }); break; case NODE_DELETED: LOG.warn("Baragon state node was deleted"); break; default: LOG.warn("Unrecognized event type {}", event.getType()); break; } } }, executor); watcher.start(); }
public void clearListener(){ ListenerContainer<ConnectionStateListener> list=(ListenerContainer<ConnectionStateListener>) client.getConnectionStateListenable(); list.clear(); }
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) { ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory()); this.client = new CuratorZookeeperClient ( localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher() { @Override public void process(WatchedEvent watchedEvent) { CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null); processEvent(event); } }, builder.getRetryPolicy(), builder.canBeReadOnly(), builder.getConnectionHandlingPolicy() ); internalConnectionHandler = new StandardInternalConnectionHandler(); listeners = new ListenerContainer<CuratorListener>(); unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>(); backgroundOperations = new DelayQueue<OperationAndData<?>>(); forcedSleepOperations = new LinkedBlockingQueue<>(); namespace = new NamespaceImpl(this, builder.getNamespace()); threadFactory = getThreadFactory(builder); maxCloseWaitMs = builder.getMaxCloseWaitMs(); connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent()); compressionProvider = builder.getCompressionProvider(); aclProvider = builder.getAclProvider(); state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT); useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable(); connectionStateErrorPolicy = Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(), "errorPolicy cannot be null"); schemaSet = Preconditions.checkNotNull(builder.getSchemaSet(), "schemaSet cannot be null"); zk34CompatibilityMode = builder.isZk34CompatibilityMode(); byte[] builderDefaultData = builder.getDefaultData(); defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0]; authInfos = buildAuths(builder); failedDeleteManager = new FailedDeleteManager(this); failedRemoveWatcherManager = new FailedRemoveWatchManager(this); namespaceFacadeCache = new NamespaceFacadeCache(this); ensembleTracker = zk34CompatibilityMode ? null : new EnsembleTracker(this, builder.getEnsembleProvider()); }
@Override public ListenerContainer<QueuePutListener<T>> getPutListenerContainer() { return queue.getPutListenerContainer(); }
/** * Return the manager for put listeners * * @return put listener container */ @Override public ListenerContainer<QueuePutListener<T>> getPutListenerContainer() { return queue.getPutListenerContainer(); }
/** * Return the manager for put listeners * * @return put listener container */ @Override public ListenerContainer<QueuePutListener<T>> getPutListenerContainer() { return putListenerContainer; }
public ListenerContainer<NodeCacheListener> getListenable() { return delegate().getListenable(); }
/** * Return the listenable * * @return listenable */ public ListenerContainer<ConnectionStateListener> getListenable() { return listeners; }
/** * Returns the listenable * * @return listenable */ public ListenerContainer<SharedValueListener> getListenable() { return listeners; }
/** * Returns the listenable * * @return listenable */ public ListenerContainer<SharedValueListener> getListenable();
/** * Returns the listenable * * @return listenable */ public ListenerContainer<PersistentNodeListener> getListenable() { return listeners; }
/** * Return the cache listenable * * @return listenable */ public ListenerContainer<PathChildrenCacheListener> getListenable() { return listeners; }
/** * Return the manager for put listeners * * @return put listener container */ ListenerContainer<QueuePutListener<T>> getPutListenerContainer();