Java 类io.reactivex.disposables.Disposables 实例源码

项目:GitHub    文件:HandlerScheduler.java   
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");
    if (unit == null) throw new NullPointerException("unit == null");

    if (disposed) {
        return Disposables.disposed();
    }

    run = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }

    return scheduled;
}
项目:Rx_java2_soussidev    文件:PreLollipopNetworkObservingStrategy.java   
private Disposable disposeInUiThread(final Action action) {
    return Disposables.fromAction(new Action() {
        @Override public void run() throws Exception {
            if (Looper.getMainLooper() == Looper.myLooper()) {
                action.run();
            } else {
                final Scheduler.Worker inner = AndroidSchedulers.mainThread().createWorker();
                inner.schedule(new Runnable() {
                    @Override public void run() {
                        try {
                            action.run();
                        } catch (Exception e) {
                            onError("Could not unregister receiver in UI Thread", e);
                        }
                        inner.dispose();
                    }
                });
            }
        }
    });
}
项目:atlas    文件:HandlerScheduler.java   
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");
    if (unit == null) throw new NullPointerException("unit == null");

    if (disposed) {
        return Disposables.disposed();
    }

    run = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }

    return scheduled;
}
项目:RxSWT    文件:EclipseScheduler.java   
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null)
        throw new NullPointerException("run == null");
    if (unit == null)
        throw new NullPointerException("unit == null");

    if (disposed) {
        return Disposables.disposed();
    }

    run = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable scheduled = new ScheduledRunnable(run);

    executeRunnable(title, delay, unit, scheduled);

    // Re-check disposed state for removing in case we were racing a
    // call to dispose().
    if (disposed) {
        return Disposables.disposed();
    }

    return scheduled;
}
项目:RxSWT    文件:DisplayScheduler.java   
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null)
        throw new NullPointerException("run == null");
    if (unit == null)
        throw new NullPointerException("unit == null");

    if (disposed) {
        return Disposables.disposed();
    }

    run = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable scheduled = new ScheduledRunnable(run);

    executeRunnable(display, delay, unit, scheduled);

    // Re-check disposed state for removing in case we were racing a
    // call to dispose().
    if (disposed) {
        return Disposables.disposed();
    }

    return scheduled;
}
项目:RxFirestore    文件:DocumentSnapshotsOnSubscribe.java   
@Override
public void subscribe(final ObservableEmitter<DocumentSnapshot> emitter) throws Exception {
    final EventListener<DocumentSnapshot> listener = new EventListener<DocumentSnapshot>() {
        @Override
        public void onEvent(DocumentSnapshot documentSnapshot, FirebaseFirestoreException e) {
            if (!emitter.isDisposed()) {
                if (e == null) {
                    emitter.onNext(documentSnapshot);
                } else {
                    emitter.onError(e);
                }
            }
        }

    };

    registration = documentReference.addSnapshotListener(listener);

    emitter.setDisposable(Disposables.fromAction(new Action() {
        @Override
        public void run() throws Exception {
            registration.remove();
        }
    }));
}
项目:simple-stack    文件:DatabaseManager.java   
public void openDatabase() {
    disposable = Observable.create((ObservableOnSubscribe<Realm>) emitter -> {
        final Realm observableRealm = Realm.getDefaultInstance();
        final RealmChangeListener<Realm> listener = realm -> {
            if(!emitter.isDisposed()) {
                emitter.onNext(observableRealm);
            }
        };
        observableRealm.addChangeListener(listener);
        emitter.setDisposable(Disposables.fromAction(() -> {
            observableRealm.removeChangeListener(listener);
            observableRealm.close();
        }));
        emitter.onNext(observableRealm);
    }).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler()).subscribe();
}
项目:simple-stack    文件:TaskRepository.java   
private Observable<List<Task>> createResults(QuerySelector<DbTask> querySelector) {
    return Observable.create((ObservableOnSubscribe<List<Task>>) emitter -> {
        Realm realm = Realm.getDefaultInstance();
        final RealmResults<DbTask> dbTasks = querySelector.createQuery(realm);
        final RealmChangeListener<RealmResults<DbTask>> realmChangeListener = element -> {
            if(element.isLoaded() && !emitter.isDisposed()) {
                List<Task> tasks = mapFrom(element);
                if(!emitter.isDisposed()) {
                    emitter.onNext(tasks);
                }
            }
        };
        emitter.setDisposable(Disposables.fromAction(() -> {
            if(dbTasks.isValid()) {
                dbTasks.removeChangeListener(realmChangeListener);
            }
            realm.close();
        }));
        dbTasks.addChangeListener(realmChangeListener);
    }).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler());
}
项目:simple-stack    文件:DatabaseManager.java   
public void openDatabase() {
    disposable = Observable.create((ObservableOnSubscribe<Realm>) emitter -> {
        final Realm observableRealm = Realm.getDefaultInstance();
        final RealmChangeListener<Realm> listener = realm -> {
            if(!emitter.isDisposed()) {
                emitter.onNext(observableRealm);
            }
        };
        observableRealm.addChangeListener(listener);
        emitter.setDisposable(Disposables.fromAction(() -> {
            observableRealm.removeChangeListener(listener);
            observableRealm.close();
        }));
        emitter.onNext(observableRealm);
    }).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler()).subscribe();
}
项目:simple-stack    文件:TaskRepository.java   
private Observable<List<Task>> createResults(QuerySelector<DbTask> querySelector) {
    return Observable.create((ObservableOnSubscribe<List<Task>>) emitter -> {
        Realm realm = Realm.getDefaultInstance();
        final RealmResults<DbTask> dbTasks = querySelector.createQuery(realm);
        final RealmChangeListener<RealmResults<DbTask>> realmChangeListener = element -> {
            if(element.isLoaded() && !emitter.isDisposed()) {
                List<Task> tasks = mapFrom(element);
                if(!emitter.isDisposed()) {
                    emitter.onNext(tasks);
                }
            }
        };
        emitter.setDisposable(Disposables.fromAction(() -> {
            if(dbTasks.isValid()) {
                dbTasks.removeChangeListener(realmChangeListener);
            }
            realm.close();
        }));
        dbTasks.addChangeListener(realmChangeListener);
    }).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler());
}
项目:ReactiveAirplaneMode    文件:ReactiveAirplaneMode.java   
/**
 * Disposes an action in UI Thread
 *
 * @param dispose action to be executed
 * @return Disposable object
 */
private Disposable disposeInUiThread(final Action dispose) {
  return Disposables.fromAction(new Action() {
    @Override public void run() throws Exception {
      if (Looper.getMainLooper() == Looper.myLooper()) {
        dispose.run();
      } else {
        final Scheduler.Worker inner = AndroidSchedulers.mainThread().createWorker();
        inner.schedule(new Runnable() {
          @Override public void run() {
            try {
              dispose.run();
            } catch (Exception exception) {
              onError("Could not unregister receiver in UI Thread", exception);
            }
            inner.dispose();
          }
        });
      }
    }
  });
}
项目:RxFirebase2    文件:AuthStateChangesOnSubscribe.java   
@Override public void subscribe(final ObservableEmitter<FirebaseAuth> emitter) {
  final FirebaseAuth.AuthStateListener listener = new FirebaseAuth.AuthStateListener() {
    @Override public void onAuthStateChanged(@NonNull FirebaseAuth firebaseAuth) {
      if (!emitter.isDisposed()) {
        emitter.onNext(firebaseAuth);
      }
    }
  };

  instance.addAuthStateListener(listener);

  emitter.setDisposable(Disposables.fromAction(new Action() {
    @Override public void run() throws Exception {
      instance.removeAuthStateListener(listener);
    }
  }));
}
项目:RxFirebase2    文件:DataChangesOnSubscribe.java   
@Override public void subscribe(final ObservableEmitter<DataSnapshot> emitter) {
  final ValueEventListener listener = new ValueEventListener() {
    @Override public void onDataChange(DataSnapshot dataSnapshot) {
      if (!emitter.isDisposed()) {
        emitter.onNext(dataSnapshot);
      }
    }

    @Override public void onCancelled(DatabaseError databaseError) {
      if (!emitter.isDisposed()) {
        emitter.onError(databaseError.toException());
      }
    }
  };

  ref.addValueEventListener(listener);
  emitter.setDisposable(Disposables.fromAction(new Action() {
    @Override public void run() throws Exception {
      ref.removeEventListener(listener);
    }
  }));
}
项目:RxiOSMOE    文件:HandlerThreadScheduler.java   
@Override
public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) {
    if (innerSubscription.isDisposed()) {
        return Disposables.empty();
    }

    final ScheduledAction scheduledAction = new ScheduledAction(action, operationQueue);
    final ScheduledExecutorService executor = IOSScheduledExecutorPool.getInstance();

    Future<?> future;
    if (delayTime <= 0) {
        future = executor.submit(scheduledAction);
    } else {
        future = executor.schedule(scheduledAction, delayTime, unit);
    }

    scheduledAction.add(Disposables.fromFuture(future));
    scheduledAction.addParent(innerSubscription);

    return scheduledAction;
}
项目:RxJava2Extensions    文件:ObservableIndexOfTest.java   
@Test
public void foundWithUnconditionalOnCompleteAfter() {
    new Observable<Integer>() {
        @Override
        protected void subscribeActual(Observer<? super Integer> s) {
            s.onSubscribe(Disposables.empty());
            s.onNext(10);
            s.onComplete();
        }
    }
    .compose(ObservableTransformers.indexOf(new Predicate<Integer>() {
        @Override
        public boolean test(Integer v) throws Exception {
            return v == 10;
        }
    }))
    .test()
    .assertResult(0L);
}
项目:grpc-rx    文件:ClientCallsRx.java   
public SingleResponseReceiver(ClientCall<?, RespT> call) {
  this.call = call;

  this.source = new SingleSource<RespT>() {
    @Override
    public void subscribe(SingleObserver<? super RespT> observer) {
      responseObserver = observer;

      // todo which disposable should be used here
      observer.onSubscribe(Disposables.disposed());

      // start call until response gets subscribed
      startCall();

      if (error != null) {
        responseObserver.onError(error);
        error = null;
      }
    }
  };
}
项目:rxfirebase    文件:AuthStateChangesOnSubscribe.java   
/**
 * @param emitter
 */
@Override
public void subscribe(final ObservableEmitter<FirebaseAuth> emitter) {
    final FirebaseAuth.AuthStateListener listener = new FirebaseAuth.AuthStateListener() {
        @Override
        public void onAuthStateChanged(@NonNull FirebaseAuth firebaseAuth) {
            if (!emitter.isDisposed()) {
                emitter.onNext(firebaseAuth);
            }
        }
    };

    instance.addAuthStateListener(listener);

    emitter.setDisposable(Disposables.fromAction(new Action() {
        @Override
        public void run() throws Exception {
            instance.removeAuthStateListener(listener);
        }
    }));
}
项目:buseta    文件:RxBroadcastReceiver.java   
@Override
public void subscribe(ObservableEmitter<Intent> emitter) throws Exception {
    BroadcastReceiver broadcastReceiver = new BroadcastReceiver() {
        @Override
        public void onReceive(Context context, Intent intent) {
            emitter.onNext(intent);
        }
    };
    emitter.setDisposable(Disposables.fromRunnable(() -> { // thank you Jake W.
        try {
            if (contextWeakReference != null && contextWeakReference.get() != null) {
                contextWeakReference.get().unregisterReceiver(broadcastReceiver);
            }
        } catch (IllegalArgumentException ignored) {}
    }));

    if (contextWeakReference != null && contextWeakReference.get() != null) {
        contextWeakReference.get().registerReceiver(broadcastReceiver, intentFilter);
    }
}
项目:durian-rx    文件:RxExecutor.java   
@Override
public <T> Disposable subscribeDisposable(ListenableFuture<? extends T> future, RxListener<T> untracedListener) {
    requireNonNull(untracedListener);
    RxListener<T> listener = Rx.getTracingPolicy().hook(future, untracedListener);
    // when we're unsubscribed, set the flag to false
    Disposable sub = Disposables.empty();
    // add a callback that guards on whether it is still subscribed
    future.addListener(() -> {
        try {
            T value = future.get();
            if (!sub.isDisposed()) {
                listener.onSuccess(value);
            }
        } catch (Throwable error) {
            if (!sub.isDisposed()) {
                listener.onFailure(error);
            }
        }
    }, executor);
    // return the subscription
    return sub;
}
项目:durian-rx    文件:RxExecutor.java   
@Override
public <T> Disposable subscribeDisposable(CompletionStage<? extends T> future, RxListener<T> untracedListener) {
    requireNonNull(untracedListener);
    RxListener<T> listener = Rx.getTracingPolicy().hook(future, untracedListener);

    // when we're unsubscribed, set the flag to false
    Disposable sub = Disposables.empty();
    future.whenCompleteAsync((value, exception) -> {
        if (!sub.isDisposed()) {
            if (exception == null) {
                listener.onSuccess(value);
            } else {
                listener.onFailure(exception);
            }
        }
    }, executor);
    // return the subscription
    return sub;
}
项目:PrefCompat    文件:PrefInternal.java   
@SuppressLint("CommitPrefEdits") public PrefInternal(final SharedPreferences pref) {
  this.pref = pref;
  editor = pref.edit();
  mObservable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
      final SharedPreferences.OnSharedPreferenceChangeListener listener =
          new SharedPreferences.OnSharedPreferenceChangeListener() {
            @Override public void onSharedPreferenceChanged(SharedPreferences sharedPreferences,
                String key) {
              emitter.onNext(key);
            }
          };
      pref.registerOnSharedPreferenceChangeListener(listener);

      emitter.setDisposable(Disposables.fromAction(new Action() {
        @Override public void run() throws Exception {
          log.d("Un-registering PrefCompat");
          pref.unregisterOnSharedPreferenceChangeListener(listener);
        }
      }));
    }
  });
}
项目:durian-swt    文件:SwtExec.java   
@Override
public Disposable schedule(Runnable action) {
    if (unsubscribed) {
        return Disposables.disposed();
    }

    SwtScheduledAction a = new SwtScheduledAction(action, this);

    synchronized (this) {
        if (unsubscribed) {
            return Disposables.disposed();
        }

        tasks.add(a);
    }

    exec.execute(a);

    if (unsubscribed) {
        a.cancel();
        return Disposables.disposed();
    }

    return a;
}
项目:GitHub    文件:RealmHelper.java   
public static <T extends RealmModel> Flowable<RealmResults<T>> getRealmItems(Class clazz, HashMap<String, String> map) {
    return Flowable.create(new FlowableOnSubscribe<RealmResults<T>>() {
        @Override
        public void subscribe(FlowableEmitter<RealmResults<T>> emitter)
                throws Exception {
            Realm realm = Realm.getDefaultInstance();
            RealmQuery<T> query = realm.where(clazz);
            if (map != null) {
                for (Map.Entry<String, String> entry : map.entrySet()) {
                    query.equalTo(entry.getKey(), entry.getValue());
                }
            }
            RealmResults<T> results = query.findAll();

            final RealmChangeListener<RealmResults<T>> listener = _realm -> {
                if (!emitter.isCancelled()) {
                    emitter.onNext(results);
                }
            };
            emitter.setDisposable(Disposables.fromRunnable(() -> {
                results.removeChangeListener(listener);
                realm.close();
            }));
            results.addChangeListener(listener);
            emitter.onNext(results);
        }
    }, BackpressureStrategy.LATEST);
}
项目:GitHub    文件:RealmObservableFactory.java   
@Override
public <E> Observable<CollectionChange<RealmResults<E>>> changesetsFrom(Realm realm, final RealmResults<E> results) {
    final RealmConfiguration realmConfig = realm.getConfiguration();
    return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmResults<E>>>() {
        @Override
        public void subscribe(final ObservableEmitter<CollectionChange<RealmResults<E>>> emitter) throws Exception {
            // Gets instance to make sure that the Realm is open for as long as the
            // Observable is subscribed to it.
            final Realm observableRealm = Realm.getInstance(realmConfig);
            resultsRefs.get().acquireReference(results);
            final OrderedRealmCollectionChangeListener<RealmResults<E>> listener = new OrderedRealmCollectionChangeListener<RealmResults<E>>() {
                @Override
                public void onChange(RealmResults<E> e, OrderedCollectionChangeSet changeSet) {
                    if (!emitter.isDisposed()) {
                        emitter.onNext(new CollectionChange<RealmResults<E>>(results, changeSet));
                    }
                }
            };
            results.addChangeListener(listener);

            // Cleanup when stream is disposed
            emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
                @Override
                public void run() {
                    results.removeChangeListener(listener);
                    observableRealm.close();
                    resultsRefs.get().releaseReference(results);
                }
            }));

            // Emit current value immediately
            emitter.onNext(new CollectionChange<>(results, null));
        }
    });
}
项目:GitHub    文件:RealmObservableFactory.java   
@Override
public <E> Flowable<RealmList<E>> from(Realm realm, final RealmList<E> list) {
    final RealmConfiguration realmConfig = realm.getConfiguration();
    return Flowable.create(new FlowableOnSubscribe<RealmList<E>>() {
        @Override
        public void subscribe(final FlowableEmitter<RealmList<E>> emitter) throws Exception {
            // Gets instance to make sure that the Realm is open for as long as the
            // Observable is subscribed to it.
            final Realm observableRealm = Realm.getInstance(realmConfig);
            listRefs.get().acquireReference(list);
            final RealmChangeListener<RealmList<E>> listener = new RealmChangeListener<RealmList<E>>() {
                @Override
                public void onChange(RealmList<E> results) {
                    if (!emitter.isCancelled()) {
                        emitter.onNext(list);
                    }
                }
            };
            list.addChangeListener(listener);

            // Cleanup when stream is disposed
            emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
                @Override
                public void run() {
                    list.removeChangeListener(listener);
                    observableRealm.close();
                    listRefs.get().releaseReference(list);
                }
            }));

            // Emit current value immediately
            emitter.onNext(list);

        }
    }, BACK_PRESSURE_STRATEGY);
}
项目:GitHub    文件:RealmObservableFactory.java   
@Override
public <E> Observable<CollectionChange<RealmList<E>>> changesetsFrom(Realm realm, final RealmList<E> list) {
    final RealmConfiguration realmConfig = realm.getConfiguration();
    return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmList<E>>>() {
        @Override
        public void subscribe(final ObservableEmitter<CollectionChange<RealmList<E>>> emitter) throws Exception {
            // Gets instance to make sure that the Realm is open for as long as the
            // Observable is subscribed to it.
            final Realm observableRealm = Realm.getInstance(realmConfig);
            listRefs.get().acquireReference(list);
            final OrderedRealmCollectionChangeListener<RealmList<E>> listener = new OrderedRealmCollectionChangeListener<RealmList<E>>() {
                @Override
                public void onChange(RealmList<E> results, OrderedCollectionChangeSet changeSet) {
                    if (!emitter.isDisposed()) {
                        emitter.onNext(new CollectionChange<>(results, changeSet));
                    }
                }
            };
            list.addChangeListener(listener);

            // Cleanup when stream is disposed
            emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
                @Override
                public void run() {
                    list.removeChangeListener(listener);
                    observableRealm.close();
                    listRefs.get().releaseReference(list);
                }
            }));

            // Emit current value immediately
            emitter.onNext(new CollectionChange<>(list, null));
        }
    });
}
项目:GitHub    文件:RealmObservableFactory.java   
@Override
public <E> Flowable<RealmList<E>> from(DynamicRealm realm, final RealmList<E> list) {
    final RealmConfiguration realmConfig = realm.getConfiguration();
    return Flowable.create(new FlowableOnSubscribe<RealmList<E>>() {
        @Override
        public void subscribe(final FlowableEmitter<RealmList<E>> emitter) throws Exception {
            // Gets instance to make sure that the Realm is open for as long as the
            // Observable is subscribed to it.
            final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig);
            listRefs.get().acquireReference(list);
            final RealmChangeListener<RealmList<E>> listener = new RealmChangeListener<RealmList<E>>() {
                @Override
                public void onChange(RealmList<E> results) {
                    if (!emitter.isCancelled()) {
                        emitter.onNext(list);
                    }
                }
            };
            list.addChangeListener(listener);

            // Cleanup when stream is disposed
            emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
                @Override
                public void run() {
                    list.removeChangeListener(listener);
                    observableRealm.close();
                    listRefs.get().releaseReference(list);
                }
            }));

            // Emit current value immediately
            emitter.onNext(list);

        }
    }, BACK_PRESSURE_STRATEGY);
}
项目:GitHub    文件:RealmObservableFactory.java   
@Override
public <E> Observable<CollectionChange<RealmList<E>>> changesetsFrom(DynamicRealm realm, final RealmList<E> list) {
    final RealmConfiguration realmConfig = realm.getConfiguration();
    return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmList<E>>>() {
        @Override
        public void subscribe(final ObservableEmitter<CollectionChange<RealmList<E>>> emitter) throws Exception {
            // Gets instance to make sure that the Realm is open for as long as the
            // Observable is subscribed to it.
            final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig);
            listRefs.get().acquireReference(list);
            final OrderedRealmCollectionChangeListener<RealmList<E>> listener = new OrderedRealmCollectionChangeListener<RealmList<E>>() {
                @Override
                public void onChange(RealmList<E> results, OrderedCollectionChangeSet changeSet) {
                    if (!emitter.isDisposed()) {
                        emitter.onNext(new CollectionChange<>(results, changeSet));
                    }
                }
            };
            list.addChangeListener(listener);

            // Cleanup when stream is disposed
            emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
                @Override
                public void run() {
                    list.removeChangeListener(listener);
                    observableRealm.close();
                    listRefs.get().releaseReference(list);
                }
            }));

            // Emit current value immediately
            emitter.onNext(new CollectionChange<>(list, null));
        }
    });
}
项目:GitHub    文件:RealmObservableFactory.java   
@Override
public <E extends RealmModel> Flowable<E> from(final Realm realm, final E object) {
    final RealmConfiguration realmConfig = realm.getConfiguration();
    return Flowable.create(new FlowableOnSubscribe<E>() {
        @Override
        public void subscribe(final FlowableEmitter<E> emitter) throws Exception {
            // Gets instance to make sure that the Realm is open for as long as the
            // Observable is subscribed to it.
            final Realm observableRealm = Realm.getInstance(realmConfig);
            objectRefs.get().acquireReference(object);
            final RealmChangeListener<E> listener = new RealmChangeListener<E>() {
                @Override
                public void onChange(E obj) {
                    if (!emitter.isCancelled()) {
                        emitter.onNext(obj);
                    }
                }
            };
            RealmObject.addChangeListener(object, listener);

            // Cleanup when stream is disposed
            emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
                @Override
                public void run() {
                    RealmObject.removeChangeListener(object, listener);
                    observableRealm.close();
                    objectRefs.get().releaseReference(object);
                }
            }));

            // Emit current value immediately
            emitter.onNext(object);

        }
    }, BACK_PRESSURE_STRATEGY);
}
项目:GitHub    文件:RealmObservableFactory.java   
@Override
public <E extends RealmModel> Observable<ObjectChange<E>> changesetsFrom(Realm realm, final E object) {
    final RealmConfiguration realmConfig = realm.getConfiguration();
    return Observable.create(new ObservableOnSubscribe<ObjectChange<E>>() {
        @Override
        public void subscribe(final ObservableEmitter<ObjectChange<E>> emitter) throws Exception {
            // Gets instance to make sure that the Realm is open for as long as the
            // Observable is subscribed to it.
            final Realm observableRealm = Realm.getInstance(realmConfig);
            objectRefs.get().acquireReference(object);
            final RealmObjectChangeListener<E> listener = new RealmObjectChangeListener<E>() {
                @Override
                public void onChange(E obj, ObjectChangeSet changeSet) {
                    if (!emitter.isDisposed()) {
                        emitter.onNext(new ObjectChange<>(obj, changeSet));
                    }
                }
            };
            RealmObject.addChangeListener(object, listener);

            // Cleanup when stream is disposed
            emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
                @Override
                public void run() {
                    RealmObject.removeChangeListener(object, listener);
                    observableRealm.close();
                    objectRefs.get().releaseReference(object);
                }
            }));

            // Emit current value immediately
            emitter.onNext(new ObjectChange<>(object, null));
        }
    });
}
项目:GitHub    文件:RealmObservableFactory.java   
@Override
public Flowable<DynamicRealmObject> from(DynamicRealm realm, final DynamicRealmObject object) {
    final RealmConfiguration realmConfig = realm.getConfiguration();
    return Flowable.create(new FlowableOnSubscribe<DynamicRealmObject>() {
        @Override
        public void subscribe(final FlowableEmitter<DynamicRealmObject> emitter) throws Exception {
            // Gets instance to make sure that the Realm is open for as long as the
            // Observable is subscribed to it.
            final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig);
            objectRefs.get().acquireReference(object);
            final RealmChangeListener<DynamicRealmObject> listener = new RealmChangeListener<DynamicRealmObject>() {
                @Override
                public void onChange(DynamicRealmObject obj) {
                    if (!emitter.isCancelled()) {
                        emitter.onNext(obj);
                    }
                }
            };
            RealmObject.addChangeListener(object, listener);

            // Cleanup when stream is disposed
            emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
                @Override
                public void run() {
                    RealmObject.removeChangeListener(object, listener);
                    observableRealm.close();
                    objectRefs.get().releaseReference(object);
                }
            }));

            // Emit current value immediately
            emitter.onNext(object);

        }
    }, BACK_PRESSURE_STRATEGY);
}
项目:GitHub    文件:RealmObservableFactory.java   
@Override
public Observable<ObjectChange<DynamicRealmObject>> changesetsFrom(DynamicRealm realm, final DynamicRealmObject object) {
    final RealmConfiguration realmConfig = realm.getConfiguration();
    return Observable.create(new ObservableOnSubscribe<ObjectChange<DynamicRealmObject>>() {
        @Override
        public void subscribe(final ObservableEmitter<ObjectChange<DynamicRealmObject>> emitter) throws Exception {
            // Gets instance to make sure that the Realm is open for as long as the
            // Observable is subscribed to it.
            final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig);
            objectRefs.get().acquireReference(object);
            final RealmObjectChangeListener<DynamicRealmObject> listener = new RealmObjectChangeListener<DynamicRealmObject>() {
                @Override
                public void onChange(DynamicRealmObject obj, ObjectChangeSet changeSet) {
                    if (!emitter.isDisposed()) {
                        emitter.onNext(new ObjectChange<>(obj, changeSet));
                    }
                }
            };
            object.addChangeListener(listener);

            // Cleanup when stream is disposed
            emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
                @Override
                public void run() {
                    object.removeChangeListener(listener);
                    observableRealm.close();
                    objectRefs.get().releaseReference(object);
                }
            }));

            // Emit current value immediately
            emitter.onNext(new ObjectChange<>(object, null));
        }
    });
}
项目:rxstate    文件:RxState.java   
private void setDisposable(ObservableEmitter<T> emitter) {
    emitter.setDisposable(Disposables.fromAction(() -> {
        synchronized (this) {
            emitters.remove(emitter);
        }
    }));
}
项目:CameraButton    文件:Preconditions.java   
static boolean checkMainThread(Observer<?> observer) {
    if (Looper.myLooper() != Looper.getMainLooper()) {
        observer.onSubscribe(Disposables.empty());
        observer.onError(new IllegalStateException(
                "Expected to be called on the main thread but was " + Thread.currentThread().getName()));
        return false;
    }
    return true;
}
项目:RxFirestore    文件:QuerySnapshotsOnSubscribe.java   
@Override
public void subscribe(final ObservableEmitter<QuerySnapshot> emitter) throws Exception {
    final EventListener<QuerySnapshot> listener = new EventListener<QuerySnapshot>() {
        @Override
        public void onEvent(QuerySnapshot querySnapshot, FirebaseFirestoreException e) {

            if (!emitter.isDisposed()) {
                if (e == null) {
                    emitter.onNext(querySnapshot);
                } else {
                    emitter.onError(e);
                }
            }

        }
    };

    registration = query.addSnapshotListener(listener);

    emitter.setDisposable(Disposables.fromAction(new Action() {
        @Override
        public void run() throws Exception {
            registration.remove();
        }
    }));

}
项目:RxFirestore    文件:DocumentChangesOnSubscribe.java   
@Override
public void subscribe(final ObservableEmitter<DocumentChange> emitter) throws Exception {
    final EventListener<QuerySnapshot> listener = new EventListener<QuerySnapshot>() {
        @Override
        public void onEvent(QuerySnapshot querySnapshot, FirebaseFirestoreException e) {

            if (!emitter.isDisposed()) {
                if (e == null) {
                    for (DocumentChange change : querySnapshot.getDocumentChanges()) {
                        emitter.onNext(change);
                    }

                } else {
                    emitter.onError(e);
                }
            }

        }
    };

    registration = query.addSnapshotListener(listener);

    emitter.setDisposable(Disposables.fromAction(new Action() {
        @Override
        public void run() throws Exception {
            registration.remove();
        }
    }));
}
项目:pl    文件:BukkitWorker.java   
@Override
public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    if (delay <= 0) {
        if (canRunImmediately()) {
            run.run();
            return Disposables.disposed();
        }

        return new SingleDisposableTask(run);
    }

    return getGroup(delay, unit).add(new QueuedWork(run));
}
项目:RxTask    文件:Preconditions.java   
public static boolean checkMainThread(Observer<?> observer) {
    if (Looper.myLooper() != Looper.getMainLooper()) {
        observer.onSubscribe(Disposables.empty());
        observer.onError(new IllegalStateException(
                "Expected to be called on the main thread but was " + Thread.currentThread()
                        .getName()));
        return false;
    }
    return true;
}
项目:RxBroadcastReceiver    文件:Preconditions.java   
public static boolean checkLooperThread(final Observer observer) {
    if (Looper.myLooper() == null) {
        observer.onSubscribe(Disposables.empty());
        observer.onError(new IllegalStateException("Calling thread is not associated with Looper"));
        return false;
    } else {
        return true;
    }
}
项目:rxjava2-jdbc    文件:FlowableSingleDeferUntilRequest.java   
@Override
public void cancel() {
    if (disposable.compareAndSet(null, Disposables.disposed())) {
        return;
    } else {
        disposable.get().dispose();
        // clear for GC
        disposable.set(Disposables.disposed());
    }
}