private Observable<String> showNameInputDialog(String prefix, MaterialDialog.InputCallback textWatcher) { final PublishSubject<String> input = PublishSubject.create(); DialogUtils.showDialog(new ThemeColorMaterialDialogBuilder(mContext).title(R.string.text_name) .inputType(InputType.TYPE_CLASS_TEXT) .alwaysCallInputCallback() .input(getString(R.string.text_please_input_name), prefix, false, textWatcher) .onPositive(new MaterialDialog.SingleButtonCallback() { @Override public void onClick(@NonNull MaterialDialog dialog, @NonNull DialogAction which) { input.onNext(dialog.getInputEditText().getText().toString()); input.onComplete(); } }) .build()); return input; }
/** * PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。 * 需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据(除非你可以阻止它发生), * 因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。 * 如果要确保来自原始Observable的所有数据都被分发: * 1.使用Create创建那个Observable以便手动给它引入"冷"Observable的行为(当所有观察者都已经订阅时才开始发射数据) * 2.改用ReplaySubject。 * * 如果原始的Observable因为发生了一个错误而终止,PublishSubject将不会发射任何数据,只是简单的向前传递这个错误通知。 */ private void doSomeWork() { PublishSubject<Integer> source = PublishSubject.create(); source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4 and onComplete source.onNext(1); source.onNext(2); source.onNext(3); /* * it will emit 4 and onComplete for second observer also. */ source.subscribe(getSecondObserver()); source.onNext(4); source.onComplete(); }
void onRequestPermissionsResult(String permissions[], int[] grantResults, boolean[] shouldShowRequestPermissionRationale) { for (int i = 0, size = permissions.length; i < size; i++) { Log.i("","onRequestPermissionsResult " + permissions[i]); // Find the corresponding subject PublishSubject<Permission> subject = mSubjects.get(permissions[i]); if (subject == null) { // No subject found Log.e(RxPermissions.TAG, "RxPermissions.onRequestPermissionsResult invoked but didn't find the corresponding permission request."); return; } mSubjects.remove(permissions[i]); boolean granted = grantResults[i] == PackageManager.PERMISSION_GRANTED; subject.onNext(new Permission(permissions[i], granted, shouldShowRequestPermissionRationale[i])); subject.onComplete(); } }
private void monitor(InputStream input, PublishSubject<String> subject) { BufferedReader reader = new BufferedReader(new InputStreamReader(input, encoding)); final int BUFFERSIZE = 4096; char[] buffer = new char[BUFFERSIZE]; while (true) { try { if (Thread.interrupted()) { subject.onComplete(); return; } int read = reader.read(buffer, 0, BUFFERSIZE); if (read == -1) { subject.onComplete(); return; } subject.onNext(new String(buffer, 0, read)); } catch (IOException e) { subject.onError(e); return; } } }
@Override public void downloadProducts(DownloadProductsRequest request, StreamObserver<Product> responseObserver) { PublishSubject<Product> productPublishSubject = PublishSubject.create(); productPublishSubject .doOnNext(product -> { responseObserver.onNext(product); counter.labels("downloadProducts", "success"); }) .doOnComplete(() -> responseObserver.onCompleted()) .doOnError(t -> { responseObserver.onError(t); counter.labels("downloadProducts", "failed"); }) .subscribe(); productDao.downloadProducts(request, productPublishSubject); }
private void initView() { subject = PublishSubject.create(); subject.debounce(0, TimeUnit.MILLISECONDS) // .filter(new Predicate<Float>() { // @Override // public boolean test(Float brightness) throws Exception { // return true; // } // }) .distinctUntilChanged() .switchMap(new Function<Float, ObservableSource<ColorMatrixColorFilter>>() { @Override public ObservableSource<ColorMatrixColorFilter> apply(Float value) throws Exception { return postBrightness(value); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<ColorMatrixColorFilter>() { @Override public void accept(ColorMatrixColorFilter colorMatrixColorFilter) throws Exception { setColorFilter(colorMatrixColorFilter); } }); }
@Inject TranscodingServiceImpl(ExternalProcessService externalProcessService, TranscoderSettings transcoderSettings, MediaScanSettings mediaScanSettings, OutputParser parser) { this.externalProcessService = externalProcessService; this.transcoderSettings = transcoderSettings; this.mediaScanSettings = mediaScanSettings; this.parser = parser; this.publisher = PublishSubject.create().toSerialized(); publisher.ofType(TranscodeTask.class) .skipWhile(o -> isActive()) .observeOn(Schedulers.computation()) .subscribeOn(Schedulers.io()) .subscribe(this::prepareTranscode); }
@Override public View onCreateView(LayoutInflater inflater, ViewGroup container, Bundle savedInstanceState) { // Inflate the layout for this fragment View view = inflater.inflate(R.layout.fragment_two_way_data_biding, container, false); unbinder = ButterKnife.bind(this, view); publishSubject = PublishSubject.create(); publishSubject.subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { mTvAddResult.setText(s); } }); onNumChanged(); mEtAddLeft.requestFocus(); return view; }
@Test public void onUserQueryEvent_emptyQueryString_shouldDoNothingWithView() { // given PublishSubject<UserQueryEvent> userQuery$ = PublishSubject.create(); UserListPresenter presenter = new UserListPresenter( userQuery$, userService, () -> mock(UserView.class), () -> mock(UserPresenter.class), DEFAULT_PAGE_SIZE, DEFAULT_USER_SEARCH_LIMIT ); presenter.start(view); UserQueryEvent event = new UserQueryEvent(" "); // empty string // when userQuery$.onNext(event); // then verifyNoMoreInteractions(view, userService); }
void onRequestPermissionsResult(String permissions[], int[] grantResults, boolean[] shouldShowRequestPermissionRationale) { for (int i = 0, size = permissions.length; i < size; i++) { log("onRequestPermissionsResult " + permissions[i]); // Find the corresponding subject PublishSubject<Permission> subject = mSubjects.get(permissions[i]); if (subject == null) { // No subject found Log.e(RxPermissions.TAG, "RxPermissions.onRequestPermissionsResult invoked but didn't find the corresponding permission request."); return; } mSubjects.remove(permissions[i]); boolean granted = (grantResults[i] == PackageManager.PERMISSION_GRANTED) && (PermissionChecker.checkSelfPermission(getContext(), permissions[i]) == PermissionChecker.PERMISSION_GRANTED); subject.onNext(new Permission(permissions[i], granted, shouldShowRequestPermissionRationale[i])); subject.onComplete(); } }
/** * 接受消息 * * @param tag 标志 * @param callBack 回调 */ public <T> DisposableObserver registerNoThread(@NonNull final Object tag, @NonNull final RxBusCallBack<T> callBack) { RxBusEvent rxBusEvent = rxBusEventArrayMap.get(tag); if (RxUtils.isEmpty(rxBusEvent)) { rxBusEvent = new RxBusEvent(); rxBusEvent.subject = PublishSubject.create().toSerialized(); rxBusEvent.disposable = rxBusEvent.subject .ofType(callBack.busOfType()) .subscribeWith(new RxBusObserver<T>() { @Override public void onError(@io.reactivex.annotations.NonNull Throwable e) { super.onError(e); callBack.onBusError(e); } @Override public void onNext(@io.reactivex.annotations.NonNull T t) { super.onNext(t); } }); rxBusEventArrayMap.put(tag, rxBusEvent); } return rxBusEvent.disposable; }
@Test public void onUserSelected_view_shouldPostUserSelectedEvent() { // given TestObserver<UserSelectedEvent> userSelected$ = TestObserver.create(); User user = mock(User.class); given(user.getLogin()).willReturn("foo"); PublishSubject<Trigger> userSelectionIntent = PublishSubject.create(); given(view.userSelection$()).willReturn(userSelectionIntent); UserPresenter presenter = new UserPresenter(Sink.of(userSelected$)); presenter.start(user, view); // when fire(userSelectionIntent); // then userSelected$.assertValueCount(1); UserSelectedEvent event = userSelected$.values().get(0); assertThat(event.getUser().getLogin()).isEqualTo("foo"); }
public Observable<Boolean> refreshOnlineStatus() { PublishSubject<Boolean> online = PublishSubject.create(); mRetrofit.create(UserApi.class) .me() .subscribeOn(Schedulers.io()) .subscribe(user -> { setUser(user); online.onNext(true); online.onComplete(); }, error -> { setUser(null); online.onNext(false); online.onComplete(); }); return online; }
@Test public void testGetStderrUpdatesEchoesUpdatesFromExecutorObservers() throws InterruptedException, ExecutionException, TimeoutException { final CancelablePromise<JobExecutionResult> executorPromise = new SimpleCancelablePromise<>(); final Subject<byte[]> stderrSubject = PublishSubject.create(); final JobExecutor executor = MockJobExecutor.thatUses(executorPromise, Observable.just(TestHelpers.generateRandomBytes()), stderrSubject); final JobManager jobManager = createManagerWith(executor); final Pair<JobId, CancelablePromise<FinalizedJob>> ret = jobManager.submit(STANDARD_VALID_REQUEST); final Observable<byte[]> stderrObservable = jobManager.stderrUpdates(ret.getLeft()).get(); final AtomicReference<byte[]> bytesFromObservable = new AtomicReference<>(); stderrObservable.subscribe(bytesFromObservable::set); final byte[] bytesExpected = TestHelpers.generateRandomBytes(); stderrSubject.onNext(bytesExpected); executorPromise.complete(new JobExecutionResult(FINISHED)); ret.getRight().get(DEFAULT_TIMEOUT, MILLISECONDS); assertThat(bytesFromObservable.get()).isEqualTo(bytesExpected); }
private void initView() { subject = PublishSubject.create(); subject.debounce(0, TimeUnit.MILLISECONDS) // .filter(new Predicate<Float>() { // @Override // public boolean test(Float contrast) throws Exception { // return true; // } // }) .distinctUntilChanged() .switchMap(new Function<Float, ObservableSource<ColorMatrixColorFilter>>() { @Override public ObservableSource<ColorMatrixColorFilter> apply(Float value) throws Exception { return postContrast(value); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<ColorMatrixColorFilter>() { @Override public void accept(ColorMatrixColorFilter colorMatrixColorFilter) throws Exception { setColorFilter(colorMatrixColorFilter); } }); }
public Observable<ScriptFile> download(String url, String path, MaterialDialog progressDialog) { PublishSubject<ScriptFile> subject = PublishSubject.create(); DownloadManager.getInstance().download(url, path) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(progressDialog::setProgress) .subscribe(new SimpleObserver<Integer>() { @Override public void onComplete() { progressDialog.dismiss(); subject.onNext(new ScriptFile(path)); subject.onComplete(); } @Override public void onError(Throwable error) { Log.e(LOG_TAG, "Download failed", error); progressDialog.dismiss(); showMessage(R.string.text_download_failed); subject.onError(error); } }); return subject; }
@Test public void testPersistStderrReturnsADisposableThatStopsFurtherReads() { final JobDAO dao = getInstance(); final JobId jobId = dao.persist(STANDARD_VALID_REQUEST).getId(); final Subject<byte[]> stderrSubject = PublishSubject.create(); final AtomicBoolean stderrObsWasRead = new AtomicBoolean(false); final Observable<byte[]> stderrObs = stderrSubject.map(data -> { stderrObsWasRead.set(true); return data; }); final Disposable disposable = dao.appendStderr(jobId, stderrObs); disposable.dispose(); stderrSubject.onNext(TestHelpers.generateRandomBytes()); assertThat(stderrObsWasRead.get()); }
@Test public void testExecuteStderrListenerIsCompletedOnceApplicationExecutionEnds() throws Throwable { final JobExecutor jobExecutor = getInstance(); final AtomicBoolean completedCalled = new AtomicBoolean(false); final Subject<byte[]> stderrSubject = PublishSubject.create(); stderrSubject.doOnComplete(() -> completedCalled.set(true)).subscribe(); final JobEventListeners listeners = createStderrListener(stderrSubject); final CancelablePromise<JobExecutionResult> ret = jobExecutor.execute(STANDARD_REQUEST, listeners); promiseAssert(ret, result -> { try { // The stderr thread can race with the exit thread Thread.sleep(50); assertThat(completedCalled.get()).isTrue(); } catch (InterruptedException ignored) {} }); }
@Test public void testExecuteEvaluatesJobInputsAsExpected() throws InterruptedException { final JobExecutor jobExecutor = getInstance(); final PersistedJob req = standardRequestWithCommand("echo", "${inputs.foo}"); final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{}); final Subject<byte[]> stdoutSubject = PublishSubject.create(); stdoutSubject.subscribe(bytes -> bytesEchoedToStdout.getAndUpdate(existingBytes -> Bytes.concat(existingBytes, bytes))); final Semaphore s = new Semaphore(1); s.acquire(); stdoutSubject.doOnComplete(s::release).subscribe(); final JobEventListeners listeners = createStdoutListener(stdoutSubject); jobExecutor.execute(req, listeners); s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS); final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim(); assertThat(stringFromStdout).isEqualTo("a"); // from spec }
public static void main(String[] args) { Subject<String> subject = PublishSubject.create(); subject.onNext("Alpha"); subject.onNext("Beta"); subject.onNext("Gamma"); subject.onComplete(); subject.map(String::length) .subscribe(System.out::println); }
@TargetApi(Build.VERSION_CODES.M) private Observable<Permission> requestImplementation(final String... permissions) { List<Observable<Permission>> list = new ArrayList<>(permissions.length); List<String> unrequestedPermissions = new ArrayList<>(); // In case of multiple permissions, we create an Observable for each of them. // At the end, the observables are combined to have a unique response. for (String permission : permissions) { mRxPermissionsFragment.log("Requesting permission " + permission); if (isGranted(permission)) { // Already granted, or not Android M // Return a granted Permission object. list.add(Observable.just(new Permission(permission, true, false))); continue; } if (isRevoked(permission)) { // Revoked by a policy, return a denied Permission object. list.add(Observable.just(new Permission(permission, false, false))); continue; } PublishSubject<Permission> subject = mRxPermissionsFragment.getSubjectByPermission(permission); // Create a new subject if not exists if (subject == null) { unrequestedPermissions.add(permission); subject = PublishSubject.create(); mRxPermissionsFragment.setSubjectForPermission(permission, subject); } list.add(subject); } if (!unrequestedPermissions.isEmpty()) { String[] unrequestedPermissionsArray = unrequestedPermissions.toArray(new String[unrequestedPermissions.size()]); requestPermissionsFromFragment(unrequestedPermissionsArray); } return Observable.concat(Observable.fromIterable(list)); }
public static void main(String[] args) { Subject<String> subject = PublishSubject.create(); subject.map(String::length) .subscribe(System.out::println); subject.onNext("Alpha"); subject.onNext("Beta"); subject.onNext("Gamma"); subject.onComplete(); }
public SelectedCountToolbarPresenter(Observable<Integer> selectedCountObservable, PublishSubject<Boolean> clearSelectionRelay, PublishSubject<Boolean> deleteSelectedItemsRelay) { this.selectedCountObservable = selectedCountObservable; this.clearSelectionRelay = clearSelectionRelay; this.deleteSelectedItemsRelay = deleteSelectedItemsRelay; }
@Test public void start_openDrawerIntent_shouldOpenTheDrawer() { // given TestObserver<SnackbarMessageEvent> snackbarMessage$ = new TestObserver<>(); PublishSubject<Trigger> openDrawerIntent = PublishSubject.create(); given(view.openDrawerIntent$()).willReturn(openDrawerIntent); given(view.readAboutIntent$()).willReturn(noTriggers()); given(view.openProjectOnGitHubIntent$()).willReturn(noTriggers()); given(view.selectLanguageIntent$()).willReturn(noTriggers()); DrawerPresenter presenter = new DrawerPresenter( "http://foo.com", Sink.of(snackbarMessage$), urlOpener ); presenter.start(view); // when fire(openDrawerIntent); // then verify(view).openDrawerIntent$(); verify(view).readAboutIntent$(); verify(view).openProjectOnGitHubIntent$(); verify(view).selectLanguageIntent$(); verify(view).openDrawer(true); then(view).shouldHaveNoMoreInteractions(); then(urlOpener).shouldHaveZeroInteractions(); snackbarMessage$.assertNoValues(); }
@Test public void onEvent_presenterStopped_shouldIgnoreSubsequentEvent() { // given PublishSubject<String> subject = PublishSubject.create(); TestPresenter presenter = new TestPresenter(); presenter.on(subject).call(subscriber); presenter.stop(); // when subject.onNext("foo"); // then verifyZeroInteractions(subscriber); }
@Override public SearchViewModel get(NavigationController navigationController, Observable<String> searchObservable, PublishSubject<SearchAdapter> searchAdapterSubject, SearchService searchService, SearchAdapterFactory searchAdapterFactory, Scheduler androidScheduler) { return new SearchViewModel(navigationController, searchObservable, searchAdapterSubject, searchService, searchAdapterFactory, androidScheduler) { @Override public void search(String city) { latch.countDown(); } }; }
@TargetApi(Build.VERSION_CODES.M) private Observable<Permission> requestImplementation(Context context,final String... permissions) { List<Observable<Permission>> list = new ArrayList<>(permissions.length); List<String> unrequestedPermissions = new ArrayList<>(); // In case of multiple permissions, we create an Observable for each of them. // At the end, the observables are combined to have a unique response. for (String permission : permissions) { XPermissionActivity.log("Requesting permission " + permission); if (isGranted(context,permission)) { // Already granted, or not Android M // Return a granted Permission object. list.add(Observable.just(new Permission(permission, true, false))); continue; } if (isRevoked(context,permission)) { // Revoked by a policy, return a denied Permission object. list.add(Observable.just(new Permission(permission, false, false))); continue; } PublishSubject<Permission> subject = XPermissionActivity.getSubjectByPermission(permission); // Create a new subject if not exists if (subject == null) { unrequestedPermissions.add(permission); subject = PublishSubject.create(); XPermissionActivity.setSubjectForPermission(permission, subject); } list.add(subject); } if (!unrequestedPermissions.isEmpty()) { String[] unrequestedPermissionsArray = unrequestedPermissions.toArray(new String[unrequestedPermissions.size()]); requestPermissionsFromActivity(context,unrequestedPermissionsArray); } return Observable.concat(Observable.fromIterable(list)); }
void onRequestPermissionsResult(String permissions[], int[] grantResults, boolean[] shouldShowRequestPermissionRationale) { for (int i = 0, size = permissions.length; i < size; i++) { log("onRequestPermissionsResult " + permissions[i]); // Find the corresponding subject PublishSubject<Permission> subject = mSubjects.get(permissions[i]); if (subject == null) { // No subject found log("XPermission.onRequestPermissionsResult invoked but didn't find the corresponding permission request."); return; } mSubjects.remove(permissions[i]); boolean granted = grantResults[i] == PackageManager.PERMISSION_GRANTED; boolean showRequestPermissionRationale = shouldShowRequestPermissionRationale[i]; log("granted: " + granted + "; showRequestPermissionRationale: " + showRequestPermissionRationale); if(Manifest.permission.WRITE_SETTINGS.equals(permissions[i]) || Manifest.permission.SYSTEM_ALERT_WINDOW.equals(permissions[i])){ granted = PermissionsChecker.isPermissionGranted(this,permissions[i],false); }else{ if(granted){ if(PermissionsChecker.isPermissionGranted(this,permissions[i],true)){ granted = true; }else{ granted = false; showRequestPermissionRationale = false; } }else if(showRequestPermissionRationale){ if(PermissionsChecker.isPermissionGranted(this,permissions[i],false)){ granted = true; }else{ granted = false; } } } subject.onNext(new Permission(permissions[i], granted, showRequestPermissionRationale)); subject.onComplete(); } }
@Test public void start_noEventPosted_shouldDoNothingWithView() { // given PublishSubject<SnackbarMessageEvent> snackbarMessage$ = PublishSubject.create(); SnackbarPresenter presenter = new SnackbarPresenter(snackbarMessage$); // when presenter.start(view); // then verifyZeroInteractions(view); }
Store(@NonNull State initialState, @NonNull Reducer<State> reducer, @NonNull Effect<State>[] effects) { this.reducer = reducer; this.action$ = PublishSubject.create(); this.state$ = BehaviorSubject.createDefault(initialState); this.result$ = Observable.fromArray(effects) .flatMap(transformer -> transformer.apply(action$, this::currentState)); }
/** * Subscribes the passed subject to the */ protected <T> void bindAction(Observable<T> from, PublishSubject<T> to) { composite.add( from .doOnNext(t -> Timber.d("bind action called")) .subscribe(to::onNext) ); }
@TargetApi(Build.VERSION_CODES.M) private Observable<Permission> requestImplementation(final String... permissions) { List<Observable<Permission>> list = new ArrayList<>(permissions.length); List<String> unrequestedPermissions = new ArrayList<>(); // In case of multiple permissions, we create an Observable for each of them. // At the end, the observables are combined to have a unique response. for (String permission : permissions) { Log.i("","Requesting permission " + permission); if (isGranted(permission)) { // Already granted, or not Android M // Return a granted Permission object. list.add(Observable.just(new Permission(permission, true, false))); continue; } if (isRevoked(permission)) { // Revoked by a policy, return a denied Permission object. list.add(Observable.just(new Permission(permission, false, false))); continue; } PublishSubject<Permission> subject = mRxPermissionsFragment.getSubjectByPermission(permission); // Create a new subject if not exists if (subject == null) { unrequestedPermissions.add(permission); subject = PublishSubject.create(); mRxPermissionsFragment.setSubjectForPermission(permission, subject); } list.add(subject); } if (!unrequestedPermissions.isEmpty()) { String[] unrequestedPermissionsArray = unrequestedPermissions.toArray(new String[unrequestedPermissions.size()]); requestPermissionsFromFragment(unrequestedPermissionsArray); } return Observable.concat(Observable.fromIterable(list)); }
DialogsStore(@NonNull AppStores base) { super(base); this.updatePublishSubject = PublishSubject.create(); this.dialogsDeletingPublisher = PublishSubject.create(); this.preferences = base.getSharedPreferences("dialogs_prefs", Context.MODE_PRIVATE); this.unreadDialogsCounter = PublishSubject.create(); }
@Test public void testExecuteEvaluatesToJSONFunctionAsExpected() throws InterruptedException { final JobExecutor jobExecutor = getInstance(); final PersistedJob req = standardRequestWithCommand("echo", "${toJSON(inputs)}"); final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{}); final Subject<byte[]> stdoutSubject = PublishSubject.create(); stdoutSubject.subscribe(bytes -> bytesEchoedToStdout.getAndUpdate(existingBytes -> Bytes.concat(existingBytes, bytes))); final Semaphore s = new Semaphore(1); s.acquire(); stdoutSubject.doOnComplete(s::release).subscribe(); final JobEventListeners listeners = createStdoutListener(stdoutSubject); jobExecutor.execute(req, listeners); s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS); final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim(); assertThat(stringFromStdout).isEqualTo(toJSON(STANDARD_VALID_REQUEST.getInputs())); }
@Inject public MessageViewModel(MessageRepository messageRepository, @Named("vm") CompositeDisposable compositeDisposable) { Timber.d("Init MessageViewModel"); this.repository = messageRepository; this.compositeDisposable = compositeDisposable; this.messageIdentifier = new Identifier(); this.subject = PublishSubject.create(); this.messages = new MutableLiveData<>(); }
RealtimeMessagesProcessor() { this.app = Injection.provideApplicationContext(); this.repositories = Injection.provideStores(); this.networker = Injection.provideNetworkInterfaces(); this.publishSubject = PublishSubject.create(); this.queue = new LinkedList<>(); this.notificationsInterceptors = new SparseArray<>(3); this.ownersInteractor = InteractorFactory.createOwnerInteractor(); this.messagesInteractor = InteractorFactory.createMessagesInteractor(); }
/** * Download product from given category. * * @param request which contains query category * @param productPublishSubject the subject which downloaded product should publish to */ public void downloadProducts(DownloadProductsRequest request, PublishSubject<Product> productPublishSubject) { QueryBuilder queryBuilder = QueryBuilders.termQuery("category", request.getCategory()); SearchResponse scrollResponse = esClient .prepareSearch(INDEX) .setScroll(DEFAULT_SCROLL_TIME_VALUE) .setTypes(TYPE) .setQuery(queryBuilder) .setSize(SCROLL_SIZE) .get(); do { scrollResponse.getHits().forEach(hit -> { try { Product.Builder builder = Product.newBuilder(); jsonParser.merge(hit.sourceAsString(), builder); productPublishSubject.onNext(builder.build()); } catch (IOException ioe) { // Don't fail the whole stream log.error("Unable to read product record", ioe); productPublishSubject.onError(ioe); throw new IllegalStateException(ioe); } }); // Fetch next batch of cite group records scrollResponse = esClient .prepareSearchScroll(scrollResponse.getScrollId()) .setScroll(DEFAULT_SCROLL_TIME_VALUE) .execute() .actionGet(); } while (scrollResponse.getHits().getHits().length != 0); productPublishSubject.onComplete(); }
@Test public void calculateProductScore() throws Exception { PublishSubject<CalculateProductScoreResponse> publishSubject = PublishSubject.create(); List<CalculateProductScoreResponse> responses = Lists.newArrayList(); publishSubject .doOnNext(response -> responses.add(response)) .subscribe(); Product product = createProduct("category"); productDao.calculateProductScore(product, publishSubject); assertThat(responses.size()).isEqualTo(1); publishSubject.onComplete(); }
/** * Subjects can be both observables and observers * Registration of the subscribers is important, a observer is only be notified if * a new events occurs * */ @Test public void useSubject() { Subject<String> subject = PublishSubject.<String>create().toSerialized(); subject.subscribe(e-> result+=e); subject.onNext("Hello"); subject.onNext("Man"); subject.onNext("Test"); subject.subscribe(e-> doIt()); subject.onNext("Test"); assertThat(result).isEqualTo(_____); }
/** * 接受消息 * * @param tag 标志 * @param callBack 回调 */ public <T> DisposableObserver register(@NonNull final Object tag, @NonNull final RxBusCallBack<T> callBack) { RxBusEvent rxBusEvent = rxBusEventArrayMap.get(tag); if (RxUtils.isEmpty(rxBusEvent)) { rxBusEvent = new RxBusEvent(); rxBusEvent.subject = PublishSubject.create().toSerialized(); rxBusEvent.disposable = rxBusEvent.subject .ofType(callBack.busOfType()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new RxBusObserver<T>() { @Override public void onError(@io.reactivex.annotations.NonNull Throwable e) { super.onError(e); callBack.onBusError(e); } @Override public void onNext(@io.reactivex.annotations.NonNull T t) { super.onNext(t); callBack.onBusNext(t); } }); rxBusEventArrayMap.put(tag, rxBusEvent); } return rxBusEvent.disposable; }