Java 类io.reactivex.ObservableTransformer 实例源码

项目:RxPay    文件:RxPayUtils.java   
public static ObservableTransformer<WxPayResult, WxPayResult> checkWechatResult() {
    return new ObservableTransformer<WxPayResult, WxPayResult>() {
        @Override
        public ObservableSource<WxPayResult> apply(Observable<WxPayResult> payResultObservable) {
            return payResultObservable.map(new Function<WxPayResult, WxPayResult>() {
                @Override
                public WxPayResult apply(WxPayResult wxPayResult) {
                    if (!wxPayResult.isSucceed()) {
                        throw new PayFailedException(wxPayResult.getErrInfo());
                    }
                    return wxPayResult;
                }
            });
        }
    };
}
项目:richeditor    文件:RxSchedulers.java   
public static <T> ObservableTransformer<T, T> compose() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(Observable<T> observable) {
            return observable
                    .subscribeOn(Schedulers.io())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(Disposable disposable) throws Exception {
                            if (!Utils.isNetworkAvailable()) {
                                //disposable.dispose();
                                //throw new NetworkNoAvailableException("no network,please check and retry");
                                //Utils.MakeShortToast("no network,please check and retry");
                                //throw new Exception();
                            }
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:cleanarchitecture-unidirectional    文件:EmailLoginPresenter.java   
@Override
protected ObservableTransformer<Action, Result> actionsToResults() {
    return upstream -> {

        final Observable<EmailLoginActions.LoginAction> loginActionObservable = upstream
                .ofType(EmailLoginActions.LoginAction.class);
        return loginActionObservable
                .flatMap(loginAction -> {
                    if (!Patterns.EMAIL_ADDRESS.matcher(loginAction.getEmail()).matches()) {
                        return Observable.just(Result.<Boolean, EmailLoginActions.LoginAction>error(loginAction, new FormValidationException("Must enter a valid email address!")));
                    }
                    if (!Pattern.compile("[0-9a-zA-Z]{6,}").matcher(loginAction.getPassword()).matches()) {
                        return Observable.just(Result.<Boolean, EmailLoginActions.LoginAction>error(loginAction, new FormValidationException("Password must be at least 6 characters long!")));
                    }
                    return useCase.performAction(loginAction)
                            .onErrorReturn(throwable -> Result.error(loginAction, throwable))
                            .startWith(Result.<Boolean, EmailLoginActions.LoginAction>loading(loginAction));
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io());

    };
}
项目:RxLog    文件:RxLog.java   
/**
 * Creates transform operator, which logs defined events in observable's lifecycle
 * @param msg     message
 * @param bitMask bitmask of events which you want to log
 * @param <T>     type
 * @return transformer
 */
public static <T> ObservableTransformer<T, T> logObservable(final String msg, final int bitMask) {
    return upstream -> {
        if ((bitMask & LOG_SUBSCRIBE) > 0) {
            upstream = upstream.compose(oLogSubscribe(msg));
        }
        if ((bitMask & LOG_TERMINATE) > 0) {
            upstream = upstream.compose(oLogTerminate(msg));
        }
        if ((bitMask & LOG_ERROR) > 0) {
            upstream = upstream.compose(oLogError(msg));
        }
        if ((bitMask & LOG_COMPLETE) > 0) {
            upstream = upstream.compose(oLogComplete(msg));
        }
        if ((bitMask & LOG_NEXT_DATA) > 0) {
            upstream = upstream.compose(oLogNext(msg));
        } else if ((bitMask & LOG_NEXT_EVENT) > 0) {
            upstream = upstream.compose(oLogNextEvent(msg));
        }
        if ((bitMask & LOG_DISPOSE) > 0) {
            upstream = upstream.compose(oLogDispose(msg));
        }
        return upstream;
    };
}
项目:NeiHanDuanZiTV    文件:RxUtils.java   
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) {
    return new ObservableTransformer<T, T>() {
        @Override
        public Observable<T> apply(Observable<T> observable) {
            return observable.subscribeOn(Schedulers.io())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(@NonNull Disposable disposable) throws Exception {
                            view.showLoading();//显示进度条
                        }
                    })
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doAfterTerminate(new Action() {
                        @Override
                        public void run() {
                            view.hideLoading();//隐藏进度条
                        }
                    }).compose(RxUtils.bindToLifecycle(view));
        }
    };
}
项目:RetrofitSample    文件:NetUtil.java   
public static <T> ObservableTransformer<BaseResponse<T>, T> handleResult() {
    try {
        return baseResponseObservable -> baseResponseObservable
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap(baseResponse -> {
                    if (baseResponse.getErrorCode() == 0) {
                        if (baseResponse.getData() != null) {
                            return Observable.just(baseResponse.getData());
                        } else {
                            //这种情况是没有data的情况下需要走onComplete来进行处理
                            return Observable.empty();
                        }
                    } else {
                        return Observable.error(new DlException(baseResponse.getErrorCode(), baseResponse.getMsg()));
                    }
                });
    } catch (Exception e) {
        e.printStackTrace();
        return baseResponseObservable -> baseResponseObservable
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap(baseResponse -> Observable.error(new Throwable("服务器错误")));
    }
}
项目:sqlbrite-sqlcipher    文件:BriteDatabaseTest.java   
@Before public void setUp() throws IOException {
  helper = new TestDb(InstrumentationRegistry.getContext(), dbFolder.newFile().getPath());
  real = helper.getWritableDatabase();

  SqlBrite.Logger logger = new SqlBrite.Logger() {
    @Override public void log(String message) {
      logs.add(message);
    }
  };
  ObservableTransformer<Query, Query> queryTransformer =
      new ObservableTransformer<Query, Query>() {
        @Override public ObservableSource<Query> apply(Observable<Query> upstream) {
          return upstream.takeUntil(killSwitch);
        }
      };
  PublishSubject<Set<String>> triggers = PublishSubject.create();
  db = new BriteDatabase(helper, logger, triggers, triggers, scheduler, queryTransformer);
}
项目:MoligyMvpArms    文件:RxUtils.java   
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) {
    return new ObservableTransformer<T, T>() {
        @Override
        public Observable<T> apply(Observable<T> observable) {
            return observable.subscribeOn(Schedulers.io())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(@NonNull Disposable disposable) throws Exception {
                            view.showLoading();//显示进度条
                        }
                    })
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doFinally(new Action() {
                        @Override
                        public void run() {
                            view.hideLoading();//隐藏进度条
                        }
                    }).compose(RxLifecycleUtils.bindToLifecycle(view));
        }
    };
}
项目:RxGps    文件:StatusExceptionResumeNextTransformer.java   
public static <R extends Result> ObservableTransformer<R, R> forObservable() {
    return upstream -> upstream.onErrorResumeNext(throwable -> {
        if(throwable instanceof StatusException) {
            StatusException statusException = (StatusException) throwable;

            if(statusException.getStatus().hasResolution()) {
                return Observable.just((R) statusException.getResult());
            } else {
                return Observable.error(throwable);
            }

        } else {
            return Observable.error(throwable);
        }
    });
}
项目:cleanarchitecture-unidirectional    文件:TimezonePresenter.java   
@Override
protected ObservableTransformer<Action, Result> actionsToResults() {
    return upstream -> Observable.merge(
            upstream.ofType(TimezonesUiActions.RefreshAction.class)
                    .flatMap(refreshAction ->
                            listUseCase.performAction(refreshAction)
                                    .onErrorReturn(t -> Result.error(refreshAction, t))
                                    .startWith(Result.loading(refreshAction))),
            upstream.ofType(TimezonesUiActions.LoadMoreAction.class)
                    .flatMap(loadMoreAction ->
                            listUseCase.performAction(loadMoreAction)
                                    .onErrorReturn(t -> Result.error(loadMoreAction, t))
                                    .startWith(Result.loading(loadMoreAction))),
            upstream.ofType(DeleteTimezoneAction.class)
                    .flatMap(action ->
                            deleteUseCase.performAction(action)
                                    .onErrorReturn(t -> Result.error(action, t))
                                    .startWith(Result.loading(action)))
    );
}
项目:RxEasyHttp    文件:RxUtil.java   
public static <T> ObservableTransformer<T, T> io_main() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
            return upstream
                    .subscribeOn(Schedulers.io())
                    .unsubscribeOn(Schedulers.io())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(@NonNull Disposable disposable) throws Exception {
                            HttpLog.i("+++doOnSubscribe+++" + disposable.isDisposed());
                        }
                    })
                    .doFinally(new Action() {
                        @Override
                        public void run() throws Exception {
                            HttpLog.i("+++doFinally+++");
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:RxPay    文件:RxPayUtils.java   
public static ObservableTransformer<PayResult, PayResult> checkAliPayResult() {
    return new ObservableTransformer<PayResult, PayResult>() {
        @Override
        public ObservableSource<PayResult> apply(Observable<PayResult> upstream) {
            return upstream.map(new Function<PayResult, PayResult>() {
                @Override
                public PayResult apply(PayResult payResult) throws Exception {
                    if (!payResult.isSucceed()) {
                        throw new PayFailedException(payResult.getErrInfo());
                    }
                    return payResult;
                }
            });
        }
    };
}
项目:Aurora    文件:RxUtils.java   
public static <T> ObservableTransformer<T, T> applySchedulersWithLifeCycle(IView view) {
    return new ObservableTransformer<T, T>() {
        @Override
        public Observable<T> apply(Observable<T> observable) {
            return observable.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doAfterTerminate(new Action() {
                        @Override
                        public void run() {
                            view.hideLoading();//隐藏进度条
                        }
                    })
                    .compose(RxLifecycleUtils.bindToLifecycle(view));
        }
    };
}
项目:Bing    文件:RxSchedulers.java   
public static <T> ObservableTransformer<T, T> compose() {
        return new ObservableTransformer<T, T>() {
            @Override
            public ObservableSource<T> apply(Observable<T> observable) {
                return observable
                        .subscribeOn(Schedulers.io())
                        .doOnSubscribe(new Consumer<Disposable>() {
                            @Override
                            public void accept(Disposable disposable) throws Exception {
//                               if (!NetworkUtils.isConnected()) {
//                                    Toast.makeText(Bing.getApplicationContext(), R.string.toast_network_error, Toast.LENGTH_SHORT).show();
//                                }
                            }
                        })
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    }
项目:MVPArmsTest1    文件:RxUtils.java   
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) {
    return new ObservableTransformer<T, T>() {
        @Override
        public Observable<T> apply(Observable<T> observable) {
            return observable.subscribeOn(Schedulers.io())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(@NonNull Disposable disposable) throws Exception {
                            view.showLoading();//显示进度条
                        }
                    })
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doAfterTerminate(new Action() {
                        @Override
                        public void run() {
                            view.hideLoading();//隐藏进度条
                        }
                    }).compose(RxUtils.bindToLifecycle(view));
        }
    };
}
项目:gigreminder    文件:ConcertDetailsPresenter.java   
@Override
protected Observable<Result> handleEvents(Observable<UiEvent> events) {
    ObservableTransformer<LoadConcertEvent, Result> loadConcert = loadEvents ->
            loadEvents.flatMap(event -> {
                return getRepository().getConcert(concertId)
                        .map(LoadConcertResult::success)
                        .onErrorReturn(LoadConcertResult::error)
                        .toObservable()
                        .startWith(LoadConcertResult.IN_PROGRESS);
            });

    return events.publish(sharedEvents -> Observable.merge(
            handleEventsOfClass(sharedEvents, LoadConcertEvent.class, loadConcert),
            sharedEvents.ofType(Result.class)
    ));
}
项目:cleanarchitecture-unidirectional    文件:CheckUserPresenter.java   
@Override
protected ObservableTransformer<UiEvent, Action> eventsToActions() {
    return upstream -> upstream
            .map(event -> getLastState().getData() == null || getLastState().getData().getPhase() == CheckPhases.STATE_START
                    ? CheckUserActions.exists(userManager.getUserId())
                    : CheckUserActions.createRecord(userManager.getUserId(), userManager.getUserName(), userManager.getUserEmail(), userManager.getAvatar()));
}
项目:RxJava2-Android-Sample    文件:RxSchedulers.java   
public <T> ObservableTransformer<T, T> applyObservableAsync() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(Observable<T> observable) {
            return observable.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:GitHub    文件:RxSchedulers.java   
public <T> ObservableTransformer<T, T> applyObservableAsync() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(Observable<T> observable) {
            return observable.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:GitHub    文件:RxSchedulers.java   
public <T> ObservableTransformer<T, T> applyObservableCompute() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(Observable<T> observable) {
            return observable.subscribeOn(Schedulers.computation())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:sqlbrite-sqlcipher    文件:BriteDatabase.java   
BriteDatabase(Context context,SQLiteOpenHelper helper, @NonNull char[] password, Logger logger, Observable<Set<String>> triggerSource,
    Observer<Set<String>> triggerSink, Scheduler scheduler,
    ObservableTransformer<Query, Query> queryTransformer) {
  SQLiteDatabase.loadLibs(context);
  this.helper = helper;
  this.password = password;
  this.logger = logger;
  this.triggerSource = triggerSource;
  this.triggerSink = triggerSink;
  this.scheduler = scheduler;
  this.queryTransformer = queryTransformer;
}
项目:GitHub    文件:Utils.java   
public static <U> ObservableTransformer<U, U> retry(final String hint, final int retryCount) {
    return new ObservableTransformer<U, U>() {
        @Override
        public ObservableSource<U> apply(Observable<U> upstream) {
            return upstream.retry(new BiPredicate<Integer, Throwable>() {
                @Override
                public boolean test(Integer integer, Throwable throwable) throws Exception {
                    return retry(hint, retryCount, integer, throwable);
                }
            });
        }
    };
}
项目:cleanarchitecture-unidirectional    文件:TimezoneEditPresenter.java   
@Override
protected ObservableTransformer<Action, Result> actionsToResults() {
    return upstream -> Observable.merge(
            upstream.ofType(TimezoneEditUiActions.GetTimezoneAction.class)
                .flatMap(action -> getUseCase.performAction(action)
                    .onErrorReturn(t -> Result.error(action, t))
                    .startWith(Result.loading(action))
                ),
            upstream.ofType(TimezoneEditUiActions.SaveTimezoneAction.class)
                .flatMap(action -> {
                    if (action.getName() == null || action.getName().isEmpty()) {
                        return Observable.just(Result.error(action, new FormValidationException("Name cannot be empty!")));
                    } else if (action.getCity() == null || action.getCity().isEmpty()) {
                        return Observable.just(Result.error(action, new FormValidationException("City cannot be empty!")));
                    } else if (action.getDifference() < -12 || action.getDifference() > 12) {
                        return Observable.just(Result.error(action, new FormValidationException("Difference must be an integer between -12 and +12!")));
                    }
                    final Observable<Result> result;
                    if (action.getId() == null) {
                        result = saveUseCase.performAction(action);
                    } else {
                        result = updateUseCase.performAction(action);
                    }
                    return result
                            .onErrorReturn(t -> Result.error(action, t))
                            .startWith(Result.loading(action));
                }),
            upstream.ofType(DeleteTimezoneAction.class)
                .flatMap(action -> deleteUseCase.performAction(action)
                        .onErrorReturn(t -> Result.error(action, t))
                        .startWith(Result.loading(action))
                )
    );
}
项目:RxPermission    文件:RealRxPermission.java   
/**
 * Map emitted items from the source observable into {@link Permission} objects for each
 * permission in parameters.
 * <p>
 * If one or several permissions have never been requested, invoke the related framework method
 * to ask the user if he allows the permissions.
 */
@NonNull @CheckReturnValue private <T> ObservableTransformer<T, Permission> ensureEach(@NonNull final String... permissions) {
  checkPermissions(permissions);

  return new ObservableTransformer<T, Permission>() {
    @Override @NonNull @CheckReturnValue public ObservableSource<Permission> apply(final Observable<T> o) {
      return request(o, permissions);
    }
  };
}
项目:GitHub    文件:ObservableUseCase.java   
public ObservableUseCase(final UseCaseExecutor useCaseExecutor,
                         final PostExecutionThread postExecutionThread) {
    super(useCaseExecutor, postExecutionThread);
    schedulersTransformer = new ObservableTransformer<R, R>() {
        @Override
        public Observable<R> apply(Observable<R> rObservable) {
            return rObservable.subscribeOn(useCaseExecutor.getScheduler())
                    .observeOn(postExecutionThread.getScheduler());
        }
    };
}
项目:XDroid-Databinding    文件:XApi.java   
public static <T> ObservableTransformer<T, T> getObservableScheduler(final Function<? super Observable<Throwable>, ? extends ObservableSource<?>> retryWhenHandler) {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
            return upstream
                    .retryWhen(retryWhenHandler)
                    .onErrorResumeNext(new ServerResultErrorFunc<T>())
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:redux-observable    文件:SchedulerObservableTransformer.java   
@SuppressWarnings("unchecked")
@Override public <T> ObservableTransformer<T, T> transformer() {
  return (ObservableTransformer<T, T>) new ObservableTransformer() {
    @Override public ObservableSource apply(Observable upstream) {
      return upstream.subscribeOn(Schedulers.trampoline())
          .observeOn(Schedulers.trampoline());
    }
  };
}
项目:SuperHttp    文件:ApiCache.java   
public <T> ObservableTransformer<T, CacheResult<T>> transformer(CacheMode cacheMode, final Type type) {
    final ICacheStrategy strategy = loadStrategy(cacheMode);//获取缓存策略
    return new ObservableTransformer<T, CacheResult<T>>() {
        @Override
        public ObservableSource<CacheResult<T>> apply(Observable<T> apiResultObservable) {

            return strategy.execute(ApiCache.this, ApiCache.this.cacheKey, apiResultObservable, type);
        }
    };
}
项目:cleanarchitecture-unidirectional    文件:EmailLoginPresenter.java   
@Override
protected ObservableTransformer<UiEvent, Action> eventsToActions() {
    return upstream -> upstream
            .ofType(EmailLoginUiEvents.LoginClicked.class)
            .map((Function<EmailLoginUiEvents.LoginClicked, Action>) loginClicked -> EmailLoginActions.login(loginClicked.getEmail(), loginClicked.getPassword()))
            .observeOn(AndroidSchedulers.mainThread());
}
项目:Ec2m    文件:TransformUtils.java   
public static <T> ObservableTransformer<T, T> all_io() {
    return new ObservableTransformer<T, T>() {

        @Override
        public ObservableSource<T> apply(Observable<T> upstream) {
            return upstream.observeOn(Schedulers.io()).subscribeOn(Schedulers.io());
        }
    };
}
项目:gigreminder    文件:ArtistsPresenter.java   
@Override
protected Observable<Result> handleEvents(Observable<UiEvent> events) {
    ObservableTransformer<LoadArtistsEvent, Result> loadArtists = loadEvents ->
            loadEvents.flatMap(event ->
                    getRepository().getArtists()
                            .map(LoadArtistsResult::success)
                            .onErrorReturn(LoadArtistsResult::error)
                            .startWith(LoadArtistsResult.IN_PROGRESS)
            );
    ObservableTransformer<DeleteArtistsEvent, Result> deleteArtists = deleteEvents ->
            deleteEvents.flatMap(event -> {
                List<Artist> artists = event.getUiModel().getSelectedArtists();

                return getRepository().deleteArtists(artists)
                        .toSingleDefault(DeleteArtistsResult.SUCCESS)
                        .onErrorReturn(DeleteArtistsResult::error)
                        .toObservable()
                        .startWith(DeleteArtistsResult.IN_PROGRESS);
            });

    return events.publish(sharedEvents -> Observable.merge(
            handleEventsOfClass(sharedEvents, LoadArtistsEvent.class, loadArtists),
            handleEventsOfClass(sharedEvents, DeleteArtistsEvent.class,
                    deleteArtists),
            sharedEvents.ofType(Result.class))
    );
}
项目:cleanarchitecture-unidirectional    文件:ForgotPwPresenter.java   
@Override
protected ObservableTransformer<Action, Result> actionsToResults() {
    return upstream -> upstream.ofType(ForgotPwActions.ForgotPwSubmit.class)
            .flatMap(action -> {
                if (!Patterns.EMAIL_ADDRESS.matcher(action.getEmail()).matches()) {
                    return Observable.just(Result.<Boolean, ForgotPwActions.ForgotPwSubmit>error(action, new FormValidationException("Must enter a valid email address!")));
                }

                return useCase.performAction(action)
                        .onErrorReturn(throwable -> Result.error(action, throwable))
                        .startWith(Result.<Boolean, ForgotPwActions.ForgotPwSubmit>loading(action));
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io());
}
项目:FastLib    文件:FastTransformer.java   
/**
 * 线程切换
 *
 * @param <T>
 * @return
 */
public static <T> ObservableTransformer<T, T> switchSchedulers() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
            return switchSchedulers(upstream);
        }
    };
}
项目:RxTransfer    文件:RxDownload.java   
public static <T> ObservableTransformer<T, T> showDialog(final Activity activity, int messageRes) {
    return observable -> Observable.fromCallable(() -> {
        showProgressDialog(activity, messageRes);
        return true;
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap(t -> observable)
    .doOnTerminate(() -> dismissProgressDialog());
}
项目:RetrofitCache    文件:CacheTransformer.java   
public static  <T> ObservableTransformer<T, T> emptyTransformer(){
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(io.reactivex.Observable<T> upstream) {

            String name = upstream.getClass().getName();
            if (name.equals(CLASS_NAME1)||name.equals(CLASS_NAME2)){
                observable(upstream);
            }
            return upstream;
        }
    };
}
项目:RxJava2-Android-Sample    文件:RxSchedulers.java   
public <T> ObservableTransformer<T, T> applyObservableCompute() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(Observable<T> observable) {
            return observable.subscribeOn(Schedulers.computation())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:POCenter    文件:RxUtils.java   
/**
 * If the publish is empty, it wll run onError and throw a NoSuchElementException
 * @param <T>
 * @return
 */
public static <T> ObservableTransformer<T, T> notEmptyOrError() {
    return new ObservableTransformer<T, T>() {

        @Override
        public ObservableSource<T> apply(Observable<T> upstream) {
            return upstream.switchIfEmpty(new Observable<T>() {
                @Override
                protected void subscribeActual(Observer<? super T> observer) {
                    observer.onError(new NoSuchElementException());
                }
            });
        }
    };
}
项目:rxjava2_retrofit2    文件:ResultTransformer.java   
public static <T> ObservableTransformer<HttpResponseResult<T>, T> transformer() {
    return new ObservableTransformer<HttpResponseResult<T>, T>() {
        @Override
        public ObservableSource<T> apply(Observable<HttpResponseResult<T>> upstream) {
            return upstream
                    .flatMap(ResultTransformer.<T>flatMap())
                    .compose(SchedulerTransformer.<T>transformer());
        }
    };
}
项目:rxjava2_retrofit2    文件:SchedulerTransformer.java   
public static <T> ObservableTransformer<T, T> transformer() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(Observable<T> upstream) {
            return upstream
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:rxjava2_retrofit2    文件:DialogTransformer.java   
public <T> ObservableTransformer<T, T> transformer() {
    return new ObservableTransformer<T, T>() {
        private ProgressDialog progressDialog;
        @Override
        public ObservableSource<T> apply(final Observable<T> upstream) {

            return  upstream.doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(@NonNull final Disposable disposable) throws Exception {
                    progressDialog = ProgressDialog.show(activity, null, msg, true, cancelable);
                    if (cancelable) {
                        progressDialog.setOnCancelListener(new DialogInterface.OnCancelListener() {
                            @Override
                            public void onCancel(DialogInterface dialog) {
                                disposable.dispose();
                            }
                        });
                    }
                }
            }).doOnTerminate(new Action() {
                @Override
                public void run() throws Exception {
                    if (progressDialog.isShowing()) {
                        progressDialog.cancel();
                    }
                }
            });
        }
    };
}