Java 类io.reactivex.functions.Function 实例源码

项目:store2store    文件:TestStore.java   
@Override
public Flowable<Integer> deleteAll() {
    if(shouldThrowError){
        shouldThrowError = false; // special case because the StoreService needs to call again getAll()
        return Flowable.error(new Exception("deleteAll.error"));
    }

    return getAll(null, null)
            .delay(1, TimeUnit.SECONDS)
            .flatMap(new Function<Optional<List<TestModel>>, Flowable<Integer>>() {
                @Override
                public Flowable<Integer> apply(Optional<List<TestModel>> ts) throws Exception {
                    return Flowable.just(ts.get().size());
                }
            });
}
项目:android-study    文件:AllAppInfoFragment.java   
@Override public void initView(View view) {
  requestBaseInit(getPageTitle());
  mAdapter = new AllAppInfoAdapter(this, mAppInfos);
  mList.setLayoutManager(new LinearLayoutManager(getActivity()));
  mList.setAdapter(mAdapter);
  Observable.just("2")
      .subscribeOn(Schedulers.io())
      .map(new Function<String, List<AppUtils.AppInfo>>() {
        @Override public List<AppUtils.AppInfo> apply(@NonNull String s) throws Exception {
          return AppUtils.getAppsInfo();
        }
      })
      .observeOn(PausedHandlerScheduler.from(getHandler()))
      .compose(mLifecycleProvider.<List<AppUtils.AppInfo>>bindUntilEvent(FragmentEvent.DESTROY))
      .subscribe(new Consumer<List<AppUtils.AppInfo>>() {
        @Override public void accept(@NonNull List<AppUtils.AppInfo> appInfos) throws Exception {
          mAdapter.resetData(appInfos);
        }
      });
}
项目:rxtools    文件:SimpleFlowableList.java   
void applyOperation(final Function<List<T>, Update<T>> operation)
{
    synchronized (_batchingLock) {
        if (_batchedOperations != null) {
            _batchedOperations.add(operation);
            return;
        }
    }

    applyUpdate(new Function<List<T>, Update<T>>() {
        @Override
        public Update<T> apply(List<T> list) throws Exception
        {
            list = new ArrayList<>(list);

            return operation.apply(list);
        }
    });
}
项目:GSB-2017-Android    文件:ClientsNetworkCalls.java   
public static Observable<List<Client>> getAllClients() {
    ClientsService service = ServiceGenerator.createService(ClientsService.class);
    return service.getAllClients(UrlManager.getAllClientsURL())
            .flatMap(new Function<JsonElement, Observable<List<Client>>>() {
                @Override
                public Observable<List<Client>> apply(JsonElement jsonElement) throws Exception {
                    if(jsonElement != null) {
                        Log.i("Get All Clients" , "JSON: "+jsonElement.toString());
                        if(jsonElement.isJsonArray()) {
                            List<Client> clients = Client.ClientsListParser.fromJsonArray(jsonElement.getAsJsonArray());
                            return Observable.just(clients);
                        } else {
                            return Observable.error(new Exception("Expected a JSON Array"));
                        }
                    } else {
                        return Observable.just((List<Client>) new ArrayList<Client>());
                    }
                }
            }).observeOn(AndroidSchedulers.mainThread());
}
项目:vt-support    文件:StorageImpl.java   
@Override
public Observable<StorageResult> put(Observable<Entry> entries) {
  return entries.flatMap((Function<Entry, ObservableSource<StorageResult>>) entry -> {
    final String insert =
        "INSERT OR REPLACE INTO TILES(zoom_level, tile_column, tile_row, tile_data)"
            + " values (?, ?, ?, ?);";

    byte[] compressedMvt;
    try {
      compressedMvt = CompressUtil.getCompressedAsGzip(entry.getVector());
    } catch (final IOException ex) {
      throw Exceptions.propagate(ex);
    }

    Observable<Object> params = Observable.<Object>just(entry.getZoomLevel(), entry.getColumn(),
        flipY(entry.getRow(), entry.getZoomLevel()), compressedMvt);

    return dataSource.update(insert)
        .parameterStream(params.toFlowable(BackpressureStrategy.BUFFER)).counts()
        .map(integer -> new StorageResult(entry))
        .onErrorReturn(throwable -> new StorageResult(entry, new Exception(throwable)))
        .toObservable();
  });
}
项目:GitHub    文件:MainPresenter.java   
void registerEvent() {
    addSubscribe(RxBus.getDefault().toFlowable(NightModeEvent.class)
            .compose(RxUtil.<NightModeEvent>rxSchedulerHelper())
            .map(new Function<NightModeEvent, Boolean>() {
                @Override
                public Boolean apply(NightModeEvent nightModeEvent) {
                    return nightModeEvent.getNightMode();
                }
            })
            .subscribeWith(new CommonSubscriber<Boolean>(mView, "切换模式失败ヽ(≧Д≦)ノ") {
                @Override
                public void onNext(Boolean aBoolean) {
                    mView.useNightMode(aBoolean);
                }
            })
    );
}
项目:RxJava2Swing    文件:RxSwingPluginsTest.java   
@Test
public void onScheduleCrashes() {
    RxSwingPlugins.setOnSchedule(new Function<Runnable, Runnable>() {
        @Override
        public Runnable apply(Runnable r) throws Exception {
            throw new IllegalStateException("Failure");
        }
    });

    try {
        RxSwingPlugins.onSchedule(Functions.EMPTY_RUNNABLE);
        Assert.fail("Should have thrown!");
    } catch (IllegalStateException ex) {
        Assert.assertEquals("Failure", ex.getMessage());
    }

    RxSwingPlugins.reset();

    Assert.assertSame(Functions.EMPTY_RUNNABLE, RxSwingPlugins.onSchedule(Functions.EMPTY_RUNNABLE));
}
项目:Android-Allocine-Api    文件:AllocineApi.java   
/**
 * Informations sur une personne
 */
private Single<News> news(String idNews, String profile) {

    final String params = ServiceSecurity.construireParams(false,
            AllocineService.CODE, idNews,
            AllocineService.PROFILE, profile
    );

    final String sed = ServiceSecurity.getSED();
    final String sig = ServiceSecurity.getSIG(params, sed);

    return allocineService.news(idNews, profile, sed, sig)
            .map(new Function<AllocineResponse, News>() {
                @Override
                public News apply(AllocineResponse allocineResponse) throws Exception {
                    return null;
                }
            });
}
项目:store2store    文件:StoreService.java   
@Override
public Flowable<Optional<T>> getById(final int id) {
    List<Flowable<Optional<T>>> flowables = new ArrayList<>();
    Flowable<Optional<T>> flowStorage = dao.getById(id);

    if(hasSyncedStore()) {
        flowStorage = flowStorage
                .flatMap(new Function<Optional<T>, Flowable<Optional<T>>>() {
                    @Override
                    public Flowable<Optional<T>> apply(final Optional<T> item) throws Exception {
                        return syncedStore.insertOrUpdate(item.get());
                    }
                });

        flowables.add(syncedStore.getById(id));
    }

    flowables.add(flowStorage);
    return Flowable.concat(flowables);
}
项目:SuperHttp    文件:FirstRemoteStrategy.java   
@Override
public <T> Observable<CacheResult<T>> execute(ApiCache apiCache, String cacheKey, Observable<T> source, Type type) {
    Observable<CacheResult<T>> cache = loadCache(apiCache, cacheKey, type);
    cache.onErrorReturn(new Function<Throwable, CacheResult<T>>() {
        @Override
        public CacheResult<T> apply(Throwable throwable) throws Exception {
            return null;
        }
    });
    Observable<CacheResult<T>> remote = loadRemote(apiCache, cacheKey, source);
    return Observable.concat(remote, cache).filter(new Predicate<CacheResult<T>>() {
        @Override
        public boolean test(CacheResult<T> tCacheResult) throws Exception {
            return tCacheResult != null && tCacheResult.getCacheData() != null;
        }
    }).firstElement().toObservable();
}
项目:MultiTypeRecyclerViewAdapter    文件:RxAdapterHelper.java   
@Override
protected void startRefresh(HandleBase<MultiHeaderEntity> refreshData) {
    Flowable.just(refreshData)
            .onBackpressureDrop()
            .observeOn(Schedulers.computation())
            .map(new Function<HandleBase<MultiHeaderEntity>, DiffUtil.DiffResult>() {
                @Override
                public DiffUtil.DiffResult apply(@NonNull HandleBase<MultiHeaderEntity> handleBase) throws Exception {
                    return handleRefresh(handleBase.getNewData(), handleBase.getNewHeader(), handleBase.getNewFooter(), handleBase.getType(), handleBase.getRefreshType());
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<DiffUtil.DiffResult>() {
                @Override
                public void accept(@NonNull DiffUtil.DiffResult diffResult) throws Exception {
                    handleResult(diffResult);
                }
            });
}
项目:android-mvp-architecture    文件:AppDataManager.java   
@Override
public Observable<Boolean> seedDatabaseQuestions() {

    GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation();
    final Gson gson = builder.create();

    return mDbHelper.isQuestionEmpty()
            .concatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() {
                @Override
                public ObservableSource<? extends Boolean> apply(Boolean isEmpty)
                        throws Exception {
                    if (isEmpty) {
                        Type type = $Gson$Types
                                .newParameterizedTypeWithOwner(null, List.class,
                                        Question.class);
                        List<Question> questionList = gson.fromJson(
                                CommonUtils.loadJSONFromAsset(mContext,
                                        AppConstants.SEED_DATABASE_QUESTIONS),
                                type);

                        return saveQuestionList(questionList);
                    }
                    return Observable.just(false);
                }
            });
}
项目:android-study    文件:PixelEffectFragment.java   
@Override protected void initView(View parent) {
  requestBaseInit(getPageTitle());

  Observable.just("123")
      .subscribeOn(Schedulers.io())
      .map(new Function<String, Bitmap>() {
        @Override public Bitmap apply(@NonNull String s) throws Exception {
          return BitmapFactory.decodeResource(getResources(), R.drawable.test1);
        }
      })
      .observeOn(PausedHandlerScheduler.from(getHandler()))
      .compose(mLifecycleProvider.<Bitmap>bindUntilEvent(FragmentEvent.DESTROY))
      .subscribe(new Consumer<Bitmap>() {
        @Override public void accept(@NonNull Bitmap bitmap) throws Exception {
          mBitmap = bitmap;
          mImage.setImageBitmap(mBitmap);
        }
      });
}
项目:GitHub    文件:AndroidSchedulersTest.java   
@Test
public void mainThreadCallsThroughToHook() {
    final AtomicInteger called = new AtomicInteger();
    final Scheduler newScheduler = new EmptyScheduler();
    RxAndroidPlugins.setMainThreadSchedulerHandler(new Function<Scheduler, Scheduler>() {
        @Override public Scheduler apply(Scheduler scheduler) {
            called.getAndIncrement();
            return newScheduler;
        }
    });

    assertSame(newScheduler, AndroidSchedulers.mainThread());
    assertEquals(1, called.get());

    assertSame(newScheduler, AndroidSchedulers.mainThread());
    assertEquals(2, called.get());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test
public void directScheduleOnceUsesHook() {
    final CountingRunnable newCounter = new CountingRunnable();
    final AtomicReference<Runnable> runnableRef = new AtomicReference<>();
    RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
        @Override public Runnable apply(Runnable runnable) {
            runnableRef.set(runnable);
            return newCounter;
        }
    });

    CountingRunnable counter = new CountingRunnable();
    scheduler.scheduleDirect(counter);

    // Verify our runnable was passed to the schedulers hook.
    assertSame(counter, runnableRef.get());

    runUiThreadTasks();
    // Verify the scheduled runnable was the one returned from the hook.
    assertEquals(1, newCounter.get());
    assertEquals(0, counter.get());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test
public void directScheduleOnceWithDelayUsesHook() {
    final CountingRunnable newCounter = new CountingRunnable();
    final AtomicReference<Runnable> runnableRef = new AtomicReference<>();
    RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
        @Override public Runnable apply(Runnable runnable) {
            runnableRef.set(runnable);
            return newCounter;
        }
    });

    CountingRunnable counter = new CountingRunnable();
    scheduler.scheduleDirect(counter, 1, MINUTES);

    // Verify our runnable was passed to the schedulers hook.
    assertSame(counter, runnableRef.get());

    idleMainLooper(1, MINUTES);
    runUiThreadTasks();
    // Verify the scheduled runnable was the one returned from the hook.
    assertEquals(1, newCounter.get());
    assertEquals(0, counter.get());
}
项目:GitHub    文件:HandlerSchedulerTest.java   
@Test
public void workerScheduleOnceWithDelayUsesHook() {
    final CountingRunnable newCounter = new CountingRunnable();
    final AtomicReference<Runnable> runnableRef = new AtomicReference<>();
    RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
        @Override public Runnable apply(Runnable runnable) {
            runnableRef.set(runnable);
            return newCounter;
        }
    });

    Worker worker = scheduler.createWorker();

    CountingRunnable counter = new CountingRunnable();
    worker.schedule(counter, 1, MINUTES);

    // Verify our runnable was passed to the schedulers hook.
    assertSame(counter, runnableRef.get());

    idleMainLooper(1, MINUTES);
    runUiThreadTasks();
    // Verify the scheduled runnable was the one returned from the hook.
    assertEquals(1, newCounter.get());
    assertEquals(0, counter.get());
}
项目:code-examples-android-expert    文件:lessonD_AdvancedStreams.java   
@Test
public void shouldBeNecessaryToSubscribetoStreamAfterSplitting() {
    final double[] averages = {0, 0};
    Observable<Integer> numbers = Observable.just(22, 22, 99, 22, 101, 22);
    Function<Integer, Integer> keySelector = integer -> integer % 2;
    Observable<GroupedObservable<Integer, Integer>> split = numbers.groupBy(keySelector);
    split.subscribe(
            group -> {
                Observable<Double> convertToDouble = group.map(integer -> (double) integer);
                Function<Double, Double> insertIntoAveragesArray = aDouble -> averages[group.getKey()] = aDouble;
                convertToDouble.reduce((t1, t2) -> t1+t2).map(insertIntoAveragesArray).subscribe();

            }
    );

    assertThat(averages[0]).isEqualTo(0);
    assertThat(averages[1]).isEqualTo(0);
}
项目:store2store    文件:TestStore.java   
@Override
public Flowable<Optional<List<TestModel>>> getAll(final List<TestModel> items) {
    return getAll(null, null).map(new Function<Optional<List<TestModel>>, Optional<List<TestModel>>>() {
        @Override
        public Optional<List<TestModel>> apply(Optional<List<TestModel>> fullList) throws Exception {
            List<TestModel> output = new ArrayList<>();
            for(TestModel toFind : items){
                for(TestModel tm : fullList.get()){
                    if(tm.getId() == toFind.getId()){
                        output.add(tm);
                    }
                }
            }
            return Optional.wrap(output);
        }
    });
}
项目:MVPArmsTest1    文件:UserModel.java   
@Override
public Observable<List<User>> getUsers(int lastIdQueried, boolean update) {
    Observable<List<User>> users = mRepositoryManager.obtainRetrofitService(UserService.class)
            .getUsers(lastIdQueried, USERS_PER_PAGE);
    //使用rxcache缓存,上拉刷新则不读取缓存,加载更多读取缓存
    return mRepositoryManager.obtainCacheService(CommonCache.class)
            .getUsers(users
                    , new DynamicKey(lastIdQueried)
                    , new EvictDynamicKey(update))
            .flatMap(new Function<Reply<List<User>>, ObservableSource<List<User>>>() {
                @Override
                public ObservableSource<List<User>> apply(@NonNull Reply<List<User>> listReply) throws Exception {
                    return Observable.just(listReply.getData());
                }
            });
}
项目:RIBs    文件:TicTacToeView.java   
@Override
public Observable<BoardCoordinate> squareClicks() {
  ArrayList<Observable<BoardCoordinate>> observables = new ArrayList<>();
  for (int i = 0; i < 3; i++) {
    for (int j = 0; j < 3; j++) {
      final int finalI = i;
      final int finalJ = j;
      observables.add(
          RxView.clicks(imageButtons[i][j])
              .map(
                  new Function<Object, BoardCoordinate>() {
                    @Override
                    public BoardCoordinate apply(Object irrelevant) throws Exception {
                      return new BoardCoordinate(finalI, finalJ);
                    }
                  }));
    }
  }
  return Observable.merge(observables);
}
项目:GitHub    文件:DownloadHelper.java   
/**
 * check url
 *
 * @param url url
 * @return empty
 */
private ObservableSource<Object> checkUrl(final String url) {
    return downloadApi.check(url)
            .flatMap(new Function<Response<Void>, ObservableSource<Object>>() {
                @Override
                public ObservableSource<Object> apply(@NonNull Response<Void> resp)
                        throws Exception {
                    if (!resp.isSuccessful()) {
                        return checkUrlByGet(url);
                    } else {
                        return saveFileInfo(url, resp);
                    }
                }
            })
            .compose(retry(REQUEST_RETRY_HINT, maxRetryCount));
}
项目:XinFramework    文件:RetryWithDelay.java   
@Override
public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {

    return throwableObservable
            .flatMap(new Function<Throwable, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
                    if (++retryCount <= maxRetries) {
                        // When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
                        Log.d("get error, it will try after " + retryDelaySecond
                                + " second, retry count " + retryCount);
                        return Observable.timer(retryDelaySecond,
                                TimeUnit.SECONDS);
                    }
                    // Max retries hit. Just pass the error along.
                    return Observable.error(throwable);
                }
            });
}
项目:My-Android-Base-Code    文件:LocationRepository.java   
private Single<Location> getLocation(LocationRequest request) {
    if (!shouldRequestNewLocation()) {
        return Single.just(mLastLocation);
    }

    return mFusedLocation.getLocation(request)
            .doOnSuccess(new Consumer<Location>() {
                @Override
                public void accept(Location location) throws Exception {
                    setLocationCache(location);
                }
            })
            .timeout(LOCATION_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
            .onErrorResumeNext(new Function<Throwable, SingleSource<? extends Location>>() {
                @Override
                public SingleSource<? extends Location> apply(Throwable e) throws Exception {
                    if (e instanceof TimeoutException && mLastLocation == null) {
                        return Single.error(new LocationTimeoutException());
                    } else if (mLastLocation == null) {
                        return Single.error(e);
                    } else {
                        return Single.just(mLastLocation);
                    }
                }
            });
}
项目:GSB-2017-Android    文件:UserNetworkCalls.java   
public static Observable<User> loginUser(final String username , String password) {
    UserService service = ServiceGenerator.createService(UserService.class);
    return service.login(UrlManager.loginURL() , new User(username , password))
            .flatMap(new Function<JsonElement, Observable<User>>() {
                @Override
                public Observable<User> apply(JsonElement jsonElement) throws Exception {

                    if (jsonElement != null) {
                        Log.i("Login User" , "JSON: "+jsonElement.toString());
                        if(jsonElement.isJsonObject()) {
                            User user = (new Gson()).fromJson(jsonElement.getAsJsonObject() , User.class);
                            PrefUtils.setUsername(user.getUsrFullname());
                            PrefUtils.setUserEmail(user.getUsrUsername());
                            return Observable.just(user);
                        } else {
                            return Observable.error(new Exception("Expected a JSON Object"));
                        }
                    } else {
                        return Observable.error(new Exception("Login Failed"));
                    }
                }
            }).observeOn(AndroidSchedulers.mainThread());
}
项目:RxJava2-Android-Sample    文件:GroupByExampleActivity.java   
private void doSomeWork() {

        Observable.range(0, 8).groupBy(new Function<Integer, String>() {
            @Override
            public String apply(@NonNull Integer integer) throws Exception {
                return integer % 2 == 0 ? "偶数" : "奇数";
            }
        }).subscribe(new Consumer<GroupedObservable<String, Integer>>() {
            @Override
            public void accept(@NonNull GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {
                String key = stringIntegerGroupedObservable.getKey();
                Log.i(TAG, "accept: key=" + key);
                if (key.equals("偶数")) {
                    stringIntegerGroupedObservable.subscribe(getObserver(key));
                } else {
                    stringIntegerGroupedObservable.subscribe(getObserver(key));
                }
            }
        });
    }
项目:MBEStyle    文件:IconShowPresenter.java   
public Disposable getWhatsNewIcons() {
    return Observable.fromArray(mView.getResources().getStringArray(R.array.whatsnew))
            .map(new Function<String, IconBean>() {
                @Override
                public IconBean apply(@NonNull String s) throws Exception {
                    IconBean bean = new IconBean();
                    bean.id = mView.getResources().getIdentifier(s, "drawable", BuildConfig.APPLICATION_ID);
                    bean.name = s;

                    return bean;
                }
            }).toList().subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<List<IconBean>>() {
                @Override
                public void accept(List<IconBean> list) throws Exception {
                    mView.onLoadData(list);
                }
            });
}
项目:J-Chain    文件:GuardTest.java   
@Test
public void onErrorMapWithNoErrorThenReturnEmptyOptional() {
    String result = Chain.let(0)
            .guard(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    // do nothing
                }
            })
            .onErrorMap(new Function<Throwable, String>() {
                @Override
                public String apply(Throwable throwable) throws Exception {
                    return "!";
                }
            })
            .defaultIfEmpty("")
            .call();

    assertEquals("", result);
}
项目:RIBs    文件:TicTacToeView.java   
@Override
public Observable<BoardCoordinate> squareClicks() {
  ArrayList<Observable<BoardCoordinate>> observables = new ArrayList<>();
  for (int i = 0; i < 3; i++) {
    for (int j = 0; j < 3; j++) {
      final int finalI = i;
      final int finalJ = j;
      observables.add(
          RxView.clicks(imageButtons[i][j])
              .map(
                  new Function<Object, BoardCoordinate>() {
                    @Override
                    public BoardCoordinate apply(Object irrelevant) throws Exception {
                      return new BoardCoordinate(finalI, finalJ);
                    }
                  }));
    }
  }
  return Observable.merge(observables);
}
项目:RxPay    文件:RxPayUtils.java   
public static ObservableTransformer<PayResult, PayResult> checkAliPayResult() {
    return new ObservableTransformer<PayResult, PayResult>() {
        @Override
        public ObservableSource<PayResult> apply(Observable<PayResult> upstream) {
            return upstream.map(new Function<PayResult, PayResult>() {
                @Override
                public PayResult apply(PayResult payResult) throws Exception {
                    if (!payResult.isSucceed()) {
                        throw new PayFailedException(payResult.getErrInfo());
                    }
                    return payResult;
                }
            });
        }
    };
}
项目:rxjava2_retrofit2    文件:ResultTransformer.java   
private static <T> Function<HttpResponseResult<T>, ObservableSource<T>> flatMap() {
    return new Function<HttpResponseResult<T>, ObservableSource<T>>() {
        @Override
        public ObservableSource<T> apply(@NonNull final HttpResponseResult<T> tHttpResponseResult) throws Exception {
            return new Observable<T>() {
                @Override
                protected void subscribeActual(Observer<? super T> observer) {
                    if (tHttpResponseResult.isSuccess()) {
                        observer.onNext(tHttpResponseResult.getResult());
                        observer.onComplete();
                    } else {
                        observer.onError(new HttpResponseException(tHttpResponseResult.getMsg(), tHttpResponseResult.getState()));
                    }
                }
            };
        }
    };
}
项目:RxEasyHttp    文件:BaseStrategy.java   
<T> Observable<CacheResult<T>> loadCache(final RxCache rxCache, Type type, final String key, final long time, final boolean needEmpty) {
    Observable<CacheResult<T>> observable = rxCache.<T>load(type, key, time).flatMap(new Function<T, ObservableSource<CacheResult<T>>>() {
        @Override
        public ObservableSource<CacheResult<T>> apply(@NonNull T t) throws Exception {
            if (t == null) {
                return Observable.error(new NullPointerException("Not find the cache!"));
            }
            return Observable.just(new CacheResult<T>(true, t));
        }
    });
    if (needEmpty) {
        observable = observable
                .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends CacheResult<T>>>() {
                    @Override
                    public ObservableSource<? extends CacheResult<T>> apply(@NonNull Throwable throwable) throws Exception {
                        return Observable.empty();
                    }
                });
    }
    return observable;
}
项目:Rxjava2.0Demo    文件:SplashActivity.java   
private void start() {
    Disposable disposable = Observable.interval(1, TimeUnit.SECONDS)
            .take(1)
            .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Long>>() {
                @Override
                public ObservableSource<? extends Long> apply(Throwable throwable) throws Exception {
                    return null;
                }
            })
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.e(MainActivity.TAG, "accept: " + aLong);
                    startActivity(new Intent(SplashActivity.this, MainActivity.class));
                    finish();
                }
            });
    dLists.add(disposable);

}
项目:AndroidSensors    文件:BasicSensorConfigTest.java   
@Test
public void getMinSensorDelay_forAllNonIMUSensor_returnsInMs() throws Exception {
    HashSet<SensorType> nonIMUSensors = new HashSet<>();
    nonIMUSensors.addAll(Arrays.asList(SensorType.gpsValues()));
    nonIMUSensors.addAll(Arrays.asList(SensorType.wifiValues()));
    nonIMUSensors.addAll(Arrays.asList(SensorType.bluetoothValues()));

    Long count = Observable.fromIterable(nonIMUSensors)
            .map(new Function<SensorType, Long>() {
                @Override
                public Long apply(SensorType sensorType) throws Exception {
                    return basicSensorConfig.getMinSensorDelay(sensorType);
                }
            })
            .filter(new Predicate<Long>() {
                @Override
                public boolean test(Long delay) throws Exception {
                    return delay.equals(MIN_DELAY_MS);
                }
            })
            .count()
            .blockingGet();

    assertThat(count.intValue(), equalTo(nonIMUSensors.size()));
}
项目:Android-Code-Demos    文件:OperatorTest.java   
private static void debounceTest(int i) {
    Observable.just(i)
            .debounce(1000, TimeUnit.MILLISECONDS)
            /*以最近请求的数据为准*/
            .switchMap(new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    return Observable.just(String.valueOf(integer));
                }
            }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    });

}
项目:RxEasyHttp    文件:NoStrategy.java   
@Override
public <T> Observable<CacheResult<T>> execute(RxCache rxCache, String cacheKey, long cacheTime, Observable<T> source, Type type) {
    return source.map(new Function<T, CacheResult<T>>() {
        @Override
        public CacheResult<T> apply(@NonNull T t) throws Exception {
            return new CacheResult<T>(false, t);
        }
    });
}
项目:store2realm    文件:PostService.java   
@Override
public Flowable<Optional<List<Post>>> getAll(Filter filter, final SortingMode sortingMode) {

    // this IF case is here only to demonstrate the usage of filtering and sorting mode in the UI
    // this logic should be on the server side and not here !
    // !!!! The filter and the sort are hardcoded here (to match presenter choices).
    if(sortingMode != null && filter != null){
        final int userIdAllowed = (int) filter.entrySet().iterator().next().getValue().value;

        // special return for demo
        return wrapOptional(apiService.getPosts()
                .flatMapIterable(new Function<List<Post>, Iterable<Post>>() {
                    @Override
                    public Iterable<Post> apply(List<Post> posts) throws Exception {
                        Collections.sort(posts, new Comparator<Post>() {
                            @Override
                            public int compare(Post p0, Post p1) {
                                return p0.userId - p1.userId; // hardcoded ordering by userId
                            }
                        });
                        return posts;
                    }
                })
                .filter(new Predicate<Post>() {
                    @Override
                    public boolean test(Post post) throws Exception {
                        return post.userId == userIdAllowed;
                    }
                })
                .toList()
                .toFlowable()
        );
    }

    // you can wrap the retrofit response directly in a
    // Optional object by default for more convenience
    return wrapOptional(apiService.getPosts());
}
项目:RxProgress    文件:RxProgress.java   
private <T> Flowable<T> forFlowable(Flowable<T> source, BackpressureStrategy backpressureStrategy) {
    return Flowable.using(this::makeDialog,
            new Function<ProgressDialog, Publisher<? extends T>>() {
                @Override
                public Publisher<? extends T> apply(@NonNull ProgressDialog dialog) throws Exception {
                    return Flowable.create(emitter -> {
                        if (builder.cancelable) {
                            dialog.setOnCancelListener(dialogInterface -> emitter.onComplete());
                        }
                        dialog.setOnDismissListener(dialogInterface -> emitter.onComplete());
                        source.subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
                    }, backpressureStrategy);
                }
            }, Dialog::dismiss);
}
项目:Android-Gank-Share    文件:CollectFragment.java   
private void loadData(){
    Observable
            .create(new ObservableOnSubscribe<List<CollectItem>>() {
                @Override
                public void subscribe(ObservableEmitter<List<CollectItem>> e) throws Exception {
                    mItemList.clear();
                    e.onNext(DataSupport.findAll(CollectItem.class));
                }
            })
            .map(new Function<List<CollectItem>, Boolean>() {
                @Override
                public Boolean apply(List<CollectItem> items) throws Exception {
                    return items != null && items.size() > 0
                            && mItemList.addAll(items);
                }
            })
            .subscribeOn(Schedulers.io())
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    addDisposable(disposable);
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Exception {
                    showEmptyView(!aBoolean);
                    mAdapter.notifyDataSetChanged();
                }
            });
}
项目:RxJava2-Android-Sample    文件:TimestampExampleActivity.java   
private void doSomeWork() {
    getObservable()
            .flatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(@NonNull String s) throws Exception {
                    return Observable.just(s).delay(500, TimeUnit.MILLISECONDS);
                }
            })
            .timestamp()
            // Run on a background thread
            .subscribeOn(Schedulers.io())
            // Be notified on the main thread
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(getObserver());
}