Java 类rx.subjects.BehaviorSubject 实例源码

项目:Jockey    文件:LocalPlaylistStore.java   
private Observable<List<Song>> getAutoPlaylistSongs(AutoPlaylist playlist) {
    BehaviorSubject<List<Song>> subject;

    if (mPlaylistContents.containsKey(playlist)) {
        subject = mPlaylistContents.get(playlist);
    } else {
        subject = BehaviorSubject.create();
        mPlaylistContents.put(playlist, subject);

        playlist.generatePlaylist(mMusicStore, this, mPlayCountStore)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(subject::onNext, subject::onError);

        subject.observeOn(Schedulers.io())
                .subscribe(contents -> {
                    MediaStoreUtil.editPlaylist(mContext, playlist, contents);
                }, throwable -> {
                    Timber.e(throwable, "Failed to save playlist contents");
                });
    }

    return subject.asObservable();
}
项目:RxSamplesPractice    文件:Samples.java   
public static void behaviourSubject() {
    BehaviorSubject<Integer> subject = BehaviorSubject.create();
    subject.onNext(5);

    Action1<Integer> action1 = integer -> Log.i("From action1", String.valueOf(integer));
    Subscription subscription1 = subject.subscribe(action1);
    subject.onNext(10);

    Action1<Integer> action2 = integer -> Log.i("From action2", String.valueOf(integer));
    Subscription subscription2 = subject.subscribe(action2);
    subject.onNext(20);

    subscription1.unsubscribe();
    subject.onNext(40);

    subscription2.unsubscribe();
    subject.onNext(80);
}
项目:arctor    文件:WaitViewReplayTransformerTest.java   
@Test
public void shouldEmitErrorAfterViewIsAttached() {
    TestScheduler testScheduler = Schedulers.test();
    BehaviorSubject<Boolean> view = BehaviorSubject.create();
    view.onNext(true);
    WaitViewReplayTransformer<Object> transformer = new WaitViewReplayTransformer<>(
            view.delay(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler));
    TestSubscriber<Object> testSubscriber = new TestSubscriber<>();
    Observable.error(new RuntimeException())
            .compose(transformer)
            .subscribe(testSubscriber);
    testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS);
    testSubscriber.awaitTerminalEvent();
    testSubscriber.assertError(RuntimeException.class);
}
项目:arctor    文件:WaitViewLatestTransformerTest.java   
@Test
public void shouldEmitValueAfterViewIsAttached() {
    TestScheduler testScheduler = Schedulers.test();
    BehaviorSubject<Boolean> view = BehaviorSubject.create();
    view.onNext(true);
    WaitViewLatestTransformer<Integer> transformer =
            new WaitViewLatestTransformer<>(view.delay(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler));
    TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
    Observable.just(0)
            .compose(transformer)
            .subscribe(testSubscriber);
    testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS);
    testSubscriber.awaitTerminalEvent();
    testSubscriber.assertValue(0);
    testSubscriber.assertCompleted();
}
项目:nextop-client    文件:RxExceptionTest.java   
public void testExceptionSubjectAction() {
    // test the behavior of surfacing exceptions from a subject


    BehaviorSubject<Integer> subject = BehaviorSubject.create();

    final List<Notification<Integer>> notifications = new ArrayList<Notification<Integer>>(4);

    Subscription s = subject.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer t) {
            notifications.add(Notification.createOnNext(t));
            throw new RuntimeException("call " + t);
        }
    });

    try {
        subject.onNext(0);
        // (unreachable) expect an exception to be thrown
        fail();
    } catch (RuntimeException e) {
        assertEquals("call 0", e.getMessage());
    }
}
项目:spoofax    文件:ParseResultProcessor.java   
private BehaviorSubject<ParseChange<P>> getUpdates(I unit) {
    final FileObject resource = unit.source();
    final FileName name = resource.getName();

    // THREADING: it is possible that two different threads asking for a subject may do the parsing twice here, as
    // this is not an atomic operation. However, the chance is very low and it does not break anything (only
    // duplicates some work), so it is acceptable.
    BehaviorSubject<ParseChange<P>> updates = updatesPerResource.get(name);
    if(updates == null) {
        updates = BehaviorSubject.create();
        updatesPerResource.put(name, updates);
        try {
            logger.trace("Parsing for {}", resource);
            final P result = syntaxService.parse(unit);
            updates.onNext(ParseChange.update(result));
        } catch(ParseException e) {
            final String message = String.format("Parsing for %s failed", name);
            logger.error(message, e);
            updates.onNext(ParseChange.<P>error(e));
        }
    }
    return updates;
}
项目:GitHub    文件:DeliverReplayTest.java   
@Test
public void testPagingCapabilities() {
    PublishSubject<Object> view = PublishSubject.create();
    BehaviorSubject<Integer> nextPageRequests = BehaviorSubject.create();
    final TestObserver<Delivery<Object, String>> testObserver = new TestObserver<>();

    nextPageRequests
        .concatMap(new Func1<Integer, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(Integer targetPage) {
                return targetPage <= requestedPageCount ?
                    Observable.<Integer>never() :
                    Observable.range(requestedPageCount, targetPage - requestedPageCount);
            }
        })
        .doOnNext(new Action1<Integer>() {
            @Override
            public void call(Integer it) {
                requestedPageCount = it + 1;
            }
        })
        .startWith(Observable.range(0, requestedPageCount))
        .concatMap(new Func1<Integer, Observable<String>>() {
            @Override
            public Observable<String> call(final Integer page) {
                return requestPage(page, PAGE_SIZE);
            }
        })
        .compose(new DeliverReplay<Object, String>(view))
        .subscribe(testObserver);

    ArrayList<Delivery<Object, String>> onNext = new ArrayList<>();

    testObserver.assertReceivedOnNext(onNext);

    view.onNext(999);
    addOnNext(onNext, 999, 0, 1, 2);

    testObserver.assertReceivedOnNext(onNext);

    nextPageRequests.onNext(2);
    addOnNext(onNext, 999, 3, 4, 5);

    testObserver.assertReceivedOnNext(onNext);

    view.onNext(null);

    assertEquals(0, testObserver.getOnCompletedEvents().size());
    testObserver.assertReceivedOnNext(onNext);

    nextPageRequests.onNext(3);

    assertEquals(0, testObserver.getOnCompletedEvents().size());
    testObserver.assertReceivedOnNext(onNext);

    view.onNext(9999);
    addOnNext(onNext, 9999, 0, 1, 2, 3, 4, 5, 6, 7, 8);

    assertEquals(0, testObserver.getOnCompletedEvents().size());
    testObserver.assertReceivedOnNext(onNext);
}
项目:GitHub    文件:DeliverReplayTest.java   
@Test
public void testPagingCapabilities() {
    PublishSubject<Object> view = PublishSubject.create();
    BehaviorSubject<Integer> nextPageRequests = BehaviorSubject.create();
    final TestObserver<Delivery<Object, String>> testObserver = new TestObserver<>();

    nextPageRequests
        .concatMap(new Func1<Integer, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(Integer targetPage) {
                return targetPage <= requestedPageCount ?
                    Observable.<Integer>never() :
                    Observable.range(requestedPageCount, targetPage - requestedPageCount);
            }
        })
        .doOnNext(new Action1<Integer>() {
            @Override
            public void call(Integer it) {
                requestedPageCount = it + 1;
            }
        })
        .startWith(Observable.range(0, requestedPageCount))
        .concatMap(new Func1<Integer, Observable<String>>() {
            @Override
            public Observable<String> call(final Integer page) {
                return requestPage(page, PAGE_SIZE);
            }
        })
        .compose(new DeliverReplay<Object, String>(view))
        .subscribe(testObserver);

    ArrayList<Delivery<Object, String>> onNext = new ArrayList<>();

    testObserver.assertReceivedOnNext(onNext);

    view.onNext(999);
    addOnNext(onNext, 999, 0, 1, 2);

    testObserver.assertReceivedOnNext(onNext);

    nextPageRequests.onNext(2);
    addOnNext(onNext, 999, 3, 4, 5);

    testObserver.assertReceivedOnNext(onNext);

    view.onNext(null);

    assertEquals(0, testObserver.getOnCompletedEvents().size());
    testObserver.assertReceivedOnNext(onNext);

    nextPageRequests.onNext(3);

    assertEquals(0, testObserver.getOnCompletedEvents().size());
    testObserver.assertReceivedOnNext(onNext);

    view.onNext(9999);
    addOnNext(onNext, 9999, 0, 1, 2, 3, 4, 5, 6, 7, 8);

    assertEquals(0, testObserver.getOnCompletedEvents().size());
    testObserver.assertReceivedOnNext(onNext);
}
项目:MDRXL    文件:RxLoader.java   
@Override
protected void onStartLoading() {
    Log.d(getClass().getSimpleName(), "onStartLoading");
    super.onStartLoading();
    if (subject == null) {
        subject = BehaviorSubject.create();
    }

    subscription = subject.observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::onResult, this::onError);

    if (subjectSubscription == null) {
        subjectSubscription = searchQuerySubject.flatMap(this::create)
                .subscribeOn(Schedulers.io()).subscribe(subject);
    }
}
项目:disclosure-android-app    文件:AnalyseAppLibraryPermission.java   
@Inject @SuppressWarnings("checkstyle:parameternumber")
public AnalyseAppLibraryPermission(DecompileApp decompileApp,
    AnalyseLibraryMethodInvocations analyseLibraryMethodInvocations,
    AnalysePermissionsFromMethodInvocations analysePermissionsFromMethodInvocations,
    LibraryService libraryService,
    PermissionService permissionService,
    StorageProvider storageProvider,
    FileUtils fileUtils,
    AppService appService) {
  this.decompileApp = decompileApp;
  this.analyseLibraryMethodInvocations = analyseLibraryMethodInvocations;
  this.analysePermissionsFromMethodInvocations = analysePermissionsFromMethodInvocations;
  this.libraryService = libraryService;
  this.permissionService = permissionService;
  this.storageProvider = storageProvider;
  this.fileUtils = fileUtils;
  this.appService = appService;
  this.progressSubject = BehaviorSubject.create();
}
项目:RxBusLib    文件:SubscriberBehaviorEvent.java   
@Override
protected final void initObservable() {
    subject = BehaviorSubject.create();
    subject.onBackpressureBuffer()
            .observeOn(EventThread.getScheduler(observeThread))
            .subscribeOn(EventThread.getScheduler(subscribeThread))
            .subscribe(event -> {
                try {
                    if (valid) {
                        handleEvent(event);
                    }
                } catch (InvocationTargetException e) {
                    throwRuntimeException("Could not dispatch event: " + event.getClass() + " to subscriber " + SubscriberBehaviorEvent.this, e);
                }
            });
}
项目:restinparse    文件:ParseTableInternal.java   
protected  Observable<ParseObject<T>> findAll(ParseQuery.Builder<DefaultParseColumn> queryBuilder) {
    ParseRestApi api = ParseRestClientFactory.masterClient();

    BehaviorSubject<Date> relay = BehaviorSubject.create();
    relay.onNext(new Date());
    return relay.flatMap(date -> {
        ParseQuery<DefaultParseColumn> queryOlders = queryBuilder.olderThan(DefaultParseColumn.createdAt, date).build();
        Observable<Response<QueryResults>> response = api.query(className, queryOlders.params);
        return response;
    }).flatMap((r) -> flatMapResponse(r, relay));

}
项目:restinparse    文件:ParseTableInternal.java   
protected Observable<ParseObject<T>> flatMapResponse(Response<QueryResults> r, @Nullable BehaviorSubject<Date> relay) {
    if (!r.isSuccessful()) {
        if (relay!= null) { relay.onCompleted();}
        throw errorFrom(r);
    }
    List<ParseObject<T>> objects = new ArrayList<>();
    List<ParseMap> found = r.body().results;
    if (found == null) {
        found = Collections.emptyList();
    }
    for (ParseMap parseMap : found) {
        ParseObject<T> o = new ParseObject<T>(parseMap);
        objects.add(o);
    }
    if (relay != null) {
        if (found.size() == QUERY_ALL_LIMIT) {
            ParseObject<DefaultParseColumn> object = new ParseObject<>(found.get(QUERY_ALL_LIMIT - 1));
            Date createdAt = object.createdAt();
            relay.onNext(createdAt);
        } else {
            relay.onCompleted();
        }
    }
    return Observable.from(objects);
}
项目:Asynchronous-Android-Programming    文件:SubjectActivity.java   
void behaviourSubject(){
    Action1<Integer> onNextFuc= new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "New Event received on behaviourSubject: " + integer);
        }
    };
    BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create();
    behaviorSubject.onNext(1);
    behaviorSubject.onNext(2);
    Subscription subscription = behaviorSubject.doOnSubscribe(new Action0() {
        @Override
        public void call() {
            Log.i(TAG, "Observer subscribed to behaviorSubject");
        }
    }).doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
            Log.i(TAG, "Observer subscribed to behaviorSubject");
        }
    }).subscribe(onNextFuc);
    behaviorSubject.onNext(3);
    behaviorSubject.onNext(4);
    subscription.unsubscribe();
    behaviorSubject.onNext(5);
    behaviorSubject.onCompleted();
}
项目:erlymon-monitor-android    文件:MoreObservables.java   
@Nonnull
public static <T> Observable.Transformer<? super T, ? extends T> behaviorRefCount() {
    return new Observable.Transformer<T, T>() {
        @Override
        public Observable<T> call(Observable<T> tObservable) {
            return new OperatorMulticast<>(tObservable, new Func0<Subject<? super T, ? extends T>>() {

                @Override
                public Subject<? super T, ? extends T> call() {
                    return BehaviorSubject.<T>create();
                }

            }).refCount();
        }
    };
}
项目:MarketBot    文件:GroupsLoader.java   
private void updateMarketTypes() {
    BehaviorSubject<Map.Entry<Integer, Integer>> subject = BehaviorSubject.create();
    CompositeSubscription subscriptions = new CompositeSubscription();
    subscriptions.add(subject
        .observeOn(AndroidSchedulers.mainThread())
        .doOnNext(progress -> {
            onProgressUpdate(progress.getKey(), progress.getValue(), "Storing items list...");

            if (progress.getKey().equals(progress.getValue())) {
                subscriptions.unsubscribe();
                decrementUpdatingCount();
                updateFinished();
            }
        }).subscribe());


    Observable.defer(() -> Observable.just(getAllMarketTypes()))
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnNext(orders -> MarketTypeEntry.addNewMarketTypes(orders, subject))
        .subscribe();
}
项目:MarketBot    文件:MarketTypeEntry.java   
public static void addNewMarketTypes(List<CrestMarketType> types, BehaviorSubject<Map.Entry<Integer, Integer>> subject) {
    int size = types.size();
    List<MarketTypeEntry> entries = new ArrayList<>(size);
    for (int i = 0; i < size; i++) {
        CrestMarketType type = types.get(i);

        MarketTypeEntry entry = new MarketTypeEntry();
        entry.id = type.getTypeId();
        entry.groupId = type.getGroupId();
        entry.href = type.getTypeHref();
        entry.icon = type.getTypeIcon();
        entry.name = type.getTypeName();
        entries.add(entry);
    }

    TransactionManager manager = TransactionManager.getInstance();
    ProcessModelTransaction transaction = new SaveModelTransaction<>(ProcessModelInfo.withModels(entries));

    transaction.setChangeListener((current, maxProgress, modifiedModel) -> {
        if (current % 25 == 0 || current == maxProgress) {
            subject.onNext(new AbstractMap.SimpleEntry<>((int) current, (int) maxProgress));
        }
    });

    manager.addTransaction(transaction);
}
项目:FlowGeek    文件:DeliverReplayTest.java   
@Test
public void testPagingCapabilities() {
    PublishSubject<Object> view = PublishSubject.create();
    BehaviorSubject<Integer> nextPageRequests = BehaviorSubject.create();
    final TestObserver<Delivery<Object, String>> testObserver = new TestObserver<>();

    nextPageRequests
        .concatMap(new Func1<Integer, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(Integer targetPage) {
                return targetPage <= requestedPageCount ?
                    Observable.<Integer>never() :
                    Observable.range(requestedPageCount, targetPage - requestedPageCount);
            }
        })
        .doOnNext(new Action1<Integer>() {
            @Override
            public void call(Integer it) {
                requestedPageCount = it + 1;
            }
        })
        .startWith(Observable.range(0, requestedPageCount))
        .concatMap(new Func1<Integer, Observable<String>>() {
            @Override
            public Observable<String> call(final Integer page) {
                return requestPage(page, PAGE_SIZE);
            }
        })
        .compose(new DeliverReplay<Object, String>(view))
        .subscribe(testObserver);

    ArrayList<Delivery<Object, String>> onNext = new ArrayList<>();

    testObserver.assertReceivedOnNext(onNext);

    view.onNext(999);
    addOnNext(onNext, 999, 0, 1, 2);

    testObserver.assertReceivedOnNext(onNext);

    nextPageRequests.onNext(2);
    addOnNext(onNext, 999, 3, 4, 5);

    testObserver.assertReceivedOnNext(onNext);

    view.onNext(null);

    assertEquals(0, testObserver.getOnCompletedEvents().size());
    testObserver.assertReceivedOnNext(onNext);

    nextPageRequests.onNext(3);

    assertEquals(0, testObserver.getOnCompletedEvents().size());
    testObserver.assertReceivedOnNext(onNext);

    view.onNext(9999);
    addOnNext(onNext, 9999, 0, 1, 2, 3, 4, 5, 6, 7, 8);

    assertEquals(0, testObserver.getOnCompletedEvents().size());
    testObserver.assertReceivedOnNext(onNext);
}
项目:android-oss    文件:LoginToutViewModel.java   
private void registerFacebookCallback() {
  final PublishSubject<String> fbAccessToken = this.facebookAccessToken;
  final BehaviorSubject<FacebookException> fbAuthError = this.facebookAuthorizationError;

  this.callbackManager = CallbackManager.Factory.create();

  LoginManager.getInstance().registerCallback(this.callbackManager, new FacebookCallback<LoginResult>() {
    @Override
    public void onSuccess(final @NonNull LoginResult result) {
      fbAccessToken.onNext(result.getAccessToken().getToken());
    }

    @Override
    public void onCancel() {
      // continue
    }

    @Override
    public void onError(final @NonNull FacebookException error) {
      if (error instanceof FacebookAuthorizationException) {
        fbAuthError.onNext(error);
      }
    }
  });
}
项目:RxTestWrapper    文件:RxTestWrapperTest.java   
@Test
public void shouldSucceedHasValuesAndClear() throws Exception {
    BehaviorSubject<String> subject = BehaviorSubject.create("first item");
    RxTestWrapper<String> assertion = assertThat(subject)
          .hasValuesAndClear("first item");
    subject.onNext("second value");
    assertion.hasValueCount(1);
}
项目:haven-platform    文件:ComposeMonitor.java   
public ComposeMonitor(DockerService dockerService, String fileName) {
    this.monitor = BehaviorSubject.create();
    this.dockerService = dockerService;
    this.fileName = fileName;
    this.observer = Observers.create(t -> {
        List<String> containerIds = getContainerIds();
        containerIds.forEach(s -> {
            ContainerDetails details = dockerService.getContainer(s);
            log.debug("get container {}", details);
            if (checkContainer(details)) {
                log.error("Container crashed {}", details);
                monitor.onNext(ResultCode.ERROR);
                monitor.onCompleted();
                return;
            }
        });
    });
}
项目:AndroidFlux    文件:TestDataStore.java   
public TestDataStore() {
  this.data = new TestData();
  this.nonNullName = BehaviorSubject.create();
  this.name = BehaviorSubject.create();
  this.passName = BehaviorSubject.create();
  this.defaultName = BehaviorSubject.create(this.data.getDefaultName());
  this.messages = BehaviorSubject.create();
  this.map = BehaviorSubject.create(this.data.getMap());
  this.i = BehaviorSubject.create();
  this.l = BehaviorSubject.create();
  this.f = BehaviorSubject.create();
  this.d = BehaviorSubject.create();
  this.b = BehaviorSubject.create();
  this.s = BehaviorSubject.create();
  this.bb = BehaviorSubject.create();
  this.c = BehaviorSubject.create();
  this.ia = BehaviorSubject.create();
  this.la = BehaviorSubject.create();
  this.fa = BehaviorSubject.create();
}
项目:carbon-gateway-framework    文件:Worker.java   
public Observable submit(UUID id, CarbonMessage cMsg, CarbonCallback cCallback) {

        RxContext rxContext = new RxContext(id.toString(), cMsg, cCallback);
        BehaviorSubject<RxContext> behaviorSubject = BehaviorSubject.create();

        Future f = WORKER_EXECUTOR_SERVICE.submit(() -> {
            try {
                log.debug("Thread using carbon message with UUID " + id);
                mediators.getFirstMediator().receive(cMsg, cCallback);
                log.debug("mediator receive returned");
                behaviorSubject.onNext(rxContext);
            } catch (Exception e) {
                log.error("Error while mediating", e);
                //behaviorSubject.onNext(rxContext);
            }
        });

        f.isDone();
        return behaviorSubject;
    }
项目:android-rxjava-demo    文件:ConstitutionServiceImpl.java   
@Inject
ConstitutionServiceImpl(final Gson gson,
                        @ConstitutionPref final StringPreference cachedConstitution) {

    // initialize constitution from a cached value
    final String json = cachedConstitution.get();
    LOGD(TAG, json);
    UsConstitution cached = gson.fromJson(json, UsConstitution.class);
    constitutionSubject = BehaviorSubject.create(cached);
    constitutionSubject.skip(1)
            .subscribe(new Action1<Constitution>() {
                @Override
                public void call(Constitution constitution) {
                    //save all subsequent updates to the constitution
                    final String newConstitution = gson.toJson(constitution);
                    cachedConstitution.set(newConstitution);
                }
            });

}
项目:RxRoboBase    文件:WriteTests.java   
@Test
public void testObserveChildren() throws Exception {
    BehaviorSubject<FirebaseChildEvent<DataSnapshot>> events = BehaviorSubject.create();

    observeChildren(reference)
            .subscribe(events);

    await(setValue(reference.child("foo"), "bar"));
    FirebaseChildEvent<DataSnapshot> add = events.getValue();
    assertThat(add.eventType, is(FirebaseChildEvent.TYPE_ADD));
    assertThat(add.value.getValue(String.class), is("bar"));
    assertThat(add.value.getKey(), is("foo"));

    await(setValue(reference.child("foo"), "baz"));
    FirebaseChildEvent<DataSnapshot> edit = events.getValue();
    assertThat(edit.eventType, is(FirebaseChildEvent.TYPE_CHANGE));
    assertThat(edit.value.getValue(String.class), is("baz"));

    await(setValue(reference.child("foo"), null));
    FirebaseChildEvent<DataSnapshot> remove = events.getValue();
    assertThat(remove.eventType, is(FirebaseChildEvent.TYPE_REMOVE));
}
项目:rxjava-and-swt    文件:EventVsFrpTwoWay.java   
public FrpBased(Composite parent, int initialValue) {
    super(parent, initialValue);
    value = BehaviorSubject.create(initialValue);
    RxBox<String> rwText = SwtRx.textImmediate(inputField);
    Rx.subscribe(rwText, text -> {
        try {
            int parsed = Integer.parseInt(text);
            value.onNext(parsed);
        } catch (Exception error) {
            outputField.setText(msgForError(error));
        }
    });
    scale.addListener(SWT.Selection, e -> {
        value.onNext(scale.getSelection());
    });
    Rx.subscribe(value.map(Object::toString), rwText::set);
    Rx.subscribe(value.map(this::msgForValue), outputField::setText);
    Rx.subscribe(value, scale::setSelection);
}
项目:FMTech    文件:DataService.java   
public Subscription subscribeData(@NonNull Observer<List<Image>> observer){
    if(null == mCache){
        mCache = BehaviorSubject.create();
        Observable.create(new Observable.OnSubscribe<List<Image>>(){
            @Override
            public void call(Subscriber<? super List<Image>> subscriber) {
                List<Image> images = DataBase.instance().readImagesInfos();
                if(null == images){
                    setDataSource(DATA_SOURCE_NETWORK);
                    loadDataFromNetwork();
                }else{
                    setDataSource(DATA_SOURCE_DISK);
                    subscriber.onNext(images);
                }
            }
        })
        .subscribeOn(Schedulers.io())
        .subscribe(mCache);
    }else{
        setDataSource(DATA_SOURCE_MEMORY);
    }

    return mCache.observeOn(AndroidSchedulers.mainThread()).subscribe(observer);
}
项目:RxRestSample    文件:ObservableGithubRepos.java   
public Observable<String> updateRepo(String userName) {
    BehaviorSubject<String> requestSubject = BehaviorSubject.create();

    Observable<List<Repo>> observable = mClient.getRepos(userName);
    observable.subscribeOn(Schedulers.io())
              .observeOn(Schedulers.io())
              .subscribe(l -> {
                                mDatabase.insertRepoList(l);
                                requestSubject.onNext(userName);},
                         e -> requestSubject.onError(e),
                         () -> requestSubject.onCompleted());
    return requestSubject.asObservable();
}
项目:mesos-rxjava    文件:AwaitableEventSubscriberDecoratorTest.java   
@Test
public void awaitEventWorks_onNext() throws Exception {
    final TestSubscriber<String> testSubscriber = new TestSubscriber<>();
    final AwaitableEventSubscriberDecorator<String> sub = new AwaitableEventSubscriberDecorator<>(testSubscriber);

    final BehaviorSubject<String> subject = BehaviorSubject.create();

    final Subscription subscription = subject.subscribe(sub);

    async.run(() -> subject.onNext("hello"));

    sub.awaitEvent();
    testSubscriber.assertValue("hello");
    testSubscriber.assertNoTerminalEvent();
    subscription.unsubscribe();
}
项目:mesos-rxjava    文件:AwaitableEventSubscriberDecoratorTest.java   
@Test
public void awaitEventWorks_onError() throws Exception {
    final TestSubscriber<String> testSubscriber = new TestSubscriber<>();
    final AwaitableEventSubscriberDecorator<String> sub = new AwaitableEventSubscriberDecorator<>(testSubscriber);

    final BehaviorSubject<String> subject = BehaviorSubject.create();

    final Subscription subscription = subject.subscribe(sub);

    final RuntimeException e = new RuntimeException("doesn't matter");
    async.run(() -> subject.onError(e));

    sub.awaitEvent();
    testSubscriber.assertNoValues();
    testSubscriber.assertError(e);
    subscription.unsubscribe();
}
项目:mesos-rxjava    文件:AwaitableEventSubscriberDecoratorTest.java   
@Test
public void awaitEventWorks_onCompleted() throws Exception {
    final TestSubscriber<String> testSubscriber = new TestSubscriber<>();
    final AwaitableEventSubscriberDecorator<String> sub = new AwaitableEventSubscriberDecorator<>(testSubscriber);

    final BehaviorSubject<String> subject = BehaviorSubject.create();

    final Subscription subscription = subject.subscribe(sub);

    async.run(subject::onCompleted);

    sub.awaitEvent();
    testSubscriber.assertNoValues();
    testSubscriber.assertCompleted();
    subscription.unsubscribe();
}
项目:mesos-rxjava    文件:AwaitableEventSubscriberDecoratorTest.java   
@Test
public void awaitEventWorks() throws Exception {
    final TestSubscriber<String> testSubscriber = new TestSubscriber<>();
    final AwaitableEventSubscriberDecorator<String> sub = new AwaitableEventSubscriberDecorator<>(testSubscriber);

    final BehaviorSubject<String> subject = BehaviorSubject.create();

    final Subscription subscription = subject.subscribe(sub);

    async.run(() -> {
        subject.onNext("hello");
        subject.onNext("world");
        subject.onNext("!");
        subject.onCompleted();
    });

    sub.awaitEvent(Integer.MAX_VALUE);
    testSubscriber.assertValues("hello", "world", "!");
    testSubscriber.assertCompleted();
    testSubscriber.assertNoErrors();
    subscription.unsubscribe();
}
项目:Jockey    文件:LocalMusicStore.java   
@Override
public Observable<Boolean> refresh() {
    mSongLoadingState.onNext(true);
    mArtistLoadingState.onNext(true);
    mAlbumLoadingState.onNext(true);
    mGenreLoadingState.onNext(true);

    BehaviorSubject<Boolean> result = BehaviorSubject.create();

    MediaStoreUtil.promptPermission(mContext)
            .observeOn(Schedulers.io())
            .map(granted -> {
                if (granted) {
                    if (mSongs != null) {
                        mSongs.onNext(getAllSongs());
                    }
                    if (mArtists != null) {
                        mArtists.onNext(getAllArtists());
                    }
                    if (mAlbums != null) {
                        mAlbums.onNext(getAllAlbums());
                    }
                    if (mGenres != null) {
                        mGenres.onNext(getAllGenres());
                    }
                }
                mSongLoadingState.onNext(false);
                mArtistLoadingState.onNext(false);
                mAlbumLoadingState.onNext(false);
                mGenreLoadingState.onNext(false);
                return granted;
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(result);

    return result.asObservable();
}
项目:Jockey    文件:LocalMusicStore.java   
@Override
public Observable<List<Song>> getSongs() {
    if (mSongs == null) {
        mSongs = BehaviorSubject.create();
        mSongLoadingState.onNext(true);

        MediaStoreUtil.getPermission(mContext)
                .observeOn(Schedulers.io())
                .subscribe(granted -> {
                    if (granted) {
                        mSongs.onNext(getAllSongs());
                    } else {
                        mSongs.onNext(Collections.emptyList());
                    }
                    mSongLoadingState.onNext(false);
                }, throwable -> {
                    Timber.e(throwable, "Failed to query MediaStore for songs");
                });
    }
    return mSongs.asObservable().observeOn(AndroidSchedulers.mainThread());
}
项目:Jockey    文件:LocalMusicStore.java   
@Override
public Observable<List<Genre>> getGenres() {
    if (mGenres == null) {
        mGenres = BehaviorSubject.create();
        mGenreLoadingState.onNext(true);

        MediaStoreUtil.getPermission(mContext)
                .observeOn(Schedulers.io())
                .subscribe(granted -> {
                    if (granted) {
                        mGenres.onNext(getAllGenres());
                    } else {
                        mGenres.onNext(Collections.emptyList());
                    }
                    mGenreLoadingState.onNext(false);
                }, throwable -> {
                    Timber.e(throwable, "Failed to query MediaStore for genres");
                });
    }
    return mGenres.asObservable().observeOn(AndroidSchedulers.mainThread());
}
项目:Jockey    文件:MediaStoreUtil.java   
@TargetApi(Build.VERSION_CODES.M)
public static Observable<Boolean> promptPermission(Context context) {
    if (sPermissionObservable == null) {
        sPermissionObservable = BehaviorSubject.create();
    }

    if (hasPermission(context)) {
        if (!sPermissionObservable.hasValue() || !sPermissionObservable.getValue()) {
            sPermissionObservable.onNext(true);
        }
        return getPermissionObservable();
    }

    RxPermissions.getInstance(context).request(READ_EXTERNAL_STORAGE, WRITE_EXTERNAL_STORAGE)
            .subscribe(sPermissionObservable::onNext, throwable -> {
                Timber.i(throwable, "Failed to get storage permission");
            });

    return getPermissionObservable();
}
项目:Jockey    文件:DemoMusicStore.java   
@Override
public Observable<List<Song>> getSongs() {
    if (mSongs == null) {
        BehaviorSubject<List<Song>> subject = BehaviorSubject.create();

        Observable.fromCallable(() -> this.<Song>parseJson(SONGS_FILENAME, Song[].class))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(subject::onNext, subject::onError);

        mSongs = subject.asObservable();
    }

    return mSongs;
}
项目:Jockey    文件:DemoMusicStore.java   
@Override
public Observable<List<Album>> getAlbums() {
    if (mAlbums == null) {
        BehaviorSubject<List<Album>> subject = BehaviorSubject.create();

        Observable.fromCallable(() -> this.<Album>parseJson(ALBUMS_FILENAME, Album[].class))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(subject::onNext, subject::onError);

        mAlbums = subject.asObservable();
    }

    return mAlbums;
}
项目:Jockey    文件:DemoMusicStore.java   
@Override
public Observable<List<Artist>> getArtists() {
    if (mArtists == null) {
        BehaviorSubject<List<Artist>> subject = BehaviorSubject.create();

        Observable.fromCallable(() -> this.<Artist>parseJson(ARTISTS_FILENAME, Artist[].class))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(subject::onNext, subject::onError);

        mArtists = subject.asObservable();
    }

    return mArtists;
}
项目:Jockey    文件:DemoMusicStore.java   
@Override
public Observable<List<Genre>> getGenres() {
    if (mGenres == null) {
        BehaviorSubject<List<Genre>> subject = BehaviorSubject.create();

        Observable.fromCallable(() -> this.<Genre>parseJson(GENRES_FILENAME, Genre[].class))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(subject::onNext, subject::onError);

        mGenres = subject.asObservable();
    }

    return mGenres;
}