Java 类rx.functions.Action4 实例源码

项目:java-dcp-client    文件:HttpStreamingConfigProvider.java   
private void triggerReconnect() {
    transitionState(LifecycleState.CONNECTING);
    if (!stopped) {
        tryConnectHosts()
                .retryWhen(any()
                        .delay(env.configProviderReconnectDelay())
                        .max(env.configProviderReconnectMaxAttempts())
                        .doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() {
                            @Override
                            public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
                                LOGGER.info("No host usable to fetch a config from, waiting and retrying (remote hosts: {}).",
                                        system(remoteHosts.get()));
                            }
                        })
                        .build()
                )
                .subscribe();
    }
}
项目:RxActions    文件:Actions.java   
/**
 * Combines provided actions into a single action stream
 *
 * @param a1 Action
 * @param a2 Action
 * @param a3 Action
 * @param a4 Action
 * @param a5 Action
 * @param a6 Action
 * @return Single action stream combined from provided actions
 */
@CheckResult
@NonNull
public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a1, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a2, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a3, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a4, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a5, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a6) {
    return new Action4<T1, T2, T3, T4>() {
        @Override
        public void call(T1 t1, T2 t2, T3 t3, T4 t4) {
            a1.call(t1, t2, t3, t4);
            a2.call(t1, t2, t3, t4);
            a3.call(t1, t2, t3, t4);
            a4.call(t1, t2, t3, t4);
            a5.call(t1, t2, t3, t4);
            a6.call(t1, t2, t3, t4);
        }
    };
}
项目:RxActions    文件:Actions.java   
/**
 * Combines provided actions into a single action stream
 *
 * @param a1 Action
 * @param a2 Action
 * @param a3 Action
 * @param a4 Action
 * @param a5 Action
 * @param a6 Action
 * @param a7 Action
 * @return Single action stream combined from provided actions
 */
@CheckResult
@NonNull
public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a1, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a2, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a3, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a4, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a5, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a6, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a7) {
    return new Action4<T1, T2, T3, T4>() {
        @Override
        public void call(T1 t1, T2 t2, T3 t3, T4 t4) {
            a1.call(t1, t2, t3, t4);
            a2.call(t1, t2, t3, t4);
            a3.call(t1, t2, t3, t4);
            a4.call(t1, t2, t3, t4);
            a5.call(t1, t2, t3, t4);
            a6.call(t1, t2, t3, t4);
            a7.call(t1, t2, t3, t4);
        }
    };
}
项目:RxActions    文件:Actions.java   
/**
 * Combines provided actions into a single action stream
 *
 * @param a1 Action
 * @param a2 Action
 * @param a3 Action
 * @param a4 Action
 * @param a5 Action
 * @param a6 Action
 * @param a7 Action
 * @param a8 Action
 * @return Single action stream combined from provided actions
 */
@CheckResult
@NonNull
public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a1, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a2, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a3, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a4, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a5, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a6, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a7, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a8) {
    return new Action4<T1, T2, T3, T4>() {
        @Override
        public void call(T1 t1, T2 t2, T3 t3, T4 t4) {
            a1.call(t1, t2, t3, t4);
            a2.call(t1, t2, t3, t4);
            a3.call(t1, t2, t3, t4);
            a4.call(t1, t2, t3, t4);
            a5.call(t1, t2, t3, t4);
            a6.call(t1, t2, t3, t4);
            a7.call(t1, t2, t3, t4);
            a8.call(t1, t2, t3, t4);
        }
    };
}
项目:RxActions    文件:Actions.java   
/**
 * Combines provided actions into a single action stream
 *
 * @param a1 Action
 * @param a2 Action
 * @param a3 Action
 * @param a4 Action
 * @param a5 Action
 * @param a6 Action
 * @param a7 Action
 * @param a8 Action
 * @param a9 Action
 * @return Single action stream combined from provided actions
 */
@CheckResult
@NonNull
public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a1, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a2, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a3, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a4, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a5, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a6, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a7, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a8, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a9) {
    return new Action4<T1, T2, T3, T4>() {
        @Override
        public void call(T1 t1, T2 t2, T3 t3, T4 t4) {
            a1.call(t1, t2, t3, t4);
            a2.call(t1, t2, t3, t4);
            a3.call(t1, t2, t3, t4);
            a4.call(t1, t2, t3, t4);
            a5.call(t1, t2, t3, t4);
            a6.call(t1, t2, t3, t4);
            a7.call(t1, t2, t3, t4);
            a8.call(t1, t2, t3, t4);
            a9.call(t1, t2, t3, t4);
        }
    };
}
项目:java-dcp-client    文件:DcpChannel.java   
private void dispatchReconnect() {
    if (isShutdown) {
        LOGGER.debug("Ignoring reconnect on {} because already shutdown.", inetAddress);
        return;
    }
    LOGGER.info("Node {} socket closed, initiating reconnect.", system(inetAddress));

    connect()
            .retryWhen(any().max(Integer.MAX_VALUE).delay(Delay.exponential(TimeUnit.MILLISECONDS, 4096, 32))
                    .doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() {
                        @Override
                        public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
                            LOGGER.debug("Rescheduling Node reconnect for DCP channel {}", inetAddress);
                        }
                    }).build())
            .subscribe(new CompletableSubscriber() {
                @Override
                public void onCompleted() {
                    LOGGER.debug("Completed Node connect for DCP channel {}", inetAddress);
                    for (short vbid = 0; vbid < openStreams.length(); vbid++) {
                        if (openStreams.get(vbid) != 0) {
                            conductor.maybeMovePartition(vbid);
                        }
                    }
                }

                @Override
                public void onError(Throwable e) {
                    LOGGER.warn("Got error during connect (maybe retried) for node {}", system(inetAddress), e);
                }

                @Override
                public void onSubscribe(Subscription d) {
                    // ignored.
                }
            });
}
项目:java-dcp-client    文件:Conductor.java   
@SuppressWarnings("unchecked")
public Single<ByteBuf> getFailoverLog(final short partition) {
    return Observable
            .just(partition)
            .map(new Func1<Short, DcpChannel>() {
                @Override
                public DcpChannel call(Short aShort) {
                    return masterChannelByPartition(partition);
                }
            })
            .flatMapSingle(new Func1<DcpChannel, Single<ByteBuf>>() {
                @Override
                public Single<ByteBuf> call(DcpChannel channel) {
                    return channel.getFailoverLog(partition);
                }
            })
            .retryWhen(anyOf(NotConnectedException.class)
                    .max(Integer.MAX_VALUE)
                    .delay(Delay.fixed(200, TimeUnit.MILLISECONDS))
                    .doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() {
                        @Override
                        public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
                            LOGGER.debug("Rescheduling Get Failover Log for vbid {}, not connected (yet).", partition);

                        }
                    })
                    .build()
            ).toSingle();
}
项目:java-dcp-client    文件:Conductor.java   
@SuppressWarnings("unchecked")
public Completable startStreamForPartition(final short partition, final long vbuuid, final long startSeqno,
                                           final long endSeqno, final long snapshotStartSeqno, final long snapshotEndSeqno) {
    return Observable
            .just(partition)
            .map(new Func1<Short, DcpChannel>() {
                @Override
                public DcpChannel call(Short aShort) {
                    return masterChannelByPartition(partition);
                }
            })
            .flatMapCompletable(new Func1<DcpChannel, Completable>() {
                @Override
                public Completable call(DcpChannel channel) {
                    return channel.openStream(partition, vbuuid, startSeqno, endSeqno, snapshotStartSeqno, snapshotEndSeqno);
                }
            })
            .retryWhen(anyOf(NotConnectedException.class)
                    .max(Integer.MAX_VALUE)
                    .delay(Delay.fixed(200, TimeUnit.MILLISECONDS))
                    .doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() {
                        @Override
                        public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
                            LOGGER.debug("Rescheduling Stream Start for vbid {}, not connected (yet).", partition);

                        }
                    })
                    .build()
            )
            .toCompletable();
}
项目:java-dcp-client    文件:RetryWithDelayHandler.java   
/**
 * Protected constructor that also allows to set a {@link Scheduler} for the delay, especially useful for tests.
 */
protected RetryWithDelayHandler(int maxAttempts, Delay retryDelay, Func1<Throwable, Boolean> errorInterruptingPredicate,
                                Action4<Integer, Throwable, Long, TimeUnit> doOnRetry, Scheduler scheduler) {
    this.maxAttempts = Math.min(Integer.MAX_VALUE - 1, maxAttempts);
    this.retryDelay = retryDelay;
    this.errorInterruptingPredicate = errorInterruptingPredicate;
    this.optionalScheduler = scheduler;
    this.doOnRetry = doOnRetry;
}
项目:RxActions    文件:Actions.java   
/**
 * Combines provided actions into a single action stream
 *
 * @param a1 Action
 * @param a2 Action
 * @return Single action stream combined from provided actions
 */
@CheckResult
@NonNull
public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a1, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a2) {
    return new Action4<T1, T2, T3, T4>() {
        @Override
        public void call(T1 t1, T2 t2, T3 t3, T4 t4) {
            a1.call(t1, t2, t3, t4);
            a2.call(t1, t2, t3, t4);
        }
    };
}
项目:RxActions    文件:Actions.java   
/**
 * Combines provided actions into a single action stream
 *
 * @param a1 Action
 * @param a2 Action
 * @param a3 Action
 * @return Single action stream combined from provided actions
 */
@CheckResult
@NonNull
public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a1, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a2, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a3) {
    return new Action4<T1, T2, T3, T4>() {
        @Override
        public void call(T1 t1, T2 t2, T3 t3, T4 t4) {
            a1.call(t1, t2, t3, t4);
            a2.call(t1, t2, t3, t4);
            a3.call(t1, t2, t3, t4);
        }
    };
}
项目:RxActions    文件:Actions.java   
/**
 * Combines provided actions into a single action stream
 *
 * @param a1 Action
 * @param a2 Action
 * @param a3 Action
 * @param a4 Action
 * @return Single action stream combined from provided actions
 */
@CheckResult
@NonNull
public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a1, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a2, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a3, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a4) {
    return new Action4<T1, T2, T3, T4>() {
        @Override
        public void call(T1 t1, T2 t2, T3 t3, T4 t4) {
            a1.call(t1, t2, t3, t4);
            a2.call(t1, t2, t3, t4);
            a3.call(t1, t2, t3, t4);
            a4.call(t1, t2, t3, t4);
        }
    };
}
项目:RxActions    文件:Actions.java   
/**
 * Combines provided actions into a single action stream
 *
 * @param a1 Action
 * @param a2 Action
 * @param a3 Action
 * @param a4 Action
 * @param a5 Action
 * @return Single action stream combined from provided actions
 */
@CheckResult
@NonNull
public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a1, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a2, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a3, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a4, @NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4> a5) {
    return new Action4<T1, T2, T3, T4>() {
        @Override
        public void call(T1 t1, T2 t2, T3 t3, T4 t4) {
            a1.call(t1, t2, t3, t4);
            a2.call(t1, t2, t3, t4);
            a3.call(t1, t2, t3, t4);
            a4.call(t1, t2, t3, t4);
            a5.call(t1, t2, t3, t4);
        }
    };
}
项目:RxActions    文件:Actions.java   
/**
 * Combines provided actions into a single action stream
 *
 * @param a Actions
 * @return Single action stream combined from provided actions
 */
@SafeVarargs
@CheckResult
@NonNull
public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull final Action4<? super T1, ? super T2, ? super T3, ? super T4>... a) {
    return new Action4<T1, T2, T3, T4>() {
        @Override
        public void call(T1 t1, T2 t2, T3 t3, T4 t4) {
            //noinspection ForLoopReplaceableByForEach
            for (int i = 0, count = a.length; i < count; i++) {
                a[i].call(t1, t2, t3, t4);
            }
        }
    };
}
项目:android-common    文件:RetryWithDelayHandler.java   
/**
 * Protected constructor that also allows to set a {@link Scheduler} for the delay, especially
 * useful for tests.
 */
protected RetryWithDelayHandler(
    int maxAttempts,
    Delay retryDelay,
    Func1<Throwable, Boolean> errorInterruptingPredicate,
    Action4<Integer, Throwable, Long, TimeUnit> doOnRetry,
    Scheduler scheduler) {
  this.maxAttempts = Math.min(Integer.MAX_VALUE - 1, maxAttempts);
  this.retryDelay = retryDelay;
  this.errorInterruptingPredicate = errorInterruptingPredicate;
  this.optionalScheduler = scheduler;
  this.doOnRetry = doOnRetry;
}
项目:java-dcp-client    文件:Conductor.java   
private void add(final InetSocketAddress node) {
    //noinspection SuspiciousMethodCalls: channel proxies equals/hashcode to its address
    if (channels.contains(node)) {
        return;
    }

    LOGGER.debug("Adding DCP Channel against {}", node);
    final DcpChannel channel = new DcpChannel(node, env, this);
    channels.add(channel);

    channel
            .connect()
            .retryWhen(RetryBuilder.anyMatches(new Func1<Throwable, Boolean>() {
                @Override
                public Boolean call(Throwable t) {
                    return !stopped;
                }
            }).max(env.dcpChannelsReconnectMaxAttempts()).delay(env.dcpChannelsReconnectDelay()).
                    doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() {
                        @Override
                        public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
                            LOGGER.debug("Rescheduling Node reconnect for DCP channel {}", node);
                        }
                    }).build()).subscribe(new CompletableSubscriber() {
        @Override
        public void onCompleted() {
            LOGGER.debug("Completed Node connect for DCP channel {}", node);
        }

        @Override
        public void onError(Throwable e) {
            LOGGER.warn("Got error during connect (maybe retried) for node {}", system(node), e);
            if (env.eventBus() != null) {
                env.eventBus().publish(new FailedToAddNodeEvent(node, e));
            }
        }

        @Override
        public void onSubscribe(Subscription d) {
            // ignored.
        }
    });
}
项目:android-common    文件:RetryWithDelayHandler.java   
/**
 * Construct a {@link RetryWithDelayHandler retry handler} that will retry on most errors but will
 * stop on specific errors.
 *
 * @param maxAttempts the maximum number of retries before a {@link CannotRetryException} is
 *     thrown. It will be capped at <code>{@link Integer#MAX_VALUE} - 1</code>.
 * @param retryDelay the {@link Delay} to apply between each retry (can grow, eg. by using {@link
 *     com.jszczygiel.foundation.rx.time.ExponentialDelay}).
 * @param errorInterruptingPredicate a predicate that determine if an error must stop the retry
 *     cycle (when true), in which case said error is cascaded down.
 */
public RetryWithDelayHandler(
    int maxAttempts,
    Delay retryDelay,
    Func1<Throwable, Boolean> errorInterruptingPredicate,
    Action4<Integer, Throwable, Long, TimeUnit> doOnRetry) {
  this(maxAttempts, retryDelay, errorInterruptingPredicate, doOnRetry, null);
}
项目:java-dcp-client    文件:RetryWithDelayHandler.java   
/**
 * Construct a {@link RetryWithDelayHandler retry handler} that will retry on most errors but will stop on specific errors.
 *
 * @param maxAttempts the maximum number of retries before a {@link CannotRetryException} is thrown. It will be
 *                    capped at <code>{@link Integer#MAX_VALUE} - 1</code>.
 * @param retryDelay the {@link Delay} to apply between each retry (can grow,
 *  eg. by using {@link ExponentialDelay}).
 * @param errorInterruptingPredicate a predicate that determine if an error must stop the retry cycle (when true),
 *  in which case said error is cascaded down.
 */
public RetryWithDelayHandler(int maxAttempts, Delay retryDelay, Func1<Throwable, Boolean> errorInterruptingPredicate,
                             Action4<Integer, Throwable, Long, TimeUnit> doOnRetry) {
    this(maxAttempts, retryDelay, errorInterruptingPredicate, doOnRetry, null);
}
项目:java-dcp-client    文件:RetryBuilder.java   
/**
 * Execute some code each time a retry is scheduled (at the moment the retriable exception
 * is caught, but before the retry delay is applied). Only quick executing code should be
 * performed, do not block in this action.
 *
 * The action receives the retry attempt number (1-n), the exception that caused the retry,
 * the delay duration and timeunit for the scheduled retry.
 *
 * @param doOnRetryAction the side-effect action to perform whenever a retry is scheduled.
 * @see OnRetryAction if you want a shorter signature.
 */
public RetryBuilder doOnRetry(Action4<Integer, Throwable, Long, TimeUnit> doOnRetryAction) {
    this.doOnRetryAction = doOnRetryAction;
    return this;
}
项目:RxActions    文件:Actions.java   
/**
 * Combines provided actions into a single action stream
 *
 * @param a1 Action
 * @return Single action stream combined from provided actions
 */
@CheckResult
@NonNull
public static <T1, T2, T3, T4> Action4<? super T1, ? super T2, ? super T3, ? super T4> combine(@NonNull Action4<? super T1, ? super T2, ? super T3, ? super T4> a1) {
    return a1;
}
项目:android-common    文件:RetryBuilder.java   
/**
 * Execute some code each time a retry is scheduled (at the moment the retriable exception is
 * caught, but before the retry delay is applied). Only quick executing code should be performed,
 * do not block in this action.
 *
 * <p>The action receives the retry attempt number (1-n), the exception that caused the retry, the
 * delay duration and timeunit for the scheduled retry.
 *
 * @param doOnRetryAction the side-effect action to perform whenever a retry is scheduled.
 * @see OnRetryAction if you want a shorter signature.
 */
public RetryBuilder doOnRetry(Action4<Integer, Throwable, Long, TimeUnit> doOnRetryAction) {
  this.doOnRetryAction = doOnRetryAction;
  return this;
}