@Test public void testDelayedErrorDeliveryWhenSafeSubscriberUnsubscribes() { TestScheduler testScheduler = new TestScheduler(); Observable<Integer> source = Observable.concat(Observable.<Integer> error(new TestException()), Observable.just(1)); @SuppressWarnings("unchecked") Observer<Integer> o = mock(Observer.class); InOrder inOrder = inOrder(o); source.observeOn(testScheduler).subscribe(o); inOrder.verify(o, never()).onError(any(TestException.class)); testScheduler.advanceTimeBy(1, TimeUnit.SECONDS); inOrder.verify(o).onError(any(TestException.class)); inOrder.verify(o, never()).onNext(anyInt()); inOrder.verify(o, never()).onComplete(); }
@SuppressWarnings("unchecked") @Test public void multipleLeftTerminalRightOtherThread() { TestScheduler otherScheduler = Schedulers.test(); TestSubject<EventB> eventBSubject = TestSubject.create(otherScheduler); Observable<Either<EventA, EventB>> either = RxEither.from(eventASubject, eventBSubject); either.subscribe(subscriber); eventASubject.onNext(eventA); eventBSubject.onNext(eventB); eventBSubject.onCompleted(); eventASubject.onNext(eventA); testScheduler.triggerActions(); subscriber.assertNotCompleted(); otherScheduler.triggerActions(); subscriber.assertNoErrors(); subscriber.assertCompleted(); subscriber.assertUnsubscribed(); subscriber.assertValues(Either.<EventA, EventB>left(eventA), Either.<EventA, EventB>left(eventA), Either.<EventA, EventB>right(eventB)); }
@Test public void basic() { TestScheduler scheduler = new TestScheduler(); PublishSubject<Integer> subject = PublishSubject.create(); RecordingObserver<Integer> o = new RecordingObserver<>(); subject .lift(new OperatorNormalize<Integer>(1, TimeUnit.SECONDS, scheduler)) .subscribe(o); subject.onNext(0); o.takeNext(); subject.onNext(1); o.assertNoMoreEvents(); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); o.takeNext(); subject.onCompleted(); o.assertOnCompleted(); }
@Test public void completion() { TestScheduler scheduler = new TestScheduler(); PublishSubject<Integer> subject = PublishSubject.create(); RecordingObserver<Integer> o = new RecordingObserver<>(); subject .lift(new OperatorNormalize<Integer>(1, TimeUnit.SECONDS, scheduler)) .subscribe(o); // First emits immediately subject.onNext(0); o.takeNext(); subject.onNext(1); subject.onCompleted(); o.assertNoMoreEvents(); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); o.takeNext(); o.assertOnCompleted(); }
@Test public void error() { TestScheduler scheduler = new TestScheduler(); PublishSubject<Integer> subject = PublishSubject.create(); RecordingObserver<Integer> o = new RecordingObserver<>(); subject .lift(new OperatorNormalize<Integer>(1, TimeUnit.SECONDS, scheduler)) .subscribe(o); // First emits immediately subject.onNext(0); o.takeNext(); subject.onNext(1); subject.onError(new RuntimeException("Blah")); o.assertNoMoreEvents(); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); o.takeNext(); assertThat(o.takeError()).isInstanceOf(RuntimeException.class); }
@Test public void unsubscription() { TestScheduler scheduler = new TestScheduler(); PublishSubject<Integer> subject = PublishSubject.create(); RecordingObserver<Integer> o = new RecordingObserver<>(); Subscription sub = subject .lift(new OperatorNormalize<Integer>(1, TimeUnit.SECONDS, scheduler)) .subscribe(o); // First emits immediately subject.onNext(0); o.takeNext(); subject.onNext(1); o.assertNoMoreEvents(); sub.unsubscribe(); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); o.assertNoMoreEvents(); }
@Test public void overDelay_shouldEmitImmediately() { TestScheduler scheduler = new TestScheduler(); PublishSubject<Integer> subject = PublishSubject.create(); RecordingObserver<Integer> o = new RecordingObserver<>(); subject .lift(new OperatorNormalize<Integer>(1, TimeUnit.SECONDS, scheduler)) .subscribe(o); // First emits immediately subject.onNext(0); o.takeNext(); scheduler.advanceTimeBy(2, TimeUnit.SECONDS); subject.onNext(1); o.takeNext(); }
@Test public void Should_transmit_events_from_the_target_client() { // given TestScheduler scheduler = Schedulers.test(); PublishSubject<String> subject = PublishSubject.create(); EventStreamClient targetClient = mock(EventStreamClient.class); when(targetClient.readServerSideEvents()).thenReturn(subject); MulticastEventStreamClient multicastEventStreamClient = new MulticastEventStreamClient(targetClient, scheduler); Observable<String> events = multicastEventStreamClient.readServerSideEvents(); // when TestSubscriber<String> subscriber = new TestSubscriber<>(); events.subscribe(subscriber); subject.onNext("Hello!"); // then assertThat(subscriber.getOnNextEvents()).hasSize(1).contains("Hello!"); }
@Test public void testExpectedReplayBehavior() { final TestScheduler scheduler = new TestScheduler(); final TestSubject<Integer> subject = TestSubject.create(scheduler); final TestSubscriber<Integer> subscriber = new TestSubscriber<>(); final ConnectableObservable<Integer> sums = subject.scan((a, b) -> a + b).replay(1); sums.connect(); subject.onNext(1); subject.onNext(2); subject.onNext(3); scheduler.triggerActions(); sums.subscribe(subscriber); subscriber.assertValueCount(1); subscriber.assertValues(6); }
@Test public void testFlakyReplayBehavior() { final TestScheduler scheduler = new TestScheduler(); final TestSubject<Integer> subject = TestSubject.create(scheduler); final TestSubscriber<Integer> subscriber = new TestSubscriber<>(); final ConnectableObservable<Integer> sums = subject.scan(1, (a, b) -> a + b).replay(1); sums.connect(); subject.onNext(2); subject.onNext(3); scheduler.triggerActions(); sums.subscribe(subscriber); // subscriber.assertValueCount(1); subscriber.assertValues(6); }
@Test public void should_test_observable_interval() { TestScheduler scheduler = new TestScheduler(); final List<Long> result = new ArrayList<>(); Observable.interval(1, TimeUnit.SECONDS, scheduler) .take(5) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { result.add(aLong); } }); assertTrue(result.isEmpty()); scheduler.advanceTimeBy(2, TimeUnit.SECONDS); assertEquals(2, result.size()); scheduler.advanceTimeBy(10, TimeUnit.SECONDS); assertEquals(5, result.size()); }
@Test public void shouldEmitErrorAfterViewIsAttached() { TestScheduler testScheduler = Schedulers.test(); BehaviorSubject<Boolean> view = BehaviorSubject.create(); view.onNext(true); WaitViewReplayTransformer<Object> transformer = new WaitViewReplayTransformer<>( view.delay(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler)); TestSubscriber<Object> testSubscriber = new TestSubscriber<>(); Observable.error(new RuntimeException()) .compose(transformer) .subscribe(testSubscriber); testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS); testSubscriber.awaitTerminalEvent(); testSubscriber.assertError(RuntimeException.class); }
@Test public void shouldEmitValueAfterViewIsAttached() { TestScheduler testScheduler = Schedulers.test(); BehaviorSubject<Boolean> view = BehaviorSubject.create(); view.onNext(true); WaitViewLatestTransformer<Integer> transformer = new WaitViewLatestTransformer<>(view.delay(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler)); TestSubscriber<Integer> testSubscriber = new TestSubscriber<>(); Observable.just(0) .compose(transformer) .subscribe(testSubscriber); testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS); testSubscriber.awaitTerminalEvent(); testSubscriber.assertValue(0); testSubscriber.assertCompleted(); }
@Test public void testSampleWindowIsConstantDuration() { @SuppressWarnings("unchecked") Observer<Integer> observer = mock(Observer.class); TestScheduler s = new TestScheduler(); PublishSubject<Integer> o = PublishSubject.create(); o.compose(Transformers.<Integer> sampleFirst(1000, TimeUnit.MILLISECONDS, s)) .subscribe(observer); // send events with simulated time increments s.advanceTimeTo(0, TimeUnit.MILLISECONDS); o.onNext(1); s.advanceTimeTo(1200, TimeUnit.MILLISECONDS); o.onNext(2); s.advanceTimeTo(2100, TimeUnit.MILLISECONDS); o.onNext(3); o.onCompleted(); InOrder inOrder = inOrder(observer); inOrder.verify(observer).onNext(1); inOrder.verify(observer).onNext(2); inOrder.verify(observer).onNext(3); inOrder.verify(observer).onCompleted(); inOrder.verifyNoMoreInteractions(); }
@Test public void testCachedScheduledReset() { TestScheduler scheduler = new TestScheduler(); Worker worker = scheduler.createWorker(); try { final AtomicInteger count = new AtomicInteger(0); Observable<Integer> source = Observable.defer(new Func0<Observable<Integer>>() { @Override public Observable<Integer> call() { return Observable.just(count.incrementAndGet()); } }) // cache .compose(Transformers.<Integer> cache(5, TimeUnit.MINUTES, worker)); assertEquals(1, (int) source.toBlocking().single()); scheduler.advanceTimeBy(1, TimeUnit.MINUTES); assertEquals(1, (int) source.toBlocking().single()); scheduler.advanceTimeBy(1, TimeUnit.MINUTES); assertEquals(1, (int) source.toBlocking().single()); scheduler.advanceTimeBy(3, TimeUnit.MINUTES); assertEquals(2, (int) source.toBlocking().single()); assertEquals(2, (int) source.toBlocking().single()); } finally { worker.unsubscribe(); } }
@Test public void startSchedule() { TestScheduler scheduler = new TestScheduler(); RxState<Integer> state = new RxState<>(0, scheduler); TestSubscriber<Integer> subscriber = new TestSubscriber<>(); state.values(StartWith.SCHEDULE).subscribe(subscriber); subscriber.assertValues(); state.apply(it -> it + 1); scheduler.triggerActions(); subscriber.assertValues(0, 1); }
@Test public void should_not_send_notification_occurring_before_subscribe() { // given TestScheduler scheduler = new TestScheduler(); Recorded<String> event = new Recorded<>(10, Notification.createOnNext("Hello world!")); final HotObservable<String> hotObservable = HotObservable.create(scheduler, event); // when final TestSubscriber<String> subscriber = new TestSubscriber<>(); scheduler.createWorker().schedule(new Action0() { @Override public void call() { hotObservable.subscribe(subscriber); } }, 15, TimeUnit.MILLISECONDS); // then scheduler.advanceTimeBy(Long.MAX_VALUE, TimeUnit.MILLISECONDS); assertThat(subscriber.getOnNextEvents()).isEmpty(); }
@Test public void should_not_send_notification_occurring_after_unsubscribe() { // given TestScheduler scheduler = new TestScheduler(); Recorded<String> event = new Recorded<>(10, Notification.createOnNext("Hello world!")); final HotObservable<String> hotObservable = HotObservable.create(scheduler, event); // when final TestSubscriber<String> subscriber = new TestSubscriber<>(); final Subscription subscription = hotObservable.subscribe(subscriber); scheduler.createWorker().schedule(new Action0() { @Override public void call() { subscription.unsubscribe(); } }, 5, TimeUnit.MILLISECONDS); // then scheduler.advanceTimeBy(Long.MAX_VALUE, TimeUnit.MILLISECONDS); assertThat(subscriber.getOnNextEvents()).isEmpty(); }
@Test public void should_keep_track_of_subscriptions() { // given TestScheduler scheduler = new TestScheduler(); final HotObservable<String> hotObservable = HotObservable.create(scheduler); // when final TestSubscriber<String> subscriber = new TestSubscriber<>(); scheduler.createWorker().schedule(new Action0() { @Override public void call() { hotObservable.subscribe(subscriber); } }, 42, TimeUnit.MILLISECONDS); // then scheduler.advanceTimeBy(42, TimeUnit.MILLISECONDS); assertThat(hotObservable.subscriptions) .containsExactly( new SubscriptionLog(42, Long.MAX_VALUE) ); }
@Test public void should_keep_track_of_unsubscriptions() { // given TestScheduler scheduler = new TestScheduler(); final HotObservable<String> hotObservable = HotObservable.create(scheduler); // when final TestSubscriber<String> subscriber = new TestSubscriber<>(); final Subscription subscription = hotObservable.subscribe(subscriber); scheduler.createWorker().schedule(new Action0() { @Override public void call() { subscription.unsubscribe(); } }, 42, TimeUnit.MILLISECONDS); // then scheduler.advanceTimeBy(42, TimeUnit.MILLISECONDS); assertThat(hotObservable.subscriptions) .containsExactly( new SubscriptionLog(0, 42) ); }
@Test public void should_send_notification_on_subscribe_using_offset() { // given TestScheduler scheduler = new TestScheduler(); long offset = 10; Recorded<String> event = new Recorded<>(offset, Notification.createOnNext("Hello world!")); ColdObservable<String> coldObservable = ColdObservable.create(scheduler, event); // when TestSubscriber<String> subscriber = new TestSubscriber<>(); coldObservable.subscribe(subscriber); // then scheduler.advanceTimeBy(9, TimeUnit.MILLISECONDS); assertThat(subscriber.getOnNextEvents()).isEmpty(); scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); assertThat(subscriber.getOnNextEvents()).containsExactly("Hello world!"); }
@Test public void should_not_send_notification_after_unsubscribe() { // given TestScheduler scheduler = new TestScheduler(); long offset = 10; Recorded<String> event = new Recorded<>(offset, Notification.createOnNext("Hello world!")); ColdObservable<String> coldObservable = ColdObservable.create(scheduler, event); TestSubscriber<String> subscriber = new TestSubscriber<>(); final Subscription subscription = coldObservable.subscribe(subscriber); // when scheduler.createWorker().schedule(new Action0() { @Override public void call() { subscription.unsubscribe(); } }, 5, TimeUnit.MILLISECONDS); // then scheduler.advanceTimeBy(Long.MAX_VALUE, TimeUnit.MILLISECONDS); assertThat(subscriber.getOnNextEvents()).isEmpty(); }
@Test public void should_be_cold_and_send_notification_at_subscribe_time() { // given TestScheduler scheduler = new TestScheduler(); Recorded<String> event = new Recorded<>(0, Notification.createOnNext("Hello world!")); final ColdObservable<String> coldObservable = ColdObservable.create(scheduler, event); // when final TestSubscriber<String> subscriber = new TestSubscriber<>(); scheduler.createWorker().schedule(new Action0() { @Override public void call() { coldObservable.subscribe(subscriber); } }, 42, TimeUnit.SECONDS); // then scheduler.advanceTimeBy(42, TimeUnit.SECONDS); assertThat(subscriber.getOnNextEvents()).containsExactly("Hello world!"); }
@Test public void should_keep_track_of_subscriptions() { // given TestScheduler scheduler = new TestScheduler(); final ColdObservable<String> coldObservable = ColdObservable.create(scheduler); // when final TestSubscriber<String> subscriber = new TestSubscriber<>(); scheduler.createWorker().schedule(new Action0() { @Override public void call() { coldObservable.subscribe(subscriber); } }, 42, TimeUnit.MILLISECONDS); // then scheduler.advanceTimeBy(42, TimeUnit.MILLISECONDS); assertThat(coldObservable.getSubscriptions()) .containsExactly( new SubscriptionLog(42, Long.MAX_VALUE) ); }
@Test public void should_keep_track_of_unsubscriptions() { // given TestScheduler scheduler = new TestScheduler(); final ColdObservable<String> coldObservable = ColdObservable.create(scheduler); // when final TestSubscriber<String> subscriber = new TestSubscriber<>(); final Subscription subscription = coldObservable.subscribe(subscriber); scheduler.createWorker().schedule(new Action0() { @Override public void call() { subscription.unsubscribe(); } }, 42, TimeUnit.MILLISECONDS); // then scheduler.advanceTimeBy(42, TimeUnit.MILLISECONDS); assertThat(coldObservable.getSubscriptions()) .containsExactly( new SubscriptionLog(0, 42) ); }
@Test public void buffer() { TestScheduler scheduler = new TestScheduler(); PublishSubject<Integer> subject = PublishSubject.create(); RecordingObserver<Integer> o = new RecordingObserver<>(); subject .lift(new OperatorNormalize<Integer>(1, TimeUnit.SECONDS, scheduler)) .subscribe(o); // First emits immediately subject.onNext(0); o.takeNext(); subject.onNext(1); subject.onNext(2); subject.onNext(3); o.assertNoMoreEvents(); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); o.takeNext(); o.assertNoMoreEvents(); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); o.takeNext(); o.assertNoMoreEvents(); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); o.takeNext(); o.assertNoMoreEvents(); subject.onCompleted(); o.assertOnCompleted(); }
/** * Test 21 */ @Test @Ignore public void Should_generate_only_one_subscription_side_effect_with_multiple_subscribers() { // given Observable<String> source = Observable.create(subscriber -> { subscriber.onNext("open"); }); TestScheduler scheduler = Schedulers.test(); EventStreamClient targetClient = mock(EventStreamClient.class); when(targetClient.readServerSideEvents()).thenReturn(source); MulticastEventStreamClient multicastEventStreamClient = new MulticastEventStreamClient(targetClient, scheduler); Observable<String> events = multicastEventStreamClient.readServerSideEvents(); // when TestSubscriber<String> subscriber1 = new TestSubscriber<>(); TestSubscriber<String> subscriber2 = new TestSubscriber<>(); events.subscribe(subscriber1); events.subscribe(subscriber2); // then assertThat(subscriber1.getOnNextEvents()).hasSize(1); assertThat(subscriber2.getOnNextEvents()).isEmpty(); }
@Before public void setUp() { fetchFails = new AtomicBoolean(false); fetchError = new RuntimeException("Fail!"); scheduler = new TestScheduler(); producer = new Func0<Single<Long>>() { @Override public Single<Long> call() { if (fetchFails.get()) { return Single.error(fetchError); } else { return Single.just(scheduler.now()); } } }; cache = new RxCache<>(EXPIRY, scheduler, Single.defer( new Func0<Single<Long>>() { @Override public Single<Long> call() { return producer.call(); } })); subscriber = new TestSubscriber<>(); }
@Test public void testNoResults() { final TestScheduler scheduler = new TestScheduler(); final List<Project> projects = Arrays.asList( ); final MockApiClient apiClient = new MockApiClient() { @Override public @NonNull Observable<DiscoverEnvelope> fetchProjects(final @NonNull DiscoveryParams params) { return Observable.just(DiscoverEnvelopeFactory.discoverEnvelope(projects)); } }; final Environment env = environment().toBuilder() .scheduler(scheduler) .apiClient(apiClient) .build(); setUpEnvironment(env); // populate search and overcome debounce this.vm.inputs.search("__"); scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS); this.searchProjects.assertValueCount(2); }
@Test public void testWithScheduler() throws Exception { TestScheduler scheduler = new TestScheduler(); RxTestWrapper<Long> assertion = RxTestWrapper.assertThat(Observable.interval(1, TimeUnit.MILLISECONDS, scheduler).take(3)); assertion .notCompleted() .hasNoValues() .hasNoErrors(); scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); assertion.hasValueCount(1); scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); assertion.hasValueCount(2); scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); assertion.hasValueCount(3) .completed(); }
@Override public Subscription invoke(DeliveryMethod method, TestSubscriber<Notification<Long>> testSubscriber, final TestScheduler scheduler, RestartableSet set) { return set .channel(RESTARTABLE_ID, method, new ObservableFactory<String, Long>() { @Override public Observable<Long> call(final String a) { return interval(1, 1, TimeUnit.SECONDS, scheduler).map(new Func1<Long, Long>() { @Override public Long call(Long aLong) { return aLong + Long.parseLong(a); } }); } }) .subscribe(testSubscriber); }
@Test public void testStartWithWithScheduler() { TestScheduler scheduler = new TestScheduler(); Observable<Integer> observable = Observable.just(3, 4).startWith(Arrays.asList(1, 2)).subscribeOn(scheduler); @SuppressWarnings("unchecked") Observer<Integer> observer = mock(Observer.class); observable.subscribe(observer); scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); InOrder inOrder = inOrder(observer); inOrder.verify(observer, times(1)).onNext(1); inOrder.verify(observer, times(1)).onNext(2); inOrder.verify(observer, times(1)).onNext(3); inOrder.verify(observer, times(1)).onNext(4); inOrder.verify(observer, times(1)).onComplete(); inOrder.verifyNoMoreInteractions(); }
@Test public void testRangeWithScheduler() { TestScheduler scheduler = new TestScheduler(); Observable<Integer> observable = Observable.range(3, 4, scheduler); @SuppressWarnings("unchecked") Observer<Integer> observer = mock(Observer.class); observable.subscribe(observer); scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); InOrder inOrder = inOrder(observer); inOrder.verify(observer, times(1)).onNext(3); inOrder.verify(observer, times(1)).onNext(4); inOrder.verify(observer, times(1)).onNext(5); inOrder.verify(observer, times(1)).onNext(6); inOrder.verify(observer, times(1)).onComplete(); inOrder.verifyNoMoreInteractions(); }
@Test(timeout = 1000) public void testSimple() { TestScheduler scheduler = new TestScheduler(); BlockingObservable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10).toBlocking(); Iterable<Long> iter = source.latest(); Iterator<Long> it = iter.iterator(); // only 9 because take(10) will immediately call onComplete() when receiving the 10th item // which onComplete() will overwrite the previous value for (int i = 0; i < 9; i++) { scheduler.advanceTimeBy(1, TimeUnit.SECONDS); Assert.assertEquals(true, it.hasNext()); Assert.assertEquals(Long.valueOf(i), it.next()); } scheduler.advanceTimeBy(1, TimeUnit.SECONDS); Assert.assertEquals(false, it.hasNext()); }
@Test(timeout = 1000) public void testSameSourceMultipleIterators() { TestScheduler scheduler = new TestScheduler(); BlockingObservable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10).toBlocking(); Iterable<Long> iter = source.latest(); for (int j = 0; j < 3; j++) { Iterator<Long> it = iter.iterator(); // only 9 because take(10) will immediately call onComplete() when receiving the 10th item // which onComplete() will overwrite the previous value for (int i = 0; i < 9; i++) { scheduler.advanceTimeBy(1, TimeUnit.SECONDS); Assert.assertEquals(true, it.hasNext()); Assert.assertEquals(Long.valueOf(i), it.next()); } scheduler.advanceTimeBy(1, TimeUnit.SECONDS); Assert.assertEquals(false, it.hasNext()); } }
@Test(timeout = 1000, expected = NoSuchElementException.class) public void testSimpleJustNext() { TestScheduler scheduler = new TestScheduler(); BlockingObservable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10).toBlocking(); Iterable<Long> iter = source.latest(); Iterator<Long> it = iter.iterator(); // only 9 because take(10) will immediately call onComplete() when receiving the 10th item // which onComplete() will overwrite the previous value for (int i = 0; i < 10; i++) { scheduler.advanceTimeBy(1, TimeUnit.SECONDS); Assert.assertEquals(Long.valueOf(i), it.next()); } }
@Before @SuppressWarnings("unchecked") public void before() { scheduler = new TestScheduler(); innerScheduler = scheduler.createWorker(); observer = mock(Observer.class); }
@Test public void testSkipLastTimed() { TestScheduler scheduler = new TestScheduler(); PublishSubject<Integer> source = PublishSubject.create(); Observable<Integer> result = source.skipLast(1, TimeUnit.SECONDS, scheduler); @SuppressWarnings("unchecked") Observer<Object> o = mock(Observer.class); result.subscribe(o); source.onNext(1); source.onNext(2); source.onNext(3); scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS); source.onNext(4); source.onNext(5); source.onNext(6); scheduler.advanceTimeBy(950, TimeUnit.MILLISECONDS); source.onComplete(); InOrder inOrder = inOrder(o); inOrder.verify(o).onNext(1); inOrder.verify(o).onNext(2); inOrder.verify(o).onNext(3); inOrder.verify(o, never()).onNext(4); inOrder.verify(o, never()).onNext(5); inOrder.verify(o, never()).onNext(6); inOrder.verify(o).onComplete(); inOrder.verifyNoMoreInteractions(); verify(o, never()).onError(any(Throwable.class)); }
@Test public void testSkipLastTimedWhenAllElementsAreValid() { TestScheduler scheduler = new TestScheduler(); PublishSubject<Integer> source = PublishSubject.create(); Observable<Integer> result = source.skipLast(1, TimeUnit.MILLISECONDS, scheduler); @SuppressWarnings("unchecked") Observer<Object> o = mock(Observer.class); result.subscribe(o); source.onNext(1); source.onNext(2); source.onNext(3); scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS); source.onComplete(); InOrder inOrder = inOrder(o); inOrder.verify(o).onNext(1); inOrder.verify(o).onNext(2); inOrder.verify(o).onNext(3); inOrder.verify(o).onComplete(); inOrder.verifyNoMoreInteractions(); }
@Test public void testTakeTimed() { TestScheduler scheduler = new TestScheduler(); PublishSubject<Integer> source = PublishSubject.create(); Observable<Integer> result = source.take(1, TimeUnit.SECONDS, scheduler); @SuppressWarnings("unchecked") Observer<Object> o = mock(Observer.class); result.subscribe(o); source.onNext(1); source.onNext(2); source.onNext(3); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); source.onNext(4); InOrder inOrder = inOrder(o); inOrder.verify(o).onNext(1); inOrder.verify(o).onNext(2); inOrder.verify(o).onNext(3); inOrder.verify(o).onComplete(); inOrder.verifyNoMoreInteractions(); verify(o, never()).onNext(4); verify(o, never()).onError(any(Throwable.class)); }