Java 类rx.functions.Action2 实例源码

项目:rxjava-extras    文件:Transformers.java   
public static <T> Transformer<T, Set<T>> toSet() {
    return new Transformer<T, Set<T>>() {

        @Override
        public Observable<Set<T>> call(Observable<T> o) {
            return o.collect(new Func0<Set<T>>() {

                @Override
                public Set<T> call() {
                    return new HashSet<T>();
                }
            }, new Action2<Set<T>, T>() {

                @Override
                public void call(Set<T> set, T t) {
                    set.add(t);
                }
            });
        }
    };
}
项目:LiteReader    文件:DoubanMovieDetailViewModel.java   
private void initActions() {
    onReviewItemClick = new Action2<Boolean, Review>() {
        @Override
        public void call(Boolean lastOne, Review review) {
            // 如果是最后一项,则跳到评论列表,否则跳到评论详情
            Intent intent;
            if (lastOne) {
                intent = DoubanMovieMoreReviewActivity.intentFor(getContext());
                intent.putExtra(Constant.EXTRA_DOUBAN_MOVIE_ID, id);
            } else {
                intent = DoubanMovieReviewActivity.intentFor(getContext());
                intent.putExtra(Constant.EXTRA_DOUBAN_MOVIE_REVIEW, review);
            }
            intent.putExtra(Constant.EXTRA_DOUBAN_MOVIE_TITLE, title);
            getContext().startActivity(intent);
        }
    };
}
项目:pcloud-networking-java    文件:RxCallAdapter.java   
@Override
public Observable<T> adapt(final Call<T> call) {

    return Observable.<T>create(SyncOnSubscribe.createSingleState(new Func0<Call<T>>() {
        @Override
        public Call<T> call() {
            return call.clone();
        }
    }, new Action2<Call<T>, Observer<? super T>>() {
        @Override
        public void call(Call<T> callClone, Observer<? super T> observer) {
            try {
                observer.onNext(callClone.execute());
                observer.onCompleted();
            } catch (Throwable throwable) {
                observer.onError(throwable);
            }
        }
    }, new Action1<Call<T>>() {
        @Override
        public void call(Call<T> tCall) {
            tCall.cancel();
        }
    }));
}
项目:buddysearch    文件:FirebaseEntityStore.java   
private <T> Observable<T> getQuery(Query query, Action2<Subscriber<? super T>, DataSnapshot> onNextAction, boolean subscribeForSingleEvent) {
    return Observable.create(subscriber -> {
        ValueEventListener eventListener = new ValueEventListener() {

            @Override
            public void onDataChange(DataSnapshot dataSnapshot) {
                onNextAction.call(subscriber, dataSnapshot);
            }

            @Override
            public void onCancelled(DatabaseError databaseError) {
                subscriber.onError(new FirebaseException(databaseError.getMessage()));
            }
        };
        if (subscribeForSingleEvent) {
            query.addListenerForSingleValueEvent(eventListener);
        } else {
            query.addValueEventListener(eventListener);
        }
        subscriber.add(Subscriptions.create(() -> query.removeEventListener(eventListener)));
    });
}
项目:RxAndroidBle    文件:ByteArrayBatchObservable.java   
@NonNull
private static SyncOnSubscribe<ByteBuffer, byte[]> createSyncOnSubscribe(final byte[] bytes, final int maxBatchSize) {
    return SyncOnSubscribe.createSingleState(
            new Func0<ByteBuffer>() {
                @Override
                public ByteBuffer call() {
                    return ByteBuffer.wrap(bytes);
                }
            },
            new Action2<ByteBuffer, Observer<? super byte[]>>() {
                @Override
                public void call(ByteBuffer byteBuffer, Observer<? super byte[]> observer) {
                    int nextBatchSize = Math.min(byteBuffer.remaining(), maxBatchSize);
                    if (nextBatchSize == 0) {
                        observer.onCompleted();
                        return;
                    }
                    final byte[] nextBatch = new byte[nextBatchSize];
                    byteBuffer.get(nextBatch);
                    observer.onNext(nextBatch);
                }
            }
    );
}
项目:RxActions    文件:Actions.java   
/**
 * Combines provided actions into a single action stream
 *
 * @param a1 Action
 * @param a2 Action
 * @param a3 Action
 * @param a4 Action
 * @param a5 Action
 * @param a6 Action
 * @return Single action stream combined from provided actions
 */
@CheckResult
@NonNull
public static <T1, T2> Action2<? super T1, ? super T2> combine(@NonNull final Action2<? super T1, ? super T2> a1, @NonNull final Action2<? super T1, ? super T2> a2, @NonNull final Action2<? super T1, ? super T2> a3, @NonNull final Action2<? super T1, ? super T2> a4, @NonNull final Action2<? super T1, ? super T2> a5, @NonNull final Action2<? super T1, ? super T2> a6) {
    return new Action2<T1, T2>() {
        @Override
        public void call(T1 t1, T2 t2) {
            a1.call(t1, t2);
            a2.call(t1, t2);
            a3.call(t1, t2);
            a4.call(t1, t2);
            a5.call(t1, t2);
            a6.call(t1, t2);
        }
    };
}
项目:RxActions    文件:Actions.java   
/**
 * Combines provided actions into a single action stream
 *
 * @param a1 Action
 * @param a2 Action
 * @param a3 Action
 * @param a4 Action
 * @param a5 Action
 * @param a6 Action
 * @param a7 Action
 * @return Single action stream combined from provided actions
 */
@CheckResult
@NonNull
public static <T1, T2> Action2<? super T1, ? super T2> combine(@NonNull final Action2<? super T1, ? super T2> a1, @NonNull final Action2<? super T1, ? super T2> a2, @NonNull final Action2<? super T1, ? super T2> a3, @NonNull final Action2<? super T1, ? super T2> a4, @NonNull final Action2<? super T1, ? super T2> a5, @NonNull final Action2<? super T1, ? super T2> a6, @NonNull final Action2<? super T1, ? super T2> a7) {
    return new Action2<T1, T2>() {
        @Override
        public void call(T1 t1, T2 t2) {
            a1.call(t1, t2);
            a2.call(t1, t2);
            a3.call(t1, t2);
            a4.call(t1, t2);
            a5.call(t1, t2);
            a6.call(t1, t2);
            a7.call(t1, t2);
        }
    };
}
项目:RxActions    文件:Actions.java   
/**
 * Combines provided actions into a single action stream
 *
 * @param a1 Action
 * @param a2 Action
 * @param a3 Action
 * @param a4 Action
 * @param a5 Action
 * @param a6 Action
 * @param a7 Action
 * @param a8 Action
 * @return Single action stream combined from provided actions
 */
@CheckResult
@NonNull
public static <T1, T2> Action2<? super T1, ? super T2> combine(@NonNull final Action2<? super T1, ? super T2> a1, @NonNull final Action2<? super T1, ? super T2> a2, @NonNull final Action2<? super T1, ? super T2> a3, @NonNull final Action2<? super T1, ? super T2> a4, @NonNull final Action2<? super T1, ? super T2> a5, @NonNull final Action2<? super T1, ? super T2> a6, @NonNull final Action2<? super T1, ? super T2> a7, @NonNull final Action2<? super T1, ? super T2> a8) {
    return new Action2<T1, T2>() {
        @Override
        public void call(T1 t1, T2 t2) {
            a1.call(t1, t2);
            a2.call(t1, t2);
            a3.call(t1, t2);
            a4.call(t1, t2);
            a5.call(t1, t2);
            a6.call(t1, t2);
            a7.call(t1, t2);
            a8.call(t1, t2);
        }
    };
}
项目:RxActions    文件:Actions.java   
/**
 * Combines provided actions into a single action stream
 *
 * @param a1 Action
 * @param a2 Action
 * @param a3 Action
 * @param a4 Action
 * @param a5 Action
 * @param a6 Action
 * @param a7 Action
 * @param a8 Action
 * @param a9 Action
 * @return Single action stream combined from provided actions
 */
@CheckResult
@NonNull
public static <T1, T2> Action2<? super T1, ? super T2> combine(@NonNull final Action2<? super T1, ? super T2> a1, @NonNull final Action2<? super T1, ? super T2> a2, @NonNull final Action2<? super T1, ? super T2> a3, @NonNull final Action2<? super T1, ? super T2> a4, @NonNull final Action2<? super T1, ? super T2> a5, @NonNull final Action2<? super T1, ? super T2> a6, @NonNull final Action2<? super T1, ? super T2> a7, @NonNull final Action2<? super T1, ? super T2> a8, @NonNull final Action2<? super T1, ? super T2> a9) {
    return new Action2<T1, T2>() {
        @Override
        public void call(T1 t1, T2 t2) {
            a1.call(t1, t2);
            a2.call(t1, t2);
            a3.call(t1, t2);
            a4.call(t1, t2);
            a5.call(t1, t2);
            a6.call(t1, t2);
            a7.call(t1, t2);
            a8.call(t1, t2);
            a9.call(t1, t2);
        }
    };
}
项目:openwebnet-android    文件:DeviceListAdapter.java   
private void updateEnergyValues(EnergyViewHolder holder, EnergyModel energy) {
    holder.linearLayoutEnergyValues.setVisibility(View.VISIBLE);
    holder.imageViewCardAlert.setVisibility(View.INVISIBLE);

    if (energy.getInstantaneousPower() == null &&
            energy.getDailyPower() == null &&
            energy.getMonthlyPower() == null) {
        log.warn("energy values are null or invalid: unable to update");
        holder.linearLayoutEnergyValues.setVisibility(View.GONE);
        holder.imageViewCardAlert.setVisibility(View.VISIBLE);
        return;
    }

    Action2<TextView, String> updateEnergy = (textView, value) ->
        textView.setText(TextUtils.isEmpty(value) ? utilityService.getString(R.string.energy_none) :
            value + " " + utilityService.getString(R.string.energy_power_unit));

    updateEnergy.call(holder.textViewEnergyInstantaneousPower, energy.getInstantaneousPower());
    updateEnergy.call(holder.textViewEnergyDailyPower, energy.getDailyPower());
    updateEnergy.call(holder.textViewEnergyMonthlyPower, energy.getMonthlyPower());
}
项目:RxJavaFlow    文件:ObservableTests.java   
@Test
public void testCollectToString() {
    String value = Observable.just(1, 2, 3).collect(new Supplier<StringBuilder>() {

        @Override
        public StringBuilder call() {
            return new StringBuilder();
        }

    }, new Action2<StringBuilder, Integer>() {

        @Override
        public void call(StringBuilder sb, Integer v) {
            if (sb.length() > 0) {
                sb.append("-");
            }
            sb.append(v);
        }
    }).toBlocking().last().toString();

    assertEquals("1-2-3", value);
}
项目:RxJavaFlow    文件:OperatorScanTest.java   
/**
 * This uses the public API collect which uses scan under the covers.
 */
@Test
public void testSeedFactory() {
    Observable<List<Integer>> o = Observable.range(1, 10)
            .collect(new Supplier<List<Integer>>() {

                @Override
                public List<Integer> call() {
                    return new ArrayList<Integer>();
                }

            }, new Action2<List<Integer>, Integer>() {

                @Override
                public void call(List<Integer> list, Integer t2) {
                    list.add(t2);
                }

            }).takeLast(1);

    assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.toBlocking().single());
    assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.toBlocking().single());
}
项目:proxypref    文件:ProxyHandlerTypesTest.java   
private <T> void testType(String methodName, T getValue, T setValue,
    Func1<SharedPreferences.Editor, SharedPreferences.Editor> whenPut,
    Action2<SharedPreferences.Editor, T> verifyPut,
    Func1<TestTypes, T> doGet, Action2<TestTypes, T> doSet) {

    SharedPreferences pref = mock(SharedPreferences.class);
    TestTypes test = ProxyPreferences.build(TestTypes.class, pref);

    when(pref.getAll())
        .thenAnswer(answerMapValue(methodName, getValue));

    assertEquals(getValue, doGet.call(test));

    SharedPreferences.Editor editor = mock(SharedPreferences.Editor.class);
    when(pref.edit())
        .thenReturn(editor);
    when(whenPut.call(editor))
        .thenReturn(editor);

    doSet.call(test, setValue);

    InOrder order = inOrder(pref, editor);
    order.verify(pref, times(1)).edit();
    verifyPut.call(order.verify(editor, times(1)), setValue);
    order.verify(editor, times(1)).apply();
}
项目:rxjava-extras    文件:Transformers.java   
/**
 * <p>
 * Returns a {@link Transformer} that returns an {@link Observable} that is
 * a buffering of the source Observable into lists of sequential items that
 * satisfy the condition {@code condition}.
 * 
 * <p>
 * <img src=
 * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/toListWhile.png?raw=true"
 * alt="marble diagram">
 * 
 * @param condition
 *            condition function that must return true if an item is to be
 *            part of the list being prepared for emission
 * @param <T>
 *            the generic type of the source Observable
 * @return transformer as above
 */
public static <T> Transformer<T, List<T>> toListWhile(
        final Func2<? super List<T>, ? super T, Boolean> condition) {

    Func0<List<T>> initialState = new Func0<List<T>>() {
        @Override
        public List<T> call() {
            return new ArrayList<T>();
        }
    };

    Action2<List<T>, T> collect = new Action2<List<T>, T>() {

        @Override
        public void call(List<T> list, T n) {
            list.add(n);
        }
    };
    return collectWhile(initialState, collect, condition);
}
项目:rxjava-extras    文件:ObservableServerSocket.java   
private static Observable<Observable<byte[]>> createServerSocketObservable(
        ServerSocket serverSocket, final long timeoutMs, final int bufferSize,
        final Action0 preAcceptAction, final Func1<? super Socket, Boolean> acceptSocket) {
    return Observable.create( //
            SyncOnSubscribe.<ServerSocket, Observable<byte[]>> createSingleState( //
                    Functions.constant0(serverSocket), //
                    new Action2<ServerSocket, Observer<? super Observable<byte[]>>>() {

                        @Override
                        public void call(ServerSocket ss,
                                Observer<? super Observable<byte[]>> observer) {
                            acceptConnection(timeoutMs, bufferSize, ss, observer,
                                    preAcceptAction, acceptSocket);
                        }
                    }));
}
项目:rxjava-slf4j    文件:Log.java   
private static <T> Func1<Action1<T>, Action1<T>> chainAction(
        final Action2<Action1<T>, T> action2) {
    return new Func1<Action1<T>, Action1<T>>() {

        @Override
        public Action1<T> call(final Action1<T> action) {
            return new Action1<T>() {

                @Override
                public void call(T t) {
                    action2.call(action, t);
                }
            };
        }
    };
}
项目:Android-Gradle-Samples    文件:ObservableTests.java   
@Test
public void testCollectToString() {
    String value = Observable.just(1, 2, 3).collect(new Func0<StringBuilder>() {
        @Override
        public StringBuilder call() {
            return new StringBuilder();
        }

    }, new Action2<StringBuilder, Integer>() {

        @Override
        public void call(StringBuilder sb, Integer v) {
            if (sb.length() > 0) {
                sb.append("-");
            }
            sb.append(v);
        }
    }).toBlocking().last().toString();

    assertEquals("1-2-3", value);
}
项目:Orpheus    文件:GenreDetailsScreenModule.java   
@Provides @ScreenScope
public BundleablePresenterConfig providePresenterConfig(
        ItemClickListener itemClickListener,
        MenuHandler menuConfig
) {
    return BundleablePresenterConfig.builder()
            .setWantsGrid(true)
            .setAllowLongPressSelection(false)
            .setItemClickListener(itemClickListener)
            .setMenuConfig(menuConfig)
            .setFabClickAction(new Action2<Context, BundleablePresenter>() {
                @Override
                public void call(Context context, final BundleablePresenter presenter) {
                    UtilsCommon.addTracksToQueue(context,
                            Collections.singletonList(screen.genre.getTracksUri()),
                            new Action1<List<Uri>>() {
                                @Override
                                public void call(List<Uri> uris) {
                                    presenter.getPlaybackController().playAll(uris, 0);
                                }
                            });
                }
            })
            .build();
}
项目:Orpheus    文件:ArtistDetailsScreenModule.java   
@Provides @ScreenScope
public BundleablePresenterConfig providePresenterConfig(
        ItemClickListener itemClickListener,
        MenuHandler menuConfig
) {
    return BundleablePresenterConfig.builder()
            .setWantsGrid(true)
            .setItemClickListener(itemClickListener)
            .setAllowLongPressSelection(false)
            .setMenuConfig(menuConfig)
            .setFabClickAction(new Action2<Context, BundleablePresenter>() {
                @Override
                public void call(Context context, final BundleablePresenter presenter) {
                    UtilsCommon.addTracksToQueue(context,
                            Collections.singletonList(screen.artist.getTracksUri()),
                            new Action1<List<Uri>>() {
                                @Override
                                public void call(List<Uri> uris) {
                                    presenter.getPlaybackController().playAll(uris, 0);
                                }
                            });
                }
            })
            .build();
}
项目:Orpheus    文件:FoldersScreenModule.java   
@Provides @Named("fab_action")
Action2<Context, BundleablePresenter> provideFabAction() {
    return new Action2<Context, BundleablePresenter>() {
        @Override
        public void call(Context context, BundleablePresenter presenter) {
            boolean indexed = presenter.getIndexClient().isIndexed(screen.container);
            if (!indexed) {
                if (screen.container instanceof Playlist) {
                    Toast.makeText(context, R.string.msg_playlist_import_not_implemented, Toast.LENGTH_LONG).show();
                    return;
                }
                presenter.getIndexClient().add(screen.container);
            } else {
                presenter.getFm().showDialog(LibraryOpScreenFragment.ni(
                        LibraryOpScreen.unIndexOp(Collections.singletonList(screen.container))));
            }

        }
    };
}
项目:GitHub    文件:RxPresenter.java   
/**
 * This is a shortcut that can be used instead of combining together
 * {@link #restartable(int, Func0)},
 * {@link #deliverFirst()},
 * {@link #split(Action2, Action2)}.
 *
 * @param restartableId     an id of the restartable.
 * @param observableFactory a factory that should return an Observable when the restartable should run.
 * @param onNext            a callback that will be called when received data should be delivered to view.
 * @param onError           a callback that will be called if the source observable emits onError.
 * @param <T>               the type of the observable.
 */
public <T> void restartableFirst(int restartableId, final Func0<Observable<T>> observableFactory,
    final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {

    restartable(restartableId, new Func0<Subscription>() {
        @Override
        public Subscription call() {
            return observableFactory.call()
                .compose(RxPresenter.this.<T>deliverFirst())
                .subscribe(split(onNext, onError));
        }
    });
}
项目:GitHub    文件:RxPresenter.java   
/**
 * This is a shortcut that can be used instead of combining together
 * {@link #restartable(int, Func0)},
 * {@link #deliverLatestCache()},
 * {@link #split(Action2, Action2)}.
 *
 * @param restartableId     an id of the restartable.
 * @param observableFactory a factory that should return an Observable when the restartable should run.
 * @param onNext            a callback that will be called when received data should be delivered to view.
 * @param onError           a callback that will be called if the source observable emits onError.
 * @param <T>               the type of the observable.
 */
public <T> void restartableLatestCache(int restartableId, final Func0<Observable<T>> observableFactory,
    final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {

    restartable(restartableId, new Func0<Subscription>() {
        @Override
        public Subscription call() {
            return observableFactory.call()
                .compose(RxPresenter.this.<T>deliverLatestCache())
                .subscribe(split(onNext, onError));
        }
    });
}
项目:GitHub    文件:RxPresenter.java   
/**
 * This is a shortcut that can be used instead of combining together
 * {@link #restartable(int, Func0)},
 * {@link #deliverReplay()},
 * {@link #split(Action2, Action2)}.
 *
 * @param restartableId     an id of the restartable.
 * @param observableFactory a factory that should return an Observable when the restartable should run.
 * @param onNext            a callback that will be called when received data should be delivered to view.
 * @param onError           a callback that will be called if the source observable emits onError.
 * @param <T>               the type of the observable.
 */
public <T> void restartableReplay(int restartableId, final Func0<Observable<T>> observableFactory,
    final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {

    restartable(restartableId, new Func0<Subscription>() {
        @Override
        public Subscription call() {
            return observableFactory.call()
                .compose(RxPresenter.this.<T>deliverReplay())
                .subscribe(split(onNext, onError));
        }
    });
}
项目:GitHub    文件:DeliveryTest.java   
private void testWithOnNextOnError(Action2<Action2, Action2> test) {
    Action2 onNext = mock(Action2.class);
    Action2 onError = mock(Action2.class);

    test.call(onNext, onError);

    verifyNoMoreInteractions(onNext);
    verifyNoMoreInteractions(onError);
}
项目:GitHub    文件:DeliveryTest.java   
@Test
public void testSplitOnNext() throws Exception {
    testWithOnNextOnError(new Action2<Action2, Action2>() {
        @Override
        public void call(Action2 onNext, Action2 onError) {
            new Delivery(1, Notification.createOnNext(2)).split(onNext, onError);
            verify(onNext, times(1)).call(1, 2);
        }
    });
}
项目:GitHub    文件:DeliveryTest.java   
@Test
public void testSplitOnError() throws Exception {
    testWithOnNextOnError(new Action2<Action2, Action2>() {
        @Override
        public void call(Action2 onNext, Action2 onError) {
            Throwable throwable = new Throwable();
            new Delivery(1, Notification.createOnError(throwable)).split(onNext, onError);
            verify(onError, times(1)).call(1, throwable);
        }
    });
}
项目:GitHub    文件:DeliveryTest.java   
@Test
public void testSplitOnComplete() throws Exception {
    testWithOnNextOnError(new Action2<Action2, Action2>() {
        @Override
        public void call(Action2 onNext, Action2 onError) {
            new Delivery(1, Notification.createOnCompleted()).split(onNext, onError);
        }
    });
}
项目:GitHub    文件:DeliverFirstTest.java   
@Test(expected = OnErrorNotImplementedException.class)
    public void testThrowDuringOnNext() throws Exception {

//        Observable
//            .just(1)
//            .filter(new Func1<Integer, Boolean>() {
//                @Override
//                public Boolean call(Integer integer) {
//                    return true;
//                }
//            })
//            .first()
//            .subscribe(new Action1<Integer>() {
//                @Override
//                public void call(Integer integer) {
//                    throw new RuntimeException();
//                }
//            });

        PublishSubject<Object> view = PublishSubject.create();

        final PublishSubject<Integer> subject = PublishSubject.create();
        new DeliverFirst<Object, Integer>(view)
            .call(subject)
            .subscribe(new Action1<Delivery<Object, Integer>>() {
                @Override
                public void call(Delivery<Object, Integer> delivery) {
                    delivery.split(
                        new Action2<Object, Integer>() {
                            @Override
                            public void call(Object o, Integer integer) {
                                throw new RuntimeException();
                            }
                        },
                        null
                    );
                }
            });

        subject.onNext(3);
        view.onNext(100);
    }
项目:GitHub    文件:RxPresenter.java   
/**
 * This is a shortcut that can be used instead of combining together
 * {@link #restartable(int, Func0)},
 * {@link #deliverFirst()},
 * {@link #split(Action2, Action2)}.
 *
 * @param restartableId     an id of the restartable.
 * @param observableFactory a factory that should return an Observable when the restartable should run.
 * @param onNext            a callback that will be called when received data should be delivered to view.
 * @param onError           a callback that will be called if the source observable emits onError.
 * @param <T>               the type of the observable.
 */
public <T> void restartableFirst(int restartableId, final Func0<Observable<T>> observableFactory,
    final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {

    restartable(restartableId, new Func0<Subscription>() {
        @Override
        public Subscription call() {
            return observableFactory.call()
                .compose(RxPresenter.this.<T>deliverFirst())
                .subscribe(split(onNext, onError));
        }
    });
}
项目:GitHub    文件:RxPresenter.java   
/**
 * This is a shortcut that can be used instead of combining together
 * {@link #restartable(int, Func0)},
 * {@link #deliverLatestCache()},
 * {@link #split(Action2, Action2)}.
 *
 * @param restartableId     an id of the restartable.
 * @param observableFactory a factory that should return an Observable when the restartable should run.
 * @param onNext            a callback that will be called when received data should be delivered to view.
 * @param onError           a callback that will be called if the source observable emits onError.
 * @param <T>               the type of the observable.
 */
public <T> void restartableLatestCache(int restartableId, final Func0<Observable<T>> observableFactory,
    final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {

    restartable(restartableId, new Func0<Subscription>() {
        @Override
        public Subscription call() {
            return observableFactory.call()
                .compose(RxPresenter.this.<T>deliverLatestCache())
                .subscribe(split(onNext, onError));
        }
    });
}
项目:GitHub    文件:RxPresenter.java   
/**
 * This is a shortcut that can be used instead of combining together
 * {@link #restartable(int, Func0)},
 * {@link #deliverReplay()},
 * {@link #split(Action2, Action2)}.
 *
 * @param restartableId     an id of the restartable.
 * @param observableFactory a factory that should return an Observable when the restartable should run.
 * @param onNext            a callback that will be called when received data should be delivered to view.
 * @param onError           a callback that will be called if the source observable emits onError.
 * @param <T>               the type of the observable.
 */
public <T> void restartableReplay(int restartableId, final Func0<Observable<T>> observableFactory,
    final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {

    restartable(restartableId, new Func0<Subscription>() {
        @Override
        public Subscription call() {
            return observableFactory.call()
                .compose(RxPresenter.this.<T>deliverReplay())
                .subscribe(split(onNext, onError));
        }
    });
}
项目:GitHub    文件:DeliveryTest.java   
private void testWithOnNextOnError(Action2<Action2, Action2> test) {
    Action2 onNext = mock(Action2.class);
    Action2 onError = mock(Action2.class);

    test.call(onNext, onError);

    verifyNoMoreInteractions(onNext);
    verifyNoMoreInteractions(onError);
}
项目:GitHub    文件:DeliveryTest.java   
@Test
public void testSplitOnNext() throws Exception {
    testWithOnNextOnError(new Action2<Action2, Action2>() {
        @Override
        public void call(Action2 onNext, Action2 onError) {
            new Delivery(1, Notification.createOnNext(2)).split(onNext, onError);
            verify(onNext, times(1)).call(1, 2);
        }
    });
}
项目:GitHub    文件:DeliveryTest.java   
@Test
public void testSplitOnError() throws Exception {
    testWithOnNextOnError(new Action2<Action2, Action2>() {
        @Override
        public void call(Action2 onNext, Action2 onError) {
            Throwable throwable = new Throwable();
            new Delivery(1, Notification.createOnError(throwable)).split(onNext, onError);
            verify(onError, times(1)).call(1, throwable);
        }
    });
}
项目:GitHub    文件:DeliveryTest.java   
@Test
public void testSplitOnComplete() throws Exception {
    testWithOnNextOnError(new Action2<Action2, Action2>() {
        @Override
        public void call(Action2 onNext, Action2 onError) {
            new Delivery(1, Notification.createOnCompleted()).split(onNext, onError);
        }
    });
}
项目:GitHub    文件:DeliverFirstTest.java   
@Test(expected = OnErrorNotImplementedException.class)
    public void testThrowDuringOnNext() throws Exception {

//        Observable
//            .just(1)
//            .filter(new Func1<Integer, Boolean>() {
//                @Override
//                public Boolean call(Integer integer) {
//                    return true;
//                }
//            })
//            .first()
//            .subscribe(new Action1<Integer>() {
//                @Override
//                public void call(Integer integer) {
//                    throw new RuntimeException();
//                }
//            });

        PublishSubject<Object> view = PublishSubject.create();

        final PublishSubject<Integer> subject = PublishSubject.create();
        new DeliverFirst<Object, Integer>(view)
            .call(subject)
            .subscribe(new Action1<Delivery<Object, Integer>>() {
                @Override
                public void call(Delivery<Object, Integer> delivery) {
                    delivery.split(
                        new Action2<Object, Integer>() {
                            @Override
                            public void call(Object o, Integer integer) {
                                throw new RuntimeException();
                            }
                        },
                        null
                    );
                }
            });

        subject.onNext(3);
        view.onNext(100);
    }
项目:azure-libraries-for-java    文件:ContainerGroupImpl.java   
@Override
protected Observable<ContainerGroupInner> createInner() {
    final ContainerGroupImpl self = this;

    if (!isInCreateMode()) {
        throw new UnsupportedOperationException("Update on an existing container group resource is not supported");
    } else if (newFileShares == null || creatableStorageAccountKey == null) {
        return this.manager().inner().containerGroups().createOrUpdateAsync(this.resourceGroupName(), this.name(), this.inner());
    } else {
        final StorageAccount storageAccount = this.<StorageAccount>taskResult(this.creatableStorageAccountKey);
        return createFileShareAsync(storageAccount)
            .collect(new Func0<List<Triple<String, String, String>>>() {
                @Override
                public List<Triple<String, String, String>> call() {
                    return new ArrayList<>();
                }
            }, new Action2<List<Triple<String, String, String>>, Triple<String, String, String>>() {
                @Override
                public void call(List<Triple<String, String, String>> cloudFileShares, Triple<String, String, String> fileShare) {
                    cloudFileShares.add(fileShare);
                }
            })
            .flatMap(new Func1<List<Triple<String, String, String>>, Observable<? extends ContainerGroupInner>>() {
                @Override
                public Observable<? extends ContainerGroupInner> call(List<Triple<String, String, String>> fileShares) {
                    for (Triple<String, String, String> fileShareEntry : fileShares) {
                        self.defineVolume(fileShareEntry.getLeft())
                            .withExistingReadWriteAzureFileShare(fileShareEntry.getMiddle())
                            .withStorageAccountName(storageAccount.name())
                            .withStorageAccountKey(fileShareEntry.getRight())
                            .attach();
                    }
                    return self.manager().inner().containerGroups().createOrUpdateAsync(self.resourceGroupName(), self.name(), self.inner());
                }
            });
    }
}
项目:azure-libraries-for-java    文件:ExternalChildResourceCollectionImpl.java   
/**
 * Commits the changes in the external child resource childCollection.
 * <p/>
 * This method returns a observable stream, either its observer's onError will be called with
 * {@link CompositeException} if some resources failed to commit or onNext will be called if all resources
 * committed successfully.
 *
 * @return the observable stream
 */
public Observable<List<FluentModelTImpl>> commitAndGetAllAsync() {
    return commitAsync().collect(
            new Func0<List<FluentModelTImpl>>() {
                public List<FluentModelTImpl> call() {
                    return new ArrayList<>();
                }
            },
            new Action2<List<FluentModelTImpl>, FluentModelTImpl>() {
                public void call(List<FluentModelTImpl> state, FluentModelTImpl item) {
                    state.add(item);
                }
            });
}
项目:azure-libraries-for-java    文件:VirtualMachineExtensionsImpl.java   
/**
 * @return an observable emits extensions in this collection as a map indexed by name.
 */
public Observable<Map<String, VirtualMachineExtension>> asMapAsync() {
    return listAsync()
            .collect(new Func0<Map<String, VirtualMachineExtension>>() {
                @Override
                public Map<String, VirtualMachineExtension> call() {
                    return new HashMap<>();
                }
            }, new Action2<Map<String, VirtualMachineExtension>, VirtualMachineExtension>() {
                @Override
                public void call(Map<String, VirtualMachineExtension> map, VirtualMachineExtension extension) {
                    map.put(extension.name(), extension);
                }
            });
}
项目:boohee_v5.6    文件:AsyncOnSubscribe.java   
@Experimental
public static <T> Observable$OnSubscribe<T> createStateless(final Action2<Long, ? super Observer<Observable<? extends T>>> next) {
    return new AsyncOnSubscribeImpl(new Func3<Void, Long, Observer<Observable<? extends T>>, Void>() {
        public Void call(Void state, Long requested, Observer<Observable<? extends T>> subscriber) {
            next.call(requested, subscriber);
            return state;
        }
    });
}