public static <T> Transformer<T, Set<T>> toSet() { return new Transformer<T, Set<T>>() { @Override public Observable<Set<T>> call(Observable<T> o) { return o.collect(new Func0<Set<T>>() { @Override public Set<T> call() { return new HashSet<T>(); } }, new Action2<Set<T>, T>() { @Override public void call(Set<T> set, T t) { set.add(t); } }); } }; }
private void initActions() { onReviewItemClick = new Action2<Boolean, Review>() { @Override public void call(Boolean lastOne, Review review) { // 如果是最后一项,则跳到评论列表,否则跳到评论详情 Intent intent; if (lastOne) { intent = DoubanMovieMoreReviewActivity.intentFor(getContext()); intent.putExtra(Constant.EXTRA_DOUBAN_MOVIE_ID, id); } else { intent = DoubanMovieReviewActivity.intentFor(getContext()); intent.putExtra(Constant.EXTRA_DOUBAN_MOVIE_REVIEW, review); } intent.putExtra(Constant.EXTRA_DOUBAN_MOVIE_TITLE, title); getContext().startActivity(intent); } }; }
@Override public Observable<T> adapt(final Call<T> call) { return Observable.<T>create(SyncOnSubscribe.createSingleState(new Func0<Call<T>>() { @Override public Call<T> call() { return call.clone(); } }, new Action2<Call<T>, Observer<? super T>>() { @Override public void call(Call<T> callClone, Observer<? super T> observer) { try { observer.onNext(callClone.execute()); observer.onCompleted(); } catch (Throwable throwable) { observer.onError(throwable); } } }, new Action1<Call<T>>() { @Override public void call(Call<T> tCall) { tCall.cancel(); } })); }
private <T> Observable<T> getQuery(Query query, Action2<Subscriber<? super T>, DataSnapshot> onNextAction, boolean subscribeForSingleEvent) { return Observable.create(subscriber -> { ValueEventListener eventListener = new ValueEventListener() { @Override public void onDataChange(DataSnapshot dataSnapshot) { onNextAction.call(subscriber, dataSnapshot); } @Override public void onCancelled(DatabaseError databaseError) { subscriber.onError(new FirebaseException(databaseError.getMessage())); } }; if (subscribeForSingleEvent) { query.addListenerForSingleValueEvent(eventListener); } else { query.addValueEventListener(eventListener); } subscriber.add(Subscriptions.create(() -> query.removeEventListener(eventListener))); }); }
@NonNull private static SyncOnSubscribe<ByteBuffer, byte[]> createSyncOnSubscribe(final byte[] bytes, final int maxBatchSize) { return SyncOnSubscribe.createSingleState( new Func0<ByteBuffer>() { @Override public ByteBuffer call() { return ByteBuffer.wrap(bytes); } }, new Action2<ByteBuffer, Observer<? super byte[]>>() { @Override public void call(ByteBuffer byteBuffer, Observer<? super byte[]> observer) { int nextBatchSize = Math.min(byteBuffer.remaining(), maxBatchSize); if (nextBatchSize == 0) { observer.onCompleted(); return; } final byte[] nextBatch = new byte[nextBatchSize]; byteBuffer.get(nextBatch); observer.onNext(nextBatch); } } ); }
/** * Combines provided actions into a single action stream * * @param a1 Action * @param a2 Action * @param a3 Action * @param a4 Action * @param a5 Action * @param a6 Action * @return Single action stream combined from provided actions */ @CheckResult @NonNull public static <T1, T2> Action2<? super T1, ? super T2> combine(@NonNull final Action2<? super T1, ? super T2> a1, @NonNull final Action2<? super T1, ? super T2> a2, @NonNull final Action2<? super T1, ? super T2> a3, @NonNull final Action2<? super T1, ? super T2> a4, @NonNull final Action2<? super T1, ? super T2> a5, @NonNull final Action2<? super T1, ? super T2> a6) { return new Action2<T1, T2>() { @Override public void call(T1 t1, T2 t2) { a1.call(t1, t2); a2.call(t1, t2); a3.call(t1, t2); a4.call(t1, t2); a5.call(t1, t2); a6.call(t1, t2); } }; }
/** * Combines provided actions into a single action stream * * @param a1 Action * @param a2 Action * @param a3 Action * @param a4 Action * @param a5 Action * @param a6 Action * @param a7 Action * @return Single action stream combined from provided actions */ @CheckResult @NonNull public static <T1, T2> Action2<? super T1, ? super T2> combine(@NonNull final Action2<? super T1, ? super T2> a1, @NonNull final Action2<? super T1, ? super T2> a2, @NonNull final Action2<? super T1, ? super T2> a3, @NonNull final Action2<? super T1, ? super T2> a4, @NonNull final Action2<? super T1, ? super T2> a5, @NonNull final Action2<? super T1, ? super T2> a6, @NonNull final Action2<? super T1, ? super T2> a7) { return new Action2<T1, T2>() { @Override public void call(T1 t1, T2 t2) { a1.call(t1, t2); a2.call(t1, t2); a3.call(t1, t2); a4.call(t1, t2); a5.call(t1, t2); a6.call(t1, t2); a7.call(t1, t2); } }; }
/** * Combines provided actions into a single action stream * * @param a1 Action * @param a2 Action * @param a3 Action * @param a4 Action * @param a5 Action * @param a6 Action * @param a7 Action * @param a8 Action * @return Single action stream combined from provided actions */ @CheckResult @NonNull public static <T1, T2> Action2<? super T1, ? super T2> combine(@NonNull final Action2<? super T1, ? super T2> a1, @NonNull final Action2<? super T1, ? super T2> a2, @NonNull final Action2<? super T1, ? super T2> a3, @NonNull final Action2<? super T1, ? super T2> a4, @NonNull final Action2<? super T1, ? super T2> a5, @NonNull final Action2<? super T1, ? super T2> a6, @NonNull final Action2<? super T1, ? super T2> a7, @NonNull final Action2<? super T1, ? super T2> a8) { return new Action2<T1, T2>() { @Override public void call(T1 t1, T2 t2) { a1.call(t1, t2); a2.call(t1, t2); a3.call(t1, t2); a4.call(t1, t2); a5.call(t1, t2); a6.call(t1, t2); a7.call(t1, t2); a8.call(t1, t2); } }; }
/** * Combines provided actions into a single action stream * * @param a1 Action * @param a2 Action * @param a3 Action * @param a4 Action * @param a5 Action * @param a6 Action * @param a7 Action * @param a8 Action * @param a9 Action * @return Single action stream combined from provided actions */ @CheckResult @NonNull public static <T1, T2> Action2<? super T1, ? super T2> combine(@NonNull final Action2<? super T1, ? super T2> a1, @NonNull final Action2<? super T1, ? super T2> a2, @NonNull final Action2<? super T1, ? super T2> a3, @NonNull final Action2<? super T1, ? super T2> a4, @NonNull final Action2<? super T1, ? super T2> a5, @NonNull final Action2<? super T1, ? super T2> a6, @NonNull final Action2<? super T1, ? super T2> a7, @NonNull final Action2<? super T1, ? super T2> a8, @NonNull final Action2<? super T1, ? super T2> a9) { return new Action2<T1, T2>() { @Override public void call(T1 t1, T2 t2) { a1.call(t1, t2); a2.call(t1, t2); a3.call(t1, t2); a4.call(t1, t2); a5.call(t1, t2); a6.call(t1, t2); a7.call(t1, t2); a8.call(t1, t2); a9.call(t1, t2); } }; }
private void updateEnergyValues(EnergyViewHolder holder, EnergyModel energy) { holder.linearLayoutEnergyValues.setVisibility(View.VISIBLE); holder.imageViewCardAlert.setVisibility(View.INVISIBLE); if (energy.getInstantaneousPower() == null && energy.getDailyPower() == null && energy.getMonthlyPower() == null) { log.warn("energy values are null or invalid: unable to update"); holder.linearLayoutEnergyValues.setVisibility(View.GONE); holder.imageViewCardAlert.setVisibility(View.VISIBLE); return; } Action2<TextView, String> updateEnergy = (textView, value) -> textView.setText(TextUtils.isEmpty(value) ? utilityService.getString(R.string.energy_none) : value + " " + utilityService.getString(R.string.energy_power_unit)); updateEnergy.call(holder.textViewEnergyInstantaneousPower, energy.getInstantaneousPower()); updateEnergy.call(holder.textViewEnergyDailyPower, energy.getDailyPower()); updateEnergy.call(holder.textViewEnergyMonthlyPower, energy.getMonthlyPower()); }
@Test public void testCollectToString() { String value = Observable.just(1, 2, 3).collect(new Supplier<StringBuilder>() { @Override public StringBuilder call() { return new StringBuilder(); } }, new Action2<StringBuilder, Integer>() { @Override public void call(StringBuilder sb, Integer v) { if (sb.length() > 0) { sb.append("-"); } sb.append(v); } }).toBlocking().last().toString(); assertEquals("1-2-3", value); }
/** * This uses the public API collect which uses scan under the covers. */ @Test public void testSeedFactory() { Observable<List<Integer>> o = Observable.range(1, 10) .collect(new Supplier<List<Integer>>() { @Override public List<Integer> call() { return new ArrayList<Integer>(); } }, new Action2<List<Integer>, Integer>() { @Override public void call(List<Integer> list, Integer t2) { list.add(t2); } }).takeLast(1); assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.toBlocking().single()); assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.toBlocking().single()); }
private <T> void testType(String methodName, T getValue, T setValue, Func1<SharedPreferences.Editor, SharedPreferences.Editor> whenPut, Action2<SharedPreferences.Editor, T> verifyPut, Func1<TestTypes, T> doGet, Action2<TestTypes, T> doSet) { SharedPreferences pref = mock(SharedPreferences.class); TestTypes test = ProxyPreferences.build(TestTypes.class, pref); when(pref.getAll()) .thenAnswer(answerMapValue(methodName, getValue)); assertEquals(getValue, doGet.call(test)); SharedPreferences.Editor editor = mock(SharedPreferences.Editor.class); when(pref.edit()) .thenReturn(editor); when(whenPut.call(editor)) .thenReturn(editor); doSet.call(test, setValue); InOrder order = inOrder(pref, editor); order.verify(pref, times(1)).edit(); verifyPut.call(order.verify(editor, times(1)), setValue); order.verify(editor, times(1)).apply(); }
/** * <p> * Returns a {@link Transformer} that returns an {@link Observable} that is * a buffering of the source Observable into lists of sequential items that * satisfy the condition {@code condition}. * * <p> * <img src= * "https://github.com/davidmoten/rxjava-extras/blob/master/src/docs/toListWhile.png?raw=true" * alt="marble diagram"> * * @param condition * condition function that must return true if an item is to be * part of the list being prepared for emission * @param <T> * the generic type of the source Observable * @return transformer as above */ public static <T> Transformer<T, List<T>> toListWhile( final Func2<? super List<T>, ? super T, Boolean> condition) { Func0<List<T>> initialState = new Func0<List<T>>() { @Override public List<T> call() { return new ArrayList<T>(); } }; Action2<List<T>, T> collect = new Action2<List<T>, T>() { @Override public void call(List<T> list, T n) { list.add(n); } }; return collectWhile(initialState, collect, condition); }
private static Observable<Observable<byte[]>> createServerSocketObservable( ServerSocket serverSocket, final long timeoutMs, final int bufferSize, final Action0 preAcceptAction, final Func1<? super Socket, Boolean> acceptSocket) { return Observable.create( // SyncOnSubscribe.<ServerSocket, Observable<byte[]>> createSingleState( // Functions.constant0(serverSocket), // new Action2<ServerSocket, Observer<? super Observable<byte[]>>>() { @Override public void call(ServerSocket ss, Observer<? super Observable<byte[]>> observer) { acceptConnection(timeoutMs, bufferSize, ss, observer, preAcceptAction, acceptSocket); } })); }
private static <T> Func1<Action1<T>, Action1<T>> chainAction( final Action2<Action1<T>, T> action2) { return new Func1<Action1<T>, Action1<T>>() { @Override public Action1<T> call(final Action1<T> action) { return new Action1<T>() { @Override public void call(T t) { action2.call(action, t); } }; } }; }
@Test public void testCollectToString() { String value = Observable.just(1, 2, 3).collect(new Func0<StringBuilder>() { @Override public StringBuilder call() { return new StringBuilder(); } }, new Action2<StringBuilder, Integer>() { @Override public void call(StringBuilder sb, Integer v) { if (sb.length() > 0) { sb.append("-"); } sb.append(v); } }).toBlocking().last().toString(); assertEquals("1-2-3", value); }
@Provides @ScreenScope public BundleablePresenterConfig providePresenterConfig( ItemClickListener itemClickListener, MenuHandler menuConfig ) { return BundleablePresenterConfig.builder() .setWantsGrid(true) .setAllowLongPressSelection(false) .setItemClickListener(itemClickListener) .setMenuConfig(menuConfig) .setFabClickAction(new Action2<Context, BundleablePresenter>() { @Override public void call(Context context, final BundleablePresenter presenter) { UtilsCommon.addTracksToQueue(context, Collections.singletonList(screen.genre.getTracksUri()), new Action1<List<Uri>>() { @Override public void call(List<Uri> uris) { presenter.getPlaybackController().playAll(uris, 0); } }); } }) .build(); }
@Provides @ScreenScope public BundleablePresenterConfig providePresenterConfig( ItemClickListener itemClickListener, MenuHandler menuConfig ) { return BundleablePresenterConfig.builder() .setWantsGrid(true) .setItemClickListener(itemClickListener) .setAllowLongPressSelection(false) .setMenuConfig(menuConfig) .setFabClickAction(new Action2<Context, BundleablePresenter>() { @Override public void call(Context context, final BundleablePresenter presenter) { UtilsCommon.addTracksToQueue(context, Collections.singletonList(screen.artist.getTracksUri()), new Action1<List<Uri>>() { @Override public void call(List<Uri> uris) { presenter.getPlaybackController().playAll(uris, 0); } }); } }) .build(); }
@Provides @Named("fab_action") Action2<Context, BundleablePresenter> provideFabAction() { return new Action2<Context, BundleablePresenter>() { @Override public void call(Context context, BundleablePresenter presenter) { boolean indexed = presenter.getIndexClient().isIndexed(screen.container); if (!indexed) { if (screen.container instanceof Playlist) { Toast.makeText(context, R.string.msg_playlist_import_not_implemented, Toast.LENGTH_LONG).show(); return; } presenter.getIndexClient().add(screen.container); } else { presenter.getFm().showDialog(LibraryOpScreenFragment.ni( LibraryOpScreen.unIndexOp(Collections.singletonList(screen.container)))); } } }; }
/** * This is a shortcut that can be used instead of combining together * {@link #restartable(int, Func0)}, * {@link #deliverFirst()}, * {@link #split(Action2, Action2)}. * * @param restartableId an id of the restartable. * @param observableFactory a factory that should return an Observable when the restartable should run. * @param onNext a callback that will be called when received data should be delivered to view. * @param onError a callback that will be called if the source observable emits onError. * @param <T> the type of the observable. */ public <T> void restartableFirst(int restartableId, final Func0<Observable<T>> observableFactory, final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) { restartable(restartableId, new Func0<Subscription>() { @Override public Subscription call() { return observableFactory.call() .compose(RxPresenter.this.<T>deliverFirst()) .subscribe(split(onNext, onError)); } }); }
/** * This is a shortcut that can be used instead of combining together * {@link #restartable(int, Func0)}, * {@link #deliverLatestCache()}, * {@link #split(Action2, Action2)}. * * @param restartableId an id of the restartable. * @param observableFactory a factory that should return an Observable when the restartable should run. * @param onNext a callback that will be called when received data should be delivered to view. * @param onError a callback that will be called if the source observable emits onError. * @param <T> the type of the observable. */ public <T> void restartableLatestCache(int restartableId, final Func0<Observable<T>> observableFactory, final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) { restartable(restartableId, new Func0<Subscription>() { @Override public Subscription call() { return observableFactory.call() .compose(RxPresenter.this.<T>deliverLatestCache()) .subscribe(split(onNext, onError)); } }); }
/** * This is a shortcut that can be used instead of combining together * {@link #restartable(int, Func0)}, * {@link #deliverReplay()}, * {@link #split(Action2, Action2)}. * * @param restartableId an id of the restartable. * @param observableFactory a factory that should return an Observable when the restartable should run. * @param onNext a callback that will be called when received data should be delivered to view. * @param onError a callback that will be called if the source observable emits onError. * @param <T> the type of the observable. */ public <T> void restartableReplay(int restartableId, final Func0<Observable<T>> observableFactory, final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) { restartable(restartableId, new Func0<Subscription>() { @Override public Subscription call() { return observableFactory.call() .compose(RxPresenter.this.<T>deliverReplay()) .subscribe(split(onNext, onError)); } }); }
private void testWithOnNextOnError(Action2<Action2, Action2> test) { Action2 onNext = mock(Action2.class); Action2 onError = mock(Action2.class); test.call(onNext, onError); verifyNoMoreInteractions(onNext); verifyNoMoreInteractions(onError); }
@Test public void testSplitOnNext() throws Exception { testWithOnNextOnError(new Action2<Action2, Action2>() { @Override public void call(Action2 onNext, Action2 onError) { new Delivery(1, Notification.createOnNext(2)).split(onNext, onError); verify(onNext, times(1)).call(1, 2); } }); }
@Test public void testSplitOnError() throws Exception { testWithOnNextOnError(new Action2<Action2, Action2>() { @Override public void call(Action2 onNext, Action2 onError) { Throwable throwable = new Throwable(); new Delivery(1, Notification.createOnError(throwable)).split(onNext, onError); verify(onError, times(1)).call(1, throwable); } }); }
@Test public void testSplitOnComplete() throws Exception { testWithOnNextOnError(new Action2<Action2, Action2>() { @Override public void call(Action2 onNext, Action2 onError) { new Delivery(1, Notification.createOnCompleted()).split(onNext, onError); } }); }
@Test(expected = OnErrorNotImplementedException.class) public void testThrowDuringOnNext() throws Exception { // Observable // .just(1) // .filter(new Func1<Integer, Boolean>() { // @Override // public Boolean call(Integer integer) { // return true; // } // }) // .first() // .subscribe(new Action1<Integer>() { // @Override // public void call(Integer integer) { // throw new RuntimeException(); // } // }); PublishSubject<Object> view = PublishSubject.create(); final PublishSubject<Integer> subject = PublishSubject.create(); new DeliverFirst<Object, Integer>(view) .call(subject) .subscribe(new Action1<Delivery<Object, Integer>>() { @Override public void call(Delivery<Object, Integer> delivery) { delivery.split( new Action2<Object, Integer>() { @Override public void call(Object o, Integer integer) { throw new RuntimeException(); } }, null ); } }); subject.onNext(3); view.onNext(100); }
@Override protected Observable<ContainerGroupInner> createInner() { final ContainerGroupImpl self = this; if (!isInCreateMode()) { throw new UnsupportedOperationException("Update on an existing container group resource is not supported"); } else if (newFileShares == null || creatableStorageAccountKey == null) { return this.manager().inner().containerGroups().createOrUpdateAsync(this.resourceGroupName(), this.name(), this.inner()); } else { final StorageAccount storageAccount = this.<StorageAccount>taskResult(this.creatableStorageAccountKey); return createFileShareAsync(storageAccount) .collect(new Func0<List<Triple<String, String, String>>>() { @Override public List<Triple<String, String, String>> call() { return new ArrayList<>(); } }, new Action2<List<Triple<String, String, String>>, Triple<String, String, String>>() { @Override public void call(List<Triple<String, String, String>> cloudFileShares, Triple<String, String, String> fileShare) { cloudFileShares.add(fileShare); } }) .flatMap(new Func1<List<Triple<String, String, String>>, Observable<? extends ContainerGroupInner>>() { @Override public Observable<? extends ContainerGroupInner> call(List<Triple<String, String, String>> fileShares) { for (Triple<String, String, String> fileShareEntry : fileShares) { self.defineVolume(fileShareEntry.getLeft()) .withExistingReadWriteAzureFileShare(fileShareEntry.getMiddle()) .withStorageAccountName(storageAccount.name()) .withStorageAccountKey(fileShareEntry.getRight()) .attach(); } return self.manager().inner().containerGroups().createOrUpdateAsync(self.resourceGroupName(), self.name(), self.inner()); } }); } }
/** * Commits the changes in the external child resource childCollection. * <p/> * This method returns a observable stream, either its observer's onError will be called with * {@link CompositeException} if some resources failed to commit or onNext will be called if all resources * committed successfully. * * @return the observable stream */ public Observable<List<FluentModelTImpl>> commitAndGetAllAsync() { return commitAsync().collect( new Func0<List<FluentModelTImpl>>() { public List<FluentModelTImpl> call() { return new ArrayList<>(); } }, new Action2<List<FluentModelTImpl>, FluentModelTImpl>() { public void call(List<FluentModelTImpl> state, FluentModelTImpl item) { state.add(item); } }); }
/** * @return an observable emits extensions in this collection as a map indexed by name. */ public Observable<Map<String, VirtualMachineExtension>> asMapAsync() { return listAsync() .collect(new Func0<Map<String, VirtualMachineExtension>>() { @Override public Map<String, VirtualMachineExtension> call() { return new HashMap<>(); } }, new Action2<Map<String, VirtualMachineExtension>, VirtualMachineExtension>() { @Override public void call(Map<String, VirtualMachineExtension> map, VirtualMachineExtension extension) { map.put(extension.name(), extension); } }); }
@Experimental public static <T> Observable$OnSubscribe<T> createStateless(final Action2<Long, ? super Observer<Observable<? extends T>>> next) { return new AsyncOnSubscribeImpl(new Func3<Void, Long, Observer<Observable<? extends T>>, Void>() { public Void call(Void state, Long requested, Observer<Observable<? extends T>> subscriber) { next.call(requested, subscriber); return state; } }); }