Java 类io.reactivex.Maybe 实例源码

项目:Reactive-Programming-With-Java-9    文件:DemoMaybe.java   
public static void main(String[] args) {
    // TODO Auto-generated method stub

    Maybe<List<String>> month_maybe = Maybe.create(emitter -> {
        try {
            String[] monthArray = { "Jan", "Feb", "Mar", "Apl", "May", "Jun", "July", "Aug", "Sept", "Oct", "Nov",
                    "Dec" };

            List<String> months = Arrays.asList(monthArray);
            if (months != null && !months.isEmpty()) {
                emitter.onSuccess(months);
            } else {
                emitter.onComplete();
            }
        } catch (Exception e) {
            emitter.onError(e);
        }
    });
    month_maybe.subscribe(s->System.out.println(s));

}
项目:NetDiscovery    文件:HttpClientDownloader.java   
@Override
public Maybe<Response> download(final Request request) {

    return Maybe.create(new MaybeOnSubscribe<CloseableHttpResponse>(){

        @Override
        public void subscribe(MaybeEmitter emitter) throws Exception {

            emitter.onSuccess(httpManager.getResponse(request));
        }
    }).map(new Function<CloseableHttpResponse, Response>() {

        @Override
        public Response apply(CloseableHttpResponse closeableHttpResponse) throws Exception {

            String html = EntityUtils.toString(closeableHttpResponse.getEntity(), "UTF-8");
            Response response = new Response();
            response.setContent(html);
            response.setStatusCode(closeableHttpResponse.getStatusLine().getStatusCode());
            return response;
        }
    });
}
项目:DisposableAttach    文件:DisposableAttachMaybeTest.java   
@Test public void test() {

        MaybeSubject<String> subject = MaybeSubject.create();
        Maybe<String> maybeSource = subject.hide();


        TestObserver testObserver = new TestObserver();
        CompositeDisposable composite = new CompositeDisposable();
        Disposable disposable = maybeSource
                .compose(DisposableAttach.<String>to(composite))
                .subscribeWith(testObserver);

        subject.onSuccess("Foo");
        testObserver.assertValue("Foo");
        assertTrue(composite.size() == 1);
        composite.dispose();
        assertTrue(composite.size() == 0);
        assertTrue(composite.isDisposed());
        assertTrue(disposable.isDisposed());
        assertTrue(testObserver.isDisposed());
    }
项目:Phoenix-for-VK    文件:KeysRamStore.java   
@Override
public Maybe<AesKeyPair> findKeyPairFor(int accountId, long sessionId) {
    return Maybe.create(e -> {
        List<AesKeyPair> pairs = mData.get(accountId);
        AesKeyPair result = null;
        if (Objects.nonNull(pairs)) {
            for (AesKeyPair pair : pairs) {
                if (pair.getSessionId() == sessionId) {
                    result = pair;
                    break;
                }
            }
        }

        if (Objects.nonNull(result)) {
            e.onSuccess(result);
        }

        e.onComplete();
    });
}
项目:wayf-cloud    文件:FacadePoliciesTest.java   
@Test
public void testMaybeToSingleEmpty() {
    final List<Object> results = new LinkedList<>();

    singleOrException(Maybe.empty(), HttpStatus.SC_BAD_REQUEST, "Too few elements")
            .subscribe((ignore) -> fail(), (e) -> results.add(e));

    assertEquals(1, results.size());

    Object result = results.get(0);

    if (result.getClass() != ServiceException.class) {
        fail();
    }

    ServiceException serviceException = (ServiceException) result;

    assertEquals(HttpStatus.SC_BAD_REQUEST, serviceException.getStatusCode());
    assertEquals("Too few elements", serviceException.getMessage());
}
项目:android-arch-mvvm    文件:ModuleCall.java   
public void enqueue(final ModuleCallback<T> callback) {
    synchronized (this) {
        if (mExecuted) {
            throw new IllegalStateException("每个ModuleCall只能enqueue一次");
        }
        mExecuted = true;
    }
    if (mCanceled || mDone) {
        return;
    }
    mModuleCallback = callback;

    if (mObservable instanceof Observable) {
        subscribeObservable((Observable<T>) mObservable);
    } else if (mObservable instanceof Single) {
        subscribeSingle((Single<T>) mObservable);
    } else if (mObservable instanceof Flowable) {
        subscribeFlowable((Flowable<T>) mObservable);
    } else {
        subscribeMaybe((Maybe<T>) mObservable);
    }
}
项目:NetDiscovery    文件:OkHttpDownloader.java   
@Override
public Maybe<Response> download(Request request) {

    okhttp3.Request okrequest = new okhttp3.Request.Builder()
            .url(request.getUrl())
            .build();

    return Maybe.create(new MaybeOnSubscribe<okhttp3.Response>(){

        @Override
        public void subscribe(MaybeEmitter emitter) throws Exception {

            emitter.onSuccess(client.newCall(okrequest).execute());
        }
    }).map(new Function<okhttp3.Response, Response>() {

        @Override
        public Response apply(okhttp3.Response resp) throws Exception {

            String html = resp.body().string();
            Response response = new Response();
            response.setContent(html);
            response.setStatusCode(resp.code());
            return response;
        }
    });
}
项目:RxRealm    文件:RxRealm.java   
public static <T extends RealmObject> Maybe<T> getElement(final Function<Realm, T> query) {
    return Maybe.create(emitter -> {
        final Realm realm = Realm.getDefaultInstance();
        final T result = query.apply(realm);
        if (result != null && result.isLoaded() && result.isValid()) {
            emitter.onSuccess(realm.copyFromRealm(result));
        } else {
            emitter.onComplete();
        }
        emitter.setCancellable(realm::close);
    });
}
项目:WeatherWeight    文件:ForecastPresenterTest.java   
@Test
public void loadLastForecastOnComplete() {
    when(lastForecastStore.get()).thenReturn(Maybe.<Channel>empty());

    presenter.loadLastForecast();
    verify(view).showContent();
}
项目:showcase-android    文件:RxFirebaseStorage.java   
/**
 * Asynchronously downloads the object at this {@link StorageReference} to a specified system filepath.
 *
 * @param storageRef     represents a reference to a Google Cloud Storage object.
 * @param destinationUri a file system URI representing the path the object should be downloaded to.
 * @return a {@link Maybe} which emits an {@link FileDownloadTask.TaskSnapshot} if success.
 */
@NonNull
public static Maybe<FileDownloadTask.TaskSnapshot> getFile(@NonNull final StorageReference storageRef,
                                                           @NonNull final Uri destinationUri) {
    return Maybe.create(new MaybeOnSubscribe<FileDownloadTask.TaskSnapshot>() {
        @Override
        public void subscribe(MaybeEmitter<FileDownloadTask.TaskSnapshot> emitter) throws Exception {
            RxHandler.assignOnTask(emitter, storageRef.getFile(destinationUri));
        }
    });
}
项目:LifecycleAwareRx    文件:LifecycleTest.java   
@Test
public void viewsAreOnlyCalledWhenLifecycleActiveWithMaybe() throws Exception {
    Maybe.just("test")
        .compose(LifecycleBinder.bind(lifecycleOwner, new DisposableMaybeObserver<String>() {
            @Override
            public void onSuccess(final String value) {
                LifecycleTest.this.methodOnViewCalled = true;
            }

            @Override
            public void onError(final Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        }));

    // Need to wait to give it time to potentially fail
    TimeUnit.MILLISECONDS.sleep(100);
    assertEquals(false, methodOnViewCalled);

    lifecycleOwner.handleLifecycleEvent(Lifecycle.Event.ON_CREATE);
    TimeUnit.MILLISECONDS.sleep(100);
    assertEquals(false, methodOnViewCalled);

    lifecycleOwner.handleLifecycleEvent(Lifecycle.Event.ON_START);
    TimeUnit.MILLISECONDS.sleep(100);
    // At this point the views should now be called since the lifecycle is active
    assertEquals(true, methodOnViewCalled);
}
项目:wayf-cloud    文件:IdentityProviderDaoDbImpl.java   
@Override
public Maybe<IdentityProvider> read(Long id) {
    IdentityProviderQuery query = new IdentityProviderQuery().setId(id);

    return Single.just(query)
            .compose((single) -> DaoPolicies.applySingle(single))
            .flatMapMaybe((_query) -> dbExecutor.executeSelectFirst(readSql, _query, resultClass));
}
项目:showcase-android    文件:RxFirebaseDatabase.java   
/**
 * Method which retrieve a list of DataSnapshot from multiple {@link DatabaseReference}.
 *
 * @param whereRefs array of {@link DatabaseReference references.}
 * @return a {@link Flowable} which emmit {@link DataSnapshot} from the given queries.
 */
@NonNull
public static Flowable<DataSnapshot> observeMultipleSingleValueEvent(@NonNull DatabaseReference... whereRefs) {
   @SuppressWarnings("unchecked")
   Maybe<DataSnapshot>[] singleQueries = (Maybe<DataSnapshot>[]) Array.newInstance(Maybe.class, whereRefs.length);
   for (int i = 0; i < whereRefs.length; i++) {
      singleQueries[i] = (observeSingleValueEvent(whereRefs[i]));
   }
   return Maybe.mergeArray(singleQueries);
}
项目:showcase-android    文件:RxFirebaseStorage.java   
/**
 * Asynchronously uploads a stream of data to this {@link StorageReference}.
 *
 * @param storageRef represents a reference to a Google Cloud Storage object.
 * @param metadata   {@link StorageMetadata} containing additional information (MIME type, etc.) about the object being uploaded.
 * @return a {@link Maybe} which emits an {@link StorageMetadata} if success.
 */
@NonNull
public static Maybe<StorageMetadata> updateMetadata(@NonNull final StorageReference storageRef,
                                                    @NonNull final StorageMetadata metadata) {
    return Maybe.create(new MaybeOnSubscribe<StorageMetadata>() {
        @Override
        public void subscribe(MaybeEmitter<StorageMetadata> emitter) throws Exception {
            RxHandler.assignOnTask(emitter, storageRef.updateMetadata(metadata));
        }
    });
}
项目:RIBs    文件:WorkerBinderTest.java   
@Test
public void bind_whenSubscribingWithWorkerLifecycle_shouldMapToWorkerStartEvent() {
  BehaviorRelay<InteractorEvent> lifecycle = BehaviorRelay.createDefault(InteractorEvent.ACTIVE);
  bind(lifecycle, worker);
  verify(worker).onStart(argumentCaptor.capture());

  Maybe observable = argumentCaptor.getValue().requestScope();
  WorkerEventCallback callback = new WorkerEventCallback();
  observable.subscribe(callback);
  lifecycle.accept(InteractorEvent.ACTIVE);
  assertThat(callback.getWorkerEvent()).isEqualTo(WorkerEvent.START);
}
项目:RxTask    文件:RxFusedLocationProviderClient.java   
@NonNull
@RequiresPermission(
        anyOf = {"android.permission.ACCESS_COARSE_LOCATION", "android.permission" +
                ".ACCESS_FINE_LOCATION"}
)
public Maybe<Location> getLastLocation() {
    return MaybeTask.create(() -> client.getLastLocation());
}
项目:trust-wallet-android    文件:TransactionRepository.java   
@Override
public Maybe<Transaction> findTransaction(Wallet wallet, String transactionHash) {
    return fetchTransaction(wallet)
            .firstElement()
               .flatMap(transactions -> {
                for (Transaction transaction : transactions) {
                    if (transaction.hash.equals(transactionHash)) {
                        return Maybe.just(transaction);
                    }
                }
                return null;
            });
}
项目:showcase-android    文件:RxFirebaseStorage.java   
/**
 * Asynchronously downloads the object at this {@link StorageReference} via a InputStream.
 *
 * @param storageRef represents a reference to a Google Cloud Storage object.
 * @return a {@link Maybe} which emits an {@link StreamDownloadTask.TaskSnapshot} if success.
 */
@NonNull
public static Maybe<StreamDownloadTask.TaskSnapshot> getStream(@NonNull final StorageReference storageRef) {
    return Maybe.create(new MaybeOnSubscribe<StreamDownloadTask.TaskSnapshot>() {
        @Override
        public void subscribe(MaybeEmitter<StreamDownloadTask.TaskSnapshot> emitter) throws Exception {
            RxHandler.assignOnTask(emitter, storageRef.getStream());
        }
    });
}
项目:wayf-cloud    文件:PasswordCredentialsDaoDbImpl.java   
@Override
public Maybe<String> getSaltForEmail(String email) {
    LOG.debug("Getting salt for [{}]", email);

    PasswordCredentials credentials = new PasswordCredentials();
    credentials.setEmailAddress(email);

    return dbExecutor.executeSelectFirst(getSaltSql, credentials, PasswordCredentials.class)
            .map((_credentials) -> _credentials.getSalt());
}
项目:wayf-cloud    文件:PublisherRegistrationDaoDbImpl.java   
@Override
public Maybe<PublisherRegistration> read(Long id) {
    LOG.debug("Reading publisher registration with id [{}] in db", id);

    PublisherRegistration publisherRegistration = new PublisherRegistration();
    publisherRegistration.setId(id);

    return Single.just(publisherRegistration)
            .compose((single) -> DaoPolicies.applySingle(single))
            .flatMapMaybe((_publisherRegistration) -> dbExecutor.executeSelectFirst(readSql, _publisherRegistration, PublisherRegistration.class));
}
项目:rxtasks    文件:RxTask.java   
/**
 * @param callable
 * @param <R>
 * @return
 */
@CheckReturnValue
@NonNull
public static <R> Maybe<R> maybe(@NonNull final Callable<Task<R>> callable) {
    return Single.fromCallable(callable).flatMapMaybe(
            new Function<Task<R>, MaybeSource<? extends R>>() {
        @Override
        public MaybeSource<? extends R> apply(Task<R> task) throws Exception {
            return maybe(task);
        }
    });
}
项目:showcase-android    文件:RxFirebaseStorage.java   
/**
 * Asynchronously uploads from a content URI to this {@link StorageReference}.
 *
 * @param storageRef        represents a reference to a Google Cloud Storage object.
 * @param uri               The source of the upload. This can be a file:// scheme or any content URI. A content resolver will be used to load the data.
 * @param metadata          {@link StorageMetadata} containing additional information (MIME type, etc.) about the object being uploaded.
 * @param existingUploadUri If set, an attempt is made to resume an existing upload session as defined by getUploadSessionUri().
 * @return a {@link Maybe} which emits an {@link UploadTask.TaskSnapshot} if success.
 */
@NonNull
public static Maybe<UploadTask.TaskSnapshot> putFile(@NonNull final StorageReference storageRef,
                                                     @NonNull final Uri uri,
                                                     @NonNull final StorageMetadata metadata,
                                                     @NonNull final Uri existingUploadUri) {
    return Maybe.create(new MaybeOnSubscribe<UploadTask.TaskSnapshot>() {
        @Override
        public void subscribe(MaybeEmitter<UploadTask.TaskSnapshot> emitter) throws Exception {
            RxHandler.assignOnTask(emitter, storageRef.putFile(uri, metadata, existingUploadUri));
        }
    });
}
项目:WeatherWeight    文件:ForecastPresenterTest.java   
@Test
public void loadLastForecastOnError() {
    Throwable error = new Exception("testing");
    when(lastForecastStore.get()).thenReturn(Maybe.<Channel>error(error));

    presenter.loadLastForecast();
    verify(view).showError(error, false);
}
项目:dztools    文件:TradeUtilityTest.java   
@Test
public void orderByIDReturnsValueFromOrderLookup() {
    stubOrderByID().thenReturn(Maybe.just(orderMockA));

    final IOrder order = tradeUtility
        .orderByID(orderID)
        .blockingGet();

    assertThat(order, equalTo(orderMockA));
}
项目:RIBs    文件:WorkerBinderTest.java   
@Test
public void unbind_whenSubscribingWithWorkerLifecycle_shouldMapToWorkerStopEvent() {
  BehaviorRelay<InteractorEvent> lifecycle = BehaviorRelay.createDefault(InteractorEvent.ACTIVE);
  WorkerUnbinder unbinder = bind(lifecycle, worker);
  verify(worker).onStart(argumentCaptor.capture());

  Maybe observable = argumentCaptor.getValue().requestScope();
  WorkerEventCallback callback = new WorkerEventCallback();
  observable.subscribe(callback);

  unbinder.unbind();

  assertThat(callback.getWorkerEvent()).isEqualTo(WorkerEvent.STOP);
}
项目:dztools    文件:OrderRepositoryTest.java   
@Test
public void whenOrderIsStoredBeforeTheIDIsFound() {
    stubIDFromOrder(orderMockA).thenReturn(Maybe.just(orderID));

    subscribeOrderStore();

    subscribeGetByID().assertValue(orderMockA);
}
项目:dztools    文件:OrderRepositoryTest.java   
@Test
public void storeOrdersFilterCorrect() {
    stubIDFromOrder(orderMockA).thenReturn(Maybe.empty());
    stubIDFromOrder(orderMockB).thenReturn(Maybe.just(orderID));

    subscribeOrdersStore().assertComplete();

    subscribeGetByID().assertValue(orderMockB);
}
项目:yabaking    文件:RecipesRepositoryImpl.java   
private Maybe<List<Recipe>> getFromRemoteDataSourceAndStoreToCache() {
    return recipesRemoteDataSource
            .list()
            .flatMapMaybe(recipes -> recipesLocalDataSource.save(recipes)
                    .andThen(recipesLocalDataSource.list())
            );
}
项目:android-rxmvp-sandbox    文件:RxSaveState.java   
/**
 * Gets the current state as a Maybe. The Maybe will emit a single bundle if there is a previous state
 * or if the state is missing it will emit no events and call the onComplete event
 */
@NonNull
public static Maybe<Bundle> getSavedState(@NonNull Activity activity) {
  PreConditions.throwIfNotOnMainThread();
  Bundle prevState = getSavedStateDirect(activity);
  if(prevState != null) {
    return Maybe.just(prevState);
  } else {
    return Maybe.empty();
  }
}
项目:yabaking    文件:DBFlowLocalDataSource.java   
@Override
public Maybe<Recipe> get(@NonNull Integer id) {
    return Maybe.defer(() -> {
        DBFlowRecipe dbFlowRecipe = SQLite.select().from(DBFlowRecipe.class)
                .where(DBFlowRecipe_Table.id.eq(id))
                .querySingle();
        if (dbFlowRecipe == null) {
            return Maybe.empty();
        } else {
            return Maybe.just(DBFlowRecipe.Mapper.toDomain(dbFlowRecipe));
        }
    });
}
项目:showcase-android    文件:RxFirebaseStorage.java   
/**
 * Asynchronously uploads byte data to this {@link StorageReference}.
 *
 * @param storageRef represents a reference to a Google Cloud Storage object.
 * @param bytes      The byte[] to upload.
 * @return a {@link Maybe} which emits an {@link UploadTask.TaskSnapshot} if success.
 */
@NonNull
public static Maybe<UploadTask.TaskSnapshot> putBytes(@NonNull final StorageReference storageRef,
                                                      @NonNull final byte[] bytes) {
    return Maybe.create(new MaybeOnSubscribe<UploadTask.TaskSnapshot>() {
        @Override
        public void subscribe(MaybeEmitter<UploadTask.TaskSnapshot> emitter) throws Exception {
            RxHandler.assignOnTask(emitter, storageRef.putBytes(bytes));
        }
    });
}
项目:showcase-android    文件:RxFirebaseStorage.java   
/**
 * Asynchronously downloads the object from this {@link StorageReference} a byte array will be allocated large enough to hold the entire file in memory.
 *
 * @param storageRef           represents a reference to a Google Cloud Storage object.
 * @param maxDownloadSizeBytes the maximum allowed size in bytes that will be allocated. Set this parameter to prevent out of memory conditions from occurring.
 *                             If the download exceeds this limit, the task will fail and an IndexOutOfBoundsException will be returned.
 * @return a {@link Maybe} which emits an byte[] if success.
 */
@NonNull
public static Maybe<byte[]> getBytes(@NonNull final StorageReference storageRef,
                                     final long maxDownloadSizeBytes) {
    return Maybe.create(new MaybeOnSubscribe<byte[]>() {
        @Override
        public void subscribe(MaybeEmitter<byte[]> emitter) throws Exception {
            RxHandler.assignOnTask(emitter, storageRef.getBytes(maxDownloadSizeBytes));
        }
    });
}
项目:WeatherWeight    文件:SharedPrefLastForecastStore.java   
@Override
public Maybe<Channel> get() {
    return Maybe.fromCallable(new Callable<Channel>() {
        @Override
        public Channel call() throws Exception {
            String json = sharedPreferences.getString(KEY, null);
            if (json == null) {
                return null;
            }
            return gson.fromJson(json, Channel.class);
        }
    });
}
项目:showcase-android    文件:RxFirebaseAuth.java   
/**
 * Asynchronously signs in as an anonymous user.
 * If there is already an anonymous user signed in, that user will be returned; otherwise, a new anonymous user identity will be created and returned.
 *
 * @param firebaseAuth firebaseAuth instance.
 * @return a {@link Maybe} which emits an {@link AuthResult} if success.
 * @see <a href="https://firebase.google.com/docs/reference/android/com/google/firebase/auth/FirebaseAuth">Firebase Auth API</a>
 */
@NonNull
public static Maybe<AuthResult> signInAnonymously(@NonNull final FirebaseAuth firebaseAuth) {
   return Maybe.create(new MaybeOnSubscribe<AuthResult>() {
      @Override
      public void subscribe(MaybeEmitter<AuthResult> emitter) throws Exception {
         RxHandler.assignOnTask(emitter, firebaseAuth.signInAnonymously());
      }
   });
}
项目:wayf-cloud    文件:DeviceAccessDaoDbImpl.java   
@Override
public Maybe<DeviceAccess> read(Long id) {
    DeviceAccess session = new DeviceAccess();
    session.setId(id);

    return Maybe.just(session)
            .compose((maybe) -> DaoPolicies.applyMaybe(maybe))
            .flatMap((_session) -> dbExecutor.executeSelectFirst(readSql, _session, DeviceAccess.class));
}
项目:showcase-android    文件:RxFirebaseStorage.java   
/**
 * Asynchronously downloads the object at this {@link StorageReference} via a InputStream.
 *
 * @param storageRef represents a reference to a Google Cloud Storage object.
 * @param processor  A StreamDownloadTask.StreamProcessor that is responsible for reading data from the InputStream.
 *                   The StreamDownloadTask.StreamProcessor is called on a background thread and checked exceptions thrown
 *                   from this object will be returned as a failure to the OnFailureListener registered on the StreamDownloadTask.
 * @return a {@link Maybe} which emits an {@link StreamDownloadTask.TaskSnapshot} if success.
 */
@NonNull
public static Maybe<StreamDownloadTask.TaskSnapshot> getStream(@NonNull final StorageReference storageRef,
                                                               @NonNull final StreamDownloadTask.StreamProcessor processor) {
    return Maybe.create(new MaybeOnSubscribe<StreamDownloadTask.TaskSnapshot>() {
        @Override
        public void subscribe(MaybeEmitter<StreamDownloadTask.TaskSnapshot> emitter) throws Exception {
            RxHandler.assignOnTask(emitter, storageRef.getStream(processor));
        }
    });
}
项目:RIBs    文件:WorkerBinderTest.java   
@Test
public void bind_whenSubscribingWithWorkerLifecycle_shouldMapToWorkerStopEvent() {
  BehaviorRelay<InteractorEvent> lifecycle = BehaviorRelay.createDefault(InteractorEvent.ACTIVE);
  bind(lifecycle, worker);
  verify(worker).onStart(argumentCaptor.capture());

  Maybe observable = argumentCaptor.getValue().requestScope();
  WorkerEventCallback callback = new WorkerEventCallback();
  observable.subscribe(callback);
  lifecycle.accept(InteractorEvent.INACTIVE);
  assertThat(callback.getWorkerEvent()).isEqualTo(WorkerEvent.STOP);
}
项目:wayf-cloud    文件:LoadingCacheTest.java   
@Test
public void testLoad() {
    cache.setCacheLoader((key) -> Maybe.just(getLoaderRandomValue()));

    String randomKey = UUID.randomUUID().toString();

    String loaderValue = cache.get(randomKey).blockingGet();
    assertNotNull(loaderValue);

    String cachedValue = cache.get(randomKey).blockingGet();
    assertEquals(loaderValue, cachedValue);
}
项目:Phoenix-for-VK    文件:MessagesStore.java   
@Override
public Maybe<DraftMessage> findDraftMessage(int accountId, int peerId) {
    return Maybe.create(e -> {
        String[] columns = {MessageColumns._ID, MessageColumns.BODY};
        Uri uri = MessengerContentProvider.getMessageContentUriFor(accountId);

        Cursor cursor = getContext().getContentResolver().query(uri, columns,
                MessageColumns.PEER_ID + " = ? AND " + MessageColumns.STATUS + " = ?",
                new String[]{String.valueOf(peerId), String.valueOf(MessageStatus.EDITING)}, null);

        if (e.isDisposed()) return;

        DraftMessage message = null;
        if (cursor != null) {
            if (cursor.moveToNext()) {
                int id = cursor.getInt(cursor.getColumnIndex(MessageColumns._ID));
                String body = cursor.getString(cursor.getColumnIndex(MessageColumns.BODY));
                message = new DraftMessage(id, body);
            }

            cursor.close();
        }

        if (nonNull(message)) {
            Integer count = getStores().attachments()
                    .getCount(accountId, AttachToType.MESSAGE, message.getId())
                    .blockingGet();

            message.setAttachmentsCount(nonNull(count) ? count : 0);
            e.onSuccess(message);
        }

        e.onComplete();
    });
}
项目:Phoenix-for-VK    文件:OwnersRepositiry.java   
@Override
public Maybe<String> getLocalizedUserActivity(int accountId, int userId) {
    return Maybe.create(e -> {
        String[] uProjection = {UserColumns.LAST_SEEN, UserColumns.ONLINE, UserColumns.SEX};
        Uri uri = MessengerContentProvider.getUserContentUriFor(accountId);
        String where = UserColumns._ID + " = ?";
        String[] args = {String.valueOf(userId)};
        Cursor cursor = getContext().getContentResolver().query(uri, uProjection, where, args, null);

        if (cursor != null) {
            if (cursor.moveToNext()) {
                boolean online = cursor.getInt(cursor.getColumnIndex(UserColumns.ONLINE)) == 1;
                long lastSeen = cursor.getLong(cursor.getColumnIndex(UserColumns.LAST_SEEN));
                int sex = cursor.getInt(cursor.getColumnIndex(UserColumns.SEX));
                String userActivityLine = UserInfoResolveUtil.getUserActivityLine(getContext(), lastSeen, online, sex);

                if (nonNull(userActivityLine)) {
                    e.onSuccess(userActivityLine);
                }
            }

            cursor.close();
        }

        e.onComplete();
    });
}