public static <T> ModeledFrameworkImpl<T> build(AsyncCuratorFramework client, ModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, Set<ModeledOptions> modeledOptions) { boolean isWatched = (watchMode != null); Objects.requireNonNull(client, "client cannot be null"); Objects.requireNonNull(model, "model cannot be null"); modeledOptions = ImmutableSet.copyOf(Objects.requireNonNull(modeledOptions, "modeledOptions cannot be null")); watchMode = (watchMode != null) ? watchMode : WatchMode.stateChangeAndSuccess; AsyncCuratorFrameworkDsl dslClient = client.with(watchMode, unhandledErrorListener, resultFilter, watcherFilter); WatchableAsyncCuratorFramework watchableClient = isWatched ? dslClient.watched() : dslClient; return new ModeledFrameworkImpl<>( client, dslClient, watchableClient, model, watchMode, watcherFilter, unhandledErrorListener, resultFilter, isWatched, modeledOptions ); }
private ZookeeperClient(){ //1000 是重试间隔时间基数,3 是重试次数 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); zkClient = createWithOptions(zkConnectionString, retryPolicy, 2000, 10000); zkClient.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { logger.info("CuratorFramework state changed: {}", newState); if(newState == ConnectionState.CONNECTED || newState == ConnectionState.RECONNECTED){ for (String key : listeners.keySet()) { System.out.println(key); IZoopkeeperListener listener = listeners.get(key); listener.execute(client); } } } }); zkClient.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() { @Override public void unhandledError(String message, Throwable e) { logger.info("CuratorFramework unhandledError: {}", message); } }); zkClient.start(); }
/** * connect ZK, register Watch/unhandle Watch * * @return */ public CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) { CuratorFramework fk = Utils.newCurator(conf, servers, port, root); fk.getCuratorListenable().addListener(new CuratorListener() { @Override public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception { if (e.getType().equals(CuratorEventType.WATCHED)) { WatchedEvent event = e.getWatchedEvent(); watcher.execute(event.getState(), event.getType(), event.getPath()); } } }); fk.getUnhandledErrorListenable().addListener( new UnhandledErrorListener() { @Override public void unhandledError(String msg, Throwable error) { String errmsg = "Unrecoverable Zookeeper error, halting process: " + msg; LOG.error(errmsg, error); JStormUtils.halt_process(1, "Unrecoverable Zookeeper error"); } }); fk.start(); return fk; }
/** * connect ZK, register Watch/unhandle Watch * * @return */ public CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) { CuratorFramework fk = Utils.newCurator(conf, servers, port, root); fk.getCuratorListenable().addListener(new CuratorListener() { @Override public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception { if (e.getType().equals(CuratorEventType.WATCHED)) { WatchedEvent event = e.getWatchedEvent(); watcher.execute(event.getState(), event.getType(), event.getPath()); } } }); fk.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() { @Override public void unhandledError(String msg, Throwable error) { String errmsg = "Unrecoverable Zookeeper error, halting process: " + msg; LOG.error(errmsg, error); JStormUtils.halt_process(1, "Unrecoverable Zookeeper error"); } }); fk.start(); return fk; }
@Inject DefaultCuratorManagementService(@Curator final CuratorFramework framework, final Set<ConnectionStateListener> connectionStateListeners, final Set<CuratorListener> curatorListeners, final Set<UnhandledErrorListener> unhandledErrorListeners) { this.framework = framework; this.connectionStateListeners = connectionStateListeners; this.curatorListeners = curatorListeners; this.unhandledErrorListeners = unhandledErrorListeners; }
Backgrounding(Backgrounding rhs, UnhandledErrorListener errorListener) { if ( rhs == null ) { rhs = new Backgrounding(); } this.inBackground = rhs.inBackground; this.context = rhs.context; this.callback = rhs.callback; this.errorListener = errorListener; }
public Backgrounding(BackgroundCallback callback, UnhandledErrorListener errorListener) { this.callback = callback; this.errorListener = errorListener; inBackground = true; context = null; }
private ModeledFrameworkImpl(AsyncCuratorFramework client, AsyncCuratorFrameworkDsl dslClient, WatchableAsyncCuratorFramework watchableClient, ModelSpec<T> modelSpec, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, boolean isWatched, Set<ModeledOptions> modeledOptions) { this.client = client; this.dslClient = dslClient; this.watchableClient = watchableClient; this.modelSpec = modelSpec; this.watchMode = watchMode; this.watcherFilter = watcherFilter; this.unhandledErrorListener = unhandledErrorListener; this.resultFilter = resultFilter; this.isWatched = isWatched; this.modeledOptions = modeledOptions; }
@Inject DefaultCuratorManagementService(@Curator final CuratorFramework framework, final Set<ConnectionStateListener> connectionStateListeners, final Set<CuratorListener> curatorListeners, final Set<UnhandledErrorListener> unhandledErrorListeners) { this(framework, connectionStateListeners, curatorListeners, unhandledErrorListeners, LoggerFactory .getLogger(DefaultCuratorManagementService.class)); }
DefaultCuratorManagementService(@Curator final CuratorFramework framework, final Set<ConnectionStateListener> connectionStateListeners, final Set<CuratorListener> curatorListeners, final Set<UnhandledErrorListener> unhandledErrorListeners, final Logger log) { this.framework = framework; this.connectionStateListeners = connectionStateListeners; this.curatorListeners = curatorListeners; this.unhandledErrorListeners = unhandledErrorListeners; this.log = log; }
/** * connect ZK, register watchers */ public CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) { CuratorFramework fk = Utils.newCurator(conf, servers, port, root); fk.getCuratorListenable().addListener(new CuratorListener() { @Override public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception { if (e.getType().equals(CuratorEventType.WATCHED)) { WatchedEvent event = e.getWatchedEvent(); watcher.execute(event.getState(), event.getType(), event.getPath()); } } }); fk.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() { @Override public void unhandledError(String msg, Throwable error) { String errmsg = "Unrecoverable zookeeper error, halting process: " + msg; LOG.error(errmsg, error); JStormUtils.halt_process(1, "Unrecoverable zookeeper error"); } }); fk.start(); return fk; }
@Override public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() { throw new UnsupportedOperationException("Not implemented in MockCurator"); }
@Override protected void configure() { install(new CuratorInnerModule(dependencies)); install(new BarrierModule()); install(new LastKnownStateModule()); Multibinder.newSetBinder(binder(), CuratorService.class); Multibinder.newSetBinder(binder(), UnhandledErrorListener.class); Multibinder.newSetBinder(binder(), CuratorListener.class); Multibinder<ConnectionStateListener> connectionStateListeners = Multibinder.newSetBinder(binder(), ConnectionStateListener.class); connectionStateListeners.addBinding().to(ConnectionStateLogger.class).in(Singleton.class); bind(CultivarStartStopManager.class).to(DefaultCultivarStartStopManager.class).in(Singleton.class); }
@Override public void addUnhandledErrorListener(final UnhandledErrorListener listener) { framework.getUnhandledErrorListenable().addListener(listener); }
@Override public CuratorMultiTransactionMain withUnhandledErrorListener(UnhandledErrorListener listener) { backgrounding = new Backgrounding(backgrounding, listener); return this; }
@Override public Pathable<Void> withUnhandledErrorListener(UnhandledErrorListener listener) { backgrounding = new Backgrounding(backgrounding, listener); return this; }
@Override public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() { return client.getUnhandledErrorListenable(); }
@Override public Pathable<List<String>> withUnhandledErrorListener(UnhandledErrorListener listener) { backgrounding = new Backgrounding(backgrounding, listener); return this; }
@Override public Pathable<List<ACL>> withUnhandledErrorListener(UnhandledErrorListener listener) { backgrounding = new Backgrounding(backgrounding, listener); return this; }
@Test public void testErrorListener() throws Exception { //The first call to the ACL provider will return a reasonable //value. The second will throw an error. This is because the ACL //provider is accessed prior to the backgrounding call. final AtomicBoolean aclProviderCalled = new AtomicBoolean(false); ACLProvider badAclProvider = new ACLProvider() { @Override public List<ACL> getDefaultAcl() { if(aclProviderCalled.getAndSet(true)) { throw new UnsupportedOperationException(); } else { return new ArrayList<>(); } } @Override public List<ACL> getAclForPath(String path) { if(aclProviderCalled.getAndSet(true)) { throw new UnsupportedOperationException(); } else { return new ArrayList<>(); } } }; CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(server.getConnectString()) .retryPolicy(new RetryOneTime(1)) .aclProvider(badAclProvider) .build(); try { client.start(); final CountDownLatch errorLatch = new CountDownLatch(1); UnhandledErrorListener listener = new UnhandledErrorListener() { @Override public void unhandledError(String message, Throwable e) { if ( e instanceof UnsupportedOperationException ) { errorLatch.countDown(); } } }; client.create().inBackground().withUnhandledErrorListener(listener).forPath("/foo"); Assert.assertTrue(new Timing().awaitLatch(errorLatch)); } finally { CloseableUtils.closeQuietly(client); } }
/** * CURATOR-126 * Shutdown the Curator client while there are still background operations running. */ @Test public void testShutdown() throws Exception { Timing timing = new Timing(); CuratorFramework client = CuratorFrameworkFactory .builder() .connectString(server.getConnectString()) .sessionTimeoutMs(timing.session()) .connectionTimeoutMs(timing.connection()).retryPolicy(new RetryOneTime(1)) .maxCloseWaitMs(timing.forWaiting().milliseconds()) .build(); try { final AtomicBoolean hadIllegalStateException = new AtomicBoolean(false); ((CuratorFrameworkImpl)client).debugUnhandledErrorListener = new UnhandledErrorListener() { @Override public void unhandledError(String message, Throwable e) { if ( e instanceof IllegalStateException ) { hadIllegalStateException.set(true); } } }; client.start(); final CountDownLatch operationReadyLatch = new CountDownLatch(1); ((CuratorFrameworkImpl)client).debugListener = new CuratorFrameworkImpl.DebugBackgroundListener() { @Override public void listen(OperationAndData<?> data) { try { operationReadyLatch.await(); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); } } }; // queue a background operation that will block due to the debugListener client.create().inBackground().forPath("/hey"); timing.sleepABit(); // close the client while the background is still blocked client.close(); // unblock the background operationReadyLatch.countDown(); timing.sleepABit(); // should not generate an exception Assert.assertFalse(hadIllegalStateException.get()); } finally { CloseableUtils.closeQuietly(client); } }
public Filters(UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter) { this.listener = listener; this.resultFilter = resultFilter; this.watcherFilter = watcherFilter; }
public UnhandledErrorListener getListener() { return listener; }
@Override public AsyncCuratorFrameworkDsl with(WatchMode mode, UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter) { return new AsyncCuratorFrameworkImpl(client, new Filters(listener, filters.getResultFilter(), filters.getWatcherFilter()), mode, watched); }
@Override public AsyncCuratorFrameworkDsl with(UnhandledErrorListener listener) { return new AsyncCuratorFrameworkImpl(client, new Filters(listener, filters.getResultFilter(), filters.getWatcherFilter()), watchMode, watched); }
@Override public AsyncCuratorFrameworkDsl with(UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter) { return new AsyncCuratorFrameworkImpl(client, new Filters(listener, resultFilter, watcherFilter), watchMode, watched); }
@Test public void testErrorListener() throws Exception { //The first call to the ACL provider will return a reasonable //value. The second will throw an error. This is because the ACL //provider is accessed prior to the backgrounding call. final AtomicBoolean aclProviderCalled = new AtomicBoolean(false); ACLProvider badAclProvider = new ACLProvider() { @Override public List<ACL> getDefaultAcl() { if(aclProviderCalled.getAndSet(true)) { throw new UnsupportedOperationException(); } else { return new ArrayList<>(); } } @Override public List<ACL> getAclForPath(String path) { if(aclProviderCalled.getAndSet(true)) { throw new UnsupportedOperationException(); } else { return new ArrayList<>(); } } }; CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(server.getConnectString()) .retryPolicy(new RetryOneTime(1)) .aclProvider(badAclProvider) .build(); try { client.start(); AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); final CountDownLatch errorLatch = new CountDownLatch(1); UnhandledErrorListener listener = (message, e) -> { if ( e instanceof UnsupportedOperationException ) { errorLatch.countDown(); } }; async.with(listener).create().forPath("/foo"); Assert.assertTrue(new Timing().awaitLatch(errorLatch)); } finally { CloseableUtils.closeQuietly(client); } }
/** * Allows catching unhandled errors in asynchornous operations. * * TODO: consider making public. */ @VisibleForTesting public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() { return errorListeners; }
@Test public void testDeleteThenCreate() throws Exception { NodeCache cache = null; CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); client.start(); try { client.create().creatingParentsIfNeeded().forPath("/test/foo", "one".getBytes()); final AtomicReference<Throwable> error = new AtomicReference<Throwable>(); client.getUnhandledErrorListenable().addListener ( new UnhandledErrorListener() { @Override public void unhandledError(String message, Throwable e) { error.set(e); } } ); final Semaphore semaphore = new Semaphore(0); cache = new NodeCache(client, "/test/foo"); cache.getListenable().addListener ( new NodeCacheListener() { @Override public void nodeChanged() throws Exception { semaphore.release(); } } ); cache.start(true); Assert.assertEquals(cache.getCurrentData().getData(), "one".getBytes()); client.delete().forPath("/test/foo"); Assert.assertTrue(semaphore.tryAcquire(1, 10, TimeUnit.SECONDS)); client.create().forPath("/test/foo", "two".getBytes()); Assert.assertTrue(semaphore.tryAcquire(1, 10, TimeUnit.SECONDS)); Throwable t = error.get(); if ( t != null ) { Assert.fail("Assert", t); } Assert.assertEquals(cache.getCurrentData().getData(), "two".getBytes()); cache.close(); } finally { CloseableUtils.closeQuietly(cache); TestCleanState.closeAndTestClean(client); } }
@Override protected void configure() { install(new CuratorInnerModule(dependencies)); install(new BarrierModule()); install(new LastKnownStateModule()); bind(BlankCuratorService.class).annotatedWith(Private.class).to(BlankCuratorService.class).in(Scopes.SINGLETON); Multibinder.newSetBinder(binder(), CuratorService.class).addBinding() .to(Key.get(BlankCuratorService.class, Private.class)); Multibinder.newSetBinder(binder(), UnhandledErrorListener.class); Multibinder.newSetBinder(binder(), CuratorListener.class); Multibinder.newSetBinder(binder(), ServiceManager.class, Cultivar.class); Multibinder<ConnectionStateListener> connectionStateListeners = Multibinder.newSetBinder(binder(), ConnectionStateListener.class); connectionStateListeners.addBinding().to(ConnectionStateLogger.class).in(Singleton.class); bind(CultivarStartStopManager.class).to(DefaultCultivarStartStopManager.class).in(Singleton.class); }
@Override public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() { return delegateFramework.getUnhandledErrorListenable(); }
/** * Adds an unhandled error listener to Curator. */ void addUnhandledErrorListener(UnhandledErrorListener listener);
/** * Returns a facade that adds the given UnhandledErrorListener to all background operations * * @param listener lister to use * @return facade */ AsyncCuratorFrameworkDsl with(UnhandledErrorListener listener);
/** * Set any combination of listener or filters * * @param listener lister to use or <code>null</code> * @param resultFilter filter to use or <code>null</code> * @param watcherFilter filter to use or <code>null</code> * @see #with(java.util.function.UnaryOperator, java.util.function.UnaryOperator) * @see #with(org.apache.curator.framework.api.UnhandledErrorListener) * @return facade */ AsyncCuratorFrameworkDsl with(UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter);
/** * Set any combination of listener, filters or watch mode * * @param mode watch mode to use for subsequent calls to {@link #watched()} (cannot be <code>null</code>) * @param listener lister to use or <code>null</code> * @param resultFilter filter to use or <code>null</code> * @param watcherFilter filter to use or <code>null</code> * @see #with(WatchMode) * @see #with(java.util.function.UnaryOperator, java.util.function.UnaryOperator) * @see #with(org.apache.curator.framework.api.UnhandledErrorListener) * @return facade */ AsyncCuratorFrameworkDsl with(WatchMode mode, UnhandledErrorListener listener, UnaryOperator<CuratorEvent> resultFilter, UnaryOperator<WatchedEvent> watcherFilter);
/** * Use the given unhandledErrorListener for operations on the Modeled Curator's ZNode * * @param unhandledErrorListener listener * @return this for chaining */ public ModeledFrameworkBuilder<T> withUnhandledErrorListener(UnhandledErrorListener unhandledErrorListener) { this.unhandledErrorListener = unhandledErrorListener; return this; }