Java 类io.reactivex.observers.DefaultObserver 实例源码

项目:Demos    文件:ExpressViewModel.java   
/**
 * 获取快递信息
 *
 * @param type   快递类型
 * @param postid 快递单号
 */
public void getExpressInfo(String type, String postid) {
    isShowLoading.set(true);

    dataManager.getExpressInfo(type, postid)
            .subscribeOn(Schedulers.io()) // 在子线程中进行Http访问
            .observeOn(AndroidSchedulers.mainThread()) // UI线程处理返回接口
            .compose(getProvider().<ExpressInfo>bindUntilEvent(ActivityEvent.DESTROY)) // onDestroy取消订阅
            .subscribe(new DefaultObserver<ExpressInfo>() {  // 订阅
                @Override
                public void onNext(@NonNull ExpressInfo expressInfo) {
                    ExpressViewModel.this.expressInfo.setExpressInfo(expressInfo);
                }

                @Override
                public void onError(@NonNull Throwable e) {
                    errorMessage.set(e.getMessage());
                    isShowLoading.set(false);
                }

                @Override
                public void onComplete() {
                    isShowLoading.set(false);
                }
            });
}
项目:Demos    文件:ExpressPresenter.java   
/**
 * 获取快递信息
 *
 * @param type   快递类型
 * @param postid 快递单号
 */
public void getExpressInfo(String type, String postid) {
    expressView.showProgressDialog();

    dataManager.getExpressInfo(type, postid)
            .subscribeOn(Schedulers.io()) // 在子线程中进行Http访问
            .observeOn(AndroidSchedulers.mainThread()) // UI线程处理返回接口
            .compose(getProvider().<ExpressInfo>bindUntilEvent(ActivityEvent.DESTROY)) // onDestroy取消订阅
            .subscribe(new DefaultObserver<ExpressInfo>() {  // 订阅
                @Override
                public void onNext(@NonNull ExpressInfo expressInfo) {
                    expressView.updateView(expressInfo);
                }

                @Override
                public void onError(@NonNull Throwable e) {
                    expressView.showError(e.getMessage());
                    expressView.hideProgressDialog();
                }

                @Override
                public void onComplete() {
                    expressView.hideProgressDialog();
                }
            });
}
项目:RxRelay    文件:BehaviorRelayTest.java   
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
    BehaviorRelay<String> src = BehaviorRelay.createDefault("null"); // FIXME was plain null which is not allowed

    for (int i = 0; i < 10; i++) {
        final Observer<Object> o = TestHelper.mockObserver();
        InOrder inOrder = inOrder(o);
        String v = "" + i;
        src.accept(v);
        System.out.printf("Turn: %d%n", i);
        src.firstElement()
            .toObservable()
            .flatMap(new Function<String, Observable<String>>() {

                @Override
                public Observable<String> apply(String t1) {
                    return Observable.just(t1 + ", " + t1);
                }
            })
            .subscribe(new DefaultObserver<String>() {
                @Override
                public void onNext(String t) {
                    o.onNext(t);
                }

                @Override
                public void onError(Throwable e) {
                    o.onError(e);
                }

                @Override
                public void onComplete() {
                    o.onComplete();
                }
            });
        inOrder.verify(o).onNext(v + ", " + v);
        inOrder.verify(o).onComplete();
        verify(o, never()).onError(any(Throwable.class));
    }
}
项目:RxRelay    文件:BehaviorRelayTest.java   
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
    BehaviorRelay<String> src = BehaviorRelay.createDefault("null"); // FIXME was plain null which is not allowed

    for (int i = 0; i < 10; i++) {
        final Observer<Object> o = TestHelper.mockObserver();
        InOrder inOrder = inOrder(o);
        String v = "" + i;
        src.accept(v);
        System.out.printf("Turn: %d%n", i);
        src.firstElement()
            .toObservable()
            .flatMap(new Function<String, Observable<String>>() {

                @Override
                public Observable<String> apply(String t1) {
                    return Observable.just(t1 + ", " + t1);
                }
            })
            .subscribe(new DefaultObserver<String>() {
                @Override
                public void onNext(String t) {
                    o.onNext(t);
                }

                @Override
                public void onError(Throwable e) {
                    o.onError(e);
                }

                @Override
                public void onComplete() {
                    o.onComplete();
                }
            });
        inOrder.verify(o).onNext(v + ", " + v);
        inOrder.verify(o).onComplete();
        verify(o, never()).onError(any(Throwable.class));
    }
}
项目:RxRelay    文件:ReplayRelayTest.java   
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
    ReplayRelay<String> src = ReplayRelay.create();

    for (int i = 0; i < 10; i++) {
        final Observer<Object> o = TestHelper.mockObserver();
        InOrder inOrder = inOrder(o);
        String v = "" + i;
        src.accept(v);
        System.out.printf("Turn: %d%n", i);
        src.firstElement()
            .toObservable()
            .flatMap(new Function<String, Observable<String>>() {

                @Override
                public Observable<String> apply(String t1) {
                    return Observable.just(t1 + ", " + t1);
                }
            })
            .subscribe(new DefaultObserver<String>() {
                @Override
                public void onNext(String t) {
                    System.out.println(t);
                    o.onNext(t);
                }

                @Override
                public void onError(Throwable e) {
                    o.onError(e);
                }

                @Override
                public void onComplete() {
                    o.onComplete();
                }
            });
        inOrder.verify(o).onNext("0, 0");
        inOrder.verify(o).onComplete();
        verify(o, never()).onError(any(Throwable.class));
    }
}
项目:RxRelay    文件:BehaviorRelayTest.java   
@Test
@Ignore("OOMs")
public void testEmissionSubscriptionRace() throws Exception {
    Scheduler s = Schedulers.io();
    Scheduler.Worker worker = Schedulers.io().createWorker();
    try {
        for (int i = 0; i < 50000; i++) {
            if (i % 1000 == 0) {
                System.out.println(i);
            }
            final BehaviorRelay<Object> rs = BehaviorRelay.create();

            final CountDownLatch finish = new CountDownLatch(1);
            final CountDownLatch start = new CountDownLatch(1);

            worker.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        start.await();
                    } catch (Exception e1) {
                        e1.printStackTrace();
                    }
                    rs.accept(1);
                }
            });

            final AtomicReference<Object> o = new AtomicReference<Object>();

            rs.subscribeOn(s).observeOn(Schedulers.io())
            .subscribe(new DefaultObserver<Object>() {

                @Override
                public void onComplete() {
                    o.set(-1);
                    finish.countDown();
                }

                @Override
                public void onError(Throwable e) {
                    o.set(e);
                    finish.countDown();
                }

                @Override
                public void onNext(Object t) {
                    o.set(t);
                    finish.countDown();
                }

            });
            start.countDown();

            if (!finish.await(5, TimeUnit.SECONDS)) {
                System.out.println(o.get());
                System.out.println(rs.hasObservers());
                fail("Timeout @ " + i);
                break;
            } else {
                Assert.assertEquals(1, o.get());
            }
        }
    } finally {
        worker.dispose();
    }
}
项目:RxRelay    文件:ReplayRelayConcurrencyTest.java   
@Test
public void testReplayRelayEmissionSubscriptionRace() throws Exception {
    Scheduler s = Schedulers.io();
    Scheduler.Worker worker = Schedulers.io().createWorker();
    try {
        for (int i = 0; i < 50000; i++) {
            if (i % 1000 == 0) {
                System.out.println(i);
            }
            final ReplayRelay<Object> rs = ReplayRelay.create();

            final CountDownLatch finish = new CountDownLatch(1);
            final CountDownLatch start = new CountDownLatch(1);

            worker.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        start.await();
                    } catch (Exception e1) {
                        e1.printStackTrace();
                    }
                    rs.accept(1);
                }
            });

            final AtomicReference<Object> o = new AtomicReference<Object>();

            rs.subscribeOn(s).observeOn(Schedulers.io())
            .subscribe(new DefaultObserver<Object>() {

                @Override
                public void onComplete() {
                    o.set(-1);
                    finish.countDown();
                }

                @Override
                public void onError(Throwable e) {
                    o.set(e);
                    finish.countDown();
                }

                @Override
                public void onNext(Object t) {
                    o.set(t);
                    finish.countDown();
                }

            });
            start.countDown();

            if (!finish.await(5, TimeUnit.SECONDS)) {
                System.out.println(o.get());
                System.out.println(rs.hasObservers());
                Assert.fail("Timeout @ " + i);
                break;
            } else {
                Assert.assertEquals(1, o.get());
            }
        }
    } finally {
        worker.dispose();
    }
}
项目:RxRelay    文件:PublishRelayTest.java   
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
    PublishRelay<String> src = PublishRelay.create();

    for (int i = 0; i < 10; i++) {
        final Observer<Object> o = TestHelper.mockObserver();
        InOrder inOrder = inOrder(o);
        String v = "" + i;
        System.out.printf("Turn: %d%n", i);
        src.firstElement()
            .toObservable()
            .flatMap(new Function<String, Observable<String>>() {

                @Override
                public Observable<String> apply(String t1) {
                    return Observable.just(t1 + ", " + t1);
                }
            })
            .subscribe(new DefaultObserver<String>() {
                @Override
                public void onNext(String t) {
                    o.onNext(t);
                }

                @Override
                public void onError(Throwable e) {
                    o.onError(e);
                }

                @Override
                public void onComplete() {
                    o.onComplete();
                }
            });
        src.accept(v);

        inOrder.verify(o).onNext(v + ", " + v);
        inOrder.verify(o).onComplete();
        verify(o, never()).onError(any(Throwable.class));
    }
}
项目:RxRelay    文件:ReplayRelayBoundedConcurrencyTest.java   
@Test
    public void testReplaySubjectEmissionSubscriptionRace() throws Exception {
        Scheduler s = Schedulers.io();
        Scheduler.Worker worker = Schedulers.io().createWorker();
        try {
            for (int i = 0; i < 50000; i++) {
                if (i % 1000 == 0) {
                    System.out.println(i);
                }
                final ReplayRelay<Object> rs = ReplayRelay.createWithSize(2);

                final CountDownLatch finish = new CountDownLatch(1);
                final CountDownLatch start = new CountDownLatch(1);

//                int j = i;

                worker.schedule(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            start.await();
                        } catch (Exception e1) {
                            e1.printStackTrace();
                        }
//                        System.out.println("> " + j);
                        rs.accept(1);
                    }
                });

                final AtomicReference<Object> o = new AtomicReference<Object>();

                rs
//                .doOnSubscribe(v -> System.out.println("!! " + j))
//                .doOnNext(e -> System.out.println(">> " + j))
                .subscribeOn(s)
                .observeOn(Schedulers.io())
//                .doOnNext(e -> System.out.println(">>> " + j))
                .subscribe(new DefaultObserver<Object>() {

                    @Override
                    protected void onStart() {
                        super.onStart();
                    }

                    @Override
                    public void onComplete() {
                        o.set(-1);
                        finish.countDown();
                    }

                    @Override
                    public void onError(Throwable e) {
                        o.set(e);
                        finish.countDown();
                    }

                    @Override
                    public void onNext(Object t) {
                        o.set(t);
                        finish.countDown();
                    }

                });
                start.countDown();

                if (!finish.await(5, TimeUnit.SECONDS)) {
                    System.out.println(o.get());
                    System.out.println(rs.hasObservers());
                    Assert.fail("Timeout @ " + i);
                    break;
                } else {
                    Assert.assertEquals(1, o.get());
                }
            }
        } finally {
            worker.dispose();
        }
    }
项目:RetrofitRxErrorHandler    文件:RealExampleTest.java   
/**
 * This test executes the real query to github server.
 * Test created by Robert Zagorski on 19.10.2016
 */
@Test
public void main() throws IOException, InterruptedException {
    // Create a very simple REST adapter which points the GitHub API.
    RxCallAdapter rxCallAdapter = new RxCallAdapter.Builder()
            .addBackoffStrategy(Exponential.init()
                    .addThrowable(UnknownHostException.class)
                    .addThrowable(SocketTimeoutException.class)
                    .setMaxRetries(3).build())
            .build();
    Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(API_URL)
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(new RxErrorHandingFactory(rxCallAdapter))
            .build();

    // Create an instance of our GitHub API interface.
    GitHub github = retrofit.create(GitHub.class);

    // Create a call instance for looking up Retrofit contributors.
    Observable<List<Repository>> call = github.repos("square");

    final CountDownLatch latch = new CountDownLatch(1);
    // Fetch and print a list of the contributors to the retrofiterrorhandler.
    call.subscribe(new DefaultObserver<List<Repository>>() {
        @Override
        public void onComplete() {
            System.out.println(new GregorianCalendar().toInstant().toString() + " Finished");
            latch.countDown();
        }

        @Override
        public void onError(Throwable e) {
            System.out.println(new GregorianCalendar().toInstant().toString() + " Finished with error: " + e);
            onComplete();
        }

        @Override
        public void onNext(List<Repository> repositories) {
            for (Repository repository : repositories) {
                System.out.println(repository.name + " (" + repository.description + ")");
            }
        }
    });
    latch.await();
}
项目:RxRelay    文件:ReplayRelayTest.java   
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
    ReplayRelay<String> src = ReplayRelay.create();

    for (int i = 0; i < 10; i++) {
        final Observer<Object> o = TestHelper.mockObserver();
        InOrder inOrder = inOrder(o);
        String v = "" + i;
        src.accept(v);
        System.out.printf("Turn: %d%n", i);
        src.firstElement()
            .toObservable()
            .flatMap(new Function<String, Observable<String>>() {

                @Override
                public Observable<String> apply(String t1) {
                    return Observable.just(t1 + ", " + t1);
                }
            })
            .subscribe(new DefaultObserver<String>() {
                @Override
                public void onNext(String t) {
                    System.out.println(t);
                    o.onNext(t);
                }

                @Override
                public void onError(Throwable e) {
                    o.onError(e);
                }

                @Override
                public void onComplete() {
                    o.onComplete();
                }
            });
        inOrder.verify(o).onNext("0, 0");
        inOrder.verify(o).onComplete();
        verify(o, never()).onError(any(Throwable.class));
    }
}
项目:RxRelay    文件:BehaviorRelayTest.java   
@Test
@Ignore("OOMs")
public void testEmissionSubscriptionRace() throws Exception {
    Scheduler s = Schedulers.io();
    Scheduler.Worker worker = Schedulers.io().createWorker();
    try {
        for (int i = 0; i < 50000; i++) {
            if (i % 1000 == 0) {
                System.out.println(i);
            }
            final BehaviorRelay<Object> rs = BehaviorRelay.create();

            final CountDownLatch finish = new CountDownLatch(1);
            final CountDownLatch start = new CountDownLatch(1);

            worker.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        start.await();
                    } catch (Exception e1) {
                        e1.printStackTrace();
                    }
                    rs.accept(1);
                }
            });

            final AtomicReference<Object> o = new AtomicReference<Object>();

            rs.subscribeOn(s).observeOn(Schedulers.io())
            .subscribe(new DefaultObserver<Object>() {

                @Override
                public void onComplete() {
                    o.set(-1);
                    finish.countDown();
                }

                @Override
                public void onError(Throwable e) {
                    o.set(e);
                    finish.countDown();
                }

                @Override
                public void onNext(Object t) {
                    o.set(t);
                    finish.countDown();
                }

            });
            start.countDown();

            if (!finish.await(5, TimeUnit.SECONDS)) {
                System.out.println(o.get());
                System.out.println(rs.hasObservers());
                fail("Timeout @ " + i);
                break;
            } else {
                Assert.assertEquals(1, o.get());
            }
        }
    } finally {
        worker.dispose();
    }
}
项目:RxRelay    文件:ReplayRelayConcurrencyTest.java   
@Test
public void testReplayRelayEmissionSubscriptionRace() throws Exception {
    Scheduler s = Schedulers.io();
    Scheduler.Worker worker = Schedulers.io().createWorker();
    try {
        for (int i = 0; i < 50000; i++) {
            if (i % 1000 == 0) {
                System.out.println(i);
            }
            final ReplayRelay<Object> rs = ReplayRelay.create();

            final CountDownLatch finish = new CountDownLatch(1);
            final CountDownLatch start = new CountDownLatch(1);

            worker.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        start.await();
                    } catch (Exception e1) {
                        e1.printStackTrace();
                    }
                    rs.accept(1);
                }
            });

            final AtomicReference<Object> o = new AtomicReference<Object>();

            rs.subscribeOn(s).observeOn(Schedulers.io())
            .subscribe(new DefaultObserver<Object>() {

                @Override
                public void onComplete() {
                    o.set(-1);
                    finish.countDown();
                }

                @Override
                public void onError(Throwable e) {
                    o.set(e);
                    finish.countDown();
                }

                @Override
                public void onNext(Object t) {
                    o.set(t);
                    finish.countDown();
                }

            });
            start.countDown();

            if (!finish.await(5, TimeUnit.SECONDS)) {
                System.out.println(o.get());
                System.out.println(rs.hasObservers());
                Assert.fail("Timeout @ " + i);
                break;
            } else {
                Assert.assertEquals(1, o.get());
            }
        }
    } finally {
        worker.dispose();
    }
}
项目:RxRelay    文件:PublishRelayTest.java   
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
    PublishRelay<String> src = PublishRelay.create();

    for (int i = 0; i < 10; i++) {
        final Observer<Object> o = TestHelper.mockObserver();
        InOrder inOrder = inOrder(o);
        String v = "" + i;
        System.out.printf("Turn: %d%n", i);
        src.firstElement()
            .toObservable()
            .flatMap(new Function<String, Observable<String>>() {

                @Override
                public Observable<String> apply(String t1) {
                    return Observable.just(t1 + ", " + t1);
                }
            })
            .subscribe(new DefaultObserver<String>() {
                @Override
                public void onNext(String t) {
                    o.onNext(t);
                }

                @Override
                public void onError(Throwable e) {
                    o.onError(e);
                }

                @Override
                public void onComplete() {
                    o.onComplete();
                }
            });
        src.accept(v);

        inOrder.verify(o).onNext(v + ", " + v);
        inOrder.verify(o).onComplete();
        verify(o, never()).onError(any(Throwable.class));
    }
}
项目:RxRelay    文件:ReplayRelayBoundedConcurrencyTest.java   
@Test
    public void testReplaySubjectEmissionSubscriptionRace() throws Exception {
        Scheduler s = Schedulers.io();
        Scheduler.Worker worker = Schedulers.io().createWorker();
        try {
            for (int i = 0; i < 50000; i++) {
                if (i % 1000 == 0) {
                    System.out.println(i);
                }
                final ReplayRelay<Object> rs = ReplayRelay.createWithSize(2);

                final CountDownLatch finish = new CountDownLatch(1);
                final CountDownLatch start = new CountDownLatch(1);

//                int j = i;

                worker.schedule(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            start.await();
                        } catch (Exception e1) {
                            e1.printStackTrace();
                        }
//                        System.out.println("> " + j);
                        rs.accept(1);
                    }
                });

                final AtomicReference<Object> o = new AtomicReference<Object>();

                rs
//                .doOnSubscribe(v -> System.out.println("!! " + j))
//                .doOnNext(e -> System.out.println(">> " + j))
                .subscribeOn(s)
                .observeOn(Schedulers.io())
//                .doOnNext(e -> System.out.println(">>> " + j))
                .subscribe(new DefaultObserver<Object>() {

                    @Override
                    protected void onStart() {
                        super.onStart();
                    }

                    @Override
                    public void onComplete() {
                        o.set(-1);
                        finish.countDown();
                    }

                    @Override
                    public void onError(Throwable e) {
                        o.set(e);
                        finish.countDown();
                    }

                    @Override
                    public void onNext(Object t) {
                        o.set(t);
                        finish.countDown();
                    }

                });
                start.countDown();

                if (!finish.await(5, TimeUnit.SECONDS)) {
                    System.out.println(o.get());
                    System.out.println(rs.hasObservers());
                    Assert.fail("Timeout @ " + i);
                    break;
                } else {
                    Assert.assertEquals(1, o.get());
                }
            }
        } finally {
            worker.dispose();
        }
    }