Java 类io.reactivex.Flowable 实例源码

项目:reactive-architectures-playground    文件:FactsPresenterTests.java   
@Test public void shouldPresent_AnotherError_IntoView() throws Exception {

        Flowable<FactAboutNumber> broken = Flowable.error(new IllegalAccessError("WTF!"));
        when(usecase.fetchTrivia()).thenReturn(broken);

        presenter.fetchRandomFacts();

        BehavioursRobot.with(view)
                .showLoadingFirstHideLoadingAfter()
                .disableRefreshFirstAndEnableAfter()
                .shouldShowErrorState()
                .shouldNotShowEmptyState()
                .shouldNotReportNetworkingError();

        DataFlowWatcher.with(onNext, onError, onCompleted).shouldFinishWithError();
        verify(strategist, oneTimeOnly()).applyStrategy(any());
    }
项目:Renrentou    文件:XApi.java   
/**
 * 异常处理变换
 *
 * @return
 */
public static <T extends IModel> FlowableTransformer<T, T> getApiTransformer() {

    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> upstream) {
            return upstream.flatMap(new Function<T, Publisher<T>>() {
                @Override
                public Publisher<T> apply(T model) throws Exception {

                    if (model == null || model.isNull()) {
                        return Flowable.error(new NetError(model.getErrorMsg(), NetError.NoDataError));
                    } else if (model.isAuthError()) {
                        return Flowable.error(new NetError(model.getErrorMsg(), NetError.AuthError));
                    } else if (model.isBizError()) {
                        return Flowable.error(new NetError(model.getErrorMsg(), NetError.BusinessError));
                    } else {
                        return Flowable.just(model);
                    }
                }
            });
        }
    };
}
项目:fluid    文件:ReviewRecentRating.java   
@Transformation
public void computeStatistics() {
  reviews
    .transformPayloadFlow(flow ->
      flow
        .groupBy(data -> data.getString("course"))
        .flatMap(group ->
          group
            .map(i -> i.getInteger("rating"))
            .buffer(1, TimeUnit.MINUTES)
            .map(Flowable::fromIterable)
            .flatMap(MathFlowable::averageDouble)
            .map(avg -> Pair.pair(group.getKey(), avg)
            ))
    )
    .to(Sink.forEachPayload(pair -> System.out.println("Window rating of " + pair.left() + " : " + pair.right())));
}
项目:rx-progress-dialog    文件:MainActivity.java   
@Override
protected void onCreate(Bundle savedInstanceState) {
  super.onCreate(savedInstanceState);
  setContentView(R.layout.activity_main);

  mLoginObservable = Observable
      .timer(TIME_DELAY, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
      .map(aLong -> "User id is " + UUID.randomUUID().toString());
  mLoginFlowable = Flowable
      .timer(TIME_DELAY, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
      .map(aLong -> "User id is " + UUID.randomUUID().toString());

  mCompositeDisposable = new CompositeDisposable();

  findViewById(R.id.button_observable).setOnClickListener(this);
  findViewById(R.id.button_flowable).setOnClickListener(this);
}
项目:RxJava2-Android-Sample    文件:ComposeOperatorExampleActivity.java   
@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_compose_operator_example);

    /*
        Compose for reusable code.
     */
    Observable.just(1, 2, 3, 4, 5)
            .compose(schedulers.<Integer>applyObservableAsync())
            .subscribe(/* */);

    Flowable.just(1, 2, 3, 4, 5)
            .compose(schedulers.<Integer>applyFlowableAsysnc())
            .subscribe(/* */);

}
项目:GitHub    文件:ComposeOperatorExampleActivity.java   
@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_compose_operator_example);

    /*
        Compose for reusable code.
     */
    Observable.just(1, 2, 3, 4, 5)
            .compose(schedulers.<Integer>applyObservableAsync())
            .subscribe(/* */);

    Flowable.just(1, 2, 3, 4, 5)
            .compose(schedulers.<Integer>applyFlowableAsysnc())
            .subscribe(/* */);

}
项目:GitHub    文件:FlowableTest.java   
@Test public void bodyRespectsBackpressure() {
  server.enqueue(new MockResponse().setBody("Hi"));

  RecordingSubscriber<String> subscriber = subscriberRule.createWithInitialRequest(0);
  Flowable<String> o = service.body();

  o.subscribe(subscriber);
  assertThat(server.getRequestCount()).isEqualTo(1);
  subscriber.assertNoEvents();

  subscriber.request(1);
  subscriber.assertAnyValue().assertComplete();

  subscriber.request(Long.MAX_VALUE); // Subsequent requests do not trigger HTTP or notifications.
  assertThat(server.getRequestCount()).isEqualTo(1);
}
项目:RxjavaExample    文件:ExampleUnitTest.java   
@Test
public void zip() throws Exception{
    Consumer<Object> consumer = v -> System.out.println("[" + System.currentTimeMillis() / 100 + "] " + v);
    Flowable<Long> f1 = Flowable.interval(100, TimeUnit.MILLISECONDS);
    Flowable<Long> f2 = Flowable.interval(200, TimeUnit.MILLISECONDS);

    Flowable<Long> f3 = Flowable.zip(f1, f2, (x, y) -> x * 10000 + y);
    f3.subscribe(consumer);
}
项目:Renrentou    文件:RegService.java   
@POST("user/regist/person")
@Multipart
Flowable<ResponseDto<User>> regPerson(
        @PartMap Map<String, RequestBody> param,
        @Part MultipartBody.Part front,
        @Part MultipartBody.Part back
);
项目:GitHub    文件:RealmObservableFactory.java   
@Override
public <E> Flowable<RealmList<E>> from(Realm realm, final RealmList<E> list) {
    final RealmConfiguration realmConfig = realm.getConfiguration();
    return Flowable.create(new FlowableOnSubscribe<RealmList<E>>() {
        @Override
        public void subscribe(final FlowableEmitter<RealmList<E>> emitter) throws Exception {
            // Gets instance to make sure that the Realm is open for as long as the
            // Observable is subscribed to it.
            final Realm observableRealm = Realm.getInstance(realmConfig);
            listRefs.get().acquireReference(list);
            final RealmChangeListener<RealmList<E>> listener = new RealmChangeListener<RealmList<E>>() {
                @Override
                public void onChange(RealmList<E> results) {
                    if (!emitter.isCancelled()) {
                        emitter.onNext(list);
                    }
                }
            };
            list.addChangeListener(listener);

            // Cleanup when stream is disposed
            emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
                @Override
                public void run() {
                    list.removeChangeListener(listener);
                    observableRealm.close();
                    listRefs.get().releaseReference(list);
                }
            }));

            // Emit current value immediately
            emitter.onNext(list);

        }
    }, BACK_PRESSURE_STRATEGY);
}
项目:MVPtemplate    文件:RxBus.java   
/**
 * 取消监听
 * @param tag
 * @param flowable
 * @return
 */
@SuppressWarnings("rawtypes")
public RxBus unregister(@NonNull Object tag,
                        @NonNull Flowable<?> flowable) {
    if (null == flowable)
        return getInstance();
    List<FlowableProcessor> processors = mProcessorMapper.get(tag);
    if (null != processors) {
        processors.remove((FlowableProcessor<?>) flowable);
        if (isEmpty(processors)) {
            mProcessorMapper.remove(tag);
        }
    }
    return getInstance();
}
项目:Mix    文件:MainActivity.java   
private void definedFlowable() {
    Flowable.interval(1, TimeUnit.MICROSECONDS)
            .onBackpressureDrop()  //加上背压策略
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Long>() {
                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "onSubscribe");
                    mSubscription = s;
                    s.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(Long aLong) {
                    Log.d(TAG, "onNext: " + aLong);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                @Override
                public void onError(Throwable t) {
                    Log.w(TAG, "onError: ", t);
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}
项目:Java-EX    文件:RxIteratorTest.java   
@Test
public void testFlowable() throws Exception {
  Iterator<Integer> iterator = Flowable.range(0, 10)
      .to(RxIterator.flowableIterator());
  int i = 0;
  while (iterator.hasNext()) {
    assertEquals(i++, iterator.next().intValue());
  }
}
项目:reactive-grpc    文件:ServerErrorIntegrationTest.java   
@Test
public void oneToMany() {
    RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
    Flowable<HelloResponse> resp = stub.sayHelloRespStream(Single.just(HelloRequest.getDefaultInstance()));
    TestSubscriber<HelloResponse> test = resp
            .doOnNext(msg -> System.out.println(msg))
            .doOnError(throwable -> System.out.println(throwable.getMessage()))
            .doOnComplete(() -> System.out.println("Completed"))
            .doOnCancel(() -> System.out.println("Client canceled"))
            .test();

    test.awaitTerminalEvent(3, TimeUnit.SECONDS);
    test.assertError(t -> t instanceof StatusRuntimeException);
    test.assertError(t -> ((StatusRuntimeException)t).getStatus() == Status.INTERNAL);
}
项目:store2store    文件:TestStore.java   
@Override
public Flowable<Optional<TestModel>> insert(TestModel item) {
    if(shouldThrowError){
        return Flowable.error(new Exception("insertSingle.error"));
    }
    return Flowable.just(Optional.wrap(item)).delay(1, TimeUnit.SECONDS);
}
项目:MVPtemplate    文件:RxManager.java   
public <T> void on(String eventName, Consumer<T> consumer) {
    Flowable<T> flowable = mRxBus.register(eventName);
    mProcessorMap.put(eventName, flowable);
    mDisposable.add(flowable.observeOn(AndroidSchedulers.mainThread())
                            .subscribe(consumer, throwable -> {
                                        throwable.printStackTrace();
                                    }));
}
项目:fluid    文件:SimpleExample.java   
@Test
public void testComplexShaping() {
  Function<Flowable<Quote>, Flowable<String>> toAuthor = flow -> flow.map(q -> q.author);
  Function<Flowable<Quote>, Flowable<String>> toWords = flow -> flow
    .concatMap(q -> Flowable.fromArray(q.quote.split(" ")));
  CacheSink<String> authors = new CacheSink<>();
  CacheSink<String> words = new CacheSink<>();

  List<Quote> quotes = new ArrayList<>();
  quotes.add(new Quote("Attitude is everything", "Diane Von Furstenberg"));
  quotes.add(new Quote("Life is short, heels shouldn't be", "Brian Atwood"));
  quotes.add(new Quote("Red is the color for fall", "Piera Gelardi"));
  quotes.add(new Quote("Rhinestones make everything better", "Piera Gelardi"));
  quotes.add(new Quote("Design is so simple, that's why it's so complicated", "Paul Rand"));

  List<DataStream<Quote>> broadcast = Source.from(quotes.stream().map(Data::new)).broadcast(2);

  broadcast.get(0)
    .transformPayloadFlow(toAuthor)
    .transformPayloadFlow(Flowable::distinct)
    .to(authors);

  broadcast.get(1)
    .transformPayloadFlow(toWords)
    .transformPayloadFlow(Flowable::distinct)
    .to(words);

  await().until(() -> authors.cache().size() == 4);
  assertThat(authors.cache()).hasSize(4);
  assertThat(words.cache()).isNotEmpty();
}
项目:EasyHttp    文件:RxEasyHttpManager.java   
/**
 * Get请求的Rxjava方式.
 * @param url
 * @param requestParams
 * @param cacheType
 * @return
 */
public <T> Flowable<T> get(String url, EasyRequestParams requestParams, int cacheType, RxEasyConverter<T> rxEasyConverter) {
    final Request request = new Request.Builder().url(EasyHttpClientManager.getInstance().buildUrl(url, requestParams)).build();
    // 接口没有单独设定缓存类型,使用全局缓存类型.
    if (cacheType == EasyCacheType.CACHE_TYPE_NO_SETTING) {
        cacheType = EasyHttpClientManager.getInstance().getConfig().getGlobalCacheType();
    }

    Call call = EasyHttpClientManager.getInstance().getOkHttpClient(cacheType).newCall(request);

    return Flowable.create(new CallFlowableOnSubscribe(call, rxEasyConverter), BackpressureStrategy.BUFFER)
            .subscribeOn(Schedulers.io());
}
项目:GitHub    文件:StatisticsPresenter.java   
private void loadStatistics() {
    mStatisticsView.setProgressIndicator(true);

    // The network request might be handled in a different thread so make sure Espresso knows
    // that the app is busy until the response is handled.
    EspressoIdlingResource.increment(); // App is busy until further notice

    Flowable<Task> tasks = mTasksRepository
            .getTasks()
            .flatMap(Flowable::fromIterable);
    Flowable<Long> completedTasks = tasks.filter(Task::isCompleted).count().toFlowable();
    Flowable<Long> activeTasks = tasks.filter(Task::isActive).count().toFlowable();
    Disposable disposable = Flowable
            .zip(completedTasks, activeTasks, (completed, active) -> Pair.create(active, completed))
            .subscribeOn(mSchedulerProvider.computation())
            .observeOn(mSchedulerProvider.ui())
            .doFinally(() -> {
                if (!EspressoIdlingResource.getIdlingResource().isIdleNow()) {
                    EspressoIdlingResource.decrement(); // Set app as idle.
                }
            })
            .subscribe(
                    // onNext
                    stats -> mStatisticsView.showStatistics(Ints.saturatedCast(stats.first), Ints.saturatedCast(stats.second)),
                    // onError
                    throwable -> mStatisticsView.showLoadingStatisticsError(),
                    // onCompleted
                    () -> mStatisticsView.setProgressIndicator(false));
    mCompositeDisposable.add(disposable);
}
项目:streamingpool-core    文件:IntervalStreamFactory.java   
@SuppressWarnings("unchecked")
@Override
public <T> ErrorStreamPair<T> create(StreamId<T> id, DiscoveryService discoveryService) {

    if (!(id instanceof IntervalStreamId)) {
        return ErrorStreamPair.empty();
    }

    IntervalStreamId typedId = (IntervalStreamId) id;
    ErrorDeflector ed = ErrorDeflector.create();

    Publisher<Long> dataPublisher = Flowable.interval(typedId.getPeriod(), typedId.getPeriodTimeUnit())
            .delay(typedId.getInitialDelay(), typedId.getInitialDelayTimeUnit());
    return ed.stream((Publisher<T>) dataPublisher);
}
项目:RxProgress    文件:MainActivity.java   
@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
    Toolbar toolbar = (Toolbar) findViewById(R.id.toolbar);
    setSupportActionBar(toolbar);
    toolbar.setSubtitle("By soussidev");

    FloatingActionButton fab = (FloatingActionButton) findViewById(R.id.fab);
    fab.setOnClickListener(view -> {

    //Set Default Text to Button
   btn_loading_observe.setText("Show Loading Observe");
   btn_loadig_flowable.setText("Show Loading Flowable");

    });

    InitView();

    //Init Observe for btn observe
    user_Observable = Observable
            .timer(TIME_DELAY, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
            .doOnTerminate(() -> btn_loading_observe.setText("Observing Again"))
            .doOnComplete(() -> AnimationView()) //if complete show animation
            .map(aLong -> getMessageResult()); //Call function messageresult()

    //Init Flowable for btn Floable
    user_Flowable = Flowable
            .timer(TIME_DELAY, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
            .doOnComplete(() -> btn_loadig_flowable.setText("Flowable Again"))
            .doOnComplete(() -> AnimationView()) //if complete show animation
            .map(aLong -> getMessageResult());  //Call function messageresult()

}
项目:XDroid-Databinding    文件:XApi.java   
public static <T> FlowableTransformer<T, T> getFlowableScheduler(final Function<? super Flowable<Throwable>, ? extends Publisher<?>> retryWhenHandler) {
    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> upstream) {
            return upstream
                    .retryWhen(retryWhenHandler)
                    .onErrorResumeNext(new ServerResultErrorFunc2<T>())
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:DailyStudy    文件:RxJavaActivity.java   
private void flowable() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
            Log.e(TAG, "start send data ");
            for (int i = 0; i < 100; i++) {
                e.onNext(i);
            }
            e.onComplete();
        }
    }, BackpressureStrategy.DROP)//指定背压策略
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<Integer>() {
                @Override
                public void onSubscribe(@NonNull Subscription s) {
                    //1, onSubscribe 是2.x新添加的方法,在发射数据前被调用,相当于1.x的onStart方法
                    //2, 参数为  Subscription ,Subscription 可用于向上游请求发射多少个元素,也可用于取笑请求
                    //3,  必须要调用Subscription 的request来请求发射数据,不然上游是不会发射数据的。
                    Log.e(TAG, "onSubscribe...");
                    s.request(10);
                }

                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG, "onNext:" + integer);
                }

                @Override
                public void onError(Throwable t) {
                    Log.e(TAG, "onError..." + t);
                }

                @Override
                public void onComplete() {
                    Log.e(TAG, "onComplete...");
                }
            });

}
项目:Java-EX    文件:ComparatorUtilTest.java   
@Test
public void testStartWith() throws Exception {
  Flowable.range(0, 10)
      .sorted(startWith(3, 5, 7))
      .take(5)
      .test()
      .assertResult(3, 5, 7, 0, 1);
}
项目:Inshorts    文件:RepositoryImpl.java   
@Override
public Flowable<List<Article>> getAllArticlesByCategory() {
    ArticleDao dao = mDatabase.articleDao();
    return dao.getAllArticlesByCategory()//query the local db for article list
            .map(articles -> {
                if(articles.size() > 0) return articles; //if the list size is > 0, return it
                else return fetchFromNetwork(dao); //if list is empty fetch from network
            });
}
项目:GitHub    文件:PaginationActivity.java   
/**
 * Simulation of network data
 */
private Flowable<List<String>> dataFromNetwork(final int page) {
    return Flowable.just(true)
            .delay(2, TimeUnit.SECONDS)
            .map(new Function<Boolean, List<String>>() {
                @Override
                public List<String> apply(@NonNull Boolean value) throws Exception {
                    List<String> items = new ArrayList<>();
                    for (int i = 1; i <= 10; i++) {
                        items.add("Item " + (page * 10 + i));
                    }
                    return items;
                }
            });
}
项目:rxtools    文件:SubjectMap.java   
private Processor<V, V> attachSource(K key)
{
    _writeLock.lock();
    try {
        // if our source is being attached, we expect that all existing sources have been
        // cleaned up properly. If not, this is a serious issue
        assert(!_weakSources.containsKey(key));

        Processor<V, V> value = BehaviorProcessor.create();

        WeakReference<Flowable<V>> weakConnector = _weakCache.get(key);

        // if an observable is being attached then it must have been added to the weak cache
        // and it must still be referenced
        Flowable<V> connector = weakConnector.get();

        // the observable must be retained by someone since it is being attached
        assert(connector != null);

        // strongly retain the observable and add the subject so future next
        // calls will be piped through the subject
        _weakSources.put(key, new WeakReference<>(value));
        _cache.put(key, connector);

        return value;
    }
    finally {
        _writeLock.unlock();
    }
}
项目:Reactive-Programming-With-Java-9    文件:DemoCompositeDisposable.java   
public static void main(String[] args) {
    // TODO Auto-generated method stub
    CompositeDisposable disposable = new CompositeDisposable();
    disposable.add(Flowable.rangeLong(10, 5).subscribe(System.out::println));
    disposable.add(Flowable.rangeLong(1, 5).subscribe(item -> System.out.println("two" + item)));

    disposable.add(Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            // TODO Auto-generated method stub

            try {
                String[] monthArray = { "Jan", "Feb", "Mar", "Apl", "May", "Jun", "July", "Aug", "Sept", "Oct",
                        "Nov", "Dec" };

                List<String> months = Arrays.asList(monthArray);

                for (String month : months) {
                    emitter.onNext(month);
                }
                emitter.onComplete();
            } catch (Exception e) {
                // TODO: handle exception
                emitter.onError(e);
            }
        }
    }).subscribe(s -> System.out.println(s)));

    disposable.dispose();


}
项目:android-arch-mvvm    文件:ModuleManager.java   
private static void checkReturnType(Method method1, Method method2) {
    Class<?> returnType;
    Type returnType1, returnType2;
    if (ModuleCall.class.equals(method1.getReturnType())) { // 异步回调的方法
        returnType = method2.getReturnType();
        if (returnType.equals(Observable.class) || returnType.equals(Single.class) || returnType.equals(Flowable.class) || returnType.equals(Maybe.class)) {

            returnType1 = method1.getGenericReturnType();
            returnType2 = method2.getGenericReturnType();

            if (returnType1 instanceof ParameterizedType && returnType2 instanceof ParameterizedType) { // 都带泛型
                // 检查泛型的类型是否一样
                if (!((ParameterizedType) returnType1).getActualTypeArguments()[0].equals(((ParameterizedType) returnType2).getActualTypeArguments()[0])) {
                    throw new IllegalArgumentException(method1.getName() + "方法的返回值类型的泛型的须一样");
                }
            } else if (!(returnType1 instanceof Class && returnType2 instanceof Class)) {
                throw new IllegalArgumentException(method1.getName() + "方法的返回值类型的泛型的须一样");
            }
        } else {
            throw new IllegalArgumentException(String.format("%s::%s的返回值类型必须是Observable,Single,Flowable,Maybe之一", method2.getDeclaringClass().getSimpleName(), method2.getName()));
        }
    } else {
        if (!method1.getGenericReturnType().equals(method2.getGenericReturnType())) { //同步调用的返回值必须一样
            throw new IllegalArgumentException(method1.getName() + "方法的返回值类型不一样");
        }
    }
}
项目:CustomizableCalendar    文件:AUCalendar.java   
public Flowable<ChangeSet> observeChangesOnCalendar() {
    return Flowable.create((FlowableEmitter<ChangeSet> emitter) -> {
        CalendarObjectChangeListener objectChangeListener = emitter::onNext;
        addChangeListener(objectChangeListener);
        emitter.setCancellable(() -> removeChangeListener(objectChangeListener));
    }, BackpressureStrategy.BUFFER);
}
项目:streamingpool-core    文件:LocalPoolHookTest.java   
@Test
public void noStreamIdEmittedIfNoStreamIsProvided() {
    TestSubscriber<StreamId<?>> subscriber = new TestSubscriber<>();
    Flowable.fromPublisher(newStreamHook()).subscribe(subscriber);

    subscriber.awaitTerminalEvent(1, SECONDS);
    subscriber.assertNoValues();
}
项目:grooves    文件:QueryExecutor.java   
/**
 * Applies all revert events from a list and returns the list with only valid forward events.
 *
 * @param events The list of events
 *
 * @return An Flowable of forward only events
 */
@NotNull
@Override
public Flowable<EventT> applyReverts(@NotNull Flowable<EventT> events) {

    return events.toList().toFlowable().flatMap(eventList -> {
        log.debug("     Event Ids (includes reverts that won't be applied): {}",
                ids(eventList));
        List<EventT> forwardEvents = new ArrayList<>();
        while (!eventList.isEmpty()) {
            EventT head = eventList.remove(eventList.size() - 1);
            if (head instanceof RevertEvent) {
                final EventIdT revertedEventId =
                        (EventIdT) ((RevertEvent) head).getRevertedEventId();
                final Optional<EventT> revertedEvent = eventList.stream()
                        .filter(it -> Objects.equals(it.getId(), revertedEventId))
                        .findFirst();

                if (revertedEvent.isPresent()) {
                    eventList.remove(revertedEvent.get());
                } else {
                    throw new GroovesException(String.format(
                            "Cannot revert event that does not exist in unapplied list - %s",
                            String.valueOf(revertedEventId)));
                }

            } else {
                forwardEvents.add(0, head);
            }

        }

        assert forwardEvents.stream().noneMatch(it -> it instanceof RevertEvent);

        return fromIterable(forwardEvents);
    });
}
项目:mvvm-template    文件:AllReposRepo.java   
public Flowable<Resource<Pageable<Repo>>> searchRemote(String query, int page) {
    return RestHelper.createRemoteSourceMapper(searchService.searchRepositories(query, page), repoPageable -> {
        dao.addAllAsync(repoPageable.getItems());
    });
}
项目:store2store    文件:StoreService.java   
public final Flowable<Optional<List<T>>> getAll(final Filter filter) {
    return getAll(filter, SortingMode.DEFAULT);
}
项目:CrazyDaily    文件:ZhihuDataRepository.java   
@Override
public Flowable<ZhihuNewsEntity> getZhihuNewsList() {
    return mZhihuService.getZhihuNewsList()
            .compose(RxTransformerUtil.normalTransformer());
}
项目:store2store    文件:StoreDao.java   
/**
 * @return int  Number of items deleted
 */
public Flowable<Integer> delete(final List<T> items) {
    throw new UnsupportedOperationException("This method has not been implemented in the child class");
}
项目:Android-MVVM    文件:UserApi.java   
@GET("user/{email}")
Flowable<User> getUser(@Path("email") String email);
项目:pyplyn    文件:TaskManager.java   
public Flowable<T> createTask(T task) {
    return Flowable.interval(0, task.repeatIntervalMillis(), MILLISECONDS)

            // prevent configurations from running too often
            .onBackpressureDrop()

            .map(i -> task)
            // stop if shutting down
            .takeWhile(t -> !shutdownHook.isShutdown())

            // stop after the first task if only running once
            .takeUntil(ignored -> runOnce)

            // remove subscriptions that are disposed or completed
            .doFinally(() -> ACTIVE_SUBSCRIPTIONS.remove(task))

            // lifecycle management
            .doOnSubscribe(subscription -> Optional.ofNullable(ACTIVE_SUBSCRIPTIONS.put(task, subscription)).ifPresent(Subscription::cancel))
            .doOnNext(disposableTask -> HAS_STARTED_PROCESSING.countDown())

            .delay((item) -> {
                // get last execution time
                Instant lastRun = LAST_EXECUTED.computeIfAbsent(task,
                        // or trigger a run by creating an Instant at the point in time where the configuration should have run last
                        t -> Instant.now().minusMillis(task.repeatIntervalMillis()));

                // compute duration between lastRun and now; D=lastRun-lastRun
                Duration duration = Duration.between(Instant.now(), lastRun)
                        // substract the repeat interval; D=repeatInterval-(lastRun-now)
                        .plusMillis(task.repeatIntervalMillis());

                // D<=0; need to run now
                if (duration.isNegative() || duration.isZero()) {
                    return Flowable.just(0L);

                    // D>0; need to run in D
                } else {
                    return Flowable.timer(duration.toMillis(), TimeUnit.MILLISECONDS);
                }
            })

            // mark the time at which we ran last
            .doOnNext(results -> LAST_EXECUTED.put(task, Instant.now()));
}
项目:RxRedux    文件:BaseViewModel.java   
/**
 * A Transformer, given eventObservable returns UIModels by applying the redux pattern.
 *
 * @return {@link FlowableTransformer} the Redux pattern transformer.
 */
@NonNull
public Flowable<UIModel<S>> uiModels(S initialState) {
    return eventsSubject.toFlowable(BackpressureStrategy.BUFFER)
            .compose(uiModelsTransformer(initialState));
}
项目:rebase-android    文件:Rebase.java   
@POST("categories/{username}/{category}/feeds")
Flowable<Feed> newFeed(
    @Path("username") String username,
    @Path("category") String category, @Body Feed feed);