Java 类io.reactivex.functions.Action 实例源码

项目:Rx_java2_soussidev    文件:LollipopNetworkObservingStrategy.java   
@Override public Observable<Connectivity> observeNetworkConnectivity(final Context context) {
    final String service = Context.CONNECTIVITY_SERVICE;
    final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service);

    return Observable.create(new ObservableOnSubscribe<Connectivity>() {
        @Override public void subscribe(ObservableEmitter<Connectivity> subscriber) throws Exception {
            networkCallback = createNetworkCallback(subscriber, context);
            final NetworkRequest networkRequest = new NetworkRequest.Builder().build();
            manager.registerNetworkCallback(networkRequest, networkCallback);
        }
    }).doOnDispose(new Action() {
        @Override public void run() {
            tryToUnregisterCallback(manager);
        }
    }).startWith(Connectivity.create(context)).distinctUntilChanged();
}
项目:Rx_java2_soussidev    文件:MarshmallowNetworkObservingStrategy.java   
@Override public Observable<Connectivity> observeNetworkConnectivity(final Context context) {
    final String service = Context.CONNECTIVITY_SERVICE;
    final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service);
    networkCallback = createNetworkCallback(context);

    registerIdleReceiver(context);

    final NetworkRequest request =
            new NetworkRequest.Builder().addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
                    .addCapability(NetworkCapabilities.NET_CAPABILITY_NOT_RESTRICTED)
                    .build();

    manager.registerNetworkCallback(request, networkCallback);

    return connectivitySubject.toFlowable(BackpressureStrategy.LATEST).doOnCancel(new Action() {
        @Override public void run() {
            tryToUnregisterCallback(manager);
            tryToUnregisterReceiver(context);
        }
    }).startWith(Connectivity.create(context)).distinctUntilChanged().toObservable();
}
项目:GitHub    文件:RxJavaTests.java   
@Test
@UiThreadTest
public void realmList_closeInDoOnUnsubscribe() {
    realm.beginTransaction();
    RealmList<Dog> list = realm.createObject(AllTypes.class).getColumnRealmList();
    realm.commitTransaction();

    Flowable<RealmList<Dog>> observable = list.asFlowable().doOnCancel(new Action() {
        @Override
        public void run() throws Exception {
            realm.close();
        }
    });
    subscription = observable.subscribe(new Consumer<RealmList<Dog>>() {
        @Override
        public void accept(RealmList<Dog> ignored) throws Exception {
        }
    });

    subscription.dispose();
    assertTrue(realm.isClosed());
}
项目:GitHub    文件:RxJavaTests.java   
@Test
@UiThreadTest
public void dynamicRealmResults_closeInDoOnUnsubscribe() {
    final DynamicRealm dynamicRealm = DynamicRealm.getInstance(realm.getConfiguration());

    Flowable<RealmResults<DynamicRealmObject>> flowable = dynamicRealm.where(AllTypes.CLASS_NAME).findAll().asFlowable()
            .doOnCancel(new Action() {
                @Override
                public void run() throws Exception {
                    dynamicRealm.close();
                }
            });

    subscription = flowable.subscribe(new Consumer<RealmResults<DynamicRealmObject>>() {
        @Override
        public void accept(RealmResults<DynamicRealmObject> ignored) throws Exception {
        }
    });

    subscription.dispose();
    assertTrue(dynamicRealm.isClosed());
}
项目:GitHub    文件:RxJavaTests.java   
@Test
@UiThreadTest
public void realmObject_closeInDoOnUnsubscribe() {
    realm.beginTransaction();
    realm.createObject(AllTypes.class);
    realm.commitTransaction();

    Flowable<AllTypes> flowable = realm.where(AllTypes.class).findFirst().<AllTypes>asFlowable()
            .doOnCancel(new Action() {
                @Override
                public void run() throws Exception {
                    realm.close();
                }
            });

    subscription = flowable.subscribe(new Consumer<AllTypes>() {
        @Override
        public void accept(AllTypes ignored) throws Exception {
        }
    });

    subscription.dispose();
    assertTrue(realm.isClosed());
}
项目:FCM-for-Mojo    文件:ServerSettingsFragment.java   
private void restart() {
    mCompositeDisposable.add(FFMService.restart()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    findPreference("restart_webqq").setEnabled(true);
                }
            })
            .subscribe(new Consumer<FFMResult>() {
                @Override
                public void accept(FFMResult ffmResult) throws Exception {
                    Toast.makeText(getContext(), "Succeed.", Toast.LENGTH_SHORT).show();
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Toast.makeText(getContext(), "Network error:\n" + throwable.getMessage(), Toast.LENGTH_SHORT).show();
                }
            }));
}
项目:FCM-for-Mojo    文件:ServerSettingsFragment.java   
private void stop() {
    mCompositeDisposable.add(FFMService.stop()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    findPreference("stop_webqq").setEnabled(true);
                }
            })
            .subscribe(new Consumer<FFMResult>() {
                @Override
                public void accept(FFMResult ffmResult) throws Exception {
                    Toast.makeText(getContext(), "Succeed.", Toast.LENGTH_SHORT).show();
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Toast.makeText(getContext(), "Network error:\n" + throwable.getMessage(), Toast.LENGTH_SHORT).show();
                }
            }));
}
项目:reactive-architectures-playground    文件:ToogleRefreshTests.java   
@Before public void beforeEachTest() {
    MockitoAnnotations.initMocks(this);

    ToogleRefreshView view = new ToogleRefreshView() {

        @Override public Action disableRefresh() {
            return disableRefresh;
        }

        @Override public Action enableRefresh() {
            return enableRefresh;
        }
    };

    toogler = new RefreshToogle<>(view, uiScheduler);
}
项目:AssistantBySDK    文件:TingPlayProcessor.java   
/**
 * 合成并显示回复文本
 **/
private void synthesizeAndShowResp(final List<Track> tracks, String content, final int finalPlayIndex) {
    EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(content), null, null, null));
    SynthesizerBase.get().startSpeakAbsolute(content)
            .doOnNext(new Consumer<SpeechMsg>() {
                @Override
                public void accept(SpeechMsg speechMsg) throws Exception {
                    if (speechMsg.state() == SpeechMsg.State.OnBegin)
                        EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START));
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END));
                    if (tracks != null)
                        XmlyManager.get().getPlayer().playList(tracks, finalPlayIndex);
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.computation())
            .subscribe();

}
项目:AssistantBySDK    文件:NaviProcessor.java   
/**
 * 合成文本并提前返回
 **/

private void speakAndAheadReturn(String text, SpeechMsgBuilder msgBuilder) {
/* 将回复文本发送到聊天列表 */
    EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(text), null, null, null));
    /* 合成回复文本 */
    msgBuilder.setText(text).setForceLocalEngine(true);
    SynthesizerBase.get().startSpeakAbsolute(msgBuilder.build())
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    EventBus.getDefault().post(new NavigateEvent(NavigateEvent.START_NAVI));
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.computation())
            .subscribe();
}
项目:RxJava2-Android-Sample    文件:TakeUntilExampleActivity.java   
/**
 * takeUntil与skipUntil操作符作用相反,订阅并开始发射原始Observable,它还监视你提供的第二个Observable。
 * 如果第二个Observable发射了一项数据或者发射了一个终止通知( onError通知或一个onComplete通知),
 * TakeUntil返回的Observable会停止发射原始Observable并终止。
 */
private void doSomeWork() {
    if (!isRunning) {
        Observable
                .interval(1, TimeUnit.SECONDS)
                .take(6)
                .takeUntil(Observable.just(10).delay(3, TimeUnit.SECONDS))
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(@NonNull Disposable disposable) throws Exception {
                        isRunning = true;
                    }
                })
                .doOnTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        isRunning = false;
                    }
                })
                // Run on a background thread
                .subscribeOn(Schedulers.io())
                // Be notified on the main thread
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(getObserver());
    }
}
项目:RxLifeCycle    文件:MainActivity.java   
@Override
protected void onStart() {
    super.onStart();

    Log.d(TAG, "onStart()");

    // Using automatic unsubscription, this should determine that the correct time to
    // unsubscribe is onStop (the opposite of onStart).
    Observable.interval(1, TimeUnit.SECONDS)
        .doOnDispose(new Action() {
            @Override
            public void run() throws Exception {
                Log.i(TAG, "Unsubscribing subscription from onStart()");
            }
        })
        .compose(this.<Long>bindToLifecycle())
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long num) throws Exception {
                Log.i(TAG, "Started in onStart(), running until in onStop(): " + num);
            }
        });
}
项目:AssistantBySDK    文件:VoiceMediator.java   
private void speak(SpeechMsgBuilder msgBuilder, final boolean isAnim) {
    SynthesizerBase.get().startSpeakAbsolute(msgBuilder.build())
            .doOnNext(new Consumer<SpeechMsg>() {
                @Override
                public void accept(SpeechMsg speechMsg) throws Exception {
                    if (speechMsg.state() == SpeechMsg.State.OnBegin && isAnim)
                        EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START));
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END));
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.computation())
            .subscribe();
}
项目:AssistantBySDK    文件:ChatAlbumListAdapter.java   
/**
 * 合成并显示回复文本
 **/
private void synthesizeAndShowResp(final List<Track> tracks, String content, final int finalPlayIndex) {
    EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(content), null, null, null));
    IflySynthesizer.get().startSpeakAbsolute(content)
            .doOnNext(new Consumer<SpeechMsg>() {
                @Override
                public void accept(SpeechMsg speechMsg) throws Exception {
                    if (speechMsg.state() == SpeechMsg.State.OnBegin)
                        EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START));
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END));
                    if (tracks != null)
                        XmPlayerManager.getInstance(mContext).playList(tracks, finalPlayIndex);
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.computation())
            .subscribe();

}
项目:AssistantBySDK    文件:AssistPresenter.java   
/**
 * 发送回复文本文本视图并合成声音
 **/
private void showAndSpeak(SpeechMsgBuilder builder) {
    EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(builder.getText()), null, null, null));
    IflySynthesizer.getInstance().startSpeakAbsolute(builder.build())
            .doOnNext(new Consumer<SpeechMsg>() {
                @Override
                public void accept(SpeechMsg speechMsg) throws Exception {
                    if (speechMsg.state() == SpeechMsg.State.OnBegin)
                        EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START));
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END));
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.computation())
            .subscribe();
}
项目:MVPArmsTest1    文件:RxUtils.java   
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) {
    return new ObservableTransformer<T, T>() {
        @Override
        public Observable<T> apply(Observable<T> observable) {
            return observable.subscribeOn(Schedulers.io())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(@NonNull Disposable disposable) throws Exception {
                            view.showLoading();//显示进度条
                        }
                    })
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doAfterTerminate(new Action() {
                        @Override
                        public void run() {
                            view.hideLoading();//隐藏进度条
                        }
                    }).compose(RxUtils.bindToLifecycle(view));
        }
    };
}
项目:RxJava4AndroidDemos    文件:FlatMap.java   
@Override
public void test0() {
    Log.i(TAG, "test0() FlatMap simple demo, integer 1,2,3 transform to string 2,3,4,6,6,9");
    Observable.just(1, 2, 3).flatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
            return Observable.just(integer * 2 + "", integer * 3 + "");
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, "Consumer<String> accept() s: " + s);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable);
        }
    }, new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "Action run() for onComplete()");
        }
    });
}
项目:RxJava4AndroidDemos    文件:Repeat.java   
@Override
public void test0() {
    Log.i(TAG, "test0() Range simple demo, repeat is 2");
    Observable.just("1", "2", "3").repeat(2).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, "Consumer<String> accept() s: " + s);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable);
        }
    }, new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "Action run() for onComplete()");
        }
    });
}
项目:RxPaper2    文件:RxPaperBook.java   
/**
 * Saves most types of POJOs or collections in {@link Book} storage.
 * <p/>
 * To deserialize correctly it is recommended to have an all-args constructor, but other types
 * may be available.
 *
 * @param key object key is used as part of object's file name
 * @param value object to save, must have no-arg constructor, can't be null.
 * @return this Book instance
 */
public <T> Completable write(final String key, final T value) {
    return Completable.fromAction(new Action() {
        @Override
        public void run() {
            book.write(key, value);
        }
    })
    // FIXME in RxJava1 the error would be propagated to updates.
    // In RxJava2 the error happens on the Completable this method returns.
    // This andThen block reproduces the behavior in RxJava1.
    .andThen(Completable.fromAction(new Action() {
        @Override
        public void run() throws Exception {
            try {
                updates.onNext(Pair.create(key, value));
            } catch (Throwable t) {
                updates.onError(t);
            }
        }
    })).subscribeOn(scheduler);
}
项目:RxEasyHttp    文件:RxUtil.java   
public static <T> ObservableTransformer<ApiResult<T>, T> _io_main() {
    return new ObservableTransformer<ApiResult<T>, T>() {
        @Override
        public ObservableSource<T> apply(@NonNull Observable<ApiResult<T>> upstream) {
            return upstream
                    .subscribeOn(Schedulers.io())
                    .unsubscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .map(new HandleFuc<T>())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(@NonNull Disposable disposable) throws Exception {
                            HttpLog.i("+++doOnSubscribe+++" + disposable.isDisposed());
                        }
                    })
                    .doFinally(new Action() {
                        @Override
                        public void run() throws Exception {
                            HttpLog.i("+++doFinally+++");
                        }
                    })
                    .onErrorResumeNext(new HttpResponseFunc<T>());
        }
    };
}
项目:RxEasyHttp    文件:RxUtil.java   
public static <T> ObservableTransformer<ApiResult<T>, T> _main() {
    return new ObservableTransformer<ApiResult<T>, T>() {
        @Override
        public ObservableSource<T> apply(@NonNull Observable<ApiResult<T>> upstream) {
            return upstream
                    //.observeOn(AndroidSchedulers.mainThread())
                    .map(new HandleFuc<T>())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(@NonNull Disposable disposable) throws Exception {
                            HttpLog.i("+++doOnSubscribe+++" + disposable.isDisposed());
                        }
                    })
                    .doFinally(new Action() {
                        @Override
                        public void run() throws Exception {
                            HttpLog.i("+++doFinally+++");
                        }
                    })
                    .onErrorResumeNext(new HttpResponseFunc<T>());
        }
    };
}
项目:ZhaZhaShop    文件:HotMovieListPresenter.java   
@Override
public void getHotMovieList(int limit) {
    mView.showLoading();

    mHotMovieListManager.getHotMovieList(limit)
            .subscribe(new Consumer<HotMovieBean>() {
                @Override
                public void accept(@NonNull HotMovieBean hotMovieBean) throws Exception {
                    mView.addHotMovieList(hotMovieBean.getData().getHot());
                    mView.addMovieIds(hotMovieBean.getData().getMovieIds());
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    Log.e(TAG, "accept: " + throwable.getLocalizedMessage().toString());
                    mView.showError(throwable.getMessage().toString());
                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                    mView.showContent();
                }
            });
}
项目:ReactiveAirplaneMode    文件:ReactiveAirplaneMode.java   
/**
 * Disposes an action in UI Thread
 *
 * @param dispose action to be executed
 * @return Disposable object
 */
private Disposable disposeInUiThread(final Action dispose) {
  return Disposables.fromAction(new Action() {
    @Override public void run() throws Exception {
      if (Looper.getMainLooper() == Looper.myLooper()) {
        dispose.run();
      } else {
        final Scheduler.Worker inner = AndroidSchedulers.mainThread().createWorker();
        inner.schedule(new Runnable() {
          @Override public void run() {
            try {
              dispose.run();
            } catch (Exception exception) {
              onError("Could not unregister receiver in UI Thread", exception);
            }
            inner.dispose();
          }
        });
      }
    }
  });
}
项目:rxtools    文件:SubjectMapTest.java   
@Test
public void testErrorHandlingInValueProvider()
{
    // setup
    final AtomicBoolean missHandlerCalled = new AtomicBoolean(false);
    TestSubscriber<Integer> testSubscriber1 = new TestSubscriber<>();

    subscribe(source.get("hello"), testSubscriber1);

    source.onNext("hello", new Callable<Integer>() {
        @Override
        public Integer call() throws Exception
        {
            throw new RuntimeException("Boom");
        }
    }, new Action() {
        @Override
        public void run()
        {
            missHandlerCalled.set(true);
        }
    });

    testSubscriber1.assertError(RuntimeException.class);
}
项目:RxJava4AndroidDemos    文件:Map.java   
@Override
public void test0() {
    Log.i(TAG, "test0() Map simple demo, integer 1,2,3 transform to string 2,4,6");
    Observable.just(1, 2, 3).map(new Function<Integer, String>() {
        @Override
        public String apply(@NonNull Integer integer) throws Exception {
            return Integer.toString(integer * 2);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, "Consumer<String> accept() s: " + s);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable);
        }
    }, new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "Action run() for onComplete()");
        }
    });
}
项目:EazyBaseMVP    文件:RxUtils.java   
public static <T> FlowableTransformer<T, T> applySchedules(final IView view) {
    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> upstream) {
            return upstream.subscribeOn(Schedulers.io())
                    .doOnSubscribe(new Consumer<Subscription>() {
                        @Override
                        public void accept(Subscription subscription) throws Exception {
                            view.showLoading();
                        }
                    })
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnTerminate(new Action() {
                        @Override
                        public void run() throws Exception {
                            view.hideLoading();
                        }
                    });
        }
    };
}
项目:RxJava4AndroidDemos    文件:Buffer.java   
@Override
public void test0() {
    Log.i(TAG, "test0() Buffer simple demo, 1,2,3 buffer(2)");
    Observable.just("1", "2", "3").buffer(2).subscribe(new Consumer<List<String>>() {
        @Override
        public void accept(List<String> strings) throws Exception {
            Log.d(TAG, "Consumer<String> accept() strings: " + strings);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable);
        }
    }, new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "Action run() for onComplete()");
        }
    });
}
项目:Learning-RxJava    文件:Ch9_7.java   
public static <T> ObservableOperator<T, T> doOnEmpty(Action
                                                             action) {
    return new ObservableOperator<T, T>() {
        @Override
        public Observer<? super T> apply(Observer<? super T>
                                                 observer) throws Exception {
            return new DisposableObserver<T>() {
                boolean isEmpty = true;

                @Override
                public void onNext(T value) {
                    isEmpty = false;
                    observer.onNext(value);
                }

                @Override
                public void onError(Throwable t) {
                    observer.onError(t);
                }

                @Override
                public void onComplete() {
                    if (isEmpty) {
                        try {
                            action.run();
                        } catch (Exception e) {
                            onError(e);
                            return;
                        }
                    }
                    observer.onComplete();
                }
            };
        }
    };
}
项目:GitHub    文件:DownloadViewHolder.java   
private void delete() {
    dispose(data.disposable);
    mRxDownload.deleteServiceDownload(data.record.getUrl(), true)
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    mAdapter.remove(getAdapterPosition());
                }
            })
            .subscribe();

}
项目:FCM-for-Mojo    文件:NotificationSettingsFragment.java   
private void uploadNotificationsToggle(final Preference preference) {
    final NotificationToggle newNotificationToggle = NotificationToggle.create(mFriendToggle.isChecked(), mGroupToggle.isChecked());
    if (newNotificationToggle.equals(mServerNotificationToggle)) {
        return;
    }

    preference.setEnabled(false);

    mCompositeDisposable.add(FFMService
            .updateNotificationsToggle(NotificationToggle.create(mFriendToggle.isChecked(), mGroupToggle.isChecked()))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    preference.setEnabled(true);
                }
            })
            .subscribe(new Consumer<FFMResult>() {
                @Override
                public void accept(FFMResult result) throws Exception {
                    mServerNotificationToggle = newNotificationToggle;

                    //Toast.makeText(getContext(), "Succeed.", Toast.LENGTH_SHORT).show();

                    Log.d("Sync", "updateNotificationsToggle success, new state: " + newNotificationToggle);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Toast.makeText(getContext(), "Network error:\n" + throwable.getMessage(), Toast.LENGTH_SHORT).show();

                    Log.w("Sync", "updateNotificationsToggle failed", throwable);
                }
            }));
}
项目:KotlinKomparisons    文件:PersistedLoginDetailsRepository.java   
@Override
public @NonNull Completable updateLoginMethod(@NonNull final LoginMethod method) {
    checkNotNull(method);
    return persistence.write(LOGIN_METHOD, method)
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    methodRelay.accept(method);
                }
            });
}
项目:AssistantBySDK    文件:MobileCommProcessor.java   
/**
 * 合成并在聊天视图中显示回复文本
 *
 * @param text       回复文本
 * @param inputType  用户输入类型。只有语音录入才会合成回复文本
 * @param msgBuilder 合成信息对象
 **/
private void speakAndShowResp(String text, int inputType, SpeechMsgBuilder msgBuilder) {
    /* 发送回复文本到聊天视图 */
    EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(text), null, null, null));
    if (inputType == AssistantService.INPUT_VOICE) {
        msgBuilder.setText(text);
        SynthesizerBase.get().startSpeakAbsolute(msgBuilder.build())
                    /* 合成是在Observable的subscribe()开始的,所以要在这之前通知动画播放。
                     *  doOnSubscribe 执行在离它最近的 subscribeOn() 所指定的线程。*/
                .doOnNext(new Consumer<SpeechMsg>() {
                    @Override
                    public void accept(SpeechMsg speechMsg) throws Exception {
                        if (speechMsg.state() == SpeechMsg.State.OnBegin)
                            EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START));
                    }
                })
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END));
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.computation())
                .subscribe();
    }
}
项目:AssistantBySDK    文件:ChatListPresenter.java   
@Override
public void synthesize(SpeechMsg msg, final View speakervView) {
    if (SynthesizerBase.isInited()) {
        SpeechMsgBuilder builder = new SpeechMsgBuilder(msg.text)
                .setOrigin(com.lingju.audio.engine.base.SpeechMsg.ORIGIN_COMMON);
        if (msg instanceof ResponseSetionsMsg) {
            SynthesizerBase.get().setForceLocalEngine(false);
            builder.setSections(((ResponseSetionsMsg) msg).getSetions());
        }

        SynthesizerBase.get().startSpeakAbsolute(builder.build())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<com.lingju.audio.engine.base.SpeechMsg>() {
                    @Override
                    public void accept(com.lingju.audio.engine.base.SpeechMsg speechMsg) throws Exception {
                        if (speechMsg.state() == com.lingju.audio.engine.base.SpeechMsg.State.Idle) {
                            Log.i("LingJu", "doOnNext>>>startSpeakerAnimation>>>" + speakervView);
                            chatListView.startSpeakerAnimation(speakervView);
                        }
                    }
                })
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.i("ChatListPresenter", "doOnComplete>>stopSpeakerAnimation");
                        chatListView.stopSpeakerAnimation(speakervView);
                    }
                })
                .subscribe();
    }
}
项目:webtrekk-android-sdk    文件:MainActivity.java   
private void permissionRequest(String permissions[], final String permissionUINames[], @Nullable  final Runnable onCompletes[]){
    final List<Completable> completes = permissionRequest.
            requestPermission(this, permissions);

    for (int i = 0; i < completes.size(); i++) {
        final Completable complete = completes.get(i);
        final String permissionUIName = permissionUINames[i];
        final Runnable onComplete = onCompletes == null ? null : onCompletes[i];

        complete.subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d("Permission", permissionUIName);
                        if (onCompletes != null && onComplete != null) {
                            onComplete.run();
                        }
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.d("Permission", "Permission Error: " + throwable.getLocalizedMessage());
                    }
                });
    }
}
项目:Melophile    文件:HistoryTrackInteractor.java   
@Override
public void remove(Track track, Action onSuccess, Consumer<? super Throwable> onError) {
    personalRepository.removeTrack(track)
            .subscribeOn(schedulerProvider.io())
            .observeOn(schedulerProvider.ui())
            .subscribe(onSuccess,onError);
}
项目:reactive-architectures-playground    文件:AssignErrorStateTests.java   
@Before public void beforeEachTest() {
    MockitoAnnotations.initMocks(this);

    ErrorStateView view = new ErrorStateView() {
        @Override public Action showErrorState() {
            return show;
        }

        @Override public Action hideErrorState() {
            return hide;
        }
    };

    assignErrorState = new AssignErrorState<>(view, uiScheduler);
}
项目:Melophile    文件:FavoritePlaylistInteractor.java   
@Override
public void clear(Action onSuccess, Consumer<? super Throwable> onError) {
    personalRepository.clearFavoritePlaylists()
            .subscribeOn(schedulerProvider.io())
            .observeOn(schedulerProvider.ui())
            .subscribe(onSuccess,onError);
}
项目:Melophile    文件:FavoritePlaylistInteractor.java   
@Override
public void add(Playlist playlist, Action onComplete, Consumer<? super Throwable> onError) {
    personalRepository.likePlaylist(playlist)
            .subscribeOn(schedulerProvider.io())
            .observeOn(schedulerProvider.ui())
            .subscribe(onComplete,onError);
}
项目:Melophile    文件:FavoriteTrackInteractor.java   
@Override
public void add(Track track, Action onComplete, Consumer<? super Throwable> onError) {
    personalRepository.likeTrack(track)
            .subscribeOn(schedulerProvider.io())
            .observeOn(schedulerProvider.ui())
            .subscribe(onComplete,onError);
}
项目:RxOptional    文件:RxOptional.java   
@Nonnull
public RxOptional<T> ifNotPresent(@Nonnull Action action) {
    if (value == null) {
        requireNonNull(action);
        try {
            action.run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    return this;
}