Java 类io.reactivex.ObservableOnSubscribe 实例源码

项目:RxBeacon    文件:RxBeacon.java   
public Observable<RxBeaconRange> beaconsInRegion() {
    return startup()
            .flatMap(new Function<Boolean, ObservableSource<RxBeaconRange>>() {
                @Override
                public ObservableSource<RxBeaconRange> apply(@NonNull Boolean aBoolean) throws Exception {
                    return Observable.create(new ObservableOnSubscribe<RxBeaconRange>() {
                        @Override
                        public void subscribe(@NonNull final ObservableEmitter<RxBeaconRange> objectObservableEmitter) throws Exception {
                            beaconManager.addRangeNotifier(new RangeNotifier() {
                                @Override
                                public void didRangeBeaconsInRegion(Collection<Beacon> collection, Region region) {
                                    objectObservableEmitter.onNext(new RxBeaconRange(collection, region));
                                }
                            });
                            beaconManager.startRangingBeaconsInRegion(getRegion());
                        }
                    });
                }
            });
}
项目:GitHub    文件:ThrottleLastExampleActivity.java   
private Observable<Integer> getObservable() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // send events with simulated time wait
            Thread.sleep(0);
            emitter.onNext(1); // skip
            emitter.onNext(2); // deliver
            Thread.sleep(505);
            emitter.onNext(3); // skip
            Thread.sleep(99);
            emitter.onNext(4); // skip
            Thread.sleep(100);
            emitter.onNext(5); // skip
            emitter.onNext(6); // deliver
            Thread.sleep(305);
            emitter.onNext(7); // deliver
            Thread.sleep(510);
            emitter.onComplete();
        }
    });
}
项目:BakingApp    文件:RecipeDatabaseHelper.java   
public void insertRecipes(@NonNull final ArrayList<Recipe> recipes, Observer<Integer> observer){
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            ContentValues[] contentValues = new ContentValues[recipes.size()];
            for (int i = 0; i < recipes.size(); ++i) {
                contentValues[i] = buildContentValuesFromRecipe(recipes.get(i));
            }
            int recipesAdded = mContext.getContentResolver().bulkInsert(RecipesContract.RecipeEntry.CONTENT_URI, contentValues);
            if (recipesAdded != 0){
                e.onNext(recipesAdded);
            } else {
                e.onError(new NullPointerException("Failed to insert"));
            }
            e.onComplete();
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(observer);
}
项目:simple-stack    文件:TaskRepository.java   
private Observable<List<Task>> createResults(QuerySelector<DbTask> querySelector) {
    return Observable.create((ObservableOnSubscribe<List<Task>>) emitter -> {
        Realm realm = Realm.getDefaultInstance();
        final RealmResults<DbTask> dbTasks = querySelector.createQuery(realm);
        final RealmChangeListener<RealmResults<DbTask>> realmChangeListener = element -> {
            if(element.isLoaded() && !emitter.isDisposed()) {
                List<Task> tasks = mapFrom(element);
                if(!emitter.isDisposed()) {
                    emitter.onNext(tasks);
                }
            }
        };
        emitter.setDisposable(Disposables.fromAction(() -> {
            if(dbTasks.isValid()) {
                dbTasks.removeChangeListener(realmChangeListener);
            }
            realm.close();
        }));
        dbTasks.addChangeListener(realmChangeListener);
    }).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler());
}
项目:RxJava2-Android-Sample    文件:DebounceExampleActivity.java   
private Observable<Integer> getObservable() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // send events with simulated time wait
            //1(drop)--(400s<500s)---2(pass)---(600s>500s)---3(drop)---(100s<500s)---4(pass)---(605s>500s)---5(pass)---510s
            emitter.onNext(1); // skip
            Thread.sleep(400);
            emitter.onNext(2); // deliver
            Thread.sleep(600);
            emitter.onNext(3); // skip
            Thread.sleep(100);
            emitter.onNext(4); // deliver
            Thread.sleep(605);
            emitter.onNext(5); // deliver
            Thread.sleep(510);
            emitter.onComplete();
        }
    });
}
项目:MBEStyle    文件:IconShowPresenter.java   
public Disposable getAllIcons() {
    return Observable.create(new ObservableOnSubscribe<IconBean>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<IconBean> e) throws Exception {
            XmlResourceParser xml = mView.getResources().getXml(R.xml.drawable);

            while (xml.getEventType() != XmlResourceParser.END_DOCUMENT) {
                if (xml.getEventType() == XmlPullParser.START_TAG) {
                    if (xml.getName().startsWith("item")) {
                        IconBean bean = new IconBean();

                        String iconName = xml.getAttributeValue(null, "drawable");
                        bean.id = mView.getResources().getIdentifier(
                                iconName, "drawable", BuildConfig.APPLICATION_ID);
                        bean.name = iconName;

                        e.onNext(bean);
                    }
                }
                xml.next();
            }
            e.onComplete();
        }
    }).toList().subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<List<IconBean>>() {
                @Override
                public void accept(List<IconBean> list) throws Exception {
                    mView.onLoadData(list);
                }
            });
}
项目:Rxjava2.0Demo    文件:MapActivity.java   
private void map() {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
        }
    }).map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) throws Exception {
            return "This is result " + integer;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.e(MainActivity.TAG, "accept: " + Thread.currentThread().getName());
            info += s + "\n";
            tv.setText(info);
        }
    });
}
项目:OKHttpLoggingInterceptor    文件:NetTest.java   
public void test()
{
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception
        {
            e.onNext(1);
        }
    }).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception
        {
            System.out.println(integer);
        }
    });
}
项目:GetStartRxJava2.0    文件:UnlimitPostActivity.java   
private void doRxJavaWork() {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            for (;;) { // 无限循环发送事件
                emitter.onNext(Integer.MAX_VALUE);
            }
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "" + integer);
                }
            });
}
项目:Android-Code-Demos    文件:BasicTest.java   
public Observable<String> getObservable() {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("Today's news update");
                e.onNext("Today's topic is Study");
                e.onComplete();
            }
        });
        /* 下面两个方法作用类似,just 的内部调用的就是 fromArray */
//     return Observable.just("Topic 1", "Heat 1", "News");
//     return Observable.fromArray("Topic 1", "Heat 1", "News");
        /* 只能发送一个数据 */
        /*return Observable.fromCallable(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Topic is Study";
            }
        });*/
    }
项目:NeteaseCloudMusic    文件:ConfigPresenter.java   
public void requestLoadingList() {
    Observable.create(new ObservableOnSubscribe<List<ConfigBean>>() {
        @Override
        public void subscribe(ObservableEmitter<List<ConfigBean>> e) throws Exception {
            mModel = ConfigModel.getInstance(configView.getContext());
            e.onNext(mModel.getConfigList());
            mModel.setConfigCallback(ConfigPresenter.this);
        }
    })
            .observeOn(Schedulers.io())
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<List<ConfigBean>>() {
                @Override
                public void accept(List<ConfigBean> list) throws Exception {
                    configView.displayConfigList(list);
                }
            });

}
项目:AndroidMVPresenter    文件:RxClick.java   
public static Observable<View> with(final View view) {
    return Observable.create(new ObservableOnSubscribe<View>() {
        @Override
        public void subscribe(@NonNull final ObservableEmitter<View> e) throws Exception {
            new Handler(Looper.getMainLooper()).post(new Runnable() {
                @Override
                public void run() {
                    view.setOnClickListener(new View.OnClickListener() {
                        @Override
                        public void onClick(View value) {
                            e.onNext(value);
                        }
                    });
                }
            });
        }
    });
}
项目:AssistantBySDK    文件:TingPlayProcessor.java   
/**
 * 按一级分类查找专辑
 **/
private Observable<List<Album>> getAlbumByCate(String cateId, int calc_dimension) {
    final Map<String, String> params = new HashMap<>();
    params.put(DTransferConstants.CATEGORY_ID, cateId);
    params.put(DTransferConstants.CALC_DIMENSION, String.valueOf(calc_dimension));
    return Observable.create(new ObservableOnSubscribe<List<Album>>() {
        @Override
        public void subscribe(final ObservableEmitter<List<Album>> e) throws Exception {
            CommonRequest.getAlbumList(params, new IDataCallBack<AlbumList>() {
                @Override
                public void onSuccess(AlbumList albumList) {
                    //onNext的参数不允许为null
                    e.onNext(albumList.getAlbums());
                    e.onComplete();
                }

                @Override
                public void onError(int i, String s) {
                    e.onError(new Throwable(i + " " + s));
                }
            });
        }
    }).subscribeOn(Schedulers.io());
}
项目:AssistantBySDK    文件:TingPlayProcessor.java   
/**
 * 按关键词查找专辑
 **/
private Observable<List<Album>> getAlbumByKeyWord(String keyword, String cateId, int calc_dimension) {
    final Map<String, String> params = new HashMap<>();
    params.put(DTransferConstants.SEARCH_KEY, keyword);
    params.put(DTransferConstants.CATEGORY_ID, cateId);
    params.put(DTransferConstants.CALC_DIMENSION, String.valueOf(calc_dimension));
    return Observable.create(new ObservableOnSubscribe<List<Album>>() {
        @Override
        public void subscribe(final ObservableEmitter<List<Album>> e) throws Exception {
            CommonRequest.getSearchedAlbums(params, new IDataCallBack<SearchAlbumList>() {
                @Override
                public void onSuccess(SearchAlbumList searchAlbumList) {
                    e.onNext(searchAlbumList.getAlbums());
                    e.onComplete();
                }

                @Override
                public void onError(int i, String s) {
                    e.onError(new Throwable(i + " " + s));
                }
            });
        }
    })
            .subscribeOn(Schedulers.io());
}
项目:starcraft-2-build-player    文件:StandardBuildsService.java   
/**
 * Returns an observable on the progress of loading stock build orders into the local SQLite DB.
 * Should be scheduled on a worker thread.
 *
 * @param c context
 * @param forceLoad if false, builds are only copied if an upgrade is required. If true,
 *                  standard builds are always copied.
 * @return observable on load progress (percentage)
 */
public static Observable<Integer> getLoadStandardBuildsIntoDBObservable(final Context c, final boolean forceLoad) {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull final ObservableEmitter<Integer> emitter) throws Exception {
            try {
                if (!emitter.isDisposed()) {
                    loadStandardBuildsIntoDB(c, forceLoad, new DbAdapter.ProgressListener() {
                        @Override
                        public void onProgressUpdate(int percent) {
                            if (!emitter.isDisposed()) {
                                emitter.onNext(percent);
                            }
                        }
                    });
                    emitter.onComplete();
                }
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    });
}
项目:ReactiveAirplaneMode    文件:ReactiveAirplaneMode.java   
/**
 * Observes Airplane Mode state of the device with BroadcastReceiver.
 * RxJava2 Observable emits true if the airplane mode turns on and false otherwise.
 *
 * @param context of the Application or Activity
 * @return RxJava2 Observable with Boolean value indicating state of the airplane mode
 */
public Observable<Boolean> observe(final Context context) {
  checkContextIsNotNull(context);
  final IntentFilter filter = createIntentFilter();

  return Observable.create(new ObservableOnSubscribe<Boolean>() {
    @Override public void subscribe(@NonNull final ObservableEmitter<Boolean> emitter)
        throws Exception {
      final BroadcastReceiver receiver = createBroadcastReceiver(emitter);
      context.registerReceiver(receiver, filter);

      final Disposable disposable = disposeInUiThread(new Action() {
        @Override public void run() throws Exception {
          tryToUnregisterReceiver(receiver, context);
        }
      });

      emitter.setDisposable(disposable);
    }
  });
}
项目:AssistantBySDK    文件:AccountingActivity.java   
/**
 * 更新余额、当日收支
 **/
public void updateBalance(final int type, final List<TaskCard<Accounting>> taskcards) {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            CountToday();
            CountBalance(type, taskcards);
            e.onNext(0);
        }
    })
            .subscribeOn(Schedulers.io())   //执行订阅(subscribe())所在线程
            .observeOn(AndroidSchedulers.mainThread())  //响应订阅(Sbscriber)所在线程
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    if (AppConfig.dPreferences.getBoolean(AppConfig.HAS_AMOUNT, false))
                        mTvBalance.setText("¥" + AssistUtils.formatAmount(balance));
                    mAdapter.notifyItemChanged(0);
                }
            });
}
项目:RxGps    文件:RxGps.java   
private Observable<Boolean> checkPlayServicesAvailable() {
    return Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(ObservableEmitter<Boolean> e) throws Exception {
            final Activity activity = activityReference.get();
            if (activity != null) {
                final GoogleApiAvailability apiAvailability = GoogleApiAvailability.getInstance();
                final int status = apiAvailability.isGooglePlayServicesAvailable(activity);

                if (status != ConnectionResult.SUCCESS) {
                    e.onError(new PlayServicesNotAvailableException());
                } else {
                    e.onNext(true);
                    e.onComplete();
                }
            }
        }
    });
}
项目:code-examples-android-expert    文件:RxJavaUnitTest.java   
@Test public void test(){
    Observable<Todo> todoObservable = Observable.create(new ObservableOnSubscribe<Todo>() {
        @Override
        public void subscribe(ObservableEmitter<Todo> emitter) throws Exception {
            try {
                List<Todo> todos = RxJavaUnitTest.this.getTodos();
                if (todos!=null){
                    throw new NullPointerException("todos was null");
                }
                for (Todo todo : todos) {
                    emitter.onNext(todo);
                }
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    });
    TestObserver<Object> testObserver = new TestObserver<>();
    todoObservable.subscribeWith(testObserver);
    testObserver.assertError(NullPointerException.class);

}
项目:SAF-AOP    文件:DemoForTraceActivity.java   
@Trace(enable = false)
private void initData() {

    Observable.create(new ObservableOnSubscribe<String>() {

        @Trace
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {

            e.onNext("111");
            e.onNext("222");
            e.onNext("333");

        }
    }).subscribe(new Consumer<String>() {

        @Trace
        @Override
        public void accept(@NonNull String str) throws Exception {

        }
    });
}
项目:simple-stack    文件:DatabaseManager.java   
public void openDatabase() {
    disposable = Observable.create((ObservableOnSubscribe<Realm>) emitter -> {
        final Realm observableRealm = Realm.getDefaultInstance();
        final RealmChangeListener<Realm> listener = realm -> {
            if(!emitter.isDisposed()) {
                emitter.onNext(observableRealm);
            }
        };
        observableRealm.addChangeListener(listener);
        emitter.setDisposable(Disposables.fromAction(() -> {
            observableRealm.removeChangeListener(listener);
            observableRealm.close();
        }));
        emitter.onNext(observableRealm);
    }).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler()).subscribe();
}
项目:RxRetroJsoup    文件:RxJsoup.java   
public Observable<String> text(final Element element, final String expression) {
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
            final Elements elements = element.select(expression);
            if (elements.isEmpty() && exceptionIfNotFound) {
                observableEmitter.onError(new NotFoundException(expression, element.toString()));
            } else {
                if (elements.isEmpty()) {
                    observableEmitter.onNext("");
                } else {
                    for (Element e : elements) {
                        observableEmitter.onNext(e.text());
                    }
                }
                observableEmitter.onComplete();
            }
        }


    });
}
项目:RxJava4AndroidDemos    文件:Create.java   
@Override
public void test3() {
    Log.i(TAG, "test3() Create simple demo, onNext() twice");
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
            for (int i = 0; i < 3; i++) {
                e.onNext(String.valueOf(i));
            }
        }
    });
    for (int time = 0; time < 2; time++) {
        observable.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Consumer<String> accept() s: " + s);
            }
        });
    }
}
项目:Reactive-Programming-With-Java-9    文件:DemoObservable.java   
public static void main(String[] args) {
 Observable<String> month_observable = Observable.create(new 
      ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter)    
           throws Exception {
            // TODO Auto-generated method stub
            try {
                String[] monthArray = { "Jan", "Feb", "Mar",    
                       "Apl", "May", "Jun", "July", "Aug",  
                       "Sept", "Oct","Nov", "Dec" };

                List<String> months = Arrays.asList(monthArray);

                for (String month : months) {
                    emitter.onNext(month);
                }
                emitter.onComplete();
            } catch (Exception e) {
                // TODO: handle exception
                emitter.onError(e);
            }
        }
    });
     month_observable.subscribe(s -> System.out.println(s));
}
项目:RxJava2-Android-Sample    文件:ThrottleLastExampleActivity.java   
private Observable<Integer> getObservable() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // send events with simulated time wait
            Thread.sleep(0);
            emitter.onNext(1); // skip
            emitter.onNext(2); // deliver
            Thread.sleep(505);
            emitter.onNext(3); // skip
            Thread.sleep(99);
            emitter.onNext(4); // skip
            Thread.sleep(100);
            emitter.onNext(5); // skip
            emitter.onNext(6); // deliver
            Thread.sleep(305);
            emitter.onNext(7); // deliver
            Thread.sleep(510);
            emitter.onComplete();
        }
    });
}
项目:PXLSRT    文件:TempStorageUtils.java   
static Observable<Boolean> storeFile(final FileOutputStream fos, final Bitmap bitmap, final int quality) {
    return Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
            bitmap.compress(Bitmap.CompressFormat.JPEG, quality, fos);
            bitmap.recycle();
            try {
                fos.flush();
                fos.close();
                emitter.onNext(true);
            } catch (IOException e) {
                e.printStackTrace();
                emitter.onError(e);
            }
        }
    });
}
项目:Aurora    文件:HistoryModel.java   
@Override
public Observable<List<VideoDaoEntity>> getListFromNet(int start, String userid) {
    return Observable.create((ObservableOnSubscribe<List<VideoDaoEntity>>) emitter -> {

        BmobQuery<VideoDaoEntity> query = new BmobQuery<VideoDaoEntity>();
        query.addWhereEqualTo("userId", userid);
        query.setLimit(10);
        query.order("-updatedAt");
        query.setSkip(start);
        query.findObjects(new FindListener<VideoDaoEntity>() {
            @Override
            public void done(List<VideoDaoEntity> list, BmobException e) {
                List<VideoDaoEntity> infolist = new ArrayList<VideoDaoEntity>();
                if (!StringUtils.isEmpty(list)) {
                    for (VideoDaoEntity entity1 : list) {
                        entity1.setVideo(mGson.fromJson(entity1.getBody(), VideoListInfo.Video.VideoData.class));
                        infolist.add(entity1);
                    }
                }
                emitter.onNext(infolist);
            }
        });
    });
}
项目:Mount    文件:MountReceiver.java   
private void onActionPackageFullyRemoved(final Intent intent) {
    Observable.create(
            new ObservableOnSubscribe<Boolean>() {
                @Override
                public void subscribe(ObservableEmitter<Boolean> e) throws Exception {
                    // prefix "package:"
                    String packageName = intent.getData().toString().substring(8);
                    List<PackageRecord> list = PackageRecord.listAll(PackageRecord.class);
                    for (PackageRecord record : list) {
                        if (TextUtils.equals(record.name, packageName)) {
                            record.delete();
                        }
                    }

                    e.onNext(true);
                    e.onComplete();
                }
            })
            .subscribeOn(Schedulers.newThread())
            .subscribe();
}
项目:android-study    文件:RxJavaFragment.java   
/**
 * sample操作符每隔指定的时间就从上游中取出一个事件发送给下游.
 */
private void doSample() {
  Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
      for (int i = 0; i < 1000; i++) {  //模拟无限循环发送事件
        emitter.onNext(i);
      }
    }
  })
      .subscribeOn(Schedulers.io())
      .sample(1, TimeUnit.MILLISECONDS)
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Consumer<Integer>() {
        @Override public void accept(Integer integer) throws Exception {
          Log.d(TAG, "" + integer);
        }
      });
}
项目:Dalaran    文件:CourseStore.java   
/**
 * 查找数据
 *
 * @param cacheId
 * @return
 */
public Observable<Course> findDataByIdentifier(@NonNull final String cacheId) {
    Observable<Course> courseObservable = Observable.create(new ObservableOnSubscribe<Course>() {
        @Override
        public void subscribe(ObservableEmitter<Course> e) throws Exception {
            Util.logMethodThreadId("findDataByIdentifier");
            long time = System.currentTimeMillis();
            try {
                Course result = (Course) mIDBEngine.find(cacheId, Course.class);
                time = System.currentTimeMillis() - time;
                Util.log("<-- End getCache2Disk(" + time + "):" + "[identifier] = " + cacheId + " [data] = " + (result != null ? result.getData() : "null"));
                if (result != null) {
                    e.onNext(result);
                }
                e.onComplete();
            } catch (XDBException d) {
                e.onError(d);
            }
        }
    });
    return courseObservable;

}
项目:RxEasyHttp    文件:MainActivity.java   
@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
            FileUtils.getFileFromAsset(MainActivity.this, "1.jpg");
        }
    }).compose(RxUtil.<String>io_main()).subscribe(new Consumer<String>() {
        @Override
        public void accept(@NonNull String s) throws Exception {

        }
    });
}
项目:Reactive-Programming-With-Java-9    文件:Demo_Schedulers_IO.java   
public static void main(String[] args) {
    // TODO Auto-generated method stub
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // TODO Auto-generated method stub
            System.out.println("Thread:-"+Thread.currentThread().getName());
            emitter.onNext(getNum());
            emitter.onComplete();
        }
    }).subscribeOn(Schedulers.io()).subscribe(new Consumer<Integer>() {

        @Override
        public void accept(Integer value) throws Exception {
            // TODO Auto-generated method stub
            System.out.println("Thread for subscription:-"+Thread.currentThread().getName());
            System.out.println(value);

        }

    });
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    System.out.println("end of main:-"+Thread.currentThread().getName());
}
项目:Aurora    文件:HistoryModel.java   
@Override
public Observable<Boolean> deleteFromNet(VideoDaoEntity entity) {
    return Observable.create((ObservableOnSubscribe<Boolean>) emitter -> {
        entity.delete(new UpdateListener() {
            @Override
            public void done(BmobException e) {
                if (e == null) {
                    emitter.onNext(true);
                }
            }
        });
    });
}
项目:RxSharedPreferences    文件:RxSharedPreferences.java   
public Observable<Integer> getInt(final String key, final int defaultValue) {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            e.onNext(sharedPreferences.getInt(key, defaultValue));
            e.onComplete();
        }
    });
}
项目:geotemporal    文件:BTree.java   
private Observable<Value> range(Node<Key, Value> x, Key lowerInclusive, Key upperExclusive,
        int height) {
    return Observable.create(new ObservableOnSubscribe<Value>() {

        @Override
        public void subscribe(ObservableEmitter<Value> emitter) throws Exception {
            range(x, lowerInclusive, upperExclusive, height, emitter);
            if (!emitter.isDisposed()) {
                emitter.onComplete();
            }
        }
    });
}
项目:GitHub    文件:MapExampleActivity.java   
private Observable<List<ApiUser>> getObservable() {
    return Observable.create(new ObservableOnSubscribe<List<ApiUser>>() {
        @Override
        public void subscribe(ObservableEmitter<List<ApiUser>> e) throws Exception {
            if (!e.isDisposed()) {
                e.onNext(Utils.getApiUserList());
                e.onComplete();
            }
        }
    });
}
项目:screen-share-to-browser    文件:RecordService.java   
private Observable<ImageInfo> getByteBufferObservable() {
    return Observable.create(new ObservableOnSubscribe<ImageInfo>() {
        @Override
        public void subscribe(ObservableEmitter<ImageInfo> e) throws Exception {
            imageInfoObservableEmitter = e;
            Log.d(TAG, "subscribe: " + Process.myTid());
        }
    });
}
项目:GitHub    文件:ZipExampleActivity.java   
private Observable<List<User>> getFootballFansObservable() {
    return Observable.create(new ObservableOnSubscribe<List<User>>() {
        @Override
        public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
            if (!e.isDisposed()) {
                e.onNext(Utils.getUserListWhoLovesFootball());
                e.onComplete();
            }
        }
    });
}
项目:Assembler    文件:HotelMainDataSourceLocal.java   
@Override
public Observable<String> getContent() {
    return Observable.create(
        new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter)
                throws Exception {
                emitter.onNext("酒店页面");
                emitter.onComplete();
            }
        }
    );
}
项目:GitHub    文件:RealmObservableFactory.java   
@Override
public <E> Observable<CollectionChange<RealmResults<E>>> changesetsFrom(Realm realm, final RealmResults<E> results) {
    final RealmConfiguration realmConfig = realm.getConfiguration();
    return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmResults<E>>>() {
        @Override
        public void subscribe(final ObservableEmitter<CollectionChange<RealmResults<E>>> emitter) throws Exception {
            // Gets instance to make sure that the Realm is open for as long as the
            // Observable is subscribed to it.
            final Realm observableRealm = Realm.getInstance(realmConfig);
            resultsRefs.get().acquireReference(results);
            final OrderedRealmCollectionChangeListener<RealmResults<E>> listener = new OrderedRealmCollectionChangeListener<RealmResults<E>>() {
                @Override
                public void onChange(RealmResults<E> e, OrderedCollectionChangeSet changeSet) {
                    if (!emitter.isDisposed()) {
                        emitter.onNext(new CollectionChange<RealmResults<E>>(results, changeSet));
                    }
                }
            };
            results.addChangeListener(listener);

            // Cleanup when stream is disposed
            emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
                @Override
                public void run() {
                    results.removeChangeListener(listener);
                    observableRealm.close();
                    resultsRefs.get().releaseReference(results);
                }
            }));

            // Emit current value immediately
            emitter.onNext(new CollectionChange<>(results, null));
        }
    });
}