public <T> void addListener(T observer, OrderedRealmCollectionChangeListener<T> listener) { if (observerPairs.isEmpty()) { nativeStartListening(nativePtr); } CollectionObserverPair<T> collectionObserverPair = new CollectionObserverPair<T>(observer, listener); observerPairs.add(collectionObserverPair); }
public void onChange(T observer, @Nullable OrderedCollectionChangeSet changes) { if (listener instanceof OrderedRealmCollectionChangeListener) { //noinspection unchecked ((OrderedRealmCollectionChangeListener<T>) listener).onChange(observer, changes); } else if (listener instanceof RealmChangeListener) { //noinspection unchecked ((RealmChangeListener<T>) listener).onChange(observer); } else { throw new RuntimeException("Unsupported listener type: " + listener); } }
@Override public <E> Observable<CollectionChange<RealmResults<E>>> changesetsFrom(Realm realm, final RealmResults<E> results) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmResults<E>>>() { @Override public void subscribe(final ObservableEmitter<CollectionChange<RealmResults<E>>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final Realm observableRealm = Realm.getInstance(realmConfig); resultsRefs.get().acquireReference(results); final OrderedRealmCollectionChangeListener<RealmResults<E>> listener = new OrderedRealmCollectionChangeListener<RealmResults<E>>() { @Override public void onChange(RealmResults<E> e, OrderedCollectionChangeSet changeSet) { if (!emitter.isDisposed()) { emitter.onNext(new CollectionChange<RealmResults<E>>(results, changeSet)); } } }; results.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { results.removeChangeListener(listener); observableRealm.close(); resultsRefs.get().releaseReference(results); } })); // Emit current value immediately emitter.onNext(new CollectionChange<>(results, null)); } }); }
@Override public <E> Observable<CollectionChange<RealmList<E>>> changesetsFrom(Realm realm, final RealmList<E> list) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmList<E>>>() { @Override public void subscribe(final ObservableEmitter<CollectionChange<RealmList<E>>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final Realm observableRealm = Realm.getInstance(realmConfig); listRefs.get().acquireReference(list); final OrderedRealmCollectionChangeListener<RealmList<E>> listener = new OrderedRealmCollectionChangeListener<RealmList<E>>() { @Override public void onChange(RealmList<E> results, OrderedCollectionChangeSet changeSet) { if (!emitter.isDisposed()) { emitter.onNext(new CollectionChange<>(results, changeSet)); } } }; list.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { list.removeChangeListener(listener); observableRealm.close(); listRefs.get().releaseReference(list); } })); // Emit current value immediately emitter.onNext(new CollectionChange<>(list, null)); } }); }
@Override public <E> Observable<CollectionChange<RealmList<E>>> changesetsFrom(DynamicRealm realm, final RealmList<E> list) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmList<E>>>() { @Override public void subscribe(final ObservableEmitter<CollectionChange<RealmList<E>>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig); listRefs.get().acquireReference(list); final OrderedRealmCollectionChangeListener<RealmList<E>> listener = new OrderedRealmCollectionChangeListener<RealmList<E>>() { @Override public void onChange(RealmList<E> results, OrderedCollectionChangeSet changeSet) { if (!emitter.isDisposed()) { emitter.onNext(new CollectionChange<>(results, changeSet)); } } }; list.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { list.removeChangeListener(listener); observableRealm.close(); listRefs.get().releaseReference(list); } })); // Emit current value immediately emitter.onNext(new CollectionChange<>(list, null)); } }); }
public <T> void removeListener(T observer, OrderedRealmCollectionChangeListener<T> listener) { observerPairs.remove(observer, listener); if (observerPairs.isEmpty()) { nativeStopListening(nativePtr); } }