Java 类io.reactivex.observables.ConnectableObservable 实例源码

项目:GitHub    文件:ReplayExampleActivity.java   
private void doSomeWork() {

        PublishSubject<Integer> source = PublishSubject.create();
        ConnectableObservable<Integer> connectableObservable = source.replay(3); // bufferSize = 3 to retain 3 values to replay
        connectableObservable.connect(); // connecting the connectableObservable

        connectableObservable.subscribe(getFirstObserver());

        source.onNext(1);
        source.onNext(2);
        source.onNext(3);
        source.onNext(4);
        source.onComplete();

        /*
         * it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
         */
        connectableObservable.subscribe(getSecondObserver());

    }
项目:RxJava2-Android-Sample    文件:ConnectableExampleActivity.java   
private void doSomeWork() {

        Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS).take(6);

        //使用publish操作符将普通Observable转换为可连接的Observable
        ConnectableObservable<Long> connectableObservable = observable
//                .publish();
                .replay(5);
        //第一个订阅者订阅,不会开始发射数据
        connectableObservable
                .compose(Utils.<Long>ioToMain())
                .subscribe(getFirstObserver());
        //如果不调用connect方法,connectableObservable则不会发射数据
        //即使没有任何订阅者订阅它,你也可以使用connect方法让一个Observable开始发射数据(或者开始生成待发射的数据)。
        //这样,你可以将一个”冷”的Observable变为”热”的。
        connectableObservable.connect();
        //第二个订阅者延迟2s订阅,这将导致丢失前面2s内发射的数据
        connectableObservable.delaySubscription(2, TimeUnit.SECONDS)//0,1数据丢失
                .compose(Utils.<Long>ioToMain())
                .subscribe(getSecondObserver());
    }
项目:RxJava2-Android-Sample    文件:ReplayExampleActivity.java   
private void doSomeWork() {

        PublishSubject<Integer> source = PublishSubject.create();
        ConnectableObservable<Integer> connectableObservable = source.replay(3); // bufferSize = 3 to retain 3 values to replay
        connectableObservable.connect(); // connecting the connectableObservable

        connectableObservable.subscribe(getFirstObserver());

        source.onNext(1);
        source.onNext(2);
        source.onNext(3);
        source.onNext(4);
        source.onComplete();

        /*
         * it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
         */
        connectableObservable.subscribe(getSecondObserver());

    }
项目:RxJava2-Android-Samples    文件:ReplayExampleActivity.java   
private void doSomeWork() {

        PublishSubject<Integer> source = PublishSubject.create();
        ConnectableObservable<Integer> connectableObservable = source.replay(3); // bufferSize = 3 to retain 3 values to replay
        connectableObservable.connect(); // connecting the connectableObservable

        connectableObservable.subscribe(getFirstObserver());

        source.onNext(1);
        source.onNext(2);
        source.onNext(3);
        source.onNext(4);
        source.onComplete();

        /*
         * it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
         */
        connectableObservable.subscribe(getSecondObserver());

    }
项目:loklak_wok_android    文件:TweetPostingFragment.java   
private void postTextOnlyTweet(String status) {
    mProgressDialog.show();
    ConnectableObservable<StatusUpdate> observable =
            mTwitterApi.postTweet(status, null, mLatitude, mLongitude)
            .subscribeOn(Schedulers.io())
            .publish();

    Disposable postingDisposable = observable
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::onSuccessfulTweetPosting, this::onErrorTweetPosting);
    mCompositeDisposable.add(postingDisposable);

    Disposable crossPostingDisposable = observable
            .flatMap(this::pushTweetToLoklak)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    push -> Log.e(LOG_TAG, push.getStatus()),
                    t -> Log.e(LOG_TAG, "Cross posting failed: " + t.toString())
            );
    mCompositeDisposable.add(crossPostingDisposable);

    Disposable publishDisposable = observable.connect();
    mCompositeDisposable.add(publishDisposable);
}
项目:Learning-RxJava    文件:Ch5_6.java   
public static void main(String[] args) {
    ConnectableObservable<Integer> threeRandoms =
            Observable.range(1, 3)
                    .map(i -> randomInt()).publish();

    //Observer 1 - print each random integer
    threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));

    //Observer 2 - sum the random integers, then print
    threeRandoms.reduce(0, (total, next) -> total + next)
            .subscribe(i -> System.out.println("Observer 2: " + i));

    threeRandoms.connect();
}
项目:Learning-RxJava    文件:Ch5_5.java   
public static void main(String[] args) {
    ConnectableObservable<Integer> threeRandoms =
            Observable.range(1, 3)
                    .map(i -> randomInt()).publish();
    threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
    threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));
    threeRandoms.connect();
}
项目:Learning-RxJava    文件:Ch5_2.java   
public static void main(String[] args) {
    ConnectableObservable<Integer> threeIntegers =
            Observable.range(1, 3).publish();
    threeIntegers.subscribe(i -> System.out.println("Observer One:" + i));
    threeIntegers.subscribe(i -> System.out.println("Observer Two:" + i));
    threeIntegers.connect();
}
项目:Learning-RxJava    文件:Ch5_11.java   
public static void main(String[] args) {

        ConnectableObservable<Integer> threeRandoms =
                Observable.range(1, 3)
                        .map(i -> randomInt()).publish();

        threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
        threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));

        threeRandoms.connect();
    }
项目:Learning-RxJava    文件:Ch5_4.java   
public static void main(String[] args) {
    ConnectableObservable<Integer> threeInts =
            Observable.range(1, 3).publish();
    Observable<Integer> threeRandoms = threeInts.map(i ->
            randomInt());
    threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
    threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));
    threeInts.connect();
}
项目:Learning-RxJava    文件:Ch5_12.java   
public static void main(String[] args) {

        ConnectableObservable<Integer> threeRandoms =
                Observable.range(1, 3)
                        .map(i -> randomInt()).publish();

//Observer 1 - print each random integer
        threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));

//Observer 2 - sum the random integers, then print
        threeRandoms.reduce(0, (total, next) -> total + next)
                .subscribe(i -> System.out.println("Observer 2: " + i));

        threeRandoms.connect();
    }
项目:Learning-RxJava    文件:Ch2_19.java   
public static void main(String[] args) {
        ConnectableObservable<Long> seconds =
                Observable.interval(1, TimeUnit.SECONDS).publish();
//observer 1
        seconds.subscribe(l -> System.out.println("Observer 1: " + l));
        seconds.connect();
//sleep 5 seconds
        sleep(5000);
//observer 2
        seconds.subscribe(l -> System.out.println("Observer 2: " + l));
//sleep 5 seconds
        sleep(5000);
    }
项目:Learning-RxJava    文件:Ch2_14.java   
public static void main(String[] args) {
        ConnectableObservable<String> source =
                Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                        .publish();
//Set up observer 1
        source.subscribe(s -> System.out.println("Observer 1: " + s));
//Set up observer 2
        source.map(String::length)
                .subscribe(i -> System.out.println("Observer 2: " + i));
//Fire!
        source.connect();
    }
项目:Spring-5.0-Cookbook    文件:EmployeeRxJavaServiceImpl.java   
@Override
public ConnectableObservable<String> freeFlowEmps() {
     List<String> rosterNames = new ArrayList<>();
     Function<Employee, String> familyNames = (emp) -> emp.getLastName().toUpperCase();
     ConnectableObservable<String> flowyNames = Observable.fromIterable(employeeDaoImpl.getEmployees()).map(familyNames).cache().publish();

     flowyNames.subscribe(System.out::println);
     flowyNames.subscribe((name) ->{
         rosterNames.add(name);
     }); 
     System.out.println(rosterNames);
    return flowyNames;
}
项目:RxCommand    文件:RxCommand.java   
/**
 * If the receiver is enabled, this method will:
 * <p>
 * 1. Invoke the `func` given at the time of creation.
 * 2. Multicast the returned observable.
 * 3. Send the multicasted observable on {@link #executionObservables()}.
 * 4. Subscribe (connect) to the original observable on the main thread.
 *
 * @param input The input value to pass to the receiver's `func`. This may be null.
 * @return the multicasted observable, after subscription. If the receiver is not
 * enabled, returns a observable that will send an error.
 */
@MainThread
public final Observable<T> execute(@Nullable Object input) {
    boolean enabled = mImmediateEnabled.blockingFirst();
    if (!enabled) {
        return Observable.error(new IllegalStateException("The command is disabled and cannot be executed"));
    }
    try {
        Observable<T> observable = mFunc.apply(input);
        if (observable == null) {
            throw new RuntimeException(String.format("null Observable returned from observable func for value %s", input));
        }

        // This means that `executing` and `enabled` will send updated values before
        // the observable actually starts performing work.
        final ConnectableObservable<T> connection = observable
                .subscribeOn(AndroidSchedulers.mainThread())
                .replay();

        mAddedExecutionObservableSubject.onNext(connection);
        connection.connect();
        return connection;
    } catch (Exception e) {
        e.printStackTrace();
        return Observable.error(e);
    }
}
项目:RxJava2-Android-Sample    文件:ConnectableExampleActivity.java   
private void testRefCount() {
    Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS).take(6);
    ConnectableObservable<Long> connectableObservable = observable.publish();
    connectableObservable.connect();
    Observable<Long> longObservable = connectableObservable.refCount();
    longObservable.delaySubscription(2, TimeUnit.SECONDS)
            .compose(Utils.<Long>ioToMain())
            .subscribe(getFirstObserver());
    longObservable.compose(Utils.<Long>ioToMain()).subscribe(getSecondObserver());
}
项目:gigreminder    文件:SyncRepositoryTest.java   
private void checkSyncStartAndFinishEvents(ConnectableObservable<BaseEvent> syncEventBus) {
    syncEventBus
            .test()
            .awaitCount(2, () -> {
            }, MAX_TIMEOUT_MILLIS)
            .assertNoErrors()
            .assertValueAt(0, event -> {
                assertThat(event).isInstanceOf(SyncStartEvent.class);
                return true;
            })
            .assertValueAt(1, event -> {
                assertThat(event).isInstanceOf(SyncFinishEvent.class);
                return true;
            });
}
项目:loklak_wok_android    文件:TweetPostingFragment.java   
private void postImageAndTextTweet(List<Observable<String>> imageIdObservables, String status) {
    mProgressDialog.show();
    ConnectableObservable<StatusUpdate> observable = Observable.zip(
            imageIdObservables,
            mediaIdArray -> {
                String mediaIds = "";
                for (Object mediaId : mediaIdArray) {
                    mediaIds = mediaIds + String.valueOf(mediaId) + ",";
                }
                return mediaIds.substring(0, mediaIds.length() - 1);
            })
            .flatMap(imageIds -> mTwitterApi.postTweet(status, imageIds, mLatitude, mLongitude))
            .subscribeOn(Schedulers.io())
                .publish();

    Disposable postingDisposable = observable
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::onSuccessfulTweetPosting, this::onErrorTweetPosting);
    mCompositeDisposable.add(postingDisposable);

    Disposable crossPostingDisposable = observable
            .flatMap(this::pushTweetToLoklak)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    push -> {},
                    t -> Log.e(LOG_TAG, "Cross posting failed: " + t.toString())
            );
    mCompositeDisposable.add(crossPostingDisposable);

    Disposable publishDisposable = observable.connect();
    mCompositeDisposable.add(publishDisposable);
}
项目:loklak_wok_android    文件:TweetHarvestingFragment.java   
private void displayAndPostScrapedData() {
    progressBar.setVisibility(View.VISIBLE);
    ConnectableObservable<ScrapedData> observable = Observable.interval(4, TimeUnit.SECONDS)
            .flatMap(this::getSuggestionsPeriodically)
            .flatMap(query -> {
                mSuggestionQuerries.add(query);
                return getScrapedTweets(query);
            })
            .retry(2)
            .publish();

    Disposable viewDisposable = observable
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    this::displayScrapedData,
                    this::setNetworkErrorView
            );
    mCompositeDisposable.add(viewDisposable);

    Disposable pushDisposable = observable
            .flatMap(this::pushScrapedData)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    push -> {
                        mHarvestedTweets += push.getRecords();
                        harvestedTweetsCountTextView.setText(String.valueOf(mHarvestedTweets));
                    },
                    throwable -> Log.e(LOG_TAG, throwable.toString())
            );
    mCompositeDisposable.add(pushDisposable);

    Disposable publishDisposable = observable.connect();
    mCompositeDisposable.add(publishDisposable);
}
项目:reark    文件:RepositoriesViewModel.java   
@Override
public void subscribeToDataStoreInternal(@NonNull final CompositeDisposable compositeDisposable) {
    checkNotNull(compositeDisposable);
    Log.v(TAG, "subscribeToDataStoreInternal");

    ConnectableObservable<DataStreamNotification<GitHubRepositorySearch>> repositorySearchSource =
            searchString
                    .debounce(SEARCH_INPUT_DELAY, TimeUnit.MILLISECONDS)
                    .distinctUntilChanged()
                    .filter(value -> value.length() > 2)
                    .doOnNext(value -> Log.d(TAG, "Searching with: " + value))
                    .switchMap(getGitHubRepositorySearch::call)
                    .publish();

    compositeDisposable.add(repositorySearchSource
            .map(toProgressStatus())
            .doOnNext(progressStatus -> Log.d(TAG, "Progress status: " + progressStatus.name()))
            .subscribe(this::setNetworkStatusText));

    compositeDisposable.add(repositorySearchSource
            .filter(DataStreamNotification::isOnNext)
            .map(DataStreamNotification::getValue)
            .map(GitHubRepositorySearch::getItems)
            .flatMap(toGitHubRepositoryList())
            .doOnNext(list -> Log.d(TAG, "Publishing " + list.size() + " repositories from the ViewModel"))
            .subscribe(repositories::onNext));

    compositeDisposable.add(repositorySearchSource.connect());
}
项目:reark    文件:RepositoriesViewModel.java   
@Override
public void subscribeToDataStoreInternal(@NonNull final CompositeDisposable compositeDisposable) {
    checkNotNull(compositeDisposable);
    Log.v(TAG, "subscribeToDataStoreInternal");

    ConnectableObservable<DataStreamNotification<GitHubRepositorySearch>> repositorySearchSource =
            searchString
                    .debounce(SEARCH_INPUT_DELAY, TimeUnit.MILLISECONDS)
                    .distinctUntilChanged()
                    .filter(value -> value.length() > 2)
                    .doOnNext(value -> Log.d(TAG, "Searching with: " + value))
                    .switchMap(getGitHubRepositorySearch::call)
                    .publish();

    compositeDisposable.add(repositorySearchSource
            .map(toProgressStatus())
            .doOnNext(progressStatus -> Log.d(TAG, "Progress status: " + progressStatus.name()))
            .subscribe(this::setNetworkStatusText));

    compositeDisposable.add(repositorySearchSource
            .filter(DataStreamNotification::isOnNext)
            .map(DataStreamNotification::getValue)
            .map(GitHubRepositorySearch::getItems)
            .flatMap(toGitHubRepositoryList())
            .doOnNext(list -> Log.d(TAG, "Publishing " + list.size() + " repositories from the ViewModel"))
            .subscribe(repositories::onNext));

    compositeDisposable.add(repositorySearchSource.connect());
}
项目:RxJava2Debug    文件:ObservableOnAssemblyConnectable.java   
ObservableOnAssemblyConnectable(ConnectableObservable<T> source) {
    this.source = source;
    this.assembled = new RxJavaAssemblyException();
}
项目:rxjavatraining    文件:HotVsColdObservableTest.java   
@Test
public void testHotObservable() throws Exception {

    ConnectableObservable<Long> connectableObservable = Observable.interval(1,
            TimeUnit.SECONDS,
            Schedulers.from(executorService)).publish();

    connectableObservable.subscribe(x -> System.out.println("L1:" + x));

    Thread.sleep(2000);

    Disposable disposable = connectableObservable.connect(); //Let us begin

    connectableObservable.subscribe(x -> System.out.println("L2:" + x));

    Thread.sleep(2000);

    connectableObservable.subscribe(x -> System.out.println("L3:" + x));

    disposable.dispose();

    Thread.sleep(10000);
}
项目:RxUi    文件:MainPresenter.java   
@NonNull
Disposable bind(MainView view) {
    final CompositeDisposable disposable = new CompositeDisposable();

    Observable<String> login = view.login().share();
    Observable<String> password = view.password().share();

    // Boolean is valid/invalid flag.
    ConnectableObservable<Triplet<String, String, Boolean>> credentials = Observable
            .combineLatest(login, password, (l, p) -> Triplet.with(l, p, !l.isEmpty() && !p.isEmpty()))
            .publish();

    Observable<Object> signInEnable = credentials
            .filter(creds -> creds.getValue2())
            .map(enable -> new Object());

    Observable<Object> signInDisable = credentials
            .filter(creds -> !creds.getValue2())
            .map(disable -> new Object());

    // You can use static import for RxUi.bind()
    disposable.add(RxUi.bind(signInEnable, view.singInEnable()));
    disposable.add(RxUi.bind(signInDisable, view.singInDisable()));

    Observable<Object> signInResult = view
            .signInClicks()
            .withLatestFrom(credentials, (click, creds) -> creds.removeFrom2()) // Leave only login and password.
            .switchMap(loginAndPassword -> authService
                    .signIn(loginAndPassword.getValue0(), loginAndPassword.getValue1())
                    .subscribeOn(ioScheduler)) // "API request".
            .share();

    disposable.add(credentials.connect());

    Observable<Success> signInSuccess = signInResult
            .filter(it -> it instanceof Success)
            .cast(Success.class);

    Observable<Failure> signInFailure = signInResult
            .filter(it -> it instanceof Failure)
            .cast(Failure.class);

    // You can use static import for RxUi.bind()
    disposable.add(RxUi.bind(signInSuccess, view.signInSuccess()));
    disposable.add(RxUi.bind(signInFailure, view.signInFailure()));

    return disposable;
}
项目:RxJava2Extensions    文件:ConnectableObservableValidator.java   
ConnectableObservableValidator(ConnectableObservable<T> source, PlainConsumer<ProtocolNonConformanceException> onViolation) {
    this.source = source;
    this.onViolation = onViolation;
}
项目:RxJava2Extensions    文件:ObservableOnAssemblyConnectable.java   
ObservableOnAssemblyConnectable(ConnectableObservable<T> source) {
    this.source = source;
    this.assembled = new RxJavaAssemblyException();
}
项目:RxJava2Extensions    文件:RxJavaProtocolValidatorTest.java   
@Test
public void connectableObservable() {
    ConnectableObservable<Integer> source = new ConnectableObservable<Integer>() {

        @Override
        protected void subscribeActual(Observer<? super Integer> s) {
            s.onComplete();
            s.onError(null);
            s.onError(new IOException());
            s.onNext(null);
            s.onNext(1);
            s.onSubscribe(null);
            s.onSubscribe(Disposables.empty());
            s.onSubscribe(Disposables.empty());
            s.onComplete();
            s.onNext(2);
        }

        @Override
        public void connect(Consumer<? super Disposable> connection) {
        }
    };

    RxJavaProtocolValidator.setOnViolationHandler(this);
    Assert.assertSame(this, RxJavaProtocolValidator.getOnViolationHandler());

    SavedHooks h = RxJavaProtocolValidator.enableAndChain();
    Assert.assertTrue(RxJavaProtocolValidator.isEnabled());

    try {
        Observable.just(1).test().assertResult(1);
        Observable.empty().test().assertResult();
        Observable.error(new IOException()).test().assertFailure(IOException.class);
        TestHelper.checkDisposed(RxJavaPlugins.onAssembly(PublishSubject.create()));

        ConnectableObservable<Integer> c = RxJavaPlugins.onAssembly(source);

        c.test();

        c.connect();

        Assert.assertEquals(15, errors.size());
        TestHelper.assertError(errors, 0, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 1, NullOnErrorParameterException.class);
        TestHelper.assertError(errors, 2, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 3, MultipleTerminationsException.class);
        TestHelper.assertError(errors, 4, OnSubscribeNotCalledException.class);
        Assert.assertTrue("" + errors.get(4).getCause(), errors.get(4).getCause() instanceof IOException);
        TestHelper.assertError(errors, 5, MultipleTerminationsException.class);
        TestHelper.assertError(errors, 6, NullOnNextParameterException.class);
        TestHelper.assertError(errors, 7, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 8, OnNextAfterTerminationException.class);
        TestHelper.assertError(errors, 9, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 10, OnNextAfterTerminationException.class);
        TestHelper.assertError(errors, 11, NullOnSubscribeParameterException.class);
        TestHelper.assertError(errors, 12, MultipleOnSubscribeCallsException.class);
        TestHelper.assertError(errors, 13, MultipleTerminationsException.class);
        TestHelper.assertError(errors, 14, OnNextAfterTerminationException.class);
    } finally {
        h.restore();
        RxJavaProtocolValidator.setOnViolationHandler(null);
    }
}
项目:RxGroups    文件:SubscriptionProxy.java   
private SubscriptionProxy(Observable<T> sourceObservable, Action onTerminate) {
  final ConnectableObservable<T> replay = sourceObservable.replay();
  sourceDisposable = replay.connect();
  proxy = replay.doOnTerminate(onTerminate);
  disposableList = new CompositeDisposable(sourceDisposable);
}
项目:projectBlue    文件:PlaybackEngine.java   
private ConnectableObservable<PositionedPlaybackFile> startPlayback(PreparedPlayableFileQueue preparedPlaybackQueue, final long filePosition) throws IOException {
    if (playbackSubscription != null)
        playbackSubscription.dispose();

    activePlayer = playbackBootstrapper.startPlayback(preparedPlaybackQueue, filePosition);
    isPlaying = true;

    final ConnectableObservable<PositionedPlaybackFile> observable = activePlayer.observe();

    playbackSubscription = observable.subscribe(
        p -> {
            isPlaying = true;
            positionedPlaybackFile = p;

            if (onPlayingFileChanged != null)
                onPlayingFileChanged.onPlayingFileChanged(p);

            saveStateToLibrary(p);
        },
        e -> {
            if (e instanceof PreparationException) {
                final PreparationException preparationException =
                    (PreparationException)e;

                saveStateToLibrary(
                    new PositionedPlaybackFile(
                        new EmptyPlaybackHandler(0),
                        preparationException.getPositionedFile()));
            }

            if (onPlaylistError != null)
                onPlaylistError.runWith(e);
        },
        () -> {
            isPlaying = false;
            positionedPlaybackFile = null;
            activePlayer = null;

            changePosition(0, 0)
                .then(positionedFile -> {
                    if (onPlaylistReset != null)
                        onPlaylistReset.onPlaylistReset(positionedFile);

                    if (onPlaybackCompleted != null)
                        onPlaybackCompleted.onPlaybackCompleted();

                    return null;
                });
        });

    observable.firstElement()
        .subscribe(
            p -> {
                if (onPlaybackStarted != null)
                    onPlaybackStarted.onPlaybackStarted(p);
            },
            e -> {});

    return observable;
}
项目:projectBlue    文件:ActivePlayer.java   
@Override
public ConnectableObservable<PositionedPlaybackFile> observe() {
    return observableProxy;
}
项目:cyclops    文件:ObservableKind.java   
@CheckReturnValue
@SchedulerSupport("none")
public ConnectableObservable<T> publish() {
    return boxed.publish();
}
项目:cyclops    文件:ObservableKind.java   
@CheckReturnValue
@SchedulerSupport("none")
public ConnectableObservable<T> replay() {
    return boxed.replay();
}
项目:cyclops    文件:ObservableKind.java   
@CheckReturnValue
@SchedulerSupport("none")
public ConnectableObservable<T> replay(int bufferSize) {
    return boxed.replay(bufferSize);
}
项目:cyclops    文件:ObservableKind.java   
@CheckReturnValue
@SchedulerSupport("io.reactivex:computation")
public ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit) {
    return boxed.replay(bufferSize, time, unit);
}
项目:cyclops    文件:ObservableKind.java   
@CheckReturnValue
@SchedulerSupport("custom")
public ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler) {
    return boxed.replay(bufferSize, time, unit, scheduler);
}
项目:cyclops    文件:ObservableKind.java   
@CheckReturnValue
@SchedulerSupport("custom")
public ConnectableObservable<T> replay(int bufferSize, Scheduler scheduler) {
    return boxed.replay(bufferSize, scheduler);
}
项目:cyclops    文件:ObservableKind.java   
@CheckReturnValue
@SchedulerSupport("io.reactivex:computation")
public ConnectableObservable<T> replay(long time, TimeUnit unit) {
    return boxed.replay(time, unit);
}
项目:cyclops    文件:ObservableKind.java   
@CheckReturnValue
@SchedulerSupport("custom")
public ConnectableObservable<T> replay(long time, TimeUnit unit, Scheduler scheduler) {
    return boxed.replay(time, unit, scheduler);
}
项目:cyclops    文件:ObservableKind.java   
@CheckReturnValue
@SchedulerSupport("custom")
public ConnectableObservable<T> replay(Scheduler scheduler) {
    return boxed.replay(scheduler);
}
项目:Learning-RxJava    文件:Ch5_10.java   
public static void main(String[] args) {

        ConnectableObservable<Integer> threeInts =
                Observable.range(1, 3).publish();

        Observable<Integer> threeRandoms = threeInts.map(i ->
                randomInt());

        threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
        threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));

        threeInts.connect();
    }