Java 类io.reactivex.functions.Function3 实例源码

项目:code-examples-android-expert    文件:lessonC_BooleanLogicAndErrorHandling.java   
/**
 * In this experiment, we will use RxJava to pick a lock. Our lock has three
 * tumblers. We will need them all to be up to unlock the lock!
 */

@Test
public void combineLatestShouldTakeTheLastEventsOfASetOfObservablesAndCombinesThem() {

    Observable<Boolean> tumbler1Observable = Observable.just(20).map(integer -> new Random().nextInt(integer) > 15)
            .delay(new Random().nextInt(20), TimeUnit.MILLISECONDS).repeat(1000);
    Observable<Boolean> tumbler2Observable = Observable.just(20).map(integer -> new Random().nextInt(integer) > 15)
            .delay(new Random().nextInt(20), TimeUnit.MILLISECONDS).repeat(1000);
    Observable<Boolean> tumbler3Observable = Observable.just(20).map(integer -> new Random().nextInt(integer) > 15)
            .delay(new Random().nextInt(20), TimeUnit.MILLISECONDS).repeat(1000);

    Function3<Boolean, Boolean, Boolean, Boolean> combineTumblerStatesFunction = (tumblerOneUp, tumblerTwoUp,
            tumblerThreeUp) -> {
        Boolean allTumblersUnlocked = tumblerOneUp && tumblerTwoUp && tumblerThreeUp;
        return allTumblersUnlocked;
    };

    Observable<Boolean> lockIsPickedObservable = Observable
            .combineLatest(tumbler1Observable, tumbler2Observable, tumbler3Observable, combineTumblerStatesFunction)
            .takeUntil(unlocked -> unlocked == true);
    lockIsPickedObservable.subscribe(testObservable);
    testObservable.awaitTerminalEvent();
    List<Object> onNextEvents = testObservable.values();
    assertThat(onNextEvents.get(onNextEvents.size()-1)).isEqualTo(null);
}
项目:rxjava2-extras    文件:FlowableStateMachine.java   
public FlowableStateMachine(Flowable<In> source, //
        Callable<? extends State> initialState, //
        Function3<? super State, ? super In, ? super Emitter<Out>, ? extends State> transition, //
        BiConsumer<? super State, ? super Emitter<Out>> completionAction, //
        Consumer3<? super State, ? super Throwable, ? super Emitter<Out>> errorAction, //
        BackpressureStrategy backpressureStrategy, //
        int requestBatchSize) {
    Preconditions.checkNotNull(initialState);
    Preconditions.checkNotNull(transition);
    Preconditions.checkNotNull(backpressureStrategy);
    Preconditions.checkArgument(requestBatchSize > 0,
            "initialRequest must be greater than zero");
    this.source = source;
    this.initialState = initialState;
    this.transition = transition;
    this.completionAction = completionAction;
    this.errorAction = errorAction;
    this.backpressureStrategy = backpressureStrategy;
    this.requestBatchSize = requestBatchSize;
}
项目:rxjava2-extras    文件:FlowableStateMachine.java   
StateMachineSubscriber( //
        Callable<? extends State> initialState,
        Function3<? super State, ? super In, ? super Emitter<Out>, ? extends State> transition, //
        BiConsumer<? super State, ? super Emitter<Out>> completionAction, //
        Consumer3<? super State, ? super Throwable, ? super Emitter<Out>> errorAction, //
        BackpressureStrategy backpressureStrategy, //
        int requestBatchSize, //
        Subscriber<? super Out> child) {
    this.initialState = initialState;
    this.transition = transition;
    this.completionAction = completionAction;
    this.errorAction = errorAction;
    this.backpressureStrategy = backpressureStrategy;
    this.requestBatchSize = requestBatchSize;
    this.child = child;
    this.count = requestBatchSize;
}
项目:NovelReader    文件:RxUtils.java   
public static <T> Single<DetailBean<T>> toCommentDetail(Single<T> detailSingle,
                                            Single<List<CommentBean>> bestCommentsSingle,
                                            Single<List<CommentBean>> commentsSingle){
    return Single.zip(detailSingle, bestCommentsSingle, commentsSingle,
            new Function3<T, List<CommentBean>, List<CommentBean>, DetailBean<T>>() {
                @Override
                public DetailBean<T> apply(T t, List<CommentBean> commentBeen,
                                           List<CommentBean> commentBeen2) throws Exception {
                    return new DetailBean<T>(t,commentBeen,commentBeen2);
                }
            });
}
项目:rxjava2-extras    文件:TransformerStateMachine.java   
private TransformerStateMachine(Callable<? extends State> initialState,
        Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
        BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
        BackpressureStrategy backpressureStrategy, int requestBatchSize) {
    Preconditions.checkNotNull(initialState);
    Preconditions.checkNotNull(transition);
    Preconditions.checkNotNull(completion);
    Preconditions.checkNotNull(backpressureStrategy);
    Preconditions.checkArgument(requestBatchSize > 0, "initialRequest must be greater than zero");
    this.initialState = initialState;
    this.transition = transition;
    this.completion = completion;
    this.backpressureStrategy = backpressureStrategy;
    this.requestBatchSize = requestBatchSize;
}
项目:rxjava2-extras    文件:TransformerStateMachine.java   
public static <State, In, Out> FlowableTransformer<In, Out> create(Callable<? extends State> initialState,
        Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
        BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
        BackpressureStrategy backpressureStrategy, int requestBatchSize) {
    return new TransformerStateMachine<State, In, Out>(initialState, transition, completion, backpressureStrategy,
            requestBatchSize);
}
项目:rxjava2-extras    文件:TransformerStateMachine.java   
private static <State, Out, In> Function<Notification<In>, Flowable<Notification<Out>>> execute(
        final Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
        final BiPredicate<? super State, ? super FlowableEmitter<Out>> completion, final Mutable<State> state,
        final BackpressureStrategy backpressureStrategy) {

    return new Function<Notification<In>, Flowable<Notification<Out>>>() {

        @Override
        public Flowable<Notification<Out>> apply(final Notification<In> in) {

            return Flowable.create(new FlowableOnSubscribe<Notification<Out>>() {

                @Override
                public void subscribe(FlowableEmitter<Notification<Out>> emitter) throws Exception {
                    FlowableEmitter<Out> w = wrap(emitter);
                    if (in.isOnNext()) {
                        state.value = transition.apply(state.value, in.getValue(), w);
                        if (!emitter.isCancelled())
                            emitter.onComplete();
                        else {
                            // this is a special emission to indicate that
                            // the transition called unsubscribe. It will be
                            // filtered later.
                            emitter.onNext(UnsubscribedNotificationHolder.<Out>unsubscribedNotification());
                        }
                    } else if (in.isOnComplete()) {
                        if (completion.test(state.value, w) && !emitter.isCancelled()) {
                            w.onComplete();
                        }
                    } else if (!emitter.isCancelled()) {
                        w.onError(in.getError());
                    }
                }

            }, backpressureStrategy);
        }
    };
}
项目:rxjava2-extras    文件:Transformers.java   
public static <State, In, Out> FlowableTransformer<In, Out> stateMachine(Callable<? extends State> initialState,
        Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
        BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
        BackpressureStrategy backpressureStrategy, int requestBatchSize) {
    return TransformerStateMachine.create(initialState, transition, completion, backpressureStrategy,
            requestBatchSize);
}
项目:ReactiveChannel    文件:LambdaChannelResponder.java   
public LambdaChannelResponder(
        ChannelSubscriber<Request, Response> actual,
        State initialState,
        Function3<State, Request, ChannelTerminalEvents, ? extends Publisher<Response>> queryMapper,
                Consumer<? super State> stateConsumer) {
    this.state = initialState;
    this.actual = actual;
    this.queryMapper = queryMapper;
    this.stateConsumer = stateConsumer;
    this.cancelled = PublishSubject.create().toSerialized();
}
项目:RxEasyHttp    文件:SceneActivity.java   
public void onZipRequest(View view) {
    //使用zip操作符合并等待多个网络请求完成后,再刷新界面
    //例如下面:数据来自3个不同的接口
    Observable<ResultBean> mobileObservable = EasyHttp.get("http://apis.juhe.cn/mobile/get")
            .params("phone", "18688994275")
            .params("dtype", "json")
            .params("key", "5682c1f44a7f486e40f9720d6c97ffe4")
            .execute(new CallClazzProxy<TestApiResult1<ResultBean>, ResultBean>(ResultBean.class) {
            });

    Observable<Content> searchObservable = EasyHttp.get("/ajax.php")
            .baseUrl("http://fy.iciba.com")
            .params("a", "fy")
            .params("f", "auto")
            .params("t", "auto")
            .params("w", "hello world")
            //采用代理
            .execute(new CallClazzProxy<TestApiResult6<Content>, Content>(Content.class) {
            });

    Observable<List<SectionItem>> listObservable = EasyHttp.get("http://news-at.zhihu.com/api/3/sections")
            .execute(new CallClazzProxy<TestApiResult5<List<SectionItem>>, List<SectionItem>>(new TypeToken<List<SectionItem>>() {
            }.getType()) {
            });
    //new Function3最后一个参数这里用的是List<Object>,表示将3个返回的结果,放在同一个集合最终一次性返回,你也可以指定返回其它你需要的数据类型并不一定是List<Object>
    //假如这三个接口返回的都是TestBean,那么就可以直接用具体的List<TestBean>,不需要用List<Object>
    Observable.zip(mobileObservable, searchObservable, listObservable, new Function3<ResultBean, Content, List<SectionItem>, List<Object>>() {
        @Override
        public List<Object> apply(@NonNull ResultBean resultbean, @NonNull Content content, @NonNull List<SectionItem> sectionItems) throws Exception {
            //将接收到的3个数据先暂存起来,一次性发给订阅者
            List list = new ArrayList();
            list.add(resultbean);
            list.add(content);
            list.add(sectionItems);
            return list;
        }
    }).subscribe(new BaseSubscriber<List<Object>>() {
        @Override
        public void onError(ApiException e) {
            showToast(e.getMessage());
        }

        @Override
        public void onNext(@NonNull List<Object> objects) {
            showToast(objects.toString());
        }
    });
}
项目:RxJava2Extensions    文件:Pattern3.java   
/**
 * Matches when all observable sequences have an available
 * element and projects the elements by invoking the selector function.
 *
 * @param <R> the result type
 * @param selector
 *            the function that will be invoked for elements in the source sequences.
 * @return the plan for the matching
 * @throws NullPointerException
 *             if selector is null
 */
public <R> Plan<R> then(Function3<T1, T2, T3, R> selector) {
    if (selector == null) {
        throw new NullPointerException();
    }
    return new Plan3<T1, T2, T3, R>(this, selector);
}
项目:rxtools    文件:FlowableList.java   
/**
 * Transforms the list using the supplied map. The map will receive a Flowable
 * bound to the previous and next items in the list. The previous and next Flowables will emit
 * when the item moves within the list or when items surrounding the list are moved.
 * @param transform A function transforming the source to the target type
 * @param <R> The type of the mapped value
 * @return A new FlowableList which has values mapped via the supplied map
 */
public <R> FlowableList<R> indexedMap(final Function3<T, Flowable<Optional<T>>, Flowable<Optional<T>>, R> transform)
{
    return new IndexedFlowableList<>(this, transform);
}