Java 类io.reactivex.subjects.ReplaySubject 实例源码

项目:showroom-android    文件:RxFaker.java   
private static ReplaySubject<Faker> createFaker() {
    final ReplaySubject<Faker> subject = ReplaySubject.create();

    Observable.create(new ObservableOnSubscribe<Faker>() {
        @Override
        public void subscribe(ObservableEmitter<Faker> e) throws Exception {
            final Faker faker = new Faker();

            if (!e.isDisposed()) {
                e.onNext(faker);
                e.onComplete();
            }
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subject);

    return subject;
}
项目:GitHub    文件:ReplaySubjectExampleActivity.java   
private void doSomeWork() {

        ReplaySubject<Integer> source = ReplaySubject.create();

        source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4

        source.onNext(1);
        source.onNext(2);
        source.onNext(3);
        source.onNext(4);
        source.onComplete();

        /*
         * it will emit 1, 2, 3, 4 for second observer also as we have used replay
         */
        source.subscribe(getSecondObserver());

    }
项目:RxJava2-Android-Sample    文件:ReplaySubjectExampleActivity.java   
/**
 * ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。
 * 也有其它版本的ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据(原始Observable发射的)。
 *
 * 如果你把ReplaySubject当作一个观察者使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),
 * 这可能导致同时(非顺序)调用,这会违反Observable协议, 给Subject的结果增加了不确定性。
 */
private void doSomeWork() {

    ReplaySubject<Integer> source = ReplaySubject.create();

    source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4

    source.onNext(1);
    source.onNext(2);
    source.onNext(3);
    source.onNext(4);
    source.onComplete();

    /*
     * it will emit 1, 2, 3, 4 for second observer also as we have used replay
     */
    source.subscribe(getSecondObserver());

}
项目:RxJava2-Android-Samples    文件:ReplaySubjectExampleActivity.java   
private void doSomeWork() {

        ReplaySubject<Integer> source = ReplaySubject.create();

        source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4

        source.onNext(1);
        source.onNext(2);
        source.onNext(3);
        source.onNext(4);
        source.onComplete();

        /*
         * it will emit 1, 2, 3, 4 for second observer also as we have used replay
         */
        source.subscribe(getSecondObserver());

    }
项目:RxGroups    文件:SubscriptionProxyTest.java   
@Test public void testUnsubscribeBeforeEmit() {
  TestObserver<String> observer = new TestObserver<>();
  ReplaySubject<String> subject = ReplaySubject.create();
  SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);

  proxy.subscribe(observer);
  proxy.dispose();

  observer.assertNotComplete();
  observer.assertNoValues();

  subject.onNext("Avanti!");
  subject.onComplete();

  // disposable observables may not be resused in RxJava2
  observer = new TestObserver<>();
  proxy.subscribe(observer);
  observer.assertComplete();
  observer.assertValue("Avanti!");
}
项目:RxGroups    文件:SubscriptionProxyTest.java   
@Test public void shouldCacheResultsWhileUnsubscribedAndDeliverAfterResubscription() {
  TestObserver<String> observer = new TestObserver<>();
  ReplaySubject<String> subject = ReplaySubject.create();
  SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);

  proxy.subscribe(observer);
  proxy.dispose();

  observer.assertNoValues();

  subject.onNext("Avanti!");
  subject.onComplete();

  // disposable observables may not be resused in RxJava2
  observer = new TestObserver<>();
  proxy.subscribe(observer);

  observer.awaitTerminalEvent(3, TimeUnit.SECONDS);
  observer.assertValue("Avanti!");
}
项目:RxGroups    文件:SubscriptionProxyTest.java   
@Test public void shouldRedeliverSameResultsToDifferentSubscriber() {
  // Use case: When rotating an activity, ObservableManager will re-subscribe original request's
  // Observable to a new Observer, which is a member of the new activity instance. In this
  // case, we may want to redeliver any previous results (if the request is still being
  // managed by ObservableManager).
  TestObserver<String> observer = new TestObserver<>();
  ReplaySubject<String> subject = ReplaySubject.create();
  SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);

  proxy.subscribe(observer);

  subject.onNext("Avanti!");
  subject.onComplete();

  proxy.dispose();

  TestObserver<String> newSubscriber = new TestObserver<>();
  proxy.subscribe(newSubscriber);

  newSubscriber.awaitTerminalEvent(3, TimeUnit.SECONDS);
  newSubscriber.assertComplete();
  newSubscriber.assertValue("Avanti!");

  observer.assertComplete();
  observer.assertValue("Avanti!");
}
项目:Learning-RxJava    文件:Ch5_24.java   
public static void main(String[] args) {
    Subject<String> subject =
            ReplaySubject.create();
    subject.subscribe(s -> System.out.println("Observer 1: " +
            s));
    subject.onNext("Alpha");
    subject.onNext("Beta");
    subject.onNext("Gamma");
    subject.onComplete();
    subject.subscribe(s -> System.out.println("Observer 2: " +
            s));
}
项目:showroom-android    文件:RxFaker.java   
public static ReplaySubject<Faker> getInstance() {
    if (rxFaker == null) {
        rxFaker = createFaker();
    }

    return rxFaker;
}
项目:jobson    文件:JobManagerTest.java   
@Test
public void testSubmitJobEventListenersEchoStdoutWhenExecutorEchoesStdout() throws InterruptedException {
    final Subject<byte[]> stdoutSubject = ReplaySubject.create();
    final byte[] expectedStdoutBytes = generateRandomBytes();
    stdoutSubject.onNext(expectedStdoutBytes);

    final JobExecutor jobExecutor = MockJobExecutor.thatUses(stdoutSubject, Observable.never());
    final JobManager jobManager = createManagerWith(jobExecutor);

    final Semaphore s = new Semaphore(1);
    s.acquire();

    final JobEventListeners listeners = JobEventListeners.createStdoutListener(new Observer<byte[]>() {
        @Override
        public void onSubscribe(@NonNull Disposable disposable) {}

        @Override
        public void onNext(@NonNull byte[] bytes) {
            assertThat(bytes).isEqualTo(expectedStdoutBytes);
            s.release();
        }

        @Override
        public void onError(@NonNull Throwable throwable) {
            fail("Error from observable");
            s.release();
       }

        @Override
        public void onComplete() {}
    });

    jobManager.submit(STANDARD_VALID_REQUEST, listeners);

    if (!s.tryAcquire(1, SECONDS)) {
        fail("Timed out before any bytes received");
    }
}
项目:jobson    文件:JobManagerTest.java   
@Test
public void testSubmitJobEventListenersEchoStderrWhenExecutorEchoesStderr() throws InterruptedException {
    final Subject<byte[]> stderr = ReplaySubject.create();
    final byte[] stderrBytes = generateRandomBytes();
    stderr.onNext(stderrBytes);

    final JobExecutor jobExecutor = MockJobExecutor.thatUses(Observable.never(), stderr);
    final JobManager jobManager = createManagerWith(jobExecutor);

    final Semaphore s = new Semaphore(1);
    s.acquire();

    final JobEventListeners listeners = JobEventListeners.createStderrListener(new Observer<byte[]>() {
        @Override
        public void onSubscribe(@NonNull Disposable disposable) {}

        @Override
        public void onNext(@NonNull byte[] bytes) {
            assertThat(bytes).isEqualTo(stderrBytes);
            s.release();
        }

        @Override
        public void onError(@NonNull Throwable throwable) {
            fail("Error from observable");
            s.release();
        }

        @Override
        public void onComplete() {}
    });

    jobManager.submit(STANDARD_VALID_REQUEST, listeners);

    if (!s.tryAcquire(1, SECONDS)) {
        fail("Timed out before any bytes received");
    }
}
项目:RxGroups    文件:SubscriptionProxyTest.java   
@Test public void shouldKeepDeliveringEventsAfterResubscribed() {
  TestObserver<String> observer = new TestObserver<>();
  ReplaySubject<String> subject = ReplaySubject.create();
  SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);

  proxy.subscribe(observer);
  subject.onNext("Avanti 1");
  proxy.dispose();
  observer = new TestObserver<>();
  proxy.subscribe(observer);

  subject.onNext("Avanti!");

  observer.assertValues("Avanti 1", "Avanti!");
}
项目:arctor    文件:WaitViewReplayTransformer.java   
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
    final ReplaySubject<Notification<T>> subject = ReplaySubject.create();
    final DisposableObserver<Notification<T>> observer = upstream.materialize()
            .subscribeWith(new DisposableObserver<Notification<T>>() {
                @Override
                public void onComplete() {
                    subject.onComplete();
                }

                @Override
                public void onError(Throwable e) {
                    subject.onError(e);

                }
                @Override
                public void onNext(Notification<T> value) {
                    subject.onNext(value);
                }
            });

    return view
            .switchMap(new Function<Boolean, Observable<Notification<T>>>() {
                @Override
                public Observable<Notification<T>> apply(final Boolean flag) {
                    if (flag) {
                        return subject;
                    } else {
                        return Observable.empty();
                    }
                }
            })
            .doOnDispose(new Action() {
                @Override
                public void run() throws Exception {
                    observer.dispose();
                }
            })
            .dematerialize();
}
项目:RxShell    文件:RxShellTest.java   
@Before
public void setup() throws Exception {
    super.setup();
    sessionPub = ReplaySubject.create();
    sessionPub.onNext(rxProcessSession);
    when(rxProcess.open()).thenAnswer(invocation -> {
        when(rxProcessSession.waitFor()).thenReturn(Single.create(e -> waitForEmitter = e));
        return sessionPub.firstOrError();
    });

    cmdStream = new MockOutputStream(new MockOutputStream.Listener() {
        @Override
        public void onNewLine(String line) {
            if (line.equals("exit" + LineReader.getLineSeparator())) {
                try {
                    cmdStream.close();
                } catch (IOException e) {
                    Timber.e(e);
                } finally {
                    waitForEmitter.onSuccess(0);
                }
            }
        }

        @Override
        public void onClose() {

        }
    });
    outputStream = new MockInputStream();
    errorStream = new MockInputStream();

    when(rxProcessSession.input()).thenReturn(cmdStream);
    when(rxProcessSession.output()).thenReturn(outputStream);
    when(rxProcessSession.error()).thenReturn(errorStream);
    when(rxProcessSession.isAlive()).thenReturn(Single.create(e -> e.onSuccess(cmdStream.isOpen())));

    when(rxProcessSession.destroy()).then(invocation -> Completable.create(e -> {
        cmdStream.close();
        waitForEmitter.onSuccess(1);
        e.onComplete();
    }));
}
项目:Reactive-Programming-With-Java-9    文件:Demo_ReplaySubject.java   
public static void main(String[] args) {
    // TODO Auto-generated method stub
    Observer<Long> observer=new Observer<Long>() {

        @Override
        public void onComplete() {
            // TODO Auto-generated method stub
            System.out.println("It's Done");

        }

        @Override
        public void onError(Throwable throwable) {
            // TODO Auto-generated method stub
            throwable.printStackTrace();

        }

        @Override
        public void onNext(Long value) {
            // TODO Auto-generated method stub
            System.out.println(":"+value);
        }

        @Override
        public void onSubscribe(Disposable disposable) {
            // TODO Auto-generated method stub
            System.out.println("onSubscribe");

        }
    };

    ReplaySubject<Long> replaySubject=ReplaySubject.create();
    replaySubject.onNext(1l);
    replaySubject.onNext(2l);
    replaySubject.subscribe(observer);
    replaySubject.onNext(10l);
    replaySubject.onComplete();


}
项目:RxBusLib    文件:SubscriberReplayEvent.java   
@Override
protected final void initObservable() {
    subject = ReplaySubject.create();
    subject.observeOn(EventThread.getScheduler(observeThread))
            .subscribeOn(EventThread.getScheduler(subscribeThread));
}
项目:clustercode    文件:ExternalProcessServiceImpl.java   
private void captureOutput(Consumer<Observable<String>> observer, InputStream stream) {
    Subject<Object> subject = ReplaySubject.create().toSerialized();
    readStreamAsync(subject, stream);
    observer.accept(subject.ofType(String.class)
                           .observeOn(Schedulers.io()));
}
项目:webtrekk-android-sdk    文件:HttpServer.java   
public Subject<String> getSubject(){
    mSubject =  ReplaySubject.create();
    return mSubject;
}
项目:RHub    文件:RxJava2Proxies.java   
public static RxJava2SubjProxy replaySubjectProxy() {
    return new RxJava2SubjProxy(ReplaySubject.create(), Roxy.TePolicy.PASS);
}
项目:RHub    文件:RxJava2Proxies.java   
public static RxJava2SubjProxy serializedReplaySubjectProxy() {
    return new RxJava2SubjProxy(ReplaySubject.create().toSerialized(), Roxy.TePolicy.PASS);
}
项目:RHub    文件:RxJava2Proxies.java   
public static RxJava2SubjProxy safeReplaySubjectProxy() {
    return new RxJava2SubjProxy(ReplaySubject.create(), Roxy.TePolicy.WRAP);
}
项目:RHub    文件:RxJava2Proxies.java   
public static RxJava2SubjProxy safeSerializedReplaySubjectProxy() {
    return new RxJava2SubjProxy(ReplaySubject.create().toSerialized(), Roxy.TePolicy.WRAP);
}
项目:GitHubAndroidOAuth    文件:OAuthActivity.java   
@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);

    Log.d(GitHubOAuth.TAG, "OAuthActivity: onCreate "
                           + "savedInstanceState = " + savedInstanceState
                           + ", getIntent() = " + getIntent());

    setContentView(R.layout.progress_dialog);

    Icepick.restoreInstanceState(this, savedInstanceState);

    if (mGitHubOAuth == null) {
        mGitHubOAuth = getIntent().getParcelableExtra(ARG_KEY_AUTH);
    }

    // init reference
    if (sOAuthResultSubject == null || sOAuthResultSubject.get() == null) {
        mOAuthResultSubject = ReplaySubject.create();
        sOAuthResultSubject = new WeakReference<>(mOAuthResultSubject);
    } else {
        mOAuthResultSubject = sOAuthResultSubject.get();
    }

    if (isBrowserIntent(getIntent())) {
        Log.d(GitHubOAuth.TAG, "OAuthActivity: Got browser intent in new created instance.");

        Pair<OAuthResult, String> result = getOAuthResult(getIntent());
        mOAuthResultSubject.onNext(result);
        finish();
        return;
    } else if (mGitHubOAuth == null) {
        authFail(GitHubOAuth.ERROR_UNKNOWN_ERROR, "Invalid launch intent");
        return;
    }

    mOAuthPresenter = new OAuthPresenter(mGitHubOAuth);
    mOAuthPresenter.attach(this);

    Log.d(GitHubOAuth.TAG, "OAuthActivity: onCreate mState = " + mState);
    switch (mState) {
        case STATE_SEND_REQ:
            // recreated after send request, check `sOAuthResultSubject`
            mState = STATE_WAIT_CODE;
            mOAuthPresenter.waitCode(mOAuthResultSubject);
            break;
        case STATE_CALL_API:
            // recreated after got code, because code can only be used once, so we fail
            authFail(GitHubOAuth.ERROR_UNKNOWN_ERROR, "Activity killed when call api");
            break;
        case STATE_NOT_REQ:
            handleLaunchIntent();
            break;
        default:
            // we may got killed at STATE_WAIT_CODE, it's too complicated to handle, just fail
            authFail(GitHubOAuth.ERROR_UNKNOWN_ERROR, "un-handled state " + mState);
            break;
    }
}
项目:rxjava2-extras    文件:FlowableFetchPagesByRequest.java   
public static <T> Flowable<T> create(final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch,
        final long start, final int maxConcurrency) {
    return Flowable.defer(new Callable<Flowable<T>>() {
        @Override
        public Flowable<T> call() throws Exception {
            // need a ReplaySubject because multiple requests can come
            // through before concatEager has established subscriptions to
            // the subject
            final ReplaySubject<Flowable<T>> subject = ReplaySubject.create();
            final AtomicLong position = new AtomicLong(start);
            LongConsumer request = new LongConsumer() {
                @Override
                public void accept(final long n) throws Exception {
                    final long pos = position.getAndAdd(n);
                    if (SubscriptionHelper.validate(n)) {
                        Flowable<T> flowable;
                        try {
                            flowable = fetch.apply(pos, n);
                        } catch (Throwable e) {
                            Exceptions.throwIfFatal(e);
                            subject.onError(e);
                            return;
                        }
                        // reduce allocations by incorporating the onNext
                        // and onComplete actions into the mutable count
                        // object
                        final Count count = new Count(subject, n);
                        flowable = flowable //
                                .doOnNext(count) //
                                .doOnComplete(count);
                        subject.onNext(flowable);
                    }
                }
            };
            return Flowable //
                    .concatEager(subject.serialize() //
                            .toFlowable(BackpressureStrategy.BUFFER), maxConcurrency, 128) //
                    .doOnRequest(request);
        }
    });
}
项目:RxDelay    文件:DelayReplayObservableTransformer.java   
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
    return upstream.compose(new DelayObservableTransformer<>(pauseLifecycle, ReplaySubject.<T>create()));
}
项目:RxDelay    文件:DelayLatestObservableTransformer.java   
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
    return upstream.compose(new DelayObservableTransformer<>(pauseLifecycle, ReplaySubject.<T>createWithSize(1)));
}
项目:Reactive-Android-Programming    文件:Sandbox.java   
private static void demo4() throws InterruptedException {
    Subject<String> subject = ReplaySubject.create();

    Observable.interval(0, 1, TimeUnit.SECONDS)
            .map(Objects::toString)
            .subscribe(subject);

    Thread.sleep(3100);

    subject.subscribe(v -> log(v));

}