private void triggerReconnect() { transitionState(LifecycleState.CONNECTING); if (!stopped) { tryConnectHosts() .retryWhen(any() .delay(env.configProviderReconnectDelay()) .max(env.configProviderReconnectMaxAttempts()) .doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() { @Override public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) { LOGGER.info("No host usable to fetch a config from, waiting and retrying (remote hosts: {}).", system(remoteHosts.get())); } }) .build() ) .subscribe(); } }
/** * Combines provided actions into a single action stream * * @param a1 Action * @param a2 Action * @param a3 Action * @param a4 Action * @param a5 Action * @param a6 Action * @return Single action stream combined from provided actions */ @CheckResult @NonNull public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a1, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a2, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a3, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a4, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a5, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a6) { return new Action4<T1, T2, T3, T4>() { @Override public void call(T1 t1, T2 t2, T3 t3, T4 t4) { a1.call(t1, t2, t3, t4); a2.call(t1, t2, t3, t4); a3.call(t1, t2, t3, t4); a4.call(t1, t2, t3, t4); a5.call(t1, t2, t3, t4); a6.call(t1, t2, t3, t4); } }; }
/** * Combines provided actions into a single action stream * * @param a1 Action * @param a2 Action * @param a3 Action * @param a4 Action * @param a5 Action * @param a6 Action * @param a7 Action * @return Single action stream combined from provided actions */ @CheckResult @NonNull public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a1, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a2, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a3, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a4, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a5, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a6, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a7) { return new Action4<T1, T2, T3, T4>() { @Override public void call(T1 t1, T2 t2, T3 t3, T4 t4) { a1.call(t1, t2, t3, t4); a2.call(t1, t2, t3, t4); a3.call(t1, t2, t3, t4); a4.call(t1, t2, t3, t4); a5.call(t1, t2, t3, t4); a6.call(t1, t2, t3, t4); a7.call(t1, t2, t3, t4); } }; }
/** * Combines provided actions into a single action stream * * @param a1 Action * @param a2 Action * @param a3 Action * @param a4 Action * @param a5 Action * @param a6 Action * @param a7 Action * @param a8 Action * @return Single action stream combined from provided actions */ @CheckResult @NonNull public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a1, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a2, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a3, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a4, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a5, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a6, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a7, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a8) { return new Action4<T1, T2, T3, T4>() { @Override public void call(T1 t1, T2 t2, T3 t3, T4 t4) { a1.call(t1, t2, t3, t4); a2.call(t1, t2, t3, t4); a3.call(t1, t2, t3, t4); a4.call(t1, t2, t3, t4); a5.call(t1, t2, t3, t4); a6.call(t1, t2, t3, t4); a7.call(t1, t2, t3, t4); a8.call(t1, t2, t3, t4); } }; }
/** * Combines provided actions into a single action stream * * @param a1 Action * @param a2 Action * @param a3 Action * @param a4 Action * @param a5 Action * @param a6 Action * @param a7 Action * @param a8 Action * @param a9 Action * @return Single action stream combined from provided actions */ @CheckResult @NonNull public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a1, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a2, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a3, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a4, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a5, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a6, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a7, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a8, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a9) { return new Action4<T1, T2, T3, T4>() { @Override public void call(T1 t1, T2 t2, T3 t3, T4 t4) { a1.call(t1, t2, t3, t4); a2.call(t1, t2, t3, t4); a3.call(t1, t2, t3, t4); a4.call(t1, t2, t3, t4); a5.call(t1, t2, t3, t4); a6.call(t1, t2, t3, t4); a7.call(t1, t2, t3, t4); a8.call(t1, t2, t3, t4); a9.call(t1, t2, t3, t4); } }; }
private void dispatchReconnect() { if (isShutdown) { LOGGER.debug("Ignoring reconnect on {} because already shutdown.", inetAddress); return; } LOGGER.info("Node {} socket closed, initiating reconnect.", system(inetAddress)); connect() .retryWhen(any().max(Integer.MAX_VALUE).delay(Delay.exponential(TimeUnit.MILLISECONDS, 4096, 32)) .doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() { @Override public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) { LOGGER.debug("Rescheduling Node reconnect for DCP channel {}", inetAddress); } }).build()) .subscribe(new CompletableSubscriber() { @Override public void onCompleted() { LOGGER.debug("Completed Node connect for DCP channel {}", inetAddress); for (short vbid = 0; vbid < openStreams.length(); vbid++) { if (openStreams.get(vbid) != 0) { conductor.maybeMovePartition(vbid); } } } @Override public void onError(Throwable e) { LOGGER.warn("Got error during connect (maybe retried) for node {}", system(inetAddress), e); } @Override public void onSubscribe(Subscription d) { // ignored. } }); }
@SuppressWarnings("unchecked") public Single<ByteBuf> getFailoverLog(final short partition) { return Observable .just(partition) .map(new Func1<Short, DcpChannel>() { @Override public DcpChannel call(Short aShort) { return masterChannelByPartition(partition); } }) .flatMapSingle(new Func1<DcpChannel, Single<ByteBuf>>() { @Override public Single<ByteBuf> call(DcpChannel channel) { return channel.getFailoverLog(partition); } }) .retryWhen(anyOf(NotConnectedException.class) .max(Integer.MAX_VALUE) .delay(Delay.fixed(200, TimeUnit.MILLISECONDS)) .doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() { @Override public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) { LOGGER.debug("Rescheduling Get Failover Log for vbid {}, not connected (yet).", partition); } }) .build() ).toSingle(); }
@SuppressWarnings("unchecked") public Completable startStreamForPartition(final short partition, final long vbuuid, final long startSeqno, final long endSeqno, final long snapshotStartSeqno, final long snapshotEndSeqno) { return Observable .just(partition) .map(new Func1<Short, DcpChannel>() { @Override public DcpChannel call(Short aShort) { return masterChannelByPartition(partition); } }) .flatMapCompletable(new Func1<DcpChannel, Completable>() { @Override public Completable call(DcpChannel channel) { return channel.openStream(partition, vbuuid, startSeqno, endSeqno, snapshotStartSeqno, snapshotEndSeqno); } }) .retryWhen(anyOf(NotConnectedException.class) .max(Integer.MAX_VALUE) .delay(Delay.fixed(200, TimeUnit.MILLISECONDS)) .doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() { @Override public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) { LOGGER.debug("Rescheduling Stream Start for vbid {}, not connected (yet).", partition); } }) .build() ) .toCompletable(); }
/** * Protected constructor that also allows to set a {@link Scheduler} for the delay, especially useful for tests. */ protected RetryWithDelayHandler(int maxAttempts, Delay retryDelay, Func1<Throwable, Boolean> errorInterruptingPredicate, Action4<Integer, Throwable, Long, TimeUnit> doOnRetry, Scheduler scheduler) { this.maxAttempts = Math.min(Integer.MAX_VALUE - 1, maxAttempts); this.retryDelay = retryDelay; this.errorInterruptingPredicate = errorInterruptingPredicate; this.optionalScheduler = scheduler; this.doOnRetry = doOnRetry; }
/** * Combines provided actions into a single action stream * * @param a1 Action * @param a2 Action * @return Single action stream combined from provided actions */ @CheckResult @NonNull public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a1, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a2) { return new Action4<T1, T2, T3, T4>() { @Override public void call(T1 t1, T2 t2, T3 t3, T4 t4) { a1.call(t1, t2, t3, t4); a2.call(t1, t2, t3, t4); } }; }
/** * Combines provided actions into a single action stream * * @param a1 Action * @param a2 Action * @param a3 Action * @return Single action stream combined from provided actions */ @CheckResult @NonNull public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a1, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a2, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a3) { return new Action4<T1, T2, T3, T4>() { @Override public void call(T1 t1, T2 t2, T3 t3, T4 t4) { a1.call(t1, t2, t3, t4); a2.call(t1, t2, t3, t4); a3.call(t1, t2, t3, t4); } }; }
/** * Combines provided actions into a single action stream * * @param a1 Action * @param a2 Action * @param a3 Action * @param a4 Action * @return Single action stream combined from provided actions */ @CheckResult @NonNull public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a1, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a2, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a3, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a4) { return new Action4<T1, T2, T3, T4>() { @Override public void call(T1 t1, T2 t2, T3 t3, T4 t4) { a1.call(t1, t2, t3, t4); a2.call(t1, t2, t3, t4); a3.call(t1, t2, t3, t4); a4.call(t1, t2, t3, t4); } }; }
/** * Combines provided actions into a single action stream * * @param a1 Action * @param a2 Action * @param a3 Action * @param a4 Action * @param a5 Action * @return Single action stream combined from provided actions */ @CheckResult @NonNull public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a1, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a2, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a3, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a4, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a5) { return new Action4<T1, T2, T3, T4>() { @Override public void call(T1 t1, T2 t2, T3 t3, T4 t4) { a1.call(t1, t2, t3, t4); a2.call(t1, t2, t3, t4); a3.call(t1, t2, t3, t4); a4.call(t1, t2, t3, t4); a5.call(t1, t2, t3, t4); } }; }
/** * Combines provided actions into a single action stream * * @param a Actions * @return Single action stream combined from provided actions */ @SafeVarargs @CheckResult @NonNull public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4>... a) { return new Action4<T1, T2, T3, T4>() { @Override public void call(T1 t1, T2 t2, T3 t3, T4 t4) { //noinspection ForLoopReplaceableByForEach for (int i = 0, count = a.length; i < count; i++) { a[i].call(t1, t2, t3, t4); } } }; }
/** * Protected constructor that also allows to set a {@link Scheduler} for the delay, especially * useful for tests. */ protected RetryWithDelayHandler( int maxAttempts, Delay retryDelay, Func1<Throwable, Boolean> errorInterruptingPredicate, Action4<Integer, Throwable, Long, TimeUnit> doOnRetry, Scheduler scheduler) { this.maxAttempts = Math.min(Integer.MAX_VALUE - 1, maxAttempts); this.retryDelay = retryDelay; this.errorInterruptingPredicate = errorInterruptingPredicate; this.optionalScheduler = scheduler; this.doOnRetry = doOnRetry; }
private void add(final InetSocketAddress node) { //noinspection SuspiciousMethodCalls: channel proxies equals/hashcode to its address if (channels.contains(node)) { return; } LOGGER.debug("Adding DCP Channel against {}", node); final DcpChannel channel = new DcpChannel(node, env, this); channels.add(channel); channel .connect() .retryWhen(RetryBuilder.anyMatches(new Func1<Throwable, Boolean>() { @Override public Boolean call(Throwable t) { return !stopped; } }).max(env.dcpChannelsReconnectMaxAttempts()).delay(env.dcpChannelsReconnectDelay()). doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() { @Override public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) { LOGGER.debug("Rescheduling Node reconnect for DCP channel {}", node); } }).build()).subscribe(new CompletableSubscriber() { @Override public void onCompleted() { LOGGER.debug("Completed Node connect for DCP channel {}", node); } @Override public void onError(Throwable e) { LOGGER.warn("Got error during connect (maybe retried) for node {}", system(node), e); if (env.eventBus() != null) { env.eventBus().publish(new FailedToAddNodeEvent(node, e)); } } @Override public void onSubscribe(Subscription d) { // ignored. } }); }
/** * Construct a {@link RetryWithDelayHandler retry handler} that will retry on most errors but will * stop on specific errors. * * @param maxAttempts the maximum number of retries before a {@link CannotRetryException} is * thrown. It will be capped at <code>{@link Integer#MAX_VALUE} - 1</code>. * @param retryDelay the {@link Delay} to apply between each retry (can grow, eg. by using {@link * com.jszczygiel.foundation.rx.time.ExponentialDelay}). * @param errorInterruptingPredicate a predicate that determine if an error must stop the retry * cycle (when true), in which case said error is cascaded down. */ public RetryWithDelayHandler( int maxAttempts, Delay retryDelay, Func1<Throwable, Boolean> errorInterruptingPredicate, Action4<Integer, Throwable, Long, TimeUnit> doOnRetry) { this(maxAttempts, retryDelay, errorInterruptingPredicate, doOnRetry, null); }
/** * Construct a {@link RetryWithDelayHandler retry handler} that will retry on most errors but will stop on specific errors. * * @param maxAttempts the maximum number of retries before a {@link CannotRetryException} is thrown. It will be * capped at <code>{@link Integer#MAX_VALUE} - 1</code>. * @param retryDelay the {@link Delay} to apply between each retry (can grow, * eg. by using {@link ExponentialDelay}). * @param errorInterruptingPredicate a predicate that determine if an error must stop the retry cycle (when true), * in which case said error is cascaded down. */ public RetryWithDelayHandler(int maxAttempts, Delay retryDelay, Func1<Throwable, Boolean> errorInterruptingPredicate, Action4<Integer, Throwable, Long, TimeUnit> doOnRetry) { this(maxAttempts, retryDelay, errorInterruptingPredicate, doOnRetry, null); }
/** * Execute some code each time a retry is scheduled (at the moment the retriable exception * is caught, but before the retry delay is applied). Only quick executing code should be * performed, do not block in this action. * * The action receives the retry attempt number (1-n), the exception that caused the retry, * the delay duration and timeunit for the scheduled retry. * * @param doOnRetryAction the side-effect action to perform whenever a retry is scheduled. * @see OnRetryAction if you want a shorter signature. */ public RetryBuilder doOnRetry(Action4<Integer, Throwable, Long, TimeUnit> doOnRetryAction) { this.doOnRetryAction = doOnRetryAction; return this; }
/** * Combines provided actions into a single action stream * * @param a1 Action * @return Single action stream combined from provided actions */ @CheckResult @NonNull public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull Action4<? super T1, ? super T2, ? super T3, ? super T4> a1) { return a1; }
/** * Execute some code each time a retry is scheduled (at the moment the retriable exception is * caught, but before the retry delay is applied). Only quick executing code should be performed, * do not block in this action. * * <p>The action receives the retry attempt number (1-n), the exception that caused the retry, the * delay duration and timeunit for the scheduled retry. * * @param doOnRetryAction the side-effect action to perform whenever a retry is scheduled. * @see OnRetryAction if you want a shorter signature. */ public RetryBuilder doOnRetry(Action4<Integer, Throwable, Long, TimeUnit> doOnRetryAction) { this.doOnRetryAction = doOnRetryAction; return this; }