Java 类io.reactivex.parallel.ParallelFlowable 实例源码

项目:rxjava2    文件:ParallelFlowableRange.java   
public static void main(String[] args) {

        int numberOfRails = 4; // can query #processors with parallelism()

        ParallelFlowable
            .from(Flowable.range(1, 10), numberOfRails)
            .runOn(Schedulers.computation())
            .map(i -> i * i)
            .filter(i -> i % 3 == 0)
            .sequential()
            .subscribe(System.out::println);
    }
项目:RxJava2Extensions    文件:ParallelOrderedMerge.java   
ParallelOrderedMerge(ParallelFlowable<T> source,
        Comparator<? super T> comparator,
        boolean delayErrors, int prefetch) {
    this.source = source;
    this.comparator = comparator;
    this.delayErrors = delayErrors;
    this.prefetch = prefetch;
}
项目:AutoDispose    文件:ParallelFlowableScoper.java   
@Override public ParallelFlowableSubscribeProxy<T> apply(final ParallelFlowable<T> upstream) {
  return new ParallelFlowableSubscribeProxy<T>() {
    @Override public void subscribe(Subscriber<? super T>[] subscribers) {
      new AutoDisposeParallelFlowable<>(upstream, scope()).subscribe(subscribers);
    }
  };
}
项目:akarnokd-misc    文件:ParallelPerf.java   
@Setup
public void setup() {
    flowable = ParallelFlowable.from(Flowable.range(0, count)).runOn(Schedulers.computation())
    .filter(v -> { Blackhole.consumeCPU(cost); return false; })
    .sequential();

    flowableFJ = ParallelFlowable.from(Flowable.range(0, count))
            .runOn(Schedulers.from(ForkJoinPool.commonPool()))
    .filter(v -> { Blackhole.consumeCPU(cost); return false; })
    .sequential();
}
项目:cyclops    文件:FlowableKind.java   
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport("none")
@CheckReturnValue
@Beta
public ParallelFlowable<T> parallel() {
    return boxed.parallel();
}
项目:cyclops    文件:FlowableKind.java   
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport("none")
@CheckReturnValue
@Beta
public ParallelFlowable<T> parallel(int parallelism) {
    return boxed.parallel(parallelism);
}
项目:cyclops    文件:FlowableKind.java   
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport("none")
@CheckReturnValue
@Beta
public ParallelFlowable<T> parallel(int parallelism, int prefetch) {
    return boxed.parallel(parallelism, prefetch);
}
项目:RxJava2Debug    文件:ParallelFlowableOnAssembly.java   
ParallelFlowableOnAssembly(ParallelFlowable<T> source) {
    this.source = source;
    this.assembled = new RxJavaAssemblyException();
}
项目:RxJava2Debug    文件:RxJava2AssemblyTrackingTest.java   
static ParallelFlowable<Integer> createParallelFlowable() {
    return Flowable.range(1, 5).concatWith(Flowable.<Integer>error(new IOException())).parallel();
}
项目:RxJava2Extensions    文件:ParallelFlowableValidator.java   
ParallelFlowableValidator(ParallelFlowable<T> source, PlainConsumer<ProtocolNonConformanceException> onViolation) {
    this.source = source;
    this.onViolation = onViolation;
}
项目:RxJava2Extensions    文件:ParallelFlowableOnAssembly.java   
ParallelFlowableOnAssembly(ParallelFlowable<T> source) {
    this.source = source;
    this.assembled = new RxJavaAssemblyException();
}
项目:RxJava2Extensions    文件:ParallelSumInteger.java   
ParallelSumInteger(ParallelFlowable<? extends Number> source) {
    this.source = source;
}
项目:RxJava2Extensions    文件:ParallelSumInteger.java   
@Override
public ParallelFlowable<Integer> apply(ParallelFlowable<T> t) {
    return new ParallelSumInteger<T>(t);
}
项目:RxJava2Extensions    文件:ParallelSumLong.java   
ParallelSumLong(ParallelFlowable<? extends Number> source) {
    this.source = source;
}
项目:RxJava2Extensions    文件:ParallelSumLong.java   
@Override
public ParallelFlowable<Long> apply(ParallelFlowable<T> t) {
    return new ParallelSumLong<T>(t);
}
项目:RxJava2Extensions    文件:ParallelSumDouble.java   
ParallelSumDouble(ParallelFlowable<? extends Number> source) {
    this.source = source;
}
项目:RxJava2Extensions    文件:ParallelSumDouble.java   
@Override
public ParallelFlowable<Double> apply(ParallelFlowable<T> t) {
    return new ParallelSumDouble<T>(t);
}
项目:RxJava2Extensions    文件:BasicMergeSubscription.java   
public void subscribe(ParallelFlowable<T> source) {
  source.subscribe(subscribers);
}
项目:RxJava2Extensions    文件:RxJavaProtocolValidatorTest.java   
@SuppressWarnings("unchecked")
@Test
public void parallelFlowable() {
    ParallelFlowable<Integer> source = new ParallelFlowable<Integer>() {

        @Override
        public void subscribe(Subscriber<? super Integer>[] s) {
            validate(s);
            s[0].onComplete();
            s[0].onError(null);
            s[0].onError(new IOException());
            s[0].onNext(null);
            s[0].onNext(1);
            s[0].onSubscribe(null);
            s[0].onSubscribe(new BooleanSubscription());
            s[0].onSubscribe(new BooleanSubscription());
            s[0].onComplete();
            s[0].onNext(2);
        }

        @Override
        public int parallelism() {
            return 1;
        }
    };

    RxJavaProtocolValidator.setOnViolationHandler(this);
    Assert.assertSame(this, RxJavaProtocolValidator.getOnViolationHandler());

    SavedHooks h = RxJavaProtocolValidator.enableAndChain();
    Assert.assertTrue(RxJavaProtocolValidator.isEnabled());

    try {
        Flowable.just(1).publish().autoConnect().test().assertResult(1);
        Flowable.empty().publish().autoConnect().test().assertResult();
        Flowable.error(new IOException()).test().assertFailure(IOException.class);

        ParallelFlowable<Integer> c = RxJavaPlugins.onAssembly(source);

        c.subscribe(new Subscriber[] { new TestSubscriber<Integer>(0) });

        Assert.assertEquals(15, errors.size());
        TestHelper.assertError(errors, 0, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 1, NullOnErrorParameterException.class);
        TestHelper.assertError(errors, 2, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 3, MultipleTerminationsException.class);
        TestHelper.assertError(errors, 4, OnSubscribeNotCalledException.class);
        Assert.assertTrue("" + errors.get(4).getCause(), errors.get(4).getCause() instanceof IOException);
        TestHelper.assertError(errors, 5, MultipleTerminationsException.class);
        TestHelper.assertError(errors, 6, NullOnNextParameterException.class);
        TestHelper.assertError(errors, 7, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 8, OnNextAfterTerminationException.class);
        TestHelper.assertError(errors, 9, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 10, OnNextAfterTerminationException.class);
        TestHelper.assertError(errors, 11, NullOnSubscribeParameterException.class);
        TestHelper.assertError(errors, 12, MultipleOnSubscribeCallsException.class);
        TestHelper.assertError(errors, 13, MultipleTerminationsException.class);
        TestHelper.assertError(errors, 14, OnNextAfterTerminationException.class);
    } finally {
        h.restore();
        RxJavaProtocolValidator.setOnViolationHandler(null);
    }
}
项目:RxJava2Extensions    文件:RxJava2AssemblyTrackingTest.java   
static ParallelFlowable<Integer> createParallelFlowable() {
    return Flowable.range(1, 5).concatWith(Flowable.<Integer>error(new IOException())).parallel();
}
项目:AutoDispose    文件:ParallelFlowableScoper.java   
AutoDisposeParallelFlowable(ParallelFlowable<T> source, Maybe<?> scope) {
  this.source = source;
  this.scope = scope;
}
项目:RxJava2Extensions    文件:ParallelTransformers.java   
/**
 * Merges the source ParallelFlowable rails in an ordered fashion picking the smallest of the available value from
 * them (determined by the Comparator), allows delaying any error they may signal and sets the prefetch
 * amount when requesting from these Publishers.
 * @param <T> the value type of all sources
 * @param source the source ParallelFlowable
 * @param comparator the comparator to use for comparing items;
 *                   it is called with the last known smallest in its first argument
 * @param delayErrors if true, source errors are delayed until all sources terminate in some way
 * @param prefetch the number of items to prefetch from the sources
 * @return the new Flowable instance
 * @since 0.17.9
 */
public static <T> Flowable<T> orderedMerge(ParallelFlowable<T> source, Comparator<? super T> comparator, boolean delayErrors, int prefetch) {
    ObjectHelper.requireNonNull(comparator, "comparator is null");
    ObjectHelper.requireNonNull(source, "sources is null");
    ObjectHelper.verifyPositive(prefetch, "prefetch");
    return RxJavaPlugins.onAssembly(new ParallelOrderedMerge<T>(source, comparator, delayErrors, prefetch));
}
项目:RxJava2Extensions    文件:ParallelTransformers.java   
/**
 * Merges the source ParallelFlowable rails in an ordered fashion picking the smallest of the available value from
 * them (determined by their natural order).
 * @param <T> the value type of all sources
 * @param source the source ParallelFlowable
 * @return the new Flowable instance
 * @since 0.17.9
 */
public static <T extends Comparable<? super T>> Flowable<T> orderedMerge(ParallelFlowable<T> source) {
    return orderedMerge(source, Functions.naturalOrder(), false, Flowable.bufferSize());
}
项目:RxJava2Extensions    文件:ParallelTransformers.java   
/**
 * Merges the source ParallelFlowable rails in an ordered fashion picking the smallest of the available value from
 * them (determined by their natural order) and allows delaying any error they may signal.
 * @param <T> the value type of all sources
 * @param source the source ParallelFlowable
 * @param delayErrors if true, source errors are delayed until all sources terminate in some way
 * @return the new Flowable instance
 * @since 0.17.9
 */
public static <T extends Comparable<? super T>> Flowable<T> orderedMerge(ParallelFlowable<T> source, boolean delayErrors) {
    return orderedMerge(source, Functions.naturalOrder(), delayErrors, Flowable.bufferSize());
}
项目:RxJava2Extensions    文件:ParallelTransformers.java   
/**
 * Merges the source ParallelFlowable rails in an ordered fashion picking the smallest of the available value from
 * them (determined by their natural order), allows delaying any error they may signal and sets the prefetch
 * amount when requesting from these Publishers.
 * @param <T> the value type of all sources
 * @param source the source ParallelFlowable
 * @param delayErrors if true, source errors are delayed until all sources terminate in some way
 * @param prefetch the number of items to prefetch from the sources
 * @return the new Flowable instance
 * @since 0.17.9
 */
public static <T extends Comparable<? super T>> Flowable<T> orderedMerge(ParallelFlowable<T> source, boolean delayErrors, int prefetch) {
    return orderedMerge(source, Functions.naturalOrder(), delayErrors, prefetch);
}
项目:RxJava2Extensions    文件:ParallelTransformers.java   
/**
 * Merges the source ParallelFlowable rails in an ordered fashion picking the smallest of the available value from
 * them (determined by the Comparator).
 * @param <T> the value type of all sources
 * @param source the source ParallelFlowable
 * @param comparator the comparator to use for comparing items;
 *                   it is called with the last known smallest in its first argument
 * @return the new Flowable instance
 * @since 0.17.9
 */
public static <T> Flowable<T> orderedMerge(ParallelFlowable<T> source, Comparator<? super T> comparator) {
    return orderedMerge(source, comparator, false, Flowable.bufferSize());
}
项目:RxJava2Extensions    文件:ParallelTransformers.java   
/**
 * Merges the source ParallelFlowable rails in an ordered fashion picking the smallest of the available value from
 * them (determined by the Comparator) and allows delaying any error they may signal.
 * @param <T> the value type of all sources
 * @param source the source ParallelFlowable
 * @param comparator the comparator to use for comparing items;
 *                   it is called with the last known smallest in its first argument
 * @param delayErrors if true, source errors are delayed until all sources terminate in some way
 * @return the new Flowable instance
 * @since 0.17.9
 */
public static <T> Flowable<T> orderedMerge(ParallelFlowable<T> source, Comparator<? super T> comparator, boolean delayErrors) {
    return orderedMerge(source, comparator, delayErrors, Flowable.bufferSize());
}