private Observable<List<Song>> getAutoPlaylistSongs(AutoPlaylist playlist) { BehaviorSubject<List<Song>> subject; if (mPlaylistContents.containsKey(playlist)) { subject = mPlaylistContents.get(playlist); } else { subject = BehaviorSubject.create(); mPlaylistContents.put(playlist, subject); playlist.generatePlaylist(mMusicStore, this, mPlayCountStore) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subject::onNext, subject::onError); subject.observeOn(Schedulers.io()) .subscribe(contents -> { MediaStoreUtil.editPlaylist(mContext, playlist, contents); }, throwable -> { Timber.e(throwable, "Failed to save playlist contents"); }); } return subject.asObservable(); }
public static void behaviourSubject() { BehaviorSubject<Integer> subject = BehaviorSubject.create(); subject.onNext(5); Action1<Integer> action1 = integer -> Log.i("From action1", String.valueOf(integer)); Subscription subscription1 = subject.subscribe(action1); subject.onNext(10); Action1<Integer> action2 = integer -> Log.i("From action2", String.valueOf(integer)); Subscription subscription2 = subject.subscribe(action2); subject.onNext(20); subscription1.unsubscribe(); subject.onNext(40); subscription2.unsubscribe(); subject.onNext(80); }
@Test public void shouldEmitErrorAfterViewIsAttached() { TestScheduler testScheduler = Schedulers.test(); BehaviorSubject<Boolean> view = BehaviorSubject.create(); view.onNext(true); WaitViewReplayTransformer<Object> transformer = new WaitViewReplayTransformer<>( view.delay(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler)); TestSubscriber<Object> testSubscriber = new TestSubscriber<>(); Observable.error(new RuntimeException()) .compose(transformer) .subscribe(testSubscriber); testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS); testSubscriber.awaitTerminalEvent(); testSubscriber.assertError(RuntimeException.class); }
@Test public void shouldEmitValueAfterViewIsAttached() { TestScheduler testScheduler = Schedulers.test(); BehaviorSubject<Boolean> view = BehaviorSubject.create(); view.onNext(true); WaitViewLatestTransformer<Integer> transformer = new WaitViewLatestTransformer<>(view.delay(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler)); TestSubscriber<Integer> testSubscriber = new TestSubscriber<>(); Observable.just(0) .compose(transformer) .subscribe(testSubscriber); testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS); testSubscriber.awaitTerminalEvent(); testSubscriber.assertValue(0); testSubscriber.assertCompleted(); }
public void testExceptionSubjectAction() { // test the behavior of surfacing exceptions from a subject BehaviorSubject<Integer> subject = BehaviorSubject.create(); final List<Notification<Integer>> notifications = new ArrayList<Notification<Integer>>(4); Subscription s = subject.subscribe(new Action1<Integer>() { @Override public void call(Integer t) { notifications.add(Notification.createOnNext(t)); throw new RuntimeException("call " + t); } }); try { subject.onNext(0); // (unreachable) expect an exception to be thrown fail(); } catch (RuntimeException e) { assertEquals("call 0", e.getMessage()); } }
private BehaviorSubject<ParseChange<P>> getUpdates(I unit) { final FileObject resource = unit.source(); final FileName name = resource.getName(); // THREADING: it is possible that two different threads asking for a subject may do the parsing twice here, as // this is not an atomic operation. However, the chance is very low and it does not break anything (only // duplicates some work), so it is acceptable. BehaviorSubject<ParseChange<P>> updates = updatesPerResource.get(name); if(updates == null) { updates = BehaviorSubject.create(); updatesPerResource.put(name, updates); try { logger.trace("Parsing for {}", resource); final P result = syntaxService.parse(unit); updates.onNext(ParseChange.update(result)); } catch(ParseException e) { final String message = String.format("Parsing for %s failed", name); logger.error(message, e); updates.onNext(ParseChange.<P>error(e)); } } return updates; }
@Test public void testPagingCapabilities() { PublishSubject<Object> view = PublishSubject.create(); BehaviorSubject<Integer> nextPageRequests = BehaviorSubject.create(); final TestObserver<Delivery<Object, String>> testObserver = new TestObserver<>(); nextPageRequests .concatMap(new Func1<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer targetPage) { return targetPage <= requestedPageCount ? Observable.<Integer>never() : Observable.range(requestedPageCount, targetPage - requestedPageCount); } }) .doOnNext(new Action1<Integer>() { @Override public void call(Integer it) { requestedPageCount = it + 1; } }) .startWith(Observable.range(0, requestedPageCount)) .concatMap(new Func1<Integer, Observable<String>>() { @Override public Observable<String> call(final Integer page) { return requestPage(page, PAGE_SIZE); } }) .compose(new DeliverReplay<Object, String>(view)) .subscribe(testObserver); ArrayList<Delivery<Object, String>> onNext = new ArrayList<>(); testObserver.assertReceivedOnNext(onNext); view.onNext(999); addOnNext(onNext, 999, 0, 1, 2); testObserver.assertReceivedOnNext(onNext); nextPageRequests.onNext(2); addOnNext(onNext, 999, 3, 4, 5); testObserver.assertReceivedOnNext(onNext); view.onNext(null); assertEquals(0, testObserver.getOnCompletedEvents().size()); testObserver.assertReceivedOnNext(onNext); nextPageRequests.onNext(3); assertEquals(0, testObserver.getOnCompletedEvents().size()); testObserver.assertReceivedOnNext(onNext); view.onNext(9999); addOnNext(onNext, 9999, 0, 1, 2, 3, 4, 5, 6, 7, 8); assertEquals(0, testObserver.getOnCompletedEvents().size()); testObserver.assertReceivedOnNext(onNext); }
@Override protected void onStartLoading() { Log.d(getClass().getSimpleName(), "onStartLoading"); super.onStartLoading(); if (subject == null) { subject = BehaviorSubject.create(); } subscription = subject.observeOn(AndroidSchedulers.mainThread()) .subscribe(this::onResult, this::onError); if (subjectSubscription == null) { subjectSubscription = searchQuerySubject.flatMap(this::create) .subscribeOn(Schedulers.io()).subscribe(subject); } }
@Inject @SuppressWarnings("checkstyle:parameternumber") public AnalyseAppLibraryPermission(DecompileApp decompileApp, AnalyseLibraryMethodInvocations analyseLibraryMethodInvocations, AnalysePermissionsFromMethodInvocations analysePermissionsFromMethodInvocations, LibraryService libraryService, PermissionService permissionService, StorageProvider storageProvider, FileUtils fileUtils, AppService appService) { this.decompileApp = decompileApp; this.analyseLibraryMethodInvocations = analyseLibraryMethodInvocations; this.analysePermissionsFromMethodInvocations = analysePermissionsFromMethodInvocations; this.libraryService = libraryService; this.permissionService = permissionService; this.storageProvider = storageProvider; this.fileUtils = fileUtils; this.appService = appService; this.progressSubject = BehaviorSubject.create(); }
@Override protected final void initObservable() { subject = BehaviorSubject.create(); subject.onBackpressureBuffer() .observeOn(EventThread.getScheduler(observeThread)) .subscribeOn(EventThread.getScheduler(subscribeThread)) .subscribe(event -> { try { if (valid) { handleEvent(event); } } catch (InvocationTargetException e) { throwRuntimeException("Could not dispatch event: " + event.getClass() + " to subscriber " + SubscriberBehaviorEvent.this, e); } }); }
protected Observable<ParseObject<T>> findAll(ParseQuery.Builder<DefaultParseColumn> queryBuilder) { ParseRestApi api = ParseRestClientFactory.masterClient(); BehaviorSubject<Date> relay = BehaviorSubject.create(); relay.onNext(new Date()); return relay.flatMap(date -> { ParseQuery<DefaultParseColumn> queryOlders = queryBuilder.olderThan(DefaultParseColumn.createdAt, date).build(); Observable<Response<QueryResults>> response = api.query(className, queryOlders.params); return response; }).flatMap((r) -> flatMapResponse(r, relay)); }
protected Observable<ParseObject<T>> flatMapResponse(Response<QueryResults> r, @Nullable BehaviorSubject<Date> relay) { if (!r.isSuccessful()) { if (relay!= null) { relay.onCompleted();} throw errorFrom(r); } List<ParseObject<T>> objects = new ArrayList<>(); List<ParseMap> found = r.body().results; if (found == null) { found = Collections.emptyList(); } for (ParseMap parseMap : found) { ParseObject<T> o = new ParseObject<T>(parseMap); objects.add(o); } if (relay != null) { if (found.size() == QUERY_ALL_LIMIT) { ParseObject<DefaultParseColumn> object = new ParseObject<>(found.get(QUERY_ALL_LIMIT - 1)); Date createdAt = object.createdAt(); relay.onNext(createdAt); } else { relay.onCompleted(); } } return Observable.from(objects); }
void behaviourSubject(){ Action1<Integer> onNextFuc= new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "New Event received on behaviourSubject: " + integer); } }; BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(); behaviorSubject.onNext(1); behaviorSubject.onNext(2); Subscription subscription = behaviorSubject.doOnSubscribe(new Action0() { @Override public void call() { Log.i(TAG, "Observer subscribed to behaviorSubject"); } }).doOnUnsubscribe(new Action0() { @Override public void call() { Log.i(TAG, "Observer subscribed to behaviorSubject"); } }).subscribe(onNextFuc); behaviorSubject.onNext(3); behaviorSubject.onNext(4); subscription.unsubscribe(); behaviorSubject.onNext(5); behaviorSubject.onCompleted(); }
@Nonnull public static <T> Observable.Transformer<? super T, ? extends T> behaviorRefCount() { return new Observable.Transformer<T, T>() { @Override public Observable<T> call(Observable<T> tObservable) { return new OperatorMulticast<>(tObservable, new Func0<Subject<? super T, ? extends T>>() { @Override public Subject<? super T, ? extends T> call() { return BehaviorSubject.<T>create(); } }).refCount(); } }; }
private void updateMarketTypes() { BehaviorSubject<Map.Entry<Integer, Integer>> subject = BehaviorSubject.create(); CompositeSubscription subscriptions = new CompositeSubscription(); subscriptions.add(subject .observeOn(AndroidSchedulers.mainThread()) .doOnNext(progress -> { onProgressUpdate(progress.getKey(), progress.getValue(), "Storing items list..."); if (progress.getKey().equals(progress.getValue())) { subscriptions.unsubscribe(); decrementUpdatingCount(); updateFinished(); } }).subscribe()); Observable.defer(() -> Observable.just(getAllMarketTypes())) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(orders -> MarketTypeEntry.addNewMarketTypes(orders, subject)) .subscribe(); }
public static void addNewMarketTypes(List<CrestMarketType> types, BehaviorSubject<Map.Entry<Integer, Integer>> subject) { int size = types.size(); List<MarketTypeEntry> entries = new ArrayList<>(size); for (int i = 0; i < size; i++) { CrestMarketType type = types.get(i); MarketTypeEntry entry = new MarketTypeEntry(); entry.id = type.getTypeId(); entry.groupId = type.getGroupId(); entry.href = type.getTypeHref(); entry.icon = type.getTypeIcon(); entry.name = type.getTypeName(); entries.add(entry); } TransactionManager manager = TransactionManager.getInstance(); ProcessModelTransaction transaction = new SaveModelTransaction<>(ProcessModelInfo.withModels(entries)); transaction.setChangeListener((current, maxProgress, modifiedModel) -> { if (current % 25 == 0 || current == maxProgress) { subject.onNext(new AbstractMap.SimpleEntry<>((int) current, (int) maxProgress)); } }); manager.addTransaction(transaction); }
private void registerFacebookCallback() { final PublishSubject<String> fbAccessToken = this.facebookAccessToken; final BehaviorSubject<FacebookException> fbAuthError = this.facebookAuthorizationError; this.callbackManager = CallbackManager.Factory.create(); LoginManager.getInstance().registerCallback(this.callbackManager, new FacebookCallback<LoginResult>() { @Override public void onSuccess(final @NonNull LoginResult result) { fbAccessToken.onNext(result.getAccessToken().getToken()); } @Override public void onCancel() { // continue } @Override public void onError(final @NonNull FacebookException error) { if (error instanceof FacebookAuthorizationException) { fbAuthError.onNext(error); } } }); }
@Test public void shouldSucceedHasValuesAndClear() throws Exception { BehaviorSubject<String> subject = BehaviorSubject.create("first item"); RxTestWrapper<String> assertion = assertThat(subject) .hasValuesAndClear("first item"); subject.onNext("second value"); assertion.hasValueCount(1); }
public ComposeMonitor(DockerService dockerService, String fileName) { this.monitor = BehaviorSubject.create(); this.dockerService = dockerService; this.fileName = fileName; this.observer = Observers.create(t -> { List<String> containerIds = getContainerIds(); containerIds.forEach(s -> { ContainerDetails details = dockerService.getContainer(s); log.debug("get container {}", details); if (checkContainer(details)) { log.error("Container crashed {}", details); monitor.onNext(ResultCode.ERROR); monitor.onCompleted(); return; } }); }); }
public TestDataStore() { this.data = new TestData(); this.nonNullName = BehaviorSubject.create(); this.name = BehaviorSubject.create(); this.passName = BehaviorSubject.create(); this.defaultName = BehaviorSubject.create(this.data.getDefaultName()); this.messages = BehaviorSubject.create(); this.map = BehaviorSubject.create(this.data.getMap()); this.i = BehaviorSubject.create(); this.l = BehaviorSubject.create(); this.f = BehaviorSubject.create(); this.d = BehaviorSubject.create(); this.b = BehaviorSubject.create(); this.s = BehaviorSubject.create(); this.bb = BehaviorSubject.create(); this.c = BehaviorSubject.create(); this.ia = BehaviorSubject.create(); this.la = BehaviorSubject.create(); this.fa = BehaviorSubject.create(); }
public Observable submit(UUID id, CarbonMessage cMsg, CarbonCallback cCallback) { RxContext rxContext = new RxContext(id.toString(), cMsg, cCallback); BehaviorSubject<RxContext> behaviorSubject = BehaviorSubject.create(); Future f = WORKER_EXECUTOR_SERVICE.submit(() -> { try { log.debug("Thread using carbon message with UUID " + id); mediators.getFirstMediator().receive(cMsg, cCallback); log.debug("mediator receive returned"); behaviorSubject.onNext(rxContext); } catch (Exception e) { log.error("Error while mediating", e); //behaviorSubject.onNext(rxContext); } }); f.isDone(); return behaviorSubject; }
@Inject ConstitutionServiceImpl(final Gson gson, @ConstitutionPref final StringPreference cachedConstitution) { // initialize constitution from a cached value final String json = cachedConstitution.get(); LOGD(TAG, json); UsConstitution cached = gson.fromJson(json, UsConstitution.class); constitutionSubject = BehaviorSubject.create(cached); constitutionSubject.skip(1) .subscribe(new Action1<Constitution>() { @Override public void call(Constitution constitution) { //save all subsequent updates to the constitution final String newConstitution = gson.toJson(constitution); cachedConstitution.set(newConstitution); } }); }
@Test public void testObserveChildren() throws Exception { BehaviorSubject<FirebaseChildEvent<DataSnapshot>> events = BehaviorSubject.create(); observeChildren(reference) .subscribe(events); await(setValue(reference.child("foo"), "bar")); FirebaseChildEvent<DataSnapshot> add = events.getValue(); assertThat(add.eventType, is(FirebaseChildEvent.TYPE_ADD)); assertThat(add.value.getValue(String.class), is("bar")); assertThat(add.value.getKey(), is("foo")); await(setValue(reference.child("foo"), "baz")); FirebaseChildEvent<DataSnapshot> edit = events.getValue(); assertThat(edit.eventType, is(FirebaseChildEvent.TYPE_CHANGE)); assertThat(edit.value.getValue(String.class), is("baz")); await(setValue(reference.child("foo"), null)); FirebaseChildEvent<DataSnapshot> remove = events.getValue(); assertThat(remove.eventType, is(FirebaseChildEvent.TYPE_REMOVE)); }
public FrpBased(Composite parent, int initialValue) { super(parent, initialValue); value = BehaviorSubject.create(initialValue); RxBox<String> rwText = SwtRx.textImmediate(inputField); Rx.subscribe(rwText, text -> { try { int parsed = Integer.parseInt(text); value.onNext(parsed); } catch (Exception error) { outputField.setText(msgForError(error)); } }); scale.addListener(SWT.Selection, e -> { value.onNext(scale.getSelection()); }); Rx.subscribe(value.map(Object::toString), rwText::set); Rx.subscribe(value.map(this::msgForValue), outputField::setText); Rx.subscribe(value, scale::setSelection); }
public Subscription subscribeData(@NonNull Observer<List<Image>> observer){ if(null == mCache){ mCache = BehaviorSubject.create(); Observable.create(new Observable.OnSubscribe<List<Image>>(){ @Override public void call(Subscriber<? super List<Image>> subscriber) { List<Image> images = DataBase.instance().readImagesInfos(); if(null == images){ setDataSource(DATA_SOURCE_NETWORK); loadDataFromNetwork(); }else{ setDataSource(DATA_SOURCE_DISK); subscriber.onNext(images); } } }) .subscribeOn(Schedulers.io()) .subscribe(mCache); }else{ setDataSource(DATA_SOURCE_MEMORY); } return mCache.observeOn(AndroidSchedulers.mainThread()).subscribe(observer); }
public Observable<String> updateRepo(String userName) { BehaviorSubject<String> requestSubject = BehaviorSubject.create(); Observable<List<Repo>> observable = mClient.getRepos(userName); observable.subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .subscribe(l -> { mDatabase.insertRepoList(l); requestSubject.onNext(userName);}, e -> requestSubject.onError(e), () -> requestSubject.onCompleted()); return requestSubject.asObservable(); }
@Test public void awaitEventWorks_onNext() throws Exception { final TestSubscriber<String> testSubscriber = new TestSubscriber<>(); final AwaitableEventSubscriberDecorator<String> sub = new AwaitableEventSubscriberDecorator<>(testSubscriber); final BehaviorSubject<String> subject = BehaviorSubject.create(); final Subscription subscription = subject.subscribe(sub); async.run(() -> subject.onNext("hello")); sub.awaitEvent(); testSubscriber.assertValue("hello"); testSubscriber.assertNoTerminalEvent(); subscription.unsubscribe(); }
@Test public void awaitEventWorks_onError() throws Exception { final TestSubscriber<String> testSubscriber = new TestSubscriber<>(); final AwaitableEventSubscriberDecorator<String> sub = new AwaitableEventSubscriberDecorator<>(testSubscriber); final BehaviorSubject<String> subject = BehaviorSubject.create(); final Subscription subscription = subject.subscribe(sub); final RuntimeException e = new RuntimeException("doesn't matter"); async.run(() -> subject.onError(e)); sub.awaitEvent(); testSubscriber.assertNoValues(); testSubscriber.assertError(e); subscription.unsubscribe(); }
@Test public void awaitEventWorks_onCompleted() throws Exception { final TestSubscriber<String> testSubscriber = new TestSubscriber<>(); final AwaitableEventSubscriberDecorator<String> sub = new AwaitableEventSubscriberDecorator<>(testSubscriber); final BehaviorSubject<String> subject = BehaviorSubject.create(); final Subscription subscription = subject.subscribe(sub); async.run(subject::onCompleted); sub.awaitEvent(); testSubscriber.assertNoValues(); testSubscriber.assertCompleted(); subscription.unsubscribe(); }
@Test public void awaitEventWorks() throws Exception { final TestSubscriber<String> testSubscriber = new TestSubscriber<>(); final AwaitableEventSubscriberDecorator<String> sub = new AwaitableEventSubscriberDecorator<>(testSubscriber); final BehaviorSubject<String> subject = BehaviorSubject.create(); final Subscription subscription = subject.subscribe(sub); async.run(() -> { subject.onNext("hello"); subject.onNext("world"); subject.onNext("!"); subject.onCompleted(); }); sub.awaitEvent(Integer.MAX_VALUE); testSubscriber.assertValues("hello", "world", "!"); testSubscriber.assertCompleted(); testSubscriber.assertNoErrors(); subscription.unsubscribe(); }
@Override public Observable<Boolean> refresh() { mSongLoadingState.onNext(true); mArtistLoadingState.onNext(true); mAlbumLoadingState.onNext(true); mGenreLoadingState.onNext(true); BehaviorSubject<Boolean> result = BehaviorSubject.create(); MediaStoreUtil.promptPermission(mContext) .observeOn(Schedulers.io()) .map(granted -> { if (granted) { if (mSongs != null) { mSongs.onNext(getAllSongs()); } if (mArtists != null) { mArtists.onNext(getAllArtists()); } if (mAlbums != null) { mAlbums.onNext(getAllAlbums()); } if (mGenres != null) { mGenres.onNext(getAllGenres()); } } mSongLoadingState.onNext(false); mArtistLoadingState.onNext(false); mAlbumLoadingState.onNext(false); mGenreLoadingState.onNext(false); return granted; }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(result); return result.asObservable(); }
@Override public Observable<List<Song>> getSongs() { if (mSongs == null) { mSongs = BehaviorSubject.create(); mSongLoadingState.onNext(true); MediaStoreUtil.getPermission(mContext) .observeOn(Schedulers.io()) .subscribe(granted -> { if (granted) { mSongs.onNext(getAllSongs()); } else { mSongs.onNext(Collections.emptyList()); } mSongLoadingState.onNext(false); }, throwable -> { Timber.e(throwable, "Failed to query MediaStore for songs"); }); } return mSongs.asObservable().observeOn(AndroidSchedulers.mainThread()); }
@Override public Observable<List<Genre>> getGenres() { if (mGenres == null) { mGenres = BehaviorSubject.create(); mGenreLoadingState.onNext(true); MediaStoreUtil.getPermission(mContext) .observeOn(Schedulers.io()) .subscribe(granted -> { if (granted) { mGenres.onNext(getAllGenres()); } else { mGenres.onNext(Collections.emptyList()); } mGenreLoadingState.onNext(false); }, throwable -> { Timber.e(throwable, "Failed to query MediaStore for genres"); }); } return mGenres.asObservable().observeOn(AndroidSchedulers.mainThread()); }
@TargetApi(Build.VERSION_CODES.M) public static Observable<Boolean> promptPermission(Context context) { if (sPermissionObservable == null) { sPermissionObservable = BehaviorSubject.create(); } if (hasPermission(context)) { if (!sPermissionObservable.hasValue() || !sPermissionObservable.getValue()) { sPermissionObservable.onNext(true); } return getPermissionObservable(); } RxPermissions.getInstance(context).request(READ_EXTERNAL_STORAGE, WRITE_EXTERNAL_STORAGE) .subscribe(sPermissionObservable::onNext, throwable -> { Timber.i(throwable, "Failed to get storage permission"); }); return getPermissionObservable(); }
@Override public Observable<List<Song>> getSongs() { if (mSongs == null) { BehaviorSubject<List<Song>> subject = BehaviorSubject.create(); Observable.fromCallable(() -> this.<Song>parseJson(SONGS_FILENAME, Song[].class)) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subject::onNext, subject::onError); mSongs = subject.asObservable(); } return mSongs; }
@Override public Observable<List<Album>> getAlbums() { if (mAlbums == null) { BehaviorSubject<List<Album>> subject = BehaviorSubject.create(); Observable.fromCallable(() -> this.<Album>parseJson(ALBUMS_FILENAME, Album[].class)) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subject::onNext, subject::onError); mAlbums = subject.asObservable(); } return mAlbums; }
@Override public Observable<List<Artist>> getArtists() { if (mArtists == null) { BehaviorSubject<List<Artist>> subject = BehaviorSubject.create(); Observable.fromCallable(() -> this.<Artist>parseJson(ARTISTS_FILENAME, Artist[].class)) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subject::onNext, subject::onError); mArtists = subject.asObservable(); } return mArtists; }
@Override public Observable<List<Genre>> getGenres() { if (mGenres == null) { BehaviorSubject<List<Genre>> subject = BehaviorSubject.create(); Observable.fromCallable(() -> this.<Genre>parseJson(GENRES_FILENAME, Genre[].class)) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subject::onNext, subject::onError); mGenres = subject.asObservable(); } return mGenres; }