Java 类io.reactivex.processors.PublishProcessor 实例源码

项目:RxShell    文件:CmdProcessorTest.java   
@Test
public void testCommand_callback_sync() throws IOException, InterruptedException {
    processor.attach(session);

    int cnt = 100;
    List<Pair<TestObserver<Cmd.Result>, TestSubscriber<String>>> testSubscribers = new ArrayList<>();
    for (int j = 0; j < cnt; j++) {
        List<String> cmds = new ArrayList<>();
        for (int i = 0; i < 10; i++) cmds.add("echo " + i);
        cmds.add("echo " + j);

        PublishProcessor<String> outputListener = PublishProcessor.create();
        TestSubscriber<String> outputObserver = outputListener.doOnEach(stringNotification -> TestHelper.sleep(1)).test();
        final Cmd cmd = Cmd.builder(cmds).outputProcessor(outputListener).build();
        final TestObserver<Cmd.Result> resultObserver = processor.submit(cmd).subscribeOn(Schedulers.newThread()).test();
        testSubscribers.add(new Pair<>(resultObserver, outputObserver));
    }
    for (Pair<TestObserver<Cmd.Result>, TestSubscriber<String>> pair : testSubscribers) {
        pair.first.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertComplete();
        pair.second.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(11);
    }
}
项目:RxShell    文件:CmdProcessorTest.java   
@Test
public void testCommand_callback_async() throws IOException, InterruptedException {
    processor.attach(session);

    int cnt = 100;
    List<Pair<TestObserver<Cmd.Result>, TestSubscriber<String>>> testSubscribers = new ArrayList<>();
    for (int j = 0; j < cnt; j++) {
        List<String> cmds = new ArrayList<>();
        for (int i = 0; i < 10; i++) cmds.add("echo " + i);
        cmds.add("echo " + j);

        PublishProcessor<String> outputListener = PublishProcessor.create();
        TestSubscriber<String> outputObserver = outputListener.observeOn(Schedulers.newThread()).doOnEach(stringNotification -> TestHelper.sleep(1)).test();
        final Cmd cmd = Cmd.builder(cmds).outputProcessor(outputListener).build();
        final TestObserver<Cmd.Result> resultObserver = processor.submit(cmd).subscribeOn(Schedulers.newThread()).test();
        testSubscribers.add(new Pair<>(resultObserver, outputObserver));
    }
    for (Pair<TestObserver<Cmd.Result>, TestSubscriber<String>> pair : testSubscribers) {
        pair.first.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertComplete();
        pair.second.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(11);
    }
}
项目:RxShell    文件:CmdBuilderTest.java   
@Test
public void testBuilder_from() {
    Cmd orig = Cmd.builder(UUID.randomUUID().toString())
            .outputBuffer(false)
            .errorBuffer(false)
            .timeout(1337)
            .outputProcessor(PublishProcessor.create())
            .errorProcessor(PublishProcessor.create())
            .build();

    Cmd copy = Cmd.from(orig).build();
    assertEquals(orig.getCommands(), copy.getCommands());
    assertEquals(orig.isOutputBufferEnabled(), copy.isOutputBufferEnabled());
    assertEquals(orig.isErrorBufferEnabled(), copy.isErrorBufferEnabled());
    assertEquals(orig.getTimeout(), copy.getTimeout());
    assertEquals(orig.getOutputProcessor(), copy.getOutputProcessor());
    assertEquals(orig.getErrorProcessor(), copy.getErrorProcessor());
}
项目:RxShell    文件:CmdBuilderTest.java   
@Test
public void testBuild() {
    final PublishProcessor<String> outputPub = PublishProcessor.create();
    final PublishProcessor<String> errorPub = PublishProcessor.create();
    Cmd cmd = Cmd.builder("cmd1")
            .outputBuffer(false)
            .errorBuffer(false)
            .timeout(1337)
            .outputProcessor(outputPub)
            .errorProcessor(errorPub)
            .build();
    assertThat(cmd.getCommands(), contains("cmd1"));
    assertThat(cmd.getOutputProcessor(), is(outputPub));
    assertThat(cmd.getErrorProcessor(), is(errorPub));
    assertThat(cmd.getTimeout(), is(1337L));
    assertThat(cmd.isOutputBufferEnabled(), is(false));
    assertThat(cmd.isErrorBufferEnabled(), is(false));
}
项目:reactivejournal    文件:BytesToWordsProcessor.java   
public Flowable<String> process(Flowable<Byte> observableInput){

        PublishProcessor<String> publishProcessor = PublishProcessor.create();

        StringBuilder sb = new StringBuilder();
        observableInput.subscribe(b->{
                if(b==32){ //send out a new word on a space
                    publishProcessor.onNext(sb.toString());
                    sb.setLength(0);
                }else{
                    sb.append((char)b.byteValue());
                }
            },
            e->LOG.error("Error in BytesToWordsProcessor [{}]", e),
            publishProcessor::onComplete
        );

        return publishProcessor;
    }
项目:DisposableAttach    文件:DisposableAttachFlowableTest.java   
@Test public void test() {

        PublishProcessor<String> subject = PublishProcessor.create();
        Flowable<String> source = subject.hide();



        TestSubscriber testSubscriber = new TestSubscriber();
        CompositeDisposable composite = new CompositeDisposable();
        Disposable disposable = source
                .compose(DisposableAttach.<String>to(composite))
                .subscribeWith(testSubscriber);

        subject.onNext("Foo");
        testSubscriber.assertValue("Foo");
        assertTrue(composite.size() == 1);
        composite.dispose();
        assertTrue(composite.size() == 0);
        assertTrue(composite.isDisposed());
        assertTrue(disposable.isDisposed());
        assertTrue(testSubscriber.isDisposed());
    }
项目:rxtools    文件:IndexedFlowableList.java   
public boolean isActive()
{
    PublishProcessor<Optional<T>> next = null;

    if (_next != null) {
        next = _next.get();
    }

    if (next != null) {
        return true;
    }

    PublishProcessor<Optional<T>> previous = null;

    if (_previous != null) {
        previous = _previous.get();
    }

    return previous != null;
}
项目:RxJava2Extensions    文件:FlowableOnBackpressureTimeoutTest.java   
@Test
public void evictCancels() {
    TestScheduler scheduler = new TestScheduler();

    PublishProcessor<Integer> pp = PublishProcessor.create();

    final TestSubscriber<Integer> ts = new TestSubscriber<Integer>(0L);

    pp
    .compose(FlowableTransformers.<Integer>onBackpressureTimeout(10, 1, TimeUnit.SECONDS, scheduler, new Consumer<Integer>() {
        @Override
        public void accept(Integer e) throws Exception {
            evicted.add(e);
            ts.cancel();
        }
    }))
    .subscribe(ts);

    TestHelper.emit(pp, 1, 2, 3, 4, 5);

    scheduler.advanceTimeBy(1, TimeUnit.MINUTES);

    Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), evicted);
}
项目:RxJava2Extensions    文件:FlowableRefCountTimeoutTest.java   
@Test
public void comeAndGo() {
    PublishProcessor<Integer> pp = PublishProcessor.create();

    Flowable<Integer> source = pp
    .publish()
    .compose(FlowableTransformers.<Integer>refCount(1));

    TestSubscriber<Integer> ts1 = source.test(0);

    Assert.assertTrue(pp.hasSubscribers());

    for (int i = 0; i < 3; i++) {
        TestSubscriber<Integer> ts2 = source.test();
        ts1.cancel();
        ts1 = ts2;
    }

    ts1.cancel();

    Assert.assertFalse(pp.hasSubscribers());
}
项目:RxJava2Extensions    文件:FlowableSpanoutTest.java   
@Test
public void errorImmediate() {
    TestScheduler scheduler = new TestScheduler();

    PublishProcessor<Integer> pp = PublishProcessor.create();

    TestSubscriber<Integer> ts = pp
        .compose(FlowableTransformers.<Integer>spanout(100, TimeUnit.MILLISECONDS, scheduler))
        .test();

    pp.onNext(1);
    pp.onError(new IOException());

    scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

    ts.assertFailure(IOException.class);
}
项目:RxJava2Extensions    文件:FlowableSpanoutTest.java   
@Test
public void errorDelayed() {
    TestScheduler scheduler = new TestScheduler();

    PublishProcessor<Integer> pp = PublishProcessor.create();

    TestSubscriber<Integer> ts = pp
        .compose(FlowableTransformers.<Integer>spanout(100L, TimeUnit.MILLISECONDS, scheduler, true))
        .test();

    pp.onNext(1);
    pp.onError(new IOException());

    scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

    ts.assertFailure(IOException.class, 1);
}
项目:RxJava2Extensions    文件:FlowableExpandTest.java   
@Test
public void depthEmitCancelRace() {
    for (int i = 0; i < 1000; i++) {

        final PublishProcessor<Integer> pp = PublishProcessor.create();

        final TestSubscriber<Integer> ts = Flowable.just(0)
        .compose(FlowableTransformers.<Integer>expand(Functions.justFunction(pp), ExpandStrategy.DEPTH_FIRST))
        .test(1);

        Runnable r1 = new Runnable() {
            @Override
            public void run() {
                pp.onNext(1);
            }
        };
        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                ts.cancel();
            }
        };

        TestHelper.race(r1, r2, Schedulers.single());
    }
}
项目:RxJava2Extensions    文件:FlowableExpandTest.java   
@Test
public void depthCompleteCancelRace() {
    for (int i = 0; i < 1000; i++) {

        final PublishProcessor<Integer> pp = PublishProcessor.create();

        final TestSubscriber<Integer> ts = Flowable.just(0)
        .compose(FlowableTransformers.<Integer>expand(Functions.justFunction(pp), ExpandStrategy.DEPTH_FIRST))
        .test(1);

        Runnable r1 = new Runnable() {
            @Override
            public void run() {
                pp.onComplete();
            }
        };
        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                ts.cancel();
            }
        };

        TestHelper.race(r1, r2, Schedulers.single());
    }
}
项目:RxJava2Extensions    文件:FlowableFilterAsyncTest.java   
@Test
public void cancel() {
    final PublishProcessor<Boolean> pp = PublishProcessor.create();

    Flowable.range(1, 5)
    .compose(FlowableTransformers.filterAsync(new Function<Integer, Publisher<Boolean>>() {
        @Override
        public Publisher<Boolean> apply(Integer v) throws Exception {
            return pp;
        }
    }, 16))
    .test()
    .cancel();

    Assert.assertFalse(pp.hasSubscribers());
}
项目:RxJava2Extensions    文件:FlowableZipLatestTest.java   
@Test
public void cancelAfterOneBackpressured() {
    PublishProcessor<Integer> pp1 = PublishProcessor.create();
    PublishProcessor<Integer> pp2 = PublishProcessor.create();

    TestSubscriber<String> ts = new TestSubscriber<String>(1) {
        @Override
        public void onNext(String t) {
            super.onNext(t);
            cancel();
            onComplete();
        }
    };

    Flowables.zipLatest(pp1, pp2, toString2).subscribe(ts);
    ts.assertEmpty();

    pp1.onNext(1);

    ts.assertEmpty();

    pp2.onNext(2);

    ts.assertResult("[1, 2]");
}
项目:RxJava2Extensions    文件:FlowableZipLatestTest.java   
@Test
public void badSource() {
    List<Throwable> errors = TestHelper.trackPluginErrors();
    try {
        final PublishProcessor<Integer> pp1 = PublishProcessor.create();
        final Flowable<Integer> pp2 = new Flowable<Integer>() {
            @Override
            protected void subscribeActual(Subscriber<? super Integer> s) {
                BooleanSubscription bs1 = new BooleanSubscription();
                s.onSubscribe(bs1);

                BooleanSubscription bs2 = new BooleanSubscription();
                s.onSubscribe(bs2);

                Assert.assertFalse(bs1.isCancelled());
                Assert.assertTrue(bs2.isCancelled());
            }
        };

        Flowables.zipLatest(pp1, pp2, toString2).test();

        TestHelper.assertError(errors, 0, ProtocolViolationException.class);
    } finally {
        RxJavaPlugins.reset();
    }
}
项目:RxJava2Extensions    文件:FlowableMapAsyncTest.java   
@Test
public void cancel() {
    final PublishProcessor<Object> pp = PublishProcessor.create();

    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapAsync(new Function<Integer, Publisher<Object>>() {
        @Override
        public Publisher<Object> apply(Integer v) throws Exception {
            return pp;
        }
    }))
    .test()
    .cancel();

    Assert.assertFalse(pp.hasSubscribers());
}
项目:RxJava2Extensions    文件:FlowableCoalesceTest.java   
@Test
@SuppressWarnings("unchecked")
public void slowPathQueueUse() {
    final PublishProcessor<Integer> pp = PublishProcessor.create();

    TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>() {
        @Override
        public void onNext(List<Integer> t) {
            super.onNext(t);
            if (t.get(0) == 1) {
                pp.onNext(100);
            }
        }
    };

    pp.compose(FlowableTransformers.coalesce(listSupplier, listAdd)).subscribe(ts);

    pp.onNext(1);
    pp.onComplete();

    ts.assertResult(Arrays.asList(1), Arrays.asList(100));
}
项目:RxJava2Extensions    文件:FlowableCoalesceTest.java   
@Test
@SuppressWarnings("unchecked")
public void slowPathQueueUseCrash() {
    final PublishProcessor<Integer> pp = PublishProcessor.create();

    TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>() {
        @Override
        public void onNext(List<Integer> t) {
            super.onNext(t);
            if (t.get(0) == 1) {
                pp.onNext(100);
            }
        }
    };

    pp.compose(FlowableTransformers.coalesce(listSupplier, listAddCrash100)).subscribe(ts);

    pp.onNext(1);
    pp.onComplete();

    ts.assertFailure(IOException.class, Arrays.asList(1));
}
项目:AutoDispose    文件:AutoDisposeParallelFlowableTest.java   
@Test public void unbound_shouldStillPassValues() {
  TestSubscriber<Integer> firstSubscriber = new TestSubscriber<>();
  TestSubscriber<Integer> secondSubscriber = new TestSubscriber<>();
  PublishProcessor<Integer> source = PublishProcessor.create();
  //noinspection unchecked
  Subscriber<Integer>[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber};

  source
      .parallel(DEFAULT_PARALLELISM)
      .as(AutoDispose.<Integer>autoDisposable(ScopeProvider.UNBOUND))
      .subscribe(subscribers);

  source.onNext(1);
  source.onNext(2);
  firstSubscriber.assertValue(1);
  secondSubscriber.assertValue(2);
  firstSubscriber.dispose();
  secondSubscriber.dispose();
}
项目:AutoDispose    文件:AutoDisposeSubscriberTest.java   
@Test public void autoDispose_withMaybe_normal() {
  TestSubscriber<Integer> o = new TestSubscriber<>();
  PublishProcessor<Integer> source = PublishProcessor.create();
  MaybeSubject<Integer> lifecycle = MaybeSubject.create();
  Disposable d = source.as(AutoDispose.<Integer>autoDisposable(lifecycle))
      .subscribeWith(o);
  o.assertSubscribed();

  assertThat(source.hasSubscribers()).isTrue();
  assertThat(lifecycle.hasObservers()).isTrue();

  source.onNext(1);
  o.assertValue(1);

  source.onNext(2);
  source.onComplete();
  o.assertValues(1, 2);
  o.assertComplete();
  assertThat(d.isDisposed()).isFalse();   // Because it completes normally
  assertThat(source.hasSubscribers()).isFalse();
  assertThat(lifecycle.hasObservers()).isFalse();
}
项目:AutoDispose    文件:AutoDisposeSubscriberTest.java   
@Test public void autoDispose_withMaybe_interrupted() {
  PublishProcessor<Integer> source = PublishProcessor.create();
  MaybeSubject<Integer> lifecycle = MaybeSubject.create();
  TestSubscriber<Integer> o = source
          .as(AutoDispose.<Integer>autoDisposable(lifecycle))
          .test();
  o.assertSubscribed();

  assertThat(source.hasSubscribers()).isTrue();
  assertThat(lifecycle.hasObservers()).isTrue();

  source.onNext(1);
  o.assertValue(1);

  lifecycle.onSuccess(2);
  source.onNext(2);

  // No more events
  o.assertValue(1);

  // Unsubscribed
  assertThat(source.hasSubscribers()).isFalse();
  assertThat(lifecycle.hasObservers()).isFalse();
}
项目:AutoDispose    文件:AutoDisposeSubscriberTest.java   
@Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() {
  AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() {
    @Override public void accept(OutsideLifecycleException e) { }
  });
  BehaviorSubject<Integer> lifecycle = BehaviorSubject.create();
  LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle);
  PublishProcessor<Integer> source = PublishProcessor.create();
  TestSubscriber<Integer> o = source
          .as(AutoDispose.<Integer>autoDisposable(provider))
          .test();

  assertThat(source.hasSubscribers()).isFalse();
  assertThat(lifecycle.hasObservers()).isFalse();
  o.assertNoValues();
  o.assertNoErrors();
}
项目:AutoDispose    文件:AutoDisposeSubscriberTest.java   
@Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() {
  AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() {
    @Override public void accept(OutsideLifecycleException e) {
      // Noop
    }
  });
  BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0);
  lifecycle.onNext(1);
  lifecycle.onNext(2);
  lifecycle.onNext(3);
  LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle);
  PublishProcessor<Integer> source = PublishProcessor.create();
  TestSubscriber<Integer> o = source
          .as(AutoDispose.<Integer>autoDisposable(provider))
          .test();

  assertThat(source.hasSubscribers()).isFalse();
  assertThat(lifecycle.hasObservers()).isFalse();
  o.assertNoValues();
  o.assertNoErrors();
}
项目:AutoDispose    文件:AutoDisposeSubscriberTest.java   
@Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithExp() {
  AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() {
    @Override public void accept(OutsideLifecycleException e) {
      // Wrap in an IllegalStateException so we can verify this is the exception we see on the
      // other side
      throw new IllegalStateException(e);
    }
  });
  BehaviorSubject<Integer> lifecycle = BehaviorSubject.create();
  LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle);
  PublishProcessor<Integer> source = PublishProcessor.create();
  TestSubscriber<Integer> o = source
          .as(AutoDispose.<Integer>autoDisposable(provider))
          .test();

  o.assertNoValues();
  o.assertError(new Predicate<Throwable>() {
    @Override public boolean test(Throwable throwable) {
      return throwable instanceof IllegalStateException
          && throwable.getCause() instanceof OutsideLifecycleException;
    }
  });
}
项目:RxJava2Jdk9Interop    文件:FlowInteropTest.java   
@Test
public void flowableProcessorToFlowProcessor() {
    PublishProcessor<Integer> pp = PublishProcessor.create();

    Flow.Processor<Integer, Integer> fp = FlowInterop.toFlowProcessor(pp);

    FlowTestSubscriber<Integer> ts = new FlowTestSubscriber<>();
    fp.subscribe(ts);

    pp.onNext(1);
    pp.onNext(2);
    pp.onNext(3);
    pp.onNext(4);
    pp.onNext(5);
    pp.onComplete();

    ts.assertResult(1, 2, 3, 4, 5);
}
项目:RxJava2Jdk9Interop    文件:FlowInteropTest.java   
@Test
public void flowableProcessorToFlowProcessorError() {
    PublishProcessor<Integer> pp = PublishProcessor.create();

    Flow.Processor<Integer, Integer> fp = FlowInterop.toFlowProcessor(pp);

    FlowTestSubscriber<Integer> ts = new FlowTestSubscriber<>();
    fp.subscribe(ts);

    pp.onNext(1);
    pp.onNext(2);
    pp.onNext(3);
    pp.onNext(4);
    pp.onNext(5);
    pp.onError(new IOException());

    ts.assertFailure(IOException.class, 1, 2, 3, 4, 5);
}
项目:RxReplayingShare    文件:ReplayingShareFlowableTest.java   
@Test public void initialValueToNewSubscriberAfterUnsubscribe() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();

  subject.onNext("Foo");
  subscriber1.assertValues("Foo");
  subscriber1.dispose();

  TestSubscriber<String> subscriber2 = new TestSubscriber<>();
  flowable.subscribe(subscriber2);
  subscriber2.assertValues("Foo");
}
项目:RxReplayingShare    文件:ReplayingShareFlowableTest.java   
@Test public void valueMissedWhenNoSubscribers() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();
  subscriber1.dispose();

  subject.onNext("Foo");
  subscriber1.assertNoValues();

  TestSubscriber<String> subscriber2 = new TestSubscriber<>();
  flowable.subscribe(subscriber2);
  subscriber2.assertNoValues();
}
项目:RxReplayingShare    文件:ReplayingShareFlowableTest.java   
@SuppressWarnings("CheckReturnValue")
@Test public void fatalExceptionDuringReplayThrown() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  flowable.subscribe();
  subject.onNext("Foo");

  Consumer<String> brokenAction = new Consumer<String>() {
    @Override public void accept(String s) {
      throw new OutOfMemoryError("broken!");
    }
  };
  try {
    flowable.subscribe(brokenAction);
  } catch (OutOfMemoryError e) {
    assertEquals("broken!", e.getMessage());
  }
}
项目:RxReplayingShare    文件:ReplayingShareFlowableTest.java   
@Test public void backpressureHonoredWhenCached() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();

  subject.onNext("Foo");
  subscriber1.assertValues("Foo");

  TestSubscriber<String> subscriber2 = new TestSubscriber<>(0);
  flowable.subscribe(subscriber2);
  subscriber2.assertNoValues();

  subject.onNext("Bar"); // Replace the cached value...
  subscriber2.request(1); // ...and ensure new requests see it.
  subscriber2.assertValues("Bar");
}
项目:RxReplayingShare    文件:ReplayingShareFlowableTest.java   
@Test public void streamsDoNotShareInstances() {
  PublishProcessor<String> subjectA = PublishProcessor.create();
  Flowable<String> flowableA = subjectA.compose(ReplayingShare.<String>instance());
  TestSubscriber<String> subscriberA1 = new TestSubscriber<>();
  flowableA.subscribe(subscriberA1);

  PublishProcessor<String> subjectB = PublishProcessor.create();
  Flowable<String> flowableB = subjectB.compose(ReplayingShare.<String>instance());
  TestSubscriber<String> subscriberB1 = new TestSubscriber<>();
  flowableB.subscribe(subscriberB1);

  subjectA.onNext("Foo");
  subscriberA1.assertValues("Foo");
  subjectB.onNext("Bar");
  subscriberB1.assertValues("Bar");

  TestSubscriber<String> subscriberA2 = new TestSubscriber<>();
  flowableA.subscribe(subscriberA2);
  subscriberA2.assertValues("Foo");

  TestSubscriber<String> subscriberB2 = new TestSubscriber<>();
  flowableB.subscribe(subscriberB2);
  subscriberB2.assertValues("Bar");
}
项目:RxJava-Android-Samples    文件:DoubleBindingTextViewFragment.java   
@Override
public View onCreateView(
    LayoutInflater inflater, @Nullable ViewGroup container, @Nullable Bundle savedInstanceState) {
  View layout = inflater.inflate(R.layout.fragment_double_binding_textview, container, false);
  unbinder = ButterKnife.bind(this, layout);

  _resultEmitterSubject = PublishProcessor.create();

  _disposable =
      _resultEmitterSubject.subscribe(
          aFloat -> {
            _result.setText(String.valueOf(aFloat));
          });

  onNumberChanged();
  _number2.requestFocus();

  return layout;
}
项目:RxJava-Android-Samples    文件:NetworkDetectorFragment.java   
@Override
public void onStart() {
  super.onStart();

  publishProcessor = PublishProcessor.create();

  disposable =
      publishProcessor
          .startWith(getConnectivityStatus(getActivity()))
          .distinctUntilChanged()
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(
              online -> {
                if (online) {
                  log("You are online");
                } else {
                  log("You are offline");
                }
              });

  listenToNetworkConnectivity();
}
项目:RxShell    文件:HarvesterTest.java   
@Test
public void testUpstreamTerminated_output() {
    publisher.onComplete();
    OutputHarvester.Crop crop = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test().assertComplete().assertValueCount(1).values().get(0);
    assertThat(crop.isComplete, is(false));

    publisher = PublishProcessor.create();
    publisher.onError(new InterruptedException());
    crop = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test().assertComplete().assertValueCount(1).values().get(0);
    assertThat(crop.isComplete, is(false));
}
项目:RxShell    文件:HarvesterTest.java   
@Test
public void testUpstreamTerminated_error() {
    publisher.onComplete();
    ErrorHarvester.Crop crop = publisher.compose(harvesterFactory.forError(publisher, cmd)).test().assertComplete().assertValueCount(1).values().get(0);
    assertThat(crop.isComplete, is(false));

    publisher = PublishProcessor.create();
    publisher.onError(new InterruptedException());
    crop = publisher.compose(harvesterFactory.forError(publisher, cmd)).test().assertComplete().assertValueCount(1).values().get(0);
    assertThat(crop.isComplete, is(false));
}
项目:RxBus2    文件:RxBus.java   
synchronized Processor getProcessor(RxQueueKey key, boolean createIfMissing)
{
    // 1) look if key already has a publisher processor, if so, return it
    if (mProcessorKeys.containsKey(key))
        return mProcessorKeys.get(key);
    // 2) else, create a new one and put it into the map
    else if (createIfMissing)
    {
        Processor processor = PublishProcessor.create().toSerialized();
        mProcessorKeys.put(key, processor);
        return processor;
    }
    else
        return null;
}
项目:RxAndroid-Examples    文件:MainActivity.java   
@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
    ButterKnife.bind(this);

    resultPublisher = PublishProcessor.create();
    subscriber = resultPublisher.subscribe(aFloat -> {
        tvSum.setText("Sum = " + aFloat);
    });
    onNumberChanged();
}
项目:MVPtemplate    文件:RxBus.java   
public <T> Flowable<T> register(@NonNull Object tag) {
    List<FlowableProcessor> processors = mProcessorMapper.get(tag);
    if (null == processors) {
        processors = new ArrayList<FlowableProcessor>();
        mProcessorMapper.put(tag, processors);
    }
    FlowableProcessor<T> processor;
    processors.add(processor = PublishProcessor.create());
    return processor;
}
项目:rxtools    文件:IndexedFlowableList.java   
private IndexHolder(int index, List<T> internalList, WeakReference<PublishProcessor<Optional<T>>> previous, WeakReference<PublishProcessor<Optional<T>>> next)
{
    _index = index;
    _internalList = internalList;
    _previous = previous;
    _next = next;
}