@Override public <R> Observable<retrofit2.adapter.rxjava.Result<R>> adapt(Call<R> call) { Observable<retrofit2.adapter.rxjava.Result<R>> observable = Observable.create(new CallOnSubscribe<>(call)) // .map(new Func1<Response<R>, retrofit2.adapter.rxjava.Result<R>>() { @Override public retrofit2.adapter.rxjava.Result<R> call(Response<R> response) { return retrofit2.adapter.rxjava.Result.response(response); } }).onErrorReturn(new Func1<Throwable, retrofit2.adapter.rxjava.Result<R>>() { @Override public retrofit2.adapter.rxjava.Result<R> call(Throwable throwable) { return Result.error(throwable); } }); if (scheduler != null) { return observable.subscribeOn(scheduler); } return observable; }
@Override public void onCreate(@Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); mRepositories = new ArrayList<>(); timespanSubject = PublishSubject.create(); Injector.obtain(getContext()).inject(this); timespanAdapter = new TrendingTimespanAdapter( new ContextThemeWrapper(getContext(), R.style.Theme_U2020_TrendingTimespan)); repositoryAdapter = new RepositoryAdapter(mRepositories); repositoryAdapter.setRepositoryClickListener(repository -> { startActivity(mIntentFactory.createUrlIntent(repository.html_url)); }); Observable<Result<RepositoriesResponse>> result = timespanSubject // .debounce(300, TimeUnit.MILLISECONDS) // .flatMap(trendingSearch) // .observeOn(mainThread()) // .share(); subscriptions.add(result // .filter(Results.isSuccess()) // .map(SearchResultToRepositoryList.instance()) // .subscribe(repositories -> { if (page == FRIST_PAGE) mRepositories.clear(); mRepositories.addAll(repositories); if (binding.trendingListView.getAdapter() == null) { binding.trendingListView.setAdapter(repositoryAdapter); } repositoryAdapter.notifyDataSetChanged(); })); subscriptions.add(result // .filter(Funcs.not(Results.isSuccess())) // .subscribe(trendingError)); }
@Override public Observable<Result<Gallery>> listGallery(Section section, Sort sort, int page) { // Fetch desired section. List<Image> images = serverDatabase.getImagesForSection(section); if (images == null) { return Observable.just(Result.response(Response.success(BAD_REQUEST))); } // Figure out proper list subset. int pageStart = (page - 1) * PAGE_SIZE; if (pageStart >= images.size() || pageStart < 0) { return Observable.just(Result.response(Response.success(BAD_REQUEST))); } int pageEnd = Math.min(pageStart + PAGE_SIZE, images.size()); // Sort and trim images. SortUtil.sort(images, sort); images = images.subList(pageStart, pageEnd); return Observable.just(Result.response(Response.success(new Gallery(200, true, images)))); }
@Override protected void onAttachedToWindow() { super.onAttachedToWindow(); Observable<Result<RepositoriesResponse>> result = timespanSubject // .flatMap(trendingSearch) // .observeOn(AndroidSchedulers.mainThread()) // .share(); subscriptions.add(result // .filter(Results.isSuccessful()) // .map(SearchResultToRepositoryList.instance()) // .subscribe(trendingAdapter)); subscriptions.add(result // .filter(Funcs.not(Results.isSuccessful())) // .subscribe(trendingError)); // Load the default selection. onRefresh(); }
@Test public void nullResponseThrows() { try { Result.response(null); fail(); } catch (NullPointerException e) { assertThat(e).hasMessage("response == null"); } }
@Test public void nullErrorThrows() { try { Result.error(null); fail(); } catch (NullPointerException e) { assertThat(e).hasMessage("error == null"); } }
public static <T> Func1<Result<T>, Observable<T>> handleResult() { return result -> { if(result.isError()){ return Observable.error(result.error()); } else { try { return Observable.just(result.response().body()); } catch (Throwable t){ Timber.e(t, "Error handling result"); return Observable.error(t); } } }; }
@FormUrlEncoded @POST("oauth2/token") Observable<Result<ApiOAuthToken>> oAuthToken(@Field("client_id") String clientId, @Field("client_secret") String clientSecret, @Field("code") String code, @Field("grant_type") String grantType, @Field("redirect_uri") String redirectUri );
@Override public Observable.Transformer<Result<F>, Response<T>> withMapper(Func1<F, T> mapper, Class<F> itemClass) { return observable -> observable .compose(observableCache.on(itemClass)) .map(result -> { if (!result.isError() && result.response().isSuccessful()) { return Response.success(mapper.call(result.response().body())); } return Response.<T>error(); }) .compose(applyAndroidSchedulers()) .compose(onErrorToErrorResponse()); }
private Observable<Result<T>> cachedItems(Class<T> itemClass) { return defer(() -> { String item = mondoPreferences.getStringPreference(getPreferenceKey(itemClass)); if (StringUtils.isNotEmptyNorNull(item)) { try { T t = gson.fromJson(item, itemClass); return just(response(success(t))); } catch (Exception e) { Logger.error("ObservableCache", "Error reading cached item", e); } } return empty(); }); }
@Test public void usesCache() { observableCache.emitsSuccessfulResultFor(Integer.class, 1); Observable.<Result<Integer>>empty() .compose(call.withMapper(Object::toString, Integer.class)) .subscribe(subscriber); subscriber.assertFinishedWithItems(sameBeanAs(Response.success("1"))); }
@Test public void isExecutedAsynchronously() { Semaphore semaphore = new Semaphore(0); subscribeTo(Observable.<Result<Integer>>create( subscriber -> { semaphore.acquireUninterruptibly(); })); semaphore.release(); }
@NonNull private ReplaySubject<Result<T>> getOrCreateSubjectFor(Class<T> itemClass) { if (subjects.get(itemClass) != null) { return subjects.get(itemClass); } ReplaySubject<Result<T>> subject = ReplaySubject.create(); subjects.put(itemClass, subject); return subject; }
@Override public Observable<Result<ApiOAuthToken>> oAuthToken(@Field("client_id") String clientId, @Field("client_secret") String clientSecret, @Field("code") String code, @Field("grant_type") String grantType, @Field("redirect_uri") String redirectUri) { return oAuthSubjects.get(keyFor(clientId, clientSecret, code, grantType, redirectUri)); }
@Override public Observable<Result<RepositoriesResponse>> call(TrendingTimespan trendingTimespan) { SearchQuery trendingQuery = new SearchQuery.Builder() // .createdSince(trendingTimespan.createdSince()) // .build(); return githubService.repositories(trendingQuery, Sort.STARS, Order.DESC, page) .subscribeOn(Schedulers.io()); }
@Override protected void onCreate(@Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); mBinding = DataBindingUtil.setContentView(this, R.layout.activity_repo_detail); Injector.obtain(this).inject(this); String url = getIntent().getData().toString(); Observable<Result<Repository>> result = mGithubService.repository(url) // .subscribeOn(Schedulers.io()) // .observeOn(mainThread()) // .share(); subscriptions.add( // result.filter(Results.isSuccess()) // .map(Result::response) // .map(Response::body) // .subscribe(repository -> mBinding.setRepo(repository))); subscriptions.add( result.filter(Funcs.not(Results.isSuccess())) // .map(Result::error) // .subscribe(throwable -> // Timber.d(throwable, "repo err")) ); }
@Override public Observable<Result<RepositoriesResponse>> call(TrendingTimespan trendingTimespan) { SearchQuery trendingQuery = new SearchQuery.Builder() // .createdSince(trendingTimespan.createdSince()) // .build(); return githubService.repositories(trendingQuery, Sort.STARS, Order.DESC) .subscribeOn(Schedulers.io()); }
@Override public void call(Result<RepositoriesResponse> result) { if (result.isError()) { Timber.e(result.error(), "Failed to get trending repositories"); } else { Response<RepositoriesResponse> response = result.response(); Timber.e("Failed to get trending repositories. Server returned %d", response.code()); } swipeRefreshView.setRefreshing(false); animatorView.setDisplayedChildId(R.id.trending_error); }
@GET("/jokes/random/{num}") Observable<Result<IcndbResult>> getRandomJokes(@Path("num") int numJokes);
@GET("/v1/gifs/search?api_key=" + PUBLIC_API_KEY) Observable<Result<SearchResult>> search(@Query("q") String query, @Query("limit") int limit, @Query("offset") int offset);
@GET("balance") Observable<Result<ApiBalance>> getBalance();
@GET("transactions?expand[]=merchant") Observable<Result<ApiTransactions>> getTransactions();
@GET("accounts") Observable<Result<ApiAccounts>> getAccounts();
@Override public Observable.Transformer<Result<T>, Result<T>> on(Class<T> itemClass) { return observable -> concat(cachedItems(itemClass), observable) .doOnNext(item -> save(item, itemClass)); }
private void save(Result<T> result, Class<T> itemClass) { if (!result.isError() && result.response().isSuccessful()) { mondoPreferences.putStringPreference(getPreferenceKey(itemClass), gson.toJson(result.response().body())); } }
@Test public void mapsResponsesWithMapper() { subscribeTo(Observable.just(Result.response(success(1)))); subscriber.assertFinishedWithItems(sameBeanAs(Response.success("1"))); }
@Test public void respondsWithResponseErrorForErrors() { subscribeTo(Observable.<Result<Integer>>error(new IOException())); subscriber.assertFinishedWithItems(sameBeanAs(Response.error())); }
@Test public void respondsWithResponseErrorWhenResultHasNon200Response() { subscribeTo(Observable.<Result<Integer>>just(Result.response(retrofit2.Response.error(500, ResponseBody.create(null, "1"))))); subscriber.assertFinishedWithItems(sameBeanAs(Response.error())); }
@Test public void respondsWithResponseErrorWhenResultHasError() { subscribeTo(Observable.<Result<Integer>>just(Result.error(new IOException()))); subscriber.assertFinishedWithItems(sameBeanAs(Response.error())); }
private Subscription subscribeTo(Observable<Result<Integer>> observable) { return observable .compose(call.withMapper(Object::toString, Integer.class)) .subscribe(subscriber); }
private Subscription subscribe(HamcrestTestSubscriber<Result<String>> subscriber, PublishSubject<Result<String>> subject) { return subject.compose(cache.on(String.class)).subscribe(subscriber); }
@Override public Observable.Transformer<Result<T>, Result<T>> on(Class<T> itemClass) { return observable -> observable; }
@Override public Observable.Transformer<Result<F>, Response<T>> withMapper(Func1<F, T> mapper, Class<F> itemClass) { return observable -> observable.map(result -> Response.success(mapper.call(result.response().body()))); }
@Override public Observable.Transformer<Result<T>, Result<T>> on(Class<T> itemClass) { return observable -> getOrCreateSubjectFor(itemClass); }
public void emitsSuccessfulResultFor(Class<T> itemClass, T item) { ReplaySubject<Result<T>> subject = getOrCreateSubjectFor(itemClass); subject.onNext(Result.response(Response.success(item))); subject.onCompleted(); }
@Override public Observable<Result<ApiBalance>> getBalance() { balanceSubject = ReplaySubject.create(); return balanceSubject; }
@Override public Observable<Result<ApiTransactions>> getTransactions() { transactionsSubject = ReplaySubject.create(); return transactionsSubject; }
public Observable<Result<ApiAccounts>> getAccounts() { accountsSubject = ReplaySubject.create(); return accountsSubject; }