@SuppressWarnings("unchecked") @Test public void multipleLeftTerminalRightOtherThread() { TestScheduler otherScheduler = Schedulers.test(); TestSubject<EventB> eventBSubject = TestSubject.create(otherScheduler); Observable<Either<EventA, EventB>> either = RxEither.from(eventASubject, eventBSubject); either.subscribe(subscriber); eventASubject.onNext(eventA); eventBSubject.onNext(eventB); eventBSubject.onCompleted(); eventASubject.onNext(eventA); testScheduler.triggerActions(); subscriber.assertNotCompleted(); otherScheduler.triggerActions(); subscriber.assertNoErrors(); subscriber.assertCompleted(); subscriber.assertUnsubscribed(); subscriber.assertValues(Either.<EventA, EventB>left(eventA), Either.<EventA, EventB>left(eventA), Either.<EventA, EventB>right(eventB)); }
@Test public void onErrorCallsCallback() { TestSubject<String> subject = TestSubject.create(scheduler); Loader.Callbacks<Result<String>> callbacks = mock(Loader.Callbacks.class); Exception error = new Exception(); RxLoader<String> loader = new RxLoader<>(subject); loader.setCallbacks(callbacks); loader.start(); subject.onError(error); scheduler.triggerActions(); verify(callbacks).onLoaderStart(); verify(callbacks).onLoaderResult(Result.<String>error(error)); verify(callbacks).onLoaderComplete(); }
@SmallTest public void testLoaderStartRemoveFragment() throws InterruptedException { TestSubject<String> subject = TestSubject.create(testScheduler); createLoader(getActivity(), subject).start(); getActivity().waitForStarted(); getInstrumentation().runOnMainSync(new Runnable() { @Override public void run() { getActivity().removeFragment(); } }); Thread.sleep(500); // Need to wait for onDestroy() to be called. subject.onNext("test"); subject.onCompleted(); testScheduler.triggerActions(); getInstrumentation().waitForIdleSync(); assertThat(getActivity().<String>getNext()).isNull(); assertThat(getActivity().isCompleted()).isFalse().as("onCompleted() is not called if the activity is destroyed"); }
@SmallTest public void testLoaderStartDetachFragment() throws InterruptedException { TestSubject<String> subject = TestSubject.create(testScheduler); createLoader(getActivity(), subject).start(); getActivity().waitForStarted(); getInstrumentation().runOnMainSync(new Runnable() { @Override public void run() { getActivity().detachFragment(); } }); Thread.sleep(500); // Need to wait for onDestroy() to be called. subject.onNext("test"); subject.onCompleted(); testScheduler.triggerActions(); getInstrumentation().waitForIdleSync(); assertThat(getActivity().<String>getNext()).isNull(); assertThat(getActivity().isCompleted()).isFalse().as("onCompleted() is not called if the fragment is detached"); }
@SmallTest public void testLoaderStartRotation() throws InterruptedException { TestSubject<String> subject = TestSubject.create(testScheduler); createLoader(getActivity(), subject).start(); getActivity().waitForStarted(); getInstrumentation().runOnMainSync(new Runnable() { @Override public void run() { getActivity().recreate(); } }); createLoader(getActivity(), subject); getActivity().waitForStarted(); assertThat(getActivity().isStarted()).isTrue().as("onStarted() called again after a configuration change"); }
@SmallTest public void testLoaderStartRotationNext() throws InterruptedException { TestSubject<String> subject = TestSubject.create(testScheduler); createLoader(getActivity(), subject).start(); getActivity().waitForStarted(); subject.onNext("test"); subject.onCompleted(); testScheduler.triggerActions(); getActivity().waitForNext(); getActivity().waitForCompleted(); getInstrumentation().runOnMainSync(new Runnable() { @Override public void run() { getActivity().recreate(); } }); createLoader(getActivity(), subject); getActivity().waitForNext(); getActivity().waitForCompleted(); assertThat(getActivity().<String>getNext()).isEqualTo("test").as("result is delivered again after a configuration change"); assertThat(getActivity().isCompleted()).isTrue().as("onCompleted() is called again after a configuration change"); }
@SmallTest public void testLoaderStartRotationError() throws InterruptedException { TestSubject<String> subject = TestSubject.create(testScheduler); createLoader(getActivity(), subject).start(); getActivity().waitForStarted(); subject.onError(new Exception("test")); testScheduler.triggerActions(); getActivity().waitForError(); getInstrumentation().runOnMainSync(new Runnable() { @Override public void run() { getActivity().recreate(); } }); createLoader(getActivity(), subject); getActivity().waitForError(); assertThat(getActivity().<String>getError()).hasMessage("test").as("onError() is called again after a configuration change"); }
@SmallTest public void testLoaderStartNextAfterDestroyed() throws InterruptedException { TestSubject<String> subject = TestSubject.create(testScheduler); createLoader(getActivity(), subject).start(); getActivity().waitForStarted(); getInstrumentation().runOnMainSync(new Runnable() { @Override public void run() { getActivity().finish(); } }); Thread.sleep(500); // Need to wait for onDestroy() to be called. subject.onNext("test"); subject.onCompleted(); testScheduler.triggerActions(); getInstrumentation().waitForIdleSync(); assertThat(getActivity().<String>getNext()).isNull(); assertThat(getActivity().isCompleted()).isFalse().as("onCompleted() is not called if the activity is destroyed"); // Needed to recreate the activity since the test runner expects it to exist. getActivity(); }
@SmallTest public void testLoaderStartNextRotationClear() throws InterruptedException { TestSubject<String> subject = TestSubject.create(testScheduler); RxLoader<String> loader = createLoader(getActivity(), subject).start(); getActivity().waitForStarted(); subject.onNext("test"); subject.onCompleted(); testScheduler.triggerActions(); getActivity().waitForNext(); getActivity().waitForCompleted(); loader.clear(); T newActivity = recreateActivity(); createLoader(newActivity, subject); getInstrumentation().waitForIdleSync(); Thread.sleep(500); // Give loader a chance to deliver the result. assertThat(newActivity.<String>getNext()).isNull(); assertThat(newActivity.isCompleted()).isFalse().as("onCompleted() is not called if the result was cleared"); }
@Before public void setUp() throws Exception { testScheduler = new TestScheduler(); statusSubject = TestSubject.create(testScheduler); jobSubject = TestSubject.create(testScheduler); when(uploadInteractor.getAll()).thenReturn(Observable.empty()); uploadManager = new UploadManager(uploadInteractor, uploadErrorAdapter, jobSubject, statusSubject, false); }
@Test public void testDanglingUpload() { final String jobId1 = "job-id-1"; final String jobId2 = "job-id-2"; final Job job1 = Job.builder() .setId(jobId1) .setFilepath("filepath") .setMetadata(Collections.emptyMap()) .setStatus(createQueued(jobId1)) .setMimeType("text/plain") .build(); final Job job2 = Job.builder() .setId(jobId2) .setFilepath("filepath") .setMetadata(Collections.emptyMap()) .setStatus(createSending(jobId2, 0)) .setMimeType("text/plain") .build(); when(uploadInteractor.getAll()) .thenReturn(Observable.from(Arrays.asList(job1, job2))); final TestScheduler testScheduler = new TestScheduler(); final TestSubject<Status> statusSubject = TestSubject.create(testScheduler); final TestSubject<Job> jobSubject = TestSubject.create(testScheduler); new UploadManager(uploadInteractor, uploadErrorAdapter, jobSubject, statusSubject, false); verify(uploadInteractor, times(1)).update(any(Status.class)); final Status expectedStatus = Status.createFailed(job2.id(), ErrorType.TERMINATED); verify(uploadInteractor).update(expectedStatus); }
@Before public void setUpServer() { tradeEventStreamClient = mock(EventStreamClient.class); scheduler = Schedulers.test(); vwapServer = new VwapServer(42, tradeEventStreamClient, scheduler); tradeSourceSubject = TestSubject.create(scheduler); when(tradeEventStreamClient.readServerSideEvents()).thenReturn(tradeSourceSubject); }
@Test public void newlyCreatedIsNotSubscribed() { TestSubject<String> subject = TestSubject.create(scheduler); RxLoader<String> loader = new RxLoader<>(subject); assertFalse(subject.hasObservers()); }
@Test public void startedSubscribes() { TestSubject<String> subject = TestSubject.create(scheduler); RxLoader<String> loader = new RxLoader<>(subject); loader.start(); assertTrue(subject.hasObservers()); }
@Test public void canceledUnsubscribes() { TestSubject<String> subject = TestSubject.create(scheduler); RxLoader<String> loader = new RxLoader<>(subject); loader.start(); loader.cancel(); assertFalse(subject.hasObservers()); }
@Test public void onNextCallsCallbacks() { TestSubject<String> subject = TestSubject.create(scheduler); Loader.Callbacks<String> callbacks = mock(Loader.Callbacks.class); RxLoader<String> loader = new RxLoader<>(subject); loader.setCallbacks(callbacks); loader.start(); subject.onNext("test"); scheduler.triggerActions(); verify(callbacks).onLoaderStart(); verify(callbacks).onLoaderResult("test"); }
@Test public void onErrorCallsCallback() { TestSubject<String> subject = TestSubject.create(scheduler); Loader.Callbacks<String> callbacks = mock(Loader.Callbacks.class); Exception error = new Exception(); RxLoader<String> loader = new RxLoader<>(subject); loader.setCallbacks(callbacks); loader.start(); subject.onError(error); scheduler.triggerActions(); verify(callbacks).onLoaderStart(); verify(callbacks).onLoaderError(error); }
@Test public void onCompleteCallsCallback() { TestSubject<String> subject = TestSubject.create(scheduler); Loader.Callbacks<String> callbacks = mock(Loader.Callbacks.class); RxLoader<String> loader = new RxLoader<>(subject); loader.setCallbacks(callbacks); loader.start(); subject.onCompleted(); scheduler.triggerActions(); verify(callbacks).onLoaderStart(); verify(callbacks).onLoaderSuccess(); }
@Test public void onNextCallsCallbacks() { TestSubject<String> subject = TestSubject.create(scheduler); Loader.Callbacks<Result<String>> callbacks = mock(Loader.Callbacks.class); RxLoader<String> loader = new RxLoader<>(subject); loader.setCallbacks(callbacks); loader.start(); subject.onNext("test"); scheduler.triggerActions(); verify(callbacks).onLoaderStart(); verify(callbacks).onLoaderResult(Result.success("test")); }
@Test public void onCompleteCallsCallback() { TestSubject<String> subject = TestSubject.create(scheduler); Loader.Callbacks<Result<String>> callbacks = mock(Loader.Callbacks.class); RxLoader<String> loader = new RxLoader<>(subject); loader.setCallbacks(callbacks); loader.start(); subject.onCompleted(); scheduler.triggerActions(); verify(callbacks).onLoaderStart(); verify(callbacks).onLoaderComplete(); }
@Test public void cancelShouldUnsubscribe() { TestSubject<String> single = TestSubject.create(new TestScheduler()); assertFalse(single.hasObservers()); T future = toFuture(single.toSingle()); assertTrue(single.hasObservers()); future.cancel(true); assertFalse(single.hasObservers()); }
@SmallTest public void testLoaderStartDetachAndAttachFragment() throws InterruptedException { TestSubject<String> subject = TestSubject.create(testScheduler); createLoader(getActivity(), subject).start(); getActivity().waitForStarted(); getInstrumentation().runOnMainSync(new Runnable() { @Override public void run() { getActivity().detachFragment(); } }); Thread.sleep(500); // Need to wait for onDestroy() to be called. subject.onNext("test"); subject.onCompleted(); testScheduler.triggerActions(); getInstrumentation().waitForIdleSync(); getInstrumentation().runOnMainSync(new Runnable() { @Override public void run() { getActivity().reattchFragment(); } }); getInstrumentation().waitForIdleSync(); createLoader(getActivity(), subject); getActivity().waitForNext(); getActivity().waitForCompleted(); assertThat(getActivity().<String>getNext()).isEqualTo("test").as("result is value delivered from observable"); assertThat(getActivity().isCompleted()).isTrue().as("onCompleted() called when fragment is reattached"); }
@SmallTest public void testMultipleLoaderFragments() throws InterruptedException { final String fragment1 = "fragment1"; final String fragment2 = "fragment2"; TestSubject<String> subject1 = TestSubject.create(testScheduler); TestSubject<String> subject2 = TestSubject.create(testScheduler); getInstrumentation().runOnMainSync(new Runnable() { @Override public void run() { getActivity().addFragment(fragment1); getActivity().addFragment(fragment2); } }); createLoader(subject1, fragment1).start(); createLoader(subject2, fragment2).start(); getActivity().waitForStarted(fragment1); getActivity().waitForStarted(fragment2); subject1.onNext("test1"); subject2.onNext("test2"); subject1.onCompleted(); subject2.onCompleted(); testScheduler.triggerActions(); getActivity().waitForNext(fragment1); getActivity().waitForCompleted(fragment1); getActivity().waitForNext(fragment2); getActivity().waitForCompleted(fragment2); assertThat(getActivity().<String>getNext(fragment1)).isEqualTo("test1").as("result is value delivered from observable"); assertThat(getActivity().isCompleted(fragment1)).isTrue().as("onCompleted() called when observable completed"); assertThat(getActivity().<String>getNext(fragment2)).isEqualTo("test2").as("result is value delivered from observable"); assertThat(getActivity().isCompleted(fragment2)).isTrue().as("onCompleted() called when observable completed"); }
@SmallTest public void testLoaderStart() throws InterruptedException { final TestSubject<String> subject = TestSubject.create(testScheduler); final RxLoader<String> loader = createLoader(getActivity(), subject); assertThat(getActivity().isStarted()).isFalse().as("onStarted() not called until loader is started"); loader.start(); getActivity().waitForStarted(); assertThat(getActivity().isStarted()).isTrue().as("onStarted() called when loader is started"); }
@SmallTest public void testLoaderStartNext() throws InterruptedException { final TestSubject<String> subject = TestSubject.create(testScheduler); createLoader(getActivity(), subject).start(); getActivity().waitForStarted(); subject.onNext("test"); subject.onCompleted(); testScheduler.triggerActions(); getActivity().waitForNext(); getActivity().waitForCompleted(); assertThat(getActivity().<String>getNext()).isEqualTo("test").as("result is value delivered from observable"); assertThat(getActivity().isCompleted()).isTrue().as("onCompleted() called when observable completed"); }
@SmallTest public void testLoaderStartError() throws InterruptedException { TestSubject<String> subject = TestSubject.create(testScheduler); createLoader(getActivity(), subject).start(); getActivity().waitForStarted(); subject.onError(new Exception("test")); subject.onCompleted(); testScheduler.triggerActions(); getActivity().waitForError(); assertThat(getActivity().<String>getError()).hasMessage("test").as("onError() is called when sent by observable"); }