@Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); testScheduler = new TestScheduler(); homePresenter = new HomePresenter(contentRepository, new TestSchedulerProvider(testScheduler), new HomeItemViewModelFactory()); // mock the Single: when(contentRepository.getContentItems()).thenReturn(Single.just(Collections.emptyList())); homePresenter.attachView(homeView); // trigger the initial load: testScheduler.triggerActions(); // The presenter wont't update the view unless it's active: when(homeView.isActive()).thenReturn(true); }
@Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); int itemId = 2; viewModel = new ContentViewModel(); testScheduler = new TestScheduler(); contentPresenter = new ContentPresenter(contentRepository, new TestSchedulerProvider(testScheduler), itemId, viewModel); // mock the Single: when(contentRepository.getContentItem(itemId)).thenReturn(Single.just(contentItem)); contentPresenter.attachView(contentView); // trigger the initial load: testScheduler.triggerActions(); // The presenter wont't update the view unless it's active: when(contentView.isActive()).thenReturn(true); }
@Test public void test_interval() { TestScheduler testScheduler=new TestScheduler(); Observable<Long>observable=Observable.interval(1, TimeUnit.SECONDS,testScheduler).take(5); TestObserver<Long> testObserver=new TestObserver<>(); observable.subscribeOn(testScheduler).subscribe(testObserver); testObserver.assertNoValues(); testObserver.assertNotComplete(); testObserver.assertNoErrors(); testScheduler.advanceTimeBy(1, TimeUnit.SECONDS); testObserver.assertValueCount(1); testObserver.assertValues(0l); testScheduler.advanceTimeTo(6, TimeUnit.SECONDS); testObserver.assertValueCount(5); testObserver.assertValues(0l,1l,2l,3l,4l); }
@Test public void shouldBeAbleToDisplayMovies() { // System.out.println(MainThread); TestScheduler testScheduler = new TestScheduler(); TestObserver<List<Movie>> testObserver = new TestObserver<>(); Observable<List<Movie>> responseObservable = Observable.just(movies) .subscribeOn(testScheduler) .observeOn(testScheduler); responseObservable.subscribe(testObserver); when(interactor.fetchMovies()).thenReturn(responseObservable); presenter.setView(view); testScheduler.triggerActions(); testObserver.assertNoErrors(); testObserver.onComplete(); verify(view).showMovies(movies); }
@Test public void shouldBeAbleToShowTrailers() { // TestScheduler testScheduler1 = new TestScheduler(); Observable<VideoWrapper> response1 = Observable.just(videoWrapper) .subscribeOn(testScheduler1) .observeOn(AndroidSchedulers.mainThread()); TestObserver<VideoWrapper> observer = new TestObserver<>(); response1.subscribe(observer); when(movieDetailsInteractor.fetchVideos(anyString())).thenReturn(response1); movieDetailsPresenter.displayTrails(movie.getId()); testScheduler1.triggerActions(); // 为什么在前面 observer.assertNoErrors(); observer.assertComplete(); verify(view).showTrailers(videoWrapper.getVideos()); }
@Test public void shouldBeAbleToShowReviews() { TestScheduler testScheduler = new TestScheduler(); TestObserver<ReviewWrapper> testObserver = new TestObserver<>(); Observable<ReviewWrapper> responseObservable = Observable.just(reviewWrapper) .subscribeOn(testScheduler) .observeOn(AndroidSchedulers.mainThread()); responseObservable.subscribe(testObserver); when(movieDetailsInteractor.fetchReviews(anyString())).thenReturn(responseObservable); movieDetailsPresenter.displayReviews(movie.getId()); testScheduler.triggerActions(); testObserver.assertNoErrors(); testObserver.assertComplete(); verify(view).showReviews(reviewWrapper.getReviews()); }
@Test public void testATestScheduler() throws Exception { TestSubscriber<Long> testSubscriber = new TestSubscriber<>(); TestScheduler testScheduler = new TestScheduler(); Flowable.interval(5, TimeUnit.MILLISECONDS, testScheduler) .map(x -> x + 1) .filter(x -> x % 2 == 0) .subscribe(testSubscriber); testScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS); testSubscriber.assertNoErrors(); testSubscriber.assertValues(2L); testScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS); testSubscriber.assertNoErrors(); testSubscriber.assertValues(2L, 4L); }
@Test public void testBufferingWithNoUpdates() throws InterruptedException { TestScheduler testScheduler = new TestScheduler(); SimpleFlowableList<Integer> list = new SimpleFlowableList<>(); FlowableList<Integer> bufferedList = list.buffer(50, TimeUnit.MILLISECONDS, testScheduler); TestSubscriber testSubscriber = new TestSubscriber(); bufferedList.updates().subscribe(testSubscriber); testScheduler.advanceTimeBy(50, TimeUnit.MILLISECONDS); testScheduler.triggerActions(); testSubscriber.awaitCount(1); testSubscriber.assertNoErrors(); testSubscriber.assertValueCount(1); testScheduler.advanceTimeBy(50, TimeUnit.MILLISECONDS); testScheduler.triggerActions(); testSubscriber.awaitCount(1); testSubscriber.dispose(); }
@Test public void should_not_send_notification_occurring_before_subscribe() { // given TestScheduler scheduler = new TestScheduler(); Recorded<String> event = new Recorded<>(10, Notification.createOnNext("Hello world!")); final HotObservable<String> hotObservable = HotObservable.create(scheduler, event); // when final TestObserver<String> observer = new TestObserver<>(); scheduler.createWorker().schedule(new Runnable() { @Override public void run() { hotObservable.subscribe(observer); } }, 15, TimeUnit.MILLISECONDS); // then scheduler.advanceTimeBy(Long.MAX_VALUE, TimeUnit.MILLISECONDS); observer.assertNoValues(); }
@Test public void should_not_send_notification_occurring_after_unsubscribe() { // given TestScheduler scheduler = new TestScheduler(); Recorded<String> event = new Recorded<>(10, Notification.createOnNext("Hello world!")); final HotObservable<String> hotObservable = HotObservable.create(scheduler, event); // when final TestObserver<String> observer = new TestObserver<>(); hotObservable.subscribe(observer); scheduler.createWorker().schedule(new Runnable() { @Override public void run() { observer.dispose(); } }, 5, TimeUnit.MILLISECONDS); // then scheduler.advanceTimeBy(Long.MAX_VALUE, TimeUnit.MILLISECONDS); observer.assertNoValues(); }
@Test public void should_keep_track_of_subscriptions() { // given TestScheduler scheduler = new TestScheduler(); final HotObservable<String> hotObservable = HotObservable.create(scheduler); // when final TestObserver<String> observer = new TestObserver<>(); scheduler.createWorker().schedule(new Runnable() { @Override public void run() { hotObservable.subscribe(observer); } }, 42, TimeUnit.MILLISECONDS); // then scheduler.advanceTimeBy(42, TimeUnit.MILLISECONDS); assertThat(hotObservable.getSubscriptions()) .containsExactly( new SubscriptionLog(42, Long.MAX_VALUE) ); }
@Test public void should_keep_track_of_unsubscriptions() { // given TestScheduler scheduler = new TestScheduler(); final HotObservable<String> hotObservable = HotObservable.create(scheduler); // when final TestObserver<String> observer = new TestObserver<>(); hotObservable.subscribe(observer); scheduler.createWorker().schedule(new Runnable() { @Override public void run() { observer.dispose(); } }, 42, TimeUnit.MILLISECONDS); // then scheduler.advanceTimeBy(42, TimeUnit.MILLISECONDS); assertThat(hotObservable.getSubscriptions()) .containsExactly( new SubscriptionLog(0, 42) ); }
@Test public void should_send_notification_on_subscribe_using_offset() { // given TestScheduler scheduler = new TestScheduler(); long offset = 10; Recorded<String> event = new Recorded<>(offset, Notification.createOnNext("Hello world!")); ColdObservable<String> coldObservable = ColdObservable.create(scheduler, event); // when TestObserver<String> observer = new TestObserver<>(); coldObservable.subscribe(observer); // then scheduler.advanceTimeBy(9, TimeUnit.MILLISECONDS); observer.assertNoValues(); scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); observer.assertValue("Hello world!"); }
@Test public void should_not_send_notification_after_unsubscribe() { // given TestScheduler scheduler = new TestScheduler(); long offset = 10; Recorded<String> event = new Recorded<>(offset, Notification.createOnNext("Hello world!")); ColdObservable<String> coldObservable = ColdObservable.create(scheduler, event); final TestObserver<String> observer = new TestObserver<>(); coldObservable.subscribe(observer); // when scheduler.createWorker().schedule(new Runnable() { @Override public void run() { observer.dispose(); } }, 5, TimeUnit.MILLISECONDS); // then scheduler.advanceTimeBy(Long.MAX_VALUE, TimeUnit.MILLISECONDS); observer.assertNoValues(); }
@Test public void should_be_cold_and_send_notification_at_subscribe_time() { // given TestScheduler scheduler = new TestScheduler(); Recorded<String> event = new Recorded<>(0, Notification.createOnNext("Hello world!")); final ColdObservable<String> coldObservable = ColdObservable.create(scheduler, event); // when final TestObserver<String> observer = new TestObserver<>(); scheduler.createWorker().schedule(new Runnable() { @Override public void run() { coldObservable.subscribe(observer); } }, 42, TimeUnit.SECONDS); // then scheduler.advanceTimeBy(42, TimeUnit.SECONDS); observer.assertValue("Hello world!"); }
@Test public void should_keep_track_of_subscriptions() { // given TestScheduler scheduler = new TestScheduler(); final ColdObservable<String> coldObservable = ColdObservable.create(scheduler); // when final TestObserver<String> observer = new TestObserver<>(); scheduler.createWorker().schedule(new Runnable() { @Override public void run() { coldObservable.subscribe(observer); } }, 42, TimeUnit.MILLISECONDS); // then scheduler.advanceTimeBy(42, TimeUnit.MILLISECONDS); assertThat(coldObservable.getSubscriptions()) .containsExactly( new SubscriptionLog(42, Long.MAX_VALUE) ); }
@Test public void should_keep_track_of_unsubscriptions() { // given TestScheduler scheduler = new TestScheduler(); final ColdObservable<String> coldObservable = ColdObservable.create(scheduler); // when final TestObserver<String> observer = new TestObserver<>(); coldObservable.subscribe(observer); scheduler.createWorker().schedule(new Runnable() { @Override public void run() { observer.dispose(); } }, 42, TimeUnit.MILLISECONDS); // then scheduler.advanceTimeBy(42, TimeUnit.MILLISECONDS); assertThat(coldObservable.getSubscriptions()) .containsExactly( new SubscriptionLog(0, 42) ); }
@Test public void testMaxIdleTime() throws InterruptedException { TestScheduler s = new TestScheduler(); AtomicInteger count = new AtomicInteger(); AtomicInteger disposed = new AtomicInteger(); Pool<Integer> pool = NonBlockingPool // .factory(() -> count.incrementAndGet()) // .healthCheck(n -> true) // .maxSize(3) // .maxIdleTime(1, TimeUnit.MINUTES) // .disposer(n -> disposed.incrementAndGet()) // .scheduler(s) // .build(); TestSubscriber<Member<Integer>> ts = new FlowableSingleDeferUntilRequest<>( // pool.member()) // .doOnNext(m -> m.checkin()) // .doOnNext(System.out::println) // .doOnRequest(t -> System.out.println("test request=" + t)) // .test(1); s.triggerActions(); ts.assertValueCount(1); assertEquals(0, disposed.get()); s.advanceTimeBy(1, TimeUnit.MINUTES); s.triggerActions(); assertEquals(1, disposed.get()); }
@Test public void testConnectionPoolRecylesAlternating() { TestScheduler s = new TestScheduler(); AtomicInteger count = new AtomicInteger(); Pool<Integer> pool = NonBlockingPool // .factory(() -> count.incrementAndGet()) // .healthCheck(n -> true) // .maxSize(2) // .maxIdleTime(1, TimeUnit.MINUTES) // .scheduler(s) // .build(); TestSubscriber<Integer> ts = new FlowableSingleDeferUntilRequest<>(pool.member()) // .repeat() // .doOnNext(m -> m.checkin()) // .map(m -> m.value()) // .test(4); // s.triggerActions(); ts.assertValueCount(4) // .assertNotTerminated(); List<Object> list = ts.getEvents().get(0); // all 4 connections released were the same assertTrue(list.get(0) == list.get(1)); assertTrue(list.get(1) == list.get(2)); assertTrue(list.get(2) == list.get(3)); }
@Test public void testMemberAvailableAfterCreationScheduledIsUsedImmediately() throws InterruptedException { TestScheduler ts = new TestScheduler(); Scheduler s = createScheduleToDelayCreation(ts); AtomicInteger count = new AtomicInteger(); Pool<Integer> pool = NonBlockingPool // .factory(() -> count.incrementAndGet()) // .createRetryInterval(10, TimeUnit.MINUTES) // .maxSize(2) // .maxIdleTime(1, TimeUnit.HOURS) // .scheduler(s) // .build(); List<Member<Integer>> list = new ArrayList<Member<Integer>>(); pool.member().doOnSuccess(m -> list.add(m)).subscribe(); assertEquals(0, list.size()); ts.advanceTimeBy(1, TimeUnit.MINUTES); ts.triggerActions(); assertEquals(1, list.size()); pool.member().doOnSuccess(m -> list.add(m)).subscribe(); list.get(0).checkin(); ts.triggerActions(); assertEquals(2, list.size()); }
@Test public void takeSizeAndTime() { TestScheduler scheduler = new TestScheduler(); ReplayRelay<Integer> rp = ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2); rp.accept(1); rp.accept(2); rp.accept(3); rp .take(1) .test() .assertResult(2); }
@Test public void reentrantDrain() { TestScheduler scheduler = new TestScheduler(); final ReplayRelay<Integer> rp = ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2); TestObserver<Integer> ts = new TestObserver<Integer>() { @Override public void onNext(Integer t) { if (t == 1) { rp.accept(2); } super.onNext(t); } }; rp.subscribe(ts); rp.accept(1); ts.assertValues(1, 2); }
@Test public void testWithScheduler() { Exception ex = new IllegalArgumentException("boo"); TestSubscriber<Integer> ts = TestSubscriber.create(); TestScheduler scheduler = new TestScheduler(); Flowable.just(1, 2) // force error after 3 emissions .concatWith(Flowable.<Integer>error(ex)) // retry with backoff .retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES) .scheduler(scheduler).build()) // go .subscribe(ts); ts.assertValues(1, 2); ts.assertNotComplete(); scheduler.advanceTimeBy(1, TimeUnit.MINUTES); ts.assertValues(1, 2, 1, 2); ts.assertNotComplete(); // next wait is 2 seconds so advancing by 1 should do nothing scheduler.advanceTimeBy(1, TimeUnit.MINUTES); ts.assertValues(1, 2, 1, 2); ts.assertNotComplete(); scheduler.advanceTimeBy(1, TimeUnit.MINUTES); ts.assertValues(1, 2, 1, 2, 1, 2); ts.assertError(ex); }
@SuppressWarnings("unchecked") @Test public void testRetryWhenSpecificExceptionFails() { Exception ex = new IllegalArgumentException("boo"); TestSubscriber<Integer> ts = TestSubscriber.create(); TestScheduler scheduler = new TestScheduler(); Flowable.just(1, 2) // force error after 3 emissions .concatWith(Flowable.<Integer>error(ex)) // retry with backoff .retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES) .scheduler(scheduler).failWhenInstanceOf(IllegalArgumentException.class).build()) // go .subscribe(ts); ts.assertValues(1, 2); ts.assertError(ex); }
@SuppressWarnings("unchecked") @Test public void testRetryWhenSpecificExceptionFailsBecauseIsNotInstanceOf() { Exception ex = new IllegalArgumentException("boo"); TestSubscriber<Integer> ts = TestSubscriber.create(); TestScheduler scheduler = new TestScheduler(); Flowable.just(1, 2) // force error after 3 emissions .concatWith(Flowable.<Integer>error(ex)) // retry with backoff .retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES) .scheduler(scheduler).retryWhenInstanceOf(SQLException.class).build()) // go .subscribe(ts); ts.assertValues(1, 2); ts.assertError(ex); }
@SuppressWarnings("unchecked") @Test public void testRetryWhenSpecificExceptionAllowed() { Exception ex = new IllegalArgumentException("boo"); TestSubscriber<Integer> ts = TestSubscriber.create(); TestScheduler scheduler = new TestScheduler(); Flowable.just(1, 2) // force error after 3 emissions .concatWith(Flowable.<Integer>error(ex)) // retry with backoff .retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES) .scheduler(scheduler).retryWhenInstanceOf(IllegalArgumentException.class).build()) // go .subscribe(ts); ts.assertValues(1, 2); ts.assertNotComplete(); }
@Test public void testRetryWhenSpecificExceptionAllowedUsePredicateReturnsTrue() { Exception ex = new IllegalArgumentException("boo"); TestSubscriber<Integer> ts = TestSubscriber.create(); TestScheduler scheduler = new TestScheduler(); Predicate<Throwable> predicate = new Predicate<Throwable>() { @Override public boolean test(Throwable t) { return t instanceof IllegalArgumentException; } }; Flowable.just(1, 2) // force error after 3 emissions .concatWith(Flowable.<Integer>error(ex)) // retry with backoff .retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES) .scheduler(scheduler).retryIf(predicate).build()) // go .subscribe(ts); ts.assertValues(1, 2); ts.assertNotComplete(); }
@Test public void testRetryWhenSpecificExceptionAllowedUsePredicateReturnsFalse() { Exception ex = new IllegalArgumentException("boo"); TestSubscriber<Integer> ts = TestSubscriber.create(); TestScheduler scheduler = new TestScheduler(); Predicate<Throwable> predicate = Predicates.alwaysFalse(); Flowable.just(1, 2) // force error after 3 emissions .concatWith(Flowable.<Integer>error(ex)) // retry with backoff .retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES) .scheduler(scheduler).retryIf(predicate).build()) // go .subscribe(ts); ts.assertValues(1, 2); ts.assertError(ex); }
@Test public void onViewCreatedUsersReceived() throws Exception { List<User> users = new ArrayList<>(); TestScheduler scheduler = new TestScheduler(); PublishSubject<List<User>> subject = PublishSubject.create(); subject.subscribeOn(scheduler); when(mInteractor.getUserList()).thenReturn(subject); ListingActivity mock = Mockito.mock(ListingActivity.class, new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { if (invocation.getMethod().getReturnType().equals(Observable.class)) return PublishSubject.create(); else return null; } }); mPresenter.attachView(mock); mPresenter.detachView(false); verify(mInteractor).getUserList(); subject.onNext(users); scheduler.triggerActions(); verify(mView, never()).setUserList(users); verify(mView, never()).showContent(); verify(mView, never()).showError(any(Throwable.class)); }
@Test public void onViewCreatedFailed() throws Exception { TestScheduler scheduler = new TestScheduler(); PublishSubject<List<User>> subject = PublishSubject.create(); subject.subscribeOn(scheduler); when(mInteractor.getUserList()).thenReturn(subject); ListingActivity mock = Mockito.mock(ListingActivity.class, new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { if (invocation.getMethod().getReturnType().equals(Observable.class)) return PublishSubject.create(); else return null; } }); mPresenter.attachView(mock); mPresenter.detachView(false); verify(mInteractor).getUserList(); IOException e = new IOException(); subject.onError(e); scheduler.triggerActions(); verify(mView, never()).setUserList(any(List.class)); verify(mView, never()).showContent(); verify(mView, never()).showError(e); }
@Test public void onViewCreatedUsersReceived() throws Exception { List<User> users = new ArrayList<>(); TestScheduler scheduler = new TestScheduler(); PublishSubject<List<User>> subject = PublishSubject.create(); subject.subscribeOn(scheduler); when(mInteractor.getUserList()).thenReturn(subject); mPresenter.attachView(mView); verify(mView).showLoading(); verify(mInteractor).getUserList(); subject.onNext(users); scheduler.triggerActions(); verify(mView).setUserList(users); verify(mView).showContent(); verify(mView, never()).showError(any(Throwable.class)); }
@Test public void onViewCreatedFailed() throws Exception { TestScheduler scheduler = new TestScheduler(); PublishSubject<List<User>> subject = PublishSubject.create(); subject.subscribeOn(scheduler); when(mInteractor.getUserList()).thenReturn(subject); mPresenter.attachView(mView); verify(mView).showLoading(); verify(mInteractor).getUserList(); IOException e = new IOException(); subject.onError(e); scheduler.triggerActions(); verify(mView, never()).setUserList(any(List.class)); verify(mView, never()).showContent(); verify(mView).showError(e); }
@Test public void search_withNonEmptyQuery_doesNotTriggerImmediately() { TestScheduler testScheduler = new TestScheduler(); Act act = new ArrangeBuilder() .withTimeScheduler(testScheduler) .act() .bind(); act.search(DUMMY_QUERY); verify(searchDataModel).querySearch(searchTermCaptor.capture(), searchDelayCaptor.capture()); assertThat(searchTermCaptor.getValue()).isEqualTo(DUMMY_QUERY); searchDelayCaptor.getValue().test().assertNotTerminated(); }
@Test public void search_withNonEmptyQuery_triggersAfterDelay() { TestScheduler testScheduler = new TestScheduler(); Act act = new ArrangeBuilder() .withTimeScheduler(testScheduler, SEARCH_DEBOUNCE_TAG) .act() .bind(); act.search(DUMMY_QUERY); verify(searchDataModel).querySearch(searchTermCaptor.capture(), searchDelayCaptor.capture()); assertThat(searchTermCaptor.getValue()).isEqualTo(DUMMY_QUERY); TestObserver<Void> testObserver = searchDelayCaptor.getValue().test(); testScheduler.advanceTimeBy(SEARCH_DEBOUNCE_TIME_SECONDS, TimeUnit.SECONDS); testObserver.assertComplete(); }
@Test public void getTimePositionMsOnceAndStream_reportsProgressValue_whenPlaying_onEachUpdatePeriod() { TestScheduler testScheduler = new TestScheduler(); ArrangeBuilder arrangeBuilder = new ArrangeBuilder() .withTimeScheduler(testScheduler) .withPlayingExoPlayer() .withProgress(500L); TestObserver<Long> testObserver = defaultObservableExoPlayer .getTimePositionMsOnceAndStream(100L, TimeUnit.SECONDS) .test(); testScheduler.advanceTimeBy(50L, TimeUnit.SECONDS); arrangeBuilder.withProgress(1000L); testObserver.assertValues(500L, 1000L); }
@Test public void shouldEmitValueAfterViewIsAttached() { TestScheduler testScheduler = new TestScheduler(); BehaviorSubject<Boolean> view = BehaviorSubject.create(); view.onNext(true); WaitViewReplayTransformer<Integer> transformer = new WaitViewReplayTransformer<>( view.delay(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler)); TestObserver<Integer> testObserver = Observable.just(0) .compose(transformer) .test(); testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS); testObserver.awaitTerminalEvent(); testObserver.assertValue(0); testObserver.assertComplete(); }
@Test public void shouldReplayAllValuesAfterViewIsAttached() { TestScheduler testScheduler = new TestScheduler(); BehaviorSubject<Boolean> view = BehaviorSubject.create(); view.onNext(true); WaitViewReplayTransformer<Integer> transformer = new WaitViewReplayTransformer<>( view.delay(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler)); TestObserver<Integer> testObserver = Observable.just(0, 1, 2) .compose(transformer) .test(); testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS); testObserver.awaitTerminalEvent(); testObserver.assertValues(0, 1, 2); testObserver.assertComplete(); }
@Test public void shouldEmitErrorAfterViewIsAttached() { TestScheduler testScheduler = new TestScheduler(); BehaviorSubject<Boolean> view = BehaviorSubject.create(); view.onNext(true); WaitViewReplayTransformer<Integer> transformer = new WaitViewReplayTransformer<>( view.delay(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler)); Exception exception = new RuntimeException(); TestObserver<Integer> testObserver = Observable.<Integer>error(exception) .compose(transformer) .test(); testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS); testObserver.awaitTerminalEvent(); testObserver.assertError(exception); testObserver.assertNotComplete(); }