@Before public void before() { curator = mock(CuratorFramework.class); connectionListenable = mock(Listenable.class); when(curator.getConnectionStateListenable()).thenReturn(connectionListenable); }
@Test public void assertAddConnectionStateListener() { CuratorFramework client = mock(CuratorFramework.class); @SuppressWarnings("unchecked") Listenable<ConnectionStateListener> listeners = mock(Listenable.class); ConnectionStateListener listener = mock(ConnectionStateListener.class); when(client.getConnectionStateListenable()).thenReturn(listeners); when(coordinatorRegistryCenter.getRawClient()).thenReturn(client); jobNodeStorage.addConnectionStateListener(listener); verify(listeners).addListener(listener); }
@Test public void assertAddDataListener() { TreeCache treeCache = mock(TreeCache.class); @SuppressWarnings("unchecked") Listenable<TreeCacheListener> listeners = mock(Listenable.class); TreeCacheListener listener = mock(TreeCacheListener.class); when(treeCache.getListenable()).thenReturn(listeners); when(coordinatorRegistryCenter.getRawCache("/testJob")).thenReturn(treeCache); jobNodeStorage.addDataListener(listener); verify(listeners).addListener(listener); }
@Override public Listenable<ConnectionStateListener> getConnectionStateListenable() { return listenerStateProxy; }
@Override public Listenable<CuratorListener> getCuratorListenable() { return listenerProxy; }
@Override public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() { return curator.getUnhandledErrorListenable(); }
/** * Starts an AdminURL bind event loop */ protected void waitForAdminURLBind() { final TreeCache tc = TreeCache.newBuilder(cf, ZOOKEEP_URL) //.setExecutor(threadFactory) //.setExecutor((ExecutorService)threadPool) //.setCacheData(true) .build(); final AtomicBoolean waiting = new AtomicBoolean(true); final Thread waitForAdminURLThread = threadFactory.newThread(new Runnable(){ @Override public void run() { final Thread waitThread = Thread.currentThread(); try { final Listenable<TreeCacheListener> listen = tc.getListenable(); listen.addListener(new TreeCacheListener(){ @Override public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception { ChildData cd = event.getData(); if(cd!=null) { log.info("TreeCache Bound [{}]", cd.getPath()); final String boundPath = cd.getPath(); if(ZOOKEEP_URL.equals(boundPath)) { updateAdminURL(cd.getData()); tc.close(); waiting.set(false); waitThread.interrupt(); } } } }); tc.start(); log.debug("AdminURL TreeCache Started"); // Check for the data one more time in case we missed // the bind event while setting up the listener final ZooKeeper z = cf.getZookeeperClient().getZooKeeper(); final Stat st = z.exists(ZOOKEEP_URL, false); if(st!=null) { updateAdminURL(z.getData(ZOOKEEP_URL, false, st)); tc.close(); } while(true) { try { Thread.currentThread().join(retryPauseTime); log.info("Still waiting for AdminURL...."); } catch (InterruptedException iex) { if(Thread.interrupted()) Thread.interrupted(); } if(!waiting.get()) break; } log.info("Ended wait for AdminURL"); } catch (Exception ex) { log.error("Failed to wait for AdminURL bind", ex); // FIXME: } finally { try { tc.close(); } catch (Exception x) {/* No Op */} } } }); waitForAdminURLThread.start(); }
@Override public Listenable<ConnectionStateListener> getConnectionStateListenable() { return new MockListenable<>(); }
@Override public Listenable<CuratorListener> getCuratorListenable() { throw new UnsupportedOperationException("Not implemented in MockCurator"); }
@Override public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() { throw new UnsupportedOperationException("Not implemented in MockCurator"); }
private CuratorFramework mockFramework() { CuratorFramework framework = mock(CuratorFramework.class); when(framework.getConnectionStateListenable()).thenReturn(mock(Listenable.class)); return framework; }
@Override public Listenable<ConnectionStateListener> getConnectionStateListenable() { return client.getConnectionStateListenable(); }
@Override public Listenable<CuratorListener> getCuratorListenable() { throw new UnsupportedOperationException("getCuratorListenable() is only available from a non-namespaced CuratorFramework instance"); }
@Override public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() { return client.getUnhandledErrorListenable(); }
@Override public Listenable<ConnectionStateListener> getConnectionStateListenable() { return connectionStateManager.getListenable(); }
@Override public Listenable<CuratorListener> getCuratorListenable() { return listeners; }
@Override public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() { return unhandledErrorListeners; }
@Override public Listenable<CuratorListener> getCuratorListenable() { return client.getCuratorListenable(); }
public Listenable<ModeledCacheListener<T>> listenable() { return listenerContainer; }
@Override public Listenable<ModeledCacheListener<T>> listenable() { return cache.listenable(); }
/** * Allows catching unhandled errors in asynchornous operations. * * TODO: consider making public. */ @VisibleForTesting public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() { return errorListeners; }
@Override public Listenable<WorkflowListener> getListenable() { return listenerContainer; }
@SuppressWarnings("unchecked") private CuratorFramework mockFramework() { CuratorFramework framework = mock(CuratorFramework.class); when(framework.getConnectionStateListenable()).thenReturn(mock(Listenable.class)); return framework; }
@Override public Listenable<ConnectionStateListener> getConnectionStateListenable() { return delegateFramework.getConnectionStateListenable(); }
@Override public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() { return delegateFramework.getUnhandledErrorListenable(); }
@Override public Listenable<CuratorListener> getCuratorListenable() { return delegateFramework.getCuratorListenable(); }
/** * Returns the listenable interface for the Connect State * * @return listenable */ public Listenable<ConnectionStateListener> getConnectionStateListenable();
/** * Returns the listenable interface for events * * @return listenable */ public Listenable<CuratorListener> getCuratorListenable();
/** * Returns the listenable interface for unhandled errors * * @return listenable */ public Listenable<UnhandledErrorListener> getUnhandledErrorListenable();
/** * Return the listener container so that you can add/remove listeners * * @return listener container */ Listenable<ModeledCacheListener<T>> listenable();
/** * Return the cache listenable * * @return listenable */ public Listenable<TreeCacheListener> getListenable() { return listeners; }
/** * Return the container to add/remove event listeners * * @return container */ Listenable<WorkflowListener> getListenable();