Java 类io.reactivex.subjects.UnicastSubject 实例源码

项目:Learning-RxJava    文件:Ch5_27.java   
public static void main(String[] args) {
        Subject<String> subject =
                UnicastSubject.create();
        Observable.interval(300, TimeUnit.MILLISECONDS)
                .map(l -> ((l + 1) * 300) + " milliseconds")
                .subscribe(subject);
        sleep(2000);
//multicast to support multiple Observers
        Observable<String> multicast =
                subject.publish().autoConnect();
//bring in first Observer
        multicast.subscribe(s -> System.out.println("Observer 1: "
                + s));
        sleep(2000);
//bring in second Observer
        multicast.subscribe(s -> System.out.println("Observer 2: "
                + s));
        sleep(1000);
    }
项目:RxJava2Jdk8Interop    文件:ObservableInteropTest.java   
@Test
public void toStreamCancel() {
    UnicastSubject<Integer> up = UnicastSubject.create();

    up.onNext(1);
    up.onNext(2);
    up.onNext(3);
    up.onNext(4);
    up.onNext(5);

    try (Stream<Integer> s = up
            .to(ObservableInterop.toStream()).limit(3)) {
        Assert.assertTrue(up.hasObservers());

        List<Integer> list = s.collect(Collectors.toList());
        Assert.assertEquals(Arrays.asList(1, 2, 3), list);
    }

    Assert.assertFalse(up.hasObservers());
}
项目:RxJava2Jdk8Interop    文件:ObservableInteropTest.java   
@Test
public void mapOptionalAsyncFused() {
    TestObserver<Integer> ts = TestHelper.fusedObserver(QueueSubscription.ANY);

    UnicastSubject<Integer> up = UnicastSubject.create();
    TestHelper.emit(up, 1, 2, 3, 4, 5);

    up
    .compose(ObservableInterop.mapOptional(v -> {
        if (v % 2 == 0) {
            return Optional.of(-v);
        }
        return Optional.empty();
    }))
    .subscribeWith(ts)
    .assertOf(TestHelper.assertFusedObserver(QueueSubscription.ASYNC))
    .assertResult(-2, -4);
}
项目:RxJava2Jdk8Interop    文件:ObservableInteropTest.java   
@Test
public void mapOptionalAsyncFusedConditional() {
    TestObserver<Integer> ts = TestHelper.fusedObserver(QueueSubscription.ANY);

    UnicastSubject<Integer> up = UnicastSubject.create();
    TestHelper.emit(up, 1, 2, 3, 4, 5);

    up
    .compose(ObservableInterop.mapOptional(v -> {
        if (v % 2 == 0) {
            return Optional.of(-v);
        }
        return Optional.empty();
    }))
    .filter(Functions.alwaysTrue())
    .subscribeWith(ts)
    .assertOf(TestHelper.assertFusedObserver(QueueSubscription.ASYNC))
    .assertResult(-2, -4);
}
项目:RxJava2Extensions    文件:AsyncObservableTest.java   
@Test
public void runAsyncProcessor() {
    AsyncObservable.runAsync(Schedulers.single(),
        UnicastSubject.<Object>create(),
    new BiConsumer<Observer<Object>, Disposable>() {
        @Override
        public void accept(Observer<? super Object> s, Disposable d) throws Exception {
            s.onNext(1);
            s.onNext(2);
            s.onNext(3);
            Thread.sleep(200);
            s.onNext(4);
            s.onNext(5);
            s.onComplete();
        }
    })
    .test()
    .awaitDone(5, TimeUnit.SECONDS)
    .assertResult(1, 2, 3, 4, 5);
}
项目:Learning-RxJava    文件:Ch5_26.java   
public static void main(String[] args) {
    Subject<String> subject =
            UnicastSubject.create();
    Observable.interval(300, TimeUnit.MILLISECONDS)
            .map(l -> ((l + 1) * 300) + " milliseconds")
            .subscribe(subject);
    sleep(2000);
    subject.subscribe(s -> System.out.println("Observer 1: " +
            s));
    sleep(2000);
}