Java 类io.reactivex.FlowableOnSubscribe 实例源码

项目:SaveImage2SystemAlbum    文件:AlbumManager.java   
/**
 * 插入到系统相册, 并刷新系统相册
 * @param imageUrl
 */
private static void insertSystemAlbumAndRefresh(final String imageUrl) {
    Flowable.create(new FlowableOnSubscribe<Object>() {
        @Override
        public void subscribe(FlowableEmitter<Object> e) throws Exception {
            File file = FileUtils.createFileFrom(imageUrl);
            String imageUri = MediaStore.Images.Media.insertImage(ApplicationProvider.IMPL.getApp().getContentResolver(), file.getAbsolutePath(), file.getName(), "图片: " + file.getName());
            Log.d("_stone_", "insertSystemAlbumAndRefresh-subscribe: imageUri=" + imageUri);


            //讲数据插入到系统图库, 在系统相册APP中就可以看到保存的图片了.
            //为了保险起见, 再同步一下系统图库
            syncAlbum(imageUrl);
        }
    }, BackpressureStrategy.BUFFER)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe();
}
项目:SAF-AOP    文件:AsyncAspect.java   
private void asyncMethod(final ProceedingJoinPoint joinPoint) throws Throwable {

        Flowable.create(new FlowableOnSubscribe<Object>() {
                            @Override
                            public void subscribe(FlowableEmitter<Object> e) throws Exception {
                                Looper.prepare();
                                try {
                                    joinPoint.proceed();
                                } catch (Throwable throwable) {
                                    throwable.printStackTrace();
                                }
                                Looper.loop();
                            }
                        }
                , BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();
    }
项目:AutoNet    文件:AutoNetRepoImpl.java   
@Override
public Flowable doPullStreamPost(final IRequestEntity requestEntity, final File file) {

    Flowable flowable = DefaultFlowable.create(new FlowableOnSubscribe() {
        @Override
        public void subscribe(FlowableEmitter emitter) throws Exception {
            final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
            Gson gson = new Gson();
            String json = gson.toJson(requestEntity);
            RequestBody body = RequestBody.create(JSON, json);
            Request request = new Request.Builder()
                    .url(mUrl)
                    .post(body)
                    .build();
            Response response = mClient.newCall(request).execute();
            if (response == null) {
                emitter.onError(new EmptyException());
                return;
            }
            recvFile(emitter, response, file);
            emitter.onComplete();
        }
    });
    return flowable;
}
项目:rxjavatraining    文件:TibcoObservableTest.java   
@Test
public void createYourOwnTibco() throws Exception {
    Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
        @Override
        public void subscribe(FlowableEmitter<String> e) throws Exception {
            while (!e.isCancelled()) {
                long numberRecords = e.requested();
                System.out.println(numberRecords);
                if (numberRecords > 0) {
                }
            }
        }
    }, BackpressureStrategy.BUFFER);


    flowable.map(x -> x + "Yay!").subscribe(System.out::println);
}
项目:vt-support    文件:FilesystemUtil.java   
/**
 * @param path the root directory containing files to be emitted
 * @param depth offset, where 1 == data directory, 2 == zoom directory, 3 == row, 4 == column
 * @return a stream of file references to data structured as the Google tiling scheme.
 */
public static Observable<File> getTiles(String path, int depth) {
  // programmer's note:
  // https://medium.com/we-are-yammer/converting-callback-async-calls-to-rxjava-ebc68bde5831
  // http://vlkan.com/blog/post/2016/07/20/rxjava-backpressure/
  // Essentially lots of way to skin this cat - e.g. onBackpressureBlock /
  // reactive pull

  return Flowable.create(new FlowableOnSubscribe<File>() {
    @Override
    public void subscribe(FlowableEmitter<File> subscriber) throws Exception {
      try {
        // warning: depth 4 is not really walking (semantics)
        walk(path, depth, subscriber);
        if (!subscriber.isCancelled()) {
          subscriber.onComplete();
        }
      } catch (final Exception ex) {
        if (!subscriber.isCancelled()) {
          subscriber.onError(ex);
        }
      }
    }
  }, BackpressureStrategy.BUFFER).toObservable();
}
项目:vt-support    文件:FilesystemUtil.java   
public static Observable<File> getTiles(String path) {
  // programmer's note:
  // https://medium.com/we-are-yammer/converting-callback-async-calls-to-rxjava-ebc68bde5831
  // http://vlkan.com/blog/post/2016/07/20/rxjava-backpressure/
  // Essentially lots of way to skin this cat - e.g. onBackpressureBlock /
  // reactive pull

  return Flowable.create(new FlowableOnSubscribe<File>() {
    @Override
    public void subscribe(FlowableEmitter<File> subscriber) throws Exception {
      try {
        walk(path, 1, subscriber);
        if (!subscriber.isCancelled()) {
          subscriber.onComplete();
        }
      } catch (final Exception ex) {
        if (!subscriber.isCancelled()) {
          subscriber.onError(ex);
        }
      }
    }
  }, BackpressureStrategy.BUFFER).toObservable();
}
项目:alchemy    文件:RxJava2Fetchable.java   
@Override
public Flowable<T> flowable(BackpressureStrategy mode) {
    return Flowable.create(new FlowableOnSubscribe<T>() {
        @Override
        public void subscribe(FlowableEmitter<T> emitter) throws Exception {
            final CloseableIterator<T> iterator = mCallable.call();
            try {
                while (!emitter.isCancelled() && iterator.hasNext()) {
                    emitter.onNext(iterator.next());
                }
                emitter.onComplete();
            } finally {
                iterator.close();
            }
        }
    }, mode);
}
项目:AxoloTL    文件:BaseRunTask.java   
@Override
public final Flowable<RxTaskMessage<T, U>> createFlowable() {
    return Flowable.create(new FlowableOnSubscribe<RxTaskMessage<T, U>>() {
        @Override
        public void subscribe(final FlowableEmitter<RxTaskMessage<T, U>> e) throws Exception {
            TaskLogger.v(TAG, "subscribe start");
            T innerResult = run(new TaskAgent<U>() {
                @Override
                public void publishProgress(U progressObject) {
                    TaskLogger.v(TAG, "onProgress");
                    e.onNext(RxTaskMessage.<T, U>createProgress(progressObject));
                }
            });
            e.onNext(RxTaskMessage.<T, U>createResult(innerResult));
            e.onComplete();
            TaskLogger.v(TAG, "subscribe end");
        }
    }, BackpressureStrategy.ERROR);
}
项目:SeeWeather    文件:ChoiceCityActivity.java   
/**
 * 查询全国所有的省,从数据库查询
 */
private void queryProvinces() {
    getToolbar().setTitle("选择省份");
    Flowable.create((FlowableOnSubscribe<String>) emitter -> {
        if (provincesList.isEmpty()) {
            provincesList.addAll(WeatherDB.loadProvinces(DBManager.getInstance().getDatabase()));
        }
        dataList.clear();
        for (Province province : provincesList) {
            emitter.onNext(province.mProName);
        }
        emitter.onComplete();
    }, BackpressureStrategy.BUFFER)
        .compose(RxUtil.ioF())
        .compose(RxUtil.activityLifecycleF(this))
        .doOnNext(proName -> dataList.add(proName))
        .doOnComplete(() -> {
            mProgressBar.setVisibility(View.GONE);
            currentLevel = LEVEL_PROVINCE;
            mAdapter.notifyDataSetChanged();
        })
        .subscribe();
}
项目:SeeWeather    文件:ChoiceCityActivity.java   
/**
 * 查询选中省份的所有城市,从数据库查询
 */
private void queryCities() {
    getToolbar().setTitle("选择城市");
    dataList.clear();
    mAdapter.notifyDataSetChanged();

    Flowable.create((FlowableOnSubscribe<String>) emitter -> {
        cityList = WeatherDB.loadCities(DBManager.getInstance().getDatabase(), selectedProvince.mProSort);
        for (City city : cityList) {
            emitter.onNext(city.mCityName);
        }
        emitter.onComplete();
    }, BackpressureStrategy.BUFFER)
        .compose(RxUtil.ioF())
        .compose(RxUtil.activityLifecycleF(this))
        .doOnNext(proName -> dataList.add(proName))
        .doOnComplete(() -> {
            currentLevel = LEVEL_CITY;
            mAdapter.notifyDataSetChanged();
            mRecyclerView.smoothScrollToPosition(0);
        })
        .subscribe();
}
项目:RxLogin    文件:RxLogin.java   
private Flowable<GoogleSignInResult> googleObservable(Activity activity) {
    return Flowable.create((FlowableOnSubscribe<ConnectionResult>) e -> {
            e.onNext(mGoogleApiClient.blockingConnect(GOOGLE_API_CONNECTION_TIMEOUT_SECONDS,
                TimeUnit.SECONDS));
            e.onComplete();
        },
        BackpressureStrategy.DROP)
        .subscribeOn(Schedulers.io())
        .flatMap(connectionResult -> {
            if (!connectionResult.isSuccess()) {
                return Flowable.error(new LoginException(LoginException.GOOGLE_ERROR,
                    connectionResult.getErrorCode(), connectionResult.getErrorMessage()));
            }
            return Flowable.create(new GoogleSubscriber(this), BackpressureStrategy.DROP);
        })
        .doOnSubscribe(subscription -> activity.startActivityForResult(getGoogleSingInIntent(),
            RC_SIGN_IN))
        .doOnTerminate(() -> {
            mGoogleApiClient.disconnect();
            mGoogleApiClient = null;
            mGoogleCallback = null;
        });
}
项目:GitHub    文件:RealmHelper.java   
public static <T extends RealmModel> Flowable<RealmResults<T>> getRealmItems(Class clazz, HashMap<String, String> map) {
    return Flowable.create(new FlowableOnSubscribe<RealmResults<T>>() {
        @Override
        public void subscribe(FlowableEmitter<RealmResults<T>> emitter)
                throws Exception {
            Realm realm = Realm.getDefaultInstance();
            RealmQuery<T> query = realm.where(clazz);
            if (map != null) {
                for (Map.Entry<String, String> entry : map.entrySet()) {
                    query.equalTo(entry.getKey(), entry.getValue());
                }
            }
            RealmResults<T> results = query.findAll();

            final RealmChangeListener<RealmResults<T>> listener = _realm -> {
                if (!emitter.isCancelled()) {
                    emitter.onNext(results);
                }
            };
            emitter.setDisposable(Disposables.fromRunnable(() -> {
                results.removeChangeListener(listener);
                realm.close();
            }));
            results.addChangeListener(listener);
            emitter.onNext(results);
        }
    }, BackpressureStrategy.LATEST);
}
项目:GitHub    文件:RxUtil.java   
/**
 * 生成Flowable
 * @param <T>
 * @return
 */
public static <T> Flowable<T> createData(final T t) {
    return Flowable.create(new FlowableOnSubscribe<T>() {
        @Override
        public void subscribe(FlowableEmitter<T> emitter) throws Exception {
            try {
                emitter.onNext(t);
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.BUFFER);
}
项目:GitHub    文件:DownloadType.java   
private Publisher<DownloadStatus> save(final Response<ResponseBody> response) {
    return Flowable.create(new FlowableOnSubscribe<DownloadStatus>() {
        @Override
        public void subscribe(FlowableEmitter<DownloadStatus> e) throws Exception {
            record.save(e, response);
        }
    }, BackpressureStrategy.LATEST);
}
项目:GitHub    文件:DownloadType.java   
/**
 * 保存断点下载的文件,以及下载进度
 *
 * @param index    下载编号
 * @param response 响应值
 * @return Flowable
 */
private Publisher<DownloadStatus> save(final int index, final ResponseBody response) {

    Flowable<DownloadStatus> flowable = Flowable.create(new FlowableOnSubscribe<DownloadStatus>() {
        @Override
        public void subscribe(FlowableEmitter<DownloadStatus> emitter) throws Exception {
            record.save(emitter, index, response);
        }
    }, BackpressureStrategy.LATEST)
            .replay(1)
            .autoConnect();
    return flowable.throttleFirst(100, TimeUnit.MILLISECONDS).mergeWith(flowable.takeLast(1))
            .subscribeOn(Schedulers.newThread());
}
项目:GitHub    文件:RxUtil.java   
/**
 * 生成Flowable
 * @param <T>
 * @return
 */
public static <T> Flowable<T> createData(final T t) {
    return Flowable.create(new FlowableOnSubscribe<T>() {
        @Override
        public void subscribe(FlowableEmitter<T> emitter) throws Exception {
            try {
                emitter.onNext(t);
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.BUFFER);
}
项目:GitHub    文件:RealmObservableFactory.java   
@Override
public <E> Flowable<RealmList<E>> from(Realm realm, final RealmList<E> list) {
    final RealmConfiguration realmConfig = realm.getConfiguration();
    return Flowable.create(new FlowableOnSubscribe<RealmList<E>>() {
        @Override
        public void subscribe(final FlowableEmitter<RealmList<E>> emitter) throws Exception {
            // Gets instance to make sure that the Realm is open for as long as the
            // Observable is subscribed to it.
            final Realm observableRealm = Realm.getInstance(realmConfig);
            listRefs.get().acquireReference(list);
            final RealmChangeListener<RealmList<E>> listener = new RealmChangeListener<RealmList<E>>() {
                @Override
                public void onChange(RealmList<E> results) {
                    if (!emitter.isCancelled()) {
                        emitter.onNext(list);
                    }
                }
            };
            list.addChangeListener(listener);

            // Cleanup when stream is disposed
            emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
                @Override
                public void run() {
                    list.removeChangeListener(listener);
                    observableRealm.close();
                    listRefs.get().releaseReference(list);
                }
            }));

            // Emit current value immediately
            emitter.onNext(list);

        }
    }, BACK_PRESSURE_STRATEGY);
}
项目:GitHub    文件:RealmObservableFactory.java   
@Override
public <E> Flowable<RealmList<E>> from(DynamicRealm realm, final RealmList<E> list) {
    final RealmConfiguration realmConfig = realm.getConfiguration();
    return Flowable.create(new FlowableOnSubscribe<RealmList<E>>() {
        @Override
        public void subscribe(final FlowableEmitter<RealmList<E>> emitter) throws Exception {
            // Gets instance to make sure that the Realm is open for as long as the
            // Observable is subscribed to it.
            final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig);
            listRefs.get().acquireReference(list);
            final RealmChangeListener<RealmList<E>> listener = new RealmChangeListener<RealmList<E>>() {
                @Override
                public void onChange(RealmList<E> results) {
                    if (!emitter.isCancelled()) {
                        emitter.onNext(list);
                    }
                }
            };
            list.addChangeListener(listener);

            // Cleanup when stream is disposed
            emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
                @Override
                public void run() {
                    list.removeChangeListener(listener);
                    observableRealm.close();
                    listRefs.get().releaseReference(list);
                }
            }));

            // Emit current value immediately
            emitter.onNext(list);

        }
    }, BACK_PRESSURE_STRATEGY);
}
项目:GitHub    文件:RealmObservableFactory.java   
@Override
public <E extends RealmModel> Flowable<E> from(final Realm realm, final E object) {
    final RealmConfiguration realmConfig = realm.getConfiguration();
    return Flowable.create(new FlowableOnSubscribe<E>() {
        @Override
        public void subscribe(final FlowableEmitter<E> emitter) throws Exception {
            // Gets instance to make sure that the Realm is open for as long as the
            // Observable is subscribed to it.
            final Realm observableRealm = Realm.getInstance(realmConfig);
            objectRefs.get().acquireReference(object);
            final RealmChangeListener<E> listener = new RealmChangeListener<E>() {
                @Override
                public void onChange(E obj) {
                    if (!emitter.isCancelled()) {
                        emitter.onNext(obj);
                    }
                }
            };
            RealmObject.addChangeListener(object, listener);

            // Cleanup when stream is disposed
            emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
                @Override
                public void run() {
                    RealmObject.removeChangeListener(object, listener);
                    observableRealm.close();
                    objectRefs.get().releaseReference(object);
                }
            }));

            // Emit current value immediately
            emitter.onNext(object);

        }
    }, BACK_PRESSURE_STRATEGY);
}
项目:GitHub    文件:RealmObservableFactory.java   
@Override
public Flowable<DynamicRealmObject> from(DynamicRealm realm, final DynamicRealmObject object) {
    final RealmConfiguration realmConfig = realm.getConfiguration();
    return Flowable.create(new FlowableOnSubscribe<DynamicRealmObject>() {
        @Override
        public void subscribe(final FlowableEmitter<DynamicRealmObject> emitter) throws Exception {
            // Gets instance to make sure that the Realm is open for as long as the
            // Observable is subscribed to it.
            final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig);
            objectRefs.get().acquireReference(object);
            final RealmChangeListener<DynamicRealmObject> listener = new RealmChangeListener<DynamicRealmObject>() {
                @Override
                public void onChange(DynamicRealmObject obj) {
                    if (!emitter.isCancelled()) {
                        emitter.onNext(obj);
                    }
                }
            };
            RealmObject.addChangeListener(object, listener);

            // Cleanup when stream is disposed
            emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
                @Override
                public void run() {
                    RealmObject.removeChangeListener(object, listener);
                    observableRealm.close();
                    objectRefs.get().releaseReference(object);
                }
            }));

            // Emit current value immediately
            emitter.onNext(object);

        }
    }, BACK_PRESSURE_STRATEGY);
}
项目:ObjectBoxRxJava    文件:RxQuery.java   
/**
 * The returned Flowable emits Query results one by one. Once all results have been processed, onComplete is called.
 * Uses given BackpressureStrategy.
 */
public static <T> Flowable<T> flowableOneByOne(final Query<T> query, BackpressureStrategy strategy) {
    return Flowable.create(new FlowableOnSubscribe<T>() {
        @Override
        public void subscribe(final FlowableEmitter<T> emitter) throws Exception {
            createListItemEmitter(query, emitter);
        }

    }, strategy);
}
项目:MBEStyle    文件:IconPresenter.java   
public void calcIconTotal() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> flowableEmitter) throws Exception {
            XmlResourceParser xml = mView.getResources().getXml(R.xml.drawable);
            int total = 0;

            while (xml.getEventType() != XmlResourceParser.END_DOCUMENT) {
                if (xml.getEventType() == XmlPullParser.START_TAG) {
                    if (xml.getName().startsWith("item")) {
                        total++;
                    }
                }
                xml.next();
            }

            flowableEmitter.onNext(total);
        }
    }, BackpressureStrategy.BUFFER)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    mView.setIconTotal(integer);
                }
            });
}
项目:GetStartRxJava2.0    文件:BaseFlowableActivity.java   
private void doRxJavaWork() {
    Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
        @Override
        public void subscribe(FlowableEmitter<String> e) throws Exception {
            e.onNext("事件1");
            e.onNext("事件2");
            e.onNext("事件3");
            e.onNext("事件4");
            e.onComplete();
        }
    }, BackpressureStrategy.BUFFER);

    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onSubscribe(Subscription s) {
            Log.d(TAG, "onSubscribe: ");
            s.request(Long.MAX_VALUE);

        }

        @Override
        public void onNext(String string) {
            Log.d(TAG, "onNext: " + string);
        }

        @Override
        public void onError(Throwable t) {
            Log.d(TAG, "onError: " + t.toString());
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete: ");

        }
    };

    flowable.subscribe(subscriber);
}
项目:EvolvingNetLib    文件:CCRequest.java   
/**
 * 获取内存缓存请求Flowable对象
 *
 * @return 内存缓存查询Flowable对象
 */
private Flowable<CCBaseResponse<T>> getMemoryCacheQueryFlowable() {
    //内存缓存数据获取
    return Flowable.create(new FlowableOnSubscribe<CCBaseResponse<T>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<CCBaseResponse<T>> e) throws Exception {

            T response = null;

            try {

                if (ccCacheQueryCallback != null) {
                    response = ccCacheQueryCallback.<T>onQueryFromMemory(cacheKey);
                }

                CCBaseResponse<T> tccBaseResponse = new CCBaseResponse<T>(response, true, true, false);

                e.onNext(tccBaseResponse);
                e.onComplete();

            } catch (Exception exception) {

                switch (cacheQueryMode) {
                    case CCCacheMode.QueryMode.MODE_ONLY_MEMORY:
                        e.onError(new CCDiskCacheQueryException(exception));
                        break;
                    default:
                        e.onComplete();
                        break;
                }

            }
        }
    }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io());

}
项目:EvolvingNetLib    文件:CCRequest.java   
/**
 * 获取磁盘缓存请求Flowable对象
 *
 * @return 磁盘缓存查询Flowable对象
 */
private Flowable<CCBaseResponse<T>> getDiskCacheQueryFlowable() {
    //磁盘缓存获取,包括任何形式的磁盘缓存
    return Flowable.create(new FlowableOnSubscribe<CCBaseResponse<T>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<CCBaseResponse<T>> e) throws Exception {
            T response = null;

            try {

                if (ccCacheQueryCallback != null) {
                    response = ccCacheQueryCallback.<T>onQueryFromDisk(cacheKey);
                }

                CCBaseResponse<T> tccBaseResponse = new CCBaseResponse<T>(response, true, false, true);

                e.onNext(tccBaseResponse);
                e.onComplete();

            } catch (Exception exception) {

                switch (cacheQueryMode) {
                    case CCCacheMode.QueryMode.MODE_ONLY_DISK:
                    case CCCacheMode.QueryMode.MODE_MEMORY_THEN_DISK:
                        e.onError(new CCDiskCacheQueryException(exception));
                        break;
                    default:
                        e.onComplete();
                        break;
                }

            }
        }
    }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io());

}
项目:AssistantBySDK    文件:AlarmActivity.java   
/**
 * 加载数据,刷新视图
 **/
private void loadData() {
    Flowable.create(new FlowableOnSubscribe<Object>() {
        @Override
        public void subscribe(FlowableEmitter<Object> e) throws Exception {
            reset();
            /* io线程加载数据库数据 */
            List<AlarmClock> alarmClocks = mAssistDao.findAllAlarmAsc(false);
            week = Calendar.getInstance().get(Calendar.DAY_OF_WEEK);
            week = week - 1 == 0 ? 7 : week - 1;    //计算当天是星期几
            week = week + 1 == 8 ? 1 : week + 1;    // 计算明天是星期几
            for (AlarmClock alarm : alarmClocks) {
                TaskCard<AlarmClock> card = new TaskCard<>(alarm, TaskCard.TaskState.ACTIVE);
                if (alarm.getValid() == 0)
                    card.taskState = TaskCard.TaskState.INVALID;
                updateAlarmCount(AccountingActivity.TYPE_ADD, alarm);
                alarmDatas.add(card);
            }
            e.onNext(0);
        }
    }, BackpressureStrategy.BUFFER)
            .subscribeOn(Schedulers.io())   //执行订阅(subscribe())所在线程
            .doOnSubscribe(new Consumer<Subscription>() {
                @Override
                public void accept(Subscription subscription) throws Exception {
                    mCpbLoad.setVisibility(View.VISIBLE);
                }
            })
            .subscribeOn(AndroidSchedulers.mainThread())
            .observeOn(AndroidSchedulers.mainThread())  //响应订阅(Sbscriber)所在线程
            .subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object o) throws Exception {
                    /* 回到主线程刷新列表 */
                    mCpbLoad.setVisibility(View.GONE);
                    mAdapter.notifyDataSetChanged();
                }
            });
}
项目:RX_Demo    文件:Rx2Test2Activity.java   
private void flowableTest() {
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 128; i++) {
                    Log.d(TAG, "emit " + i);
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR)//增加了一个参数
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
//                        s.request(Long.MAX_VALUE);  //注意这句代码
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);

                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }
项目:YiZhi    文件:RxHelper.java   
/**
 * 生成Flowable
 *
 * @param t
 * @return Flowable
 */
public static <T> Flowable<T> createFlowable(final T t) {
    return Flowable.create(new FlowableOnSubscribe<T>() {
        @Override
        public void subscribe(FlowableEmitter<T> emitter) throws Exception {
            try {
                emitter.onNext(t);
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.BUFFER);
}
项目:KomaMusic    文件:LocalDataSource.java   
@Override
public Flowable<List<Song>> getAllSongs() {
    return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
            e.onNext(SongsPresenter.getAllSongs());
            e.onComplete();
        }
    }, BackpressureStrategy.LATEST);
}
项目:KomaMusic    文件:LocalDataSource.java   
@Override
public Flowable<List<Playlist>> getAllPlaylists() {
    return Flowable.create(new FlowableOnSubscribe<List<Playlist>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<List<Playlist>> e) throws Exception {
            e.onNext(PlaylistsPresenter.getAllPlaylists());
            e.onComplete();
        }
    }, BackpressureStrategy.LATEST);
}
项目:KomaMusic    文件:LocalDataSource.java   
@Override
public Flowable<List<Album>> getAllAlbums() {
    return Flowable.create(new FlowableOnSubscribe<List<Album>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<List<Album>> e) throws Exception {
            e.onNext(AlbumsPresenter.getAllAlbums());
            e.onComplete();
        }
    }, BackpressureStrategy.LATEST);
}
项目:KomaMusic    文件:LocalDataSource.java   
@Override
public Flowable<List<Album>> getArtistAlbums(final long artistId) {
    return Flowable.create(new FlowableOnSubscribe<List<Album>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<List<Album>> e) throws Exception {
            e.onNext(AlbumsPresenter.getArtistAlbums(artistId));
            e.onComplete();
        }
    }, BackpressureStrategy.LATEST);
}
项目:KomaMusic    文件:LocalDataSource.java   
@Override
public Flowable<List<Artist>> getAllArtists() {
    return Flowable.create(new FlowableOnSubscribe<List<Artist>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<List<Artist>> e) throws Exception {
            e.onNext(ArtistsPresenter.getAllArtists());
            e.onComplete();
        }
    }, BackpressureStrategy.LATEST);
}
项目:KomaMusic    文件:LocalDataSource.java   
@Override
public Flowable<List<Song>> getQueueSongs() {
    return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
            e.onNext(PlayQueuePresenter.getQueueSongs());
            e.onComplete();
        }
    }, BackpressureStrategy.LATEST);
}
项目:KomaMusic    文件:LocalDataSource.java   
@Override
public Flowable<List<Song>> getAlbumSongs(final long albumId) {
    return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
            e.onNext(AlbumDetailPresenter.getAlbumSongs(albumId));
            e.onComplete();
        }
    }, BackpressureStrategy.LATEST);
}
项目:KomaMusic    文件:LocalDataSource.java   
@Override
public Flowable<List<Song>> getRecentlyAddedSongs() {
    return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
            e.onNext(RecentlyAddedPresenter.getRecentlyAddedSongs());
            e.onComplete();
        }
    }, BackpressureStrategy.LATEST);
}
项目:KomaMusic    文件:LocalDataSource.java   
@Override
public Flowable<List<Song>> getRecentlyPlayedSongs() {
    return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
            e.onNext(RecentlyPlayPresenter.getRecentlyPlaySongs());
            e.onComplete();
        }
    }, BackpressureStrategy.LATEST);
}
项目:KomaMusic    文件:LocalDataSource.java   
@Override
public Flowable<List<Song>> getMyFavoriteSongs() {
    return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
            e.onNext(MyFavoritePresenter.getFavoriteSongs());
            e.onComplete();
        }
    }, BackpressureStrategy.LATEST);
}
项目:XSnow    文件:DownloadRequest.java   
@Override
protected <T> Observable<T> execute(Type type) {
    return (Observable<T>) apiService
            .downFile(suffixUrl, params)
            .subscribeOn(Schedulers.io())
            .unsubscribeOn(Schedulers.io())
            .toFlowable(BackpressureStrategy.LATEST)
            .flatMap(new Function<ResponseBody, Publisher<?>>() {
                @Override
                public Publisher<?> apply(final ResponseBody responseBody) throws Exception {
                    return Flowable.create(new FlowableOnSubscribe<DownProgress>() {
                        @Override
                        public void subscribe(FlowableEmitter<DownProgress> subscriber) throws Exception {
                            File dir = getDiskCacheDir(rootName, dirName);
                            if (!dir.exists()) {
                                dir.mkdirs();
                            }
                            File file = new File(dir.getPath() + File.separator + fileName);
                            saveFile(subscriber, file, responseBody);
                        }
                    }, BackpressureStrategy.LATEST);
                }
            })
            .sample(1, TimeUnit.SECONDS)
            .observeOn(AndroidSchedulers.mainThread())
            .toObservable()
            .retryWhen(new ApiRetryFunc(retryCount, retryDelayMillis));
}
项目:showcase-android    文件:RxFirebaseDatabase.java   
/**
 * Listener for changes in te data at the given query location.
 *
 * @param query    reference represents a particular location in your Database and can be used for reading or writing data to that Database location.
 * @param strategy {@link BackpressureStrategy} associated to this {@link Flowable}
 * @return a {@link Flowable} which emits when a value of the database change in the given query.
 */
@NonNull
public static Flowable<DataSnapshot> observeValueEvent(@NonNull final Query query,
                                                       @NonNull BackpressureStrategy strategy) {
   return Flowable.create(new FlowableOnSubscribe<DataSnapshot>() {
      @Override
      public void subscribe(final FlowableEmitter<DataSnapshot> emitter) throws Exception {
         final ValueEventListener valueEventListener = new ValueEventListener() {
            @Override
            public void onDataChange(DataSnapshot dataSnapshot) {
               emitter.onNext(dataSnapshot);
            }

            @Override
            public void onCancelled(final DatabaseError error) {
               emitter.onError(new RxFirebaseDataException(error));
            }
         };
         emitter.setCancellable(new Cancellable() {
            @Override
            public void cancel() throws Exception {
               query.removeEventListener(valueEventListener);
            }
         });
         query.addValueEventListener(valueEventListener);
      }
   }, strategy);
}