@Override public void addListener(Object listener) { CuratorListener curatorListener = (CuratorListener) listener; if (curator != null) { curator.getCuratorListenable().addListener(curatorListener); } listeners.add(curatorListener); }
@Override public void addListener(Object listener, Executor executor) { CuratorListener curatorListener = (CuratorListener) listener; if (curator != null) { curator.getCuratorListenable().addListener(curatorListener, executor); } listeners.add(curatorListener); }
@Override public void removeListener(Object listener) { CuratorListener curatorListener = (CuratorListener) listener; if (curator != null) { curator.getCuratorListenable().removeListener(curatorListener); } listeners.remove(curatorListener); }
public void clearCurator() { for (CuratorListener curatorListener : listeners) { if (curator != null) { curator.getCuratorListenable().removeListener(curatorListener); } } }
public void updateCurator(CuratorFramework curator) { this.curator = curator; for (CuratorListener curatorListener : listeners) { if (this.curator != null) { this.curator.getCuratorListenable().addListener(curatorListener); } } }
/** * connect ZK, register Watch/unhandle Watch * * @return */ public CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) { CuratorFramework fk = Utils.newCurator(conf, servers, port, root); fk.getCuratorListenable().addListener(new CuratorListener() { @Override public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception { if (e.getType().equals(CuratorEventType.WATCHED)) { WatchedEvent event = e.getWatchedEvent(); watcher.execute(event.getState(), event.getType(), event.getPath()); } } }); fk.getUnhandledErrorListenable().addListener( new UnhandledErrorListener() { @Override public void unhandledError(String msg, Throwable error) { String errmsg = "Unrecoverable Zookeeper error, halting process: " + msg; LOG.error(errmsg, error); JStormUtils.halt_process(1, "Unrecoverable Zookeeper error"); } }); fk.start(); return fk; }
/** * connect ZK, register Watch/unhandle Watch * * @return */ public CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) { CuratorFramework fk = Utils.newCurator(conf, servers, port, root); fk.getCuratorListenable().addListener(new CuratorListener() { @Override public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception { if (e.getType().equals(CuratorEventType.WATCHED)) { WatchedEvent event = e.getWatchedEvent(); watcher.execute(event.getState(), event.getType(), event.getPath()); } } }); fk.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() { @Override public void unhandledError(String msg, Throwable error) { String errmsg = "Unrecoverable Zookeeper error, halting process: " + msg; LOG.error(errmsg, error); JStormUtils.halt_process(1, "Unrecoverable Zookeeper error"); } }); fk.start(); return fk; }
@Inject DefaultCuratorManagementService(@Curator final CuratorFramework framework, final Set<ConnectionStateListener> connectionStateListeners, final Set<CuratorListener> curatorListeners, final Set<UnhandledErrorListener> unhandledErrorListeners) { this.framework = framework; this.connectionStateListeners = connectionStateListeners; this.curatorListeners = curatorListeners; this.unhandledErrorListeners = unhandledErrorListeners; }
@Inject DefaultCuratorManagementService(@Curator final CuratorFramework framework, final Set<ConnectionStateListener> connectionStateListeners, final Set<CuratorListener> curatorListeners, final Set<UnhandledErrorListener> unhandledErrorListeners) { this(framework, connectionStateListeners, curatorListeners, unhandledErrorListeners, LoggerFactory .getLogger(DefaultCuratorManagementService.class)); }
DefaultCuratorManagementService(@Curator final CuratorFramework framework, final Set<ConnectionStateListener> connectionStateListeners, final Set<CuratorListener> curatorListeners, final Set<UnhandledErrorListener> unhandledErrorListeners, final Logger log) { this.framework = framework; this.connectionStateListeners = connectionStateListeners; this.curatorListeners = curatorListeners; this.unhandledErrorListeners = unhandledErrorListeners; this.log = log; }
/** * connect ZK, register watchers */ public CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) { CuratorFramework fk = Utils.newCurator(conf, servers, port, root); fk.getCuratorListenable().addListener(new CuratorListener() { @Override public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception { if (e.getType().equals(CuratorEventType.WATCHED)) { WatchedEvent event = e.getWatchedEvent(); watcher.execute(event.getState(), event.getType(), event.getPath()); } } }); fk.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() { @Override public void unhandledError(String msg, Throwable error) { String errmsg = "Unrecoverable zookeeper error, halting process: " + msg; LOG.error(errmsg, error); JStormUtils.halt_process(1, "Unrecoverable zookeeper error"); } }); fk.start(); return fk; }
@Override public Listenable<CuratorListener> getCuratorListenable() { throw new UnsupportedOperationException("Not implemented in MockCurator"); }
@Override protected void configure() { install(new CuratorInnerModule(dependencies)); install(new BarrierModule()); install(new LastKnownStateModule()); Multibinder.newSetBinder(binder(), CuratorService.class); Multibinder.newSetBinder(binder(), UnhandledErrorListener.class); Multibinder.newSetBinder(binder(), CuratorListener.class); Multibinder<ConnectionStateListener> connectionStateListeners = Multibinder.newSetBinder(binder(), ConnectionStateListener.class); connectionStateListeners.addBinding().to(ConnectionStateLogger.class).in(Singleton.class); bind(CultivarStartStopManager.class).to(DefaultCultivarStartStopManager.class).in(Singleton.class); }
@Override public void addCuratorListener(final CuratorListener listener) { framework.getCuratorListenable().addListener(listener); }
@Override public Listenable<CuratorListener> getCuratorListenable() { throw new UnsupportedOperationException("getCuratorListenable() is only available from a non-namespaced CuratorFramework instance"); }
@Override public Listenable<CuratorListener> getCuratorListenable() { return client.getCuratorListenable(); }
@Override protected void configure() { install(new CuratorInnerModule(dependencies)); install(new BarrierModule()); install(new LastKnownStateModule()); bind(BlankCuratorService.class).annotatedWith(Private.class).to(BlankCuratorService.class).in(Scopes.SINGLETON); Multibinder.newSetBinder(binder(), CuratorService.class).addBinding() .to(Key.get(BlankCuratorService.class, Private.class)); Multibinder.newSetBinder(binder(), UnhandledErrorListener.class); Multibinder.newSetBinder(binder(), CuratorListener.class); Multibinder.newSetBinder(binder(), ServiceManager.class, Cultivar.class); Multibinder<ConnectionStateListener> connectionStateListeners = Multibinder.newSetBinder(binder(), ConnectionStateListener.class); connectionStateListeners.addBinding().to(ConnectionStateLogger.class).in(Singleton.class); bind(CultivarStartStopManager.class).to(DefaultCultivarStartStopManager.class).in(Singleton.class); }
@Override public Listenable<CuratorListener> getCuratorListenable() { return delegateFramework.getCuratorListenable(); }
/** * Adds a curator listener to Curator. */ void addCuratorListener(CuratorListener listener);