Java 类io.reactivex.BackpressureStrategy 实例源码

项目:EasyHttp    文件:RxEasyHttpManager.java   
/**
 * Post请求的Rxjava方式.
 * @param url
 * @param requestParams
 * @return
 */
public <T> Flowable<T> post(String url, EasyRequestParams requestParams, RxEasyConverter<T> rxEasyConverter) {
    FormBody.Builder builder = new FormBody.Builder();
    ConcurrentHashMap<String, String> paramsMap = requestParams.getRequestParams();
    for (ConcurrentHashMap.Entry<String, String> entry : paramsMap.entrySet()) {
        builder.add(entry.getKey(), entry.getValue());
    }

    RequestBody requestBody = builder.build();
    final Request request = new Request.Builder()
            .url(url)
            .post(requestBody)
            .build();

    Call call = EasyHttpClientManager.getInstance().getOkHttpClient(EasyCacheType.CACHE_TYPE_DEFAULT).newCall(request);

    return Flowable.create(new CallFlowableOnSubscribe(call,rxEasyConverter), BackpressureStrategy.BUFFER)
            .subscribeOn(Schedulers.io());
}
项目:smart-asset-iot-android-demo    文件:RxLocationManager.java   
@SuppressWarnings("MissingPermission")
@RequiresPermission(anyOf = {
        Manifest.permission.ACCESS_COARSE_LOCATION,
        Manifest.permission.ACCESS_FINE_LOCATION
})
public void startLocationUpdates(boolean checkLocationSettings) {
    stopLocationUpdates();
    locationUpdatesDisposable = locationSettingsCheck(checkLocationSettings)
            .flatMapObservable(ignore -> locationUpdates()
                    .startWith(lastLocation()))
            .map(this::transformLocation)
            .toFlowable(BackpressureStrategy.LATEST)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::setLocation,
                    error -> Timber.e("Failed to get location updates", error));
}
项目:smart-lens    文件:BarcodeScannerFragment.java   
@Override
public void onImageCapture(@Nullable byte[] imageBytes) {
    //Process the image using Tf.
    Flowable<BarcodeInfo> flowable = Flowable.create(e -> {
        Bitmap bitmap = null;
        if (imageBytes != null) bitmap = CameraUtils.bytesToBitmap(imageBytes);
        if (bitmap != null) e.onNext(mBarcodeScanner.scan(bitmap));
        e.onComplete();
    }, BackpressureStrategy.DROP);

    final Subscription[] subscriptions = new Subscription[1];
    flowable.subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnSubscribe(subscription -> subscriptions[0] = subscription)
            .doOnError(t -> {
                Timber.e(t.getMessage());
                subscriptions[0].cancel();
            })
            .doOnComplete(() -> subscriptions[0].cancel())
            .subscribe(barcodeInfo -> {
                //TODO Display the info
            });
}
项目:SAF-AOP    文件:AsyncAspect.java   
private void asyncMethod(final ProceedingJoinPoint joinPoint) throws Throwable {

        Flowable.create(new FlowableOnSubscribe<Object>() {
                            @Override
                            public void subscribe(FlowableEmitter<Object> e) throws Exception {
                                Looper.prepare();
                                try {
                                    joinPoint.proceed();
                                } catch (Throwable throwable) {
                                    throwable.printStackTrace();
                                }
                                Looper.loop();
                            }
                        }
                , BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();
    }
项目:Reactive-Programming-With-Java-9    文件:DemoFlowable.java   
public static void main(String[] args) {
    // TODO Auto-generated method stub
    Flowable<String> month_maybe = Flowable.create(emitter -> {
        try {
            String[] monthArray = { "Jan", "Feb", "Mar", "Apl", "May", "Jun", "July", "Aug", "Sept", "Oct", "Nov",
                    "Dec" };

            List<String> months = Arrays.asList(monthArray);
            for (String month : months) {
                emitter.onNext(month);
            }
            emitter.onComplete();

        } catch (Exception e) {
            emitter.onError(e);
        }
    },BackpressureStrategy.MISSING);
    month_maybe.subscribe(s -> System.out.println(s));

}
项目:TurboChat    文件:MessageViewModelTest.java   
@Test
public void shouldPassMessageToLocalStreamWhenSendMessage() throws Exception {

    String message = "hello @alex http://youtube.com/q=look (love) there @yui you go http://twitter.com";

    final List<Link> expectedLinks = asList(
            new Link("http://youtube.com/q=look", ""),
            new Link("http://twitter.com", ""));

    when(userResolver.getLoggedInUser()).thenReturn(TestUtils.createMockUser());

    final TestSubscriber<Message> userTestSubscriber = new TestSubscriber<>();
    final Observable<Message> messageViewModelMessages = messageViewModel.localMessageStream();
    messageViewModelMessages.toFlowable(BackpressureStrategy.LATEST).subscribe(userTestSubscriber);
    final Message m = new Message(id, message, Arrays.asList("alex", "yui"),
            Arrays.asList("love"), expectedLinks, TestUtils.createMockUser());

    messageViewModel.sendMessage(message, Arrays.asList("love"), sendScheduler);
    userTestSubscriber.assertValue(m);

}
项目:reactivejournal    文件:RxJournalBackPressureBuffer.java   
public static void main(String[] args) throws IOException {

        ReactiveJournal reactiveJournal = new ReactiveJournal("/tmp/fastproducer");
        reactiveJournal.clearCache();
        Flowable<Long> fastProducer = FastProducerSlowConsumer.createFastProducer(BackpressureStrategy.MISSING, 500);

        reactiveJournal.createReactiveRecorder().recordAsync(fastProducer,"input");
        PlayOptions options = new PlayOptions().filter("input").replayRate(PlayOptions.ReplayRate.FAST);
        Flowable journalInput = new RxJavaPlayer(reactiveJournal).play(options);

        Consumer onNextSlowConsumer = FastProducerSlowConsumer.createOnNextSlowConsumer(10);

        long startTime = System.currentTimeMillis();
        journalInput.subscribe(onNextSlowConsumer::accept,
                e -> System.out.println("ReactiveRecorder " + " " + e),
                () -> System.out.println("ReactiveRecorder complete [" + (System.currentTimeMillis()-startTime) + "]")
        );

    }
项目:buffer-slayer    文件:RxReporter.java   
private RxReporter(Builder<M, R> builder) {
  this.sender = builder.sender;
  this.metrics = builder.metrics;

  this.messageTimeoutNanos = builder.messageTimeoutNanos;
  this.bufferedMaxMessages = builder.bufferedMaxMessages;
  this.pendingMaxMessages = builder.pendingMaxMessages;
  this.overflowStrategy = builder.overflowStrategy;
  this.scheduler = builder.scheduler;

  Flowable<SendingTask<M>> flowable = Flowable.create(this, BackpressureStrategy.MISSING);
  initBackpressurePolicy(flowable)
      .observeOn(Schedulers.single())
      .groupBy(new MessagePartitioner())
      .subscribe(new MessageGroupSubscriber(messageTimeoutNanos, bufferedMaxMessages, sender, scheduler));
}
项目:RxPaper2    文件:RxPaperBookTest.java   
@Test
public void testUpdatesChecked() throws Exception {
    RxPaperBook book = RxPaperBook.with("UPDATES_CH", Schedulers.trampoline());
    final String key = "hello";
    final ComplexObject value = ComplexObject.random();
    final TestSubscriber<ComplexObject> updatesSubscriber = TestSubscriber.create();
    book.observe(key, ComplexObject.class, BackpressureStrategy.MISSING).subscribe(updatesSubscriber);
    updatesSubscriber.assertValueCount(0);
    book.write(key, value).subscribe();
    updatesSubscriber.assertValueCount(1);
    updatesSubscriber.assertValues(value);
    final ComplexObject newValue = ComplexObject.random();
    book.write(key, newValue).subscribe();
    updatesSubscriber.assertValueCount(2);
    updatesSubscriber.assertValues(value, newValue);
    // Error value
    final int wrongValue = 3;
    book.write(key, wrongValue).test().assertComplete().assertNoErrors();
    updatesSubscriber.assertValueCount(2);
    updatesSubscriber.assertValues(value, newValue);
    updatesSubscriber.assertNoErrors();
}
项目:rxjavatraining    文件:TibcoObservableTest.java   
@Test
public void createYourOwnTibco() throws Exception {
    Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
        @Override
        public void subscribe(FlowableEmitter<String> e) throws Exception {
            while (!e.isCancelled()) {
                long numberRecords = e.requested();
                System.out.println(numberRecords);
                if (numberRecords > 0) {
                }
            }
        }
    }, BackpressureStrategy.BUFFER);


    flowable.map(x -> x + "Yay!").subscribe(System.out::println);
}
项目:RxProgress    文件:RxProgress.java   
private <T> Flowable<T> forFlowable(Flowable<T> source, BackpressureStrategy backpressureStrategy) {
    return Flowable.using(this::makeDialog,
            new Function<ProgressDialog, Publisher<? extends T>>() {
                @Override
                public Publisher<? extends T> apply(@NonNull ProgressDialog dialog) throws Exception {
                    return Flowable.create(emitter -> {
                        if (builder.cancelable) {
                            dialog.setOnCancelListener(dialogInterface -> emitter.onComplete());
                        }
                        dialog.setOnDismissListener(dialogInterface -> emitter.onComplete());
                        source.subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
                    }, backpressureStrategy);
                }
            }, Dialog::dismiss);
}
项目:TurboChat    文件:MessageViewModelTest.java   
@Test
public void shouldReturnApiStream() throws Exception {

    final TestSubscriber<Message> userTestSubscriber = new TestSubscriber<>();
    final Observable<Message> messageViewModelMessages = messageViewModel.apiSendMessageStream();
    messageViewModelMessages.toFlowable(BackpressureStrategy.LATEST).subscribe(userTestSubscriber);

    userTestSubscriber.assertNoErrors();
    userTestSubscriber.assertNoValues();

}
项目:GitHub    文件:TasksLocalDataSource.java   
@Override
public Flowable<List<Task>> getTasks() {
    String[] projection = {
            TaskEntry.COLUMN_NAME_ENTRY_ID,
            TaskEntry.COLUMN_NAME_TITLE,
            TaskEntry.COLUMN_NAME_DESCRIPTION,
            TaskEntry.COLUMN_NAME_COMPLETED
    };
    String sql = String.format("SELECT %s FROM %s", TextUtils.join(",", projection), TaskEntry.TABLE_NAME);
    return mDatabaseHelper.createQuery(TaskEntry.TABLE_NAME, sql)
            .mapToList(mTaskMapperFunction)
            .toFlowable(BackpressureStrategy.BUFFER);
}
项目:GitHub    文件:TasksLocalDataSource.java   
@Override
public Flowable<Optional<Task>> getTask(@NonNull String taskId) {
    String[] projection = {
            TaskEntry.COLUMN_NAME_ENTRY_ID,
            TaskEntry.COLUMN_NAME_TITLE,
            TaskEntry.COLUMN_NAME_DESCRIPTION,
            TaskEntry.COLUMN_NAME_COMPLETED
    };
    String sql = String.format("SELECT %s FROM %s WHERE %s LIKE ?",
            TextUtils.join(",", projection), TaskEntry.TABLE_NAME, TaskEntry.COLUMN_NAME_ENTRY_ID);
    return mDatabaseHelper.createQuery(TaskEntry.TABLE_NAME, sql, taskId)
            .mapToOneOrDefault(cursor -> Optional.of(mTaskMapperFunction.apply(cursor)), Optional.<Task>absent())
            .toFlowable(BackpressureStrategy.BUFFER);
}
项目:GitHub    文件:RxJava2CallAdapter.java   
@Override public Object adapt(Call<R> call) {
  Observable<Response<R>> responseObservable = isAsync
      ? new CallEnqueueObservable<>(call)
      : new CallExecuteObservable<>(call);

  Observable<?> observable;
  if (isResult) {
    observable = new ResultObservable<>(responseObservable);
  } else if (isBody) {
    observable = new BodyObservable<>(responseObservable);
  } else {
    observable = responseObservable;
  }

  if (scheduler != null) {
    observable = observable.subscribeOn(scheduler);
  }

  if (isFlowable) {
    return observable.toFlowable(BackpressureStrategy.LATEST);
  }
  if (isSingle) {
    return observable.singleOrError();
  }
  if (isMaybe) {
    return observable.singleElement();
  }
  if (isCompletable) {
    return observable.ignoreElements();
  }
  return observable;
}
项目:GitHub    文件:DownloadType.java   
private Publisher<DownloadStatus> save(final Response<ResponseBody> response) {
    return Flowable.create(new FlowableOnSubscribe<DownloadStatus>() {
        @Override
        public void subscribe(FlowableEmitter<DownloadStatus> e) throws Exception {
            record.save(e, response);
        }
    }, BackpressureStrategy.LATEST);
}
项目:AndroidSensors    文件:SensorGathererTest.java   
@Before
public void setUp() throws Exception {
    if (sensorGatherer == null)
        throw new Error("sensorGatherer must be initialized before calling super.setUp() method");

    when(sensorConfig.getBackpressureStrategy(any(SensorType.class))).thenReturn(BackpressureStrategy.BUFFER);
    when(permissionChecker.isPermissionGranted()).thenReturn(true);
    when(sensorChecker.isReady(any(SensorType.class))).thenReturn(true);
}
项目:GitHub    文件:RxUtil.java   
/**
 * 生成Flowable
 * @param <T>
 * @return
 */
public static <T> Flowable<T> createData(final T t) {
    return Flowable.create(new FlowableOnSubscribe<T>() {
        @Override
        public void subscribe(FlowableEmitter<T> emitter) throws Exception {
            try {
                emitter.onNext(t);
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.BUFFER);
}
项目:GitHub    文件:ThrottleSearchActivity.java   
@Override
protected void onResume() {
    super.onResume();

    // Listen to key presses and only start search after user paused to avoid excessive redrawing on the screen.
    disposable = RxTextView.textChangeEvents(searchInputView)
            .debounce(200, TimeUnit.MILLISECONDS) // default Scheduler is Schedulers.computation()
            .observeOn(AndroidSchedulers.mainThread()) // Needed to access Realm data
            .toFlowable(BackpressureStrategy.BUFFER)
            .switchMap(textChangeEvent -> {
                // Use Async API to move Realm queries off the main thread.
                // Realm currently doesn't support the standard Schedulers.
                return realm.where(Person.class)
                        .beginsWith("name", textChangeEvent.text().toString())
                        .findAllSortedAsync("name")
                        .asFlowable();
            })
            // Only continue once data is actually loaded
            // RealmObservables will emit the unloaded (empty) list as its first item
            .filter(people -> people.isLoaded())
            .subscribe(people -> {
                searchResultsView.removeAllViews();
                for (Person person : people) {
                    TextView view = new TextView(ThrottleSearchActivity.this);
                    view.setText(person.getName());
                    searchResultsView.addView(view);
                }
            }, throwable -> throwable.printStackTrace());
}
项目:DailyStudy    文件:RxJavaActivity.java   
private void flowable() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
            Log.e(TAG, "start send data ");
            for (int i = 0; i < 100; i++) {
                e.onNext(i);
            }
            e.onComplete();
        }
    }, BackpressureStrategy.DROP)//指定背压策略
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<Integer>() {
                @Override
                public void onSubscribe(@NonNull Subscription s) {
                    //1, onSubscribe 是2.x新添加的方法,在发射数据前被调用,相当于1.x的onStart方法
                    //2, 参数为  Subscription ,Subscription 可用于向上游请求发射多少个元素,也可用于取笑请求
                    //3,  必须要调用Subscription 的request来请求发射数据,不然上游是不会发射数据的。
                    Log.e(TAG, "onSubscribe...");
                    s.request(10);
                }

                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG, "onNext:" + integer);
                }

                @Override
                public void onError(Throwable t) {
                    Log.e(TAG, "onError..." + t);
                }

                @Override
                public void onComplete() {
                    Log.e(TAG, "onComplete...");
                }
            });

}
项目:MBEStyle    文件:IconPresenter.java   
public void calcIconTotal() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> flowableEmitter) throws Exception {
            XmlResourceParser xml = mView.getResources().getXml(R.xml.drawable);
            int total = 0;

            while (xml.getEventType() != XmlResourceParser.END_DOCUMENT) {
                if (xml.getEventType() == XmlPullParser.START_TAG) {
                    if (xml.getName().startsWith("item")) {
                        total++;
                    }
                }
                xml.next();
            }

            flowableEmitter.onNext(total);
        }
    }, BackpressureStrategy.BUFFER)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    mView.setIconTotal(integer);
                }
            });
}
项目:vt-support    文件:StorageImpl.java   
@Override
public void putEntries(Observable<Entry> entries) {
  final String insert =
      "INSERT OR REPLACE INTO TILES(zoom_level, tile_column, tile_row, tile_data)"
          + " values (?, ?, ?, ?);";

  final Observable<Object> params = entries.concatMap(entry -> {
    byte[] compressedMvt;
    try {
      compressedMvt = CompressUtil.getCompressedAsGzip(entry.getVector());
    } catch (final IOException ex) {
      throw Exceptions.propagate(ex);
    }

    return Observable.<Object>just(entry.getZoomLevel(), entry.getColumn(),
        flipY(entry.getRow(), entry.getZoomLevel()), compressedMvt);
  })
      // source: https://github.com/davidmoten/rxjava-jdbc/pull/46/files
      .toList()
      .flattenAsObservable(objects -> objects);

  // TODO update when upstream is enhanced
  dataSource.update(insert)
      .parameterStream(params.toFlowable(BackpressureStrategy.BUFFER))
      .counts()
      .test() // TODO remove hack
      .awaitDone(5, TimeUnit.SECONDS)
      .assertComplete();
}
项目:ratpack-rx2    文件:RxRatpack.java   
/**
 * @param promise
 * @param strategy The {@link BackpressureStrategy} to use
 * @param <T>
 * @return
 * @see RxRatpack#observe(Promise)
 */
public static <T> Flowable<T> flow(Promise<T> promise, BackpressureStrategy strategy) {
  return Flowable.create(subscriber ->
      promise.onError(subscriber::onError).then(value -> {
        subscriber.onNext(value);
        subscriber.onComplete();
      }),
    strategy);
}
项目:Mix    文件:MainActivity.java   
/**
 * 水缸的更多处理
 */
private void flowableSize() {

    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 10000; i++) {
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io())
            //DROP和LATEST的区别在于,当发出事件数量有限时,后者一定会接收到最后一条数据,如这里的9999,而DROP是连续性的
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Integer>() {

                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "onSubscribe");
                    mSubscription = s;
                    s.request(128);
                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: " + integer);
                }

                @Override
                public void onError(Throwable t) {
                    Log.w(TAG, "onError: ", t);
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}
项目:GetStartRxJava2.0    文件:BaseFlowableActivity.java   
private void doRxJavaWork() {
    Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
        @Override
        public void subscribe(FlowableEmitter<String> e) throws Exception {
            e.onNext("事件1");
            e.onNext("事件2");
            e.onNext("事件3");
            e.onNext("事件4");
            e.onComplete();
        }
    }, BackpressureStrategy.BUFFER);

    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onSubscribe(Subscription s) {
            Log.d(TAG, "onSubscribe: ");
            s.request(Long.MAX_VALUE);

        }

        @Override
        public void onNext(String string) {
            Log.d(TAG, "onNext: " + string);
        }

        @Override
        public void onError(Throwable t) {
            Log.d(TAG, "onError: " + t.toString());
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete: ");

        }
    };

    flowable.subscribe(subscriber);
}
项目:EvolvingNetLib    文件:CCRequest.java   
/**
 * 获取内存缓存请求Flowable对象
 *
 * @return 内存缓存查询Flowable对象
 */
private Flowable<CCBaseResponse<T>> getMemoryCacheQueryFlowable() {
    //内存缓存数据获取
    return Flowable.create(new FlowableOnSubscribe<CCBaseResponse<T>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<CCBaseResponse<T>> e) throws Exception {

            T response = null;

            try {

                if (ccCacheQueryCallback != null) {
                    response = ccCacheQueryCallback.<T>onQueryFromMemory(cacheKey);
                }

                CCBaseResponse<T> tccBaseResponse = new CCBaseResponse<T>(response, true, true, false);

                e.onNext(tccBaseResponse);
                e.onComplete();

            } catch (Exception exception) {

                switch (cacheQueryMode) {
                    case CCCacheMode.QueryMode.MODE_ONLY_MEMORY:
                        e.onError(new CCDiskCacheQueryException(exception));
                        break;
                    default:
                        e.onComplete();
                        break;
                }

            }
        }
    }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io());

}
项目:ocraft-s2client    文件:S2Client.java   
private S2Client(Builder builder) {

        connectToIp = builder.connectToIp;
        connectToPort = builder.connectToPort;
        traced = builder.traced;
        tracer = builder.tracer;
        game = builder.game;

        log.info("Starting: {}", this);

        Channel channel = channelProvider.getChannel();
        responseStream = channel.outputStream().mergeWith(channel.errorStream())
                .map(this::prepareResponse)
                .toFlowable(BackpressureStrategy.ERROR)
                .onBackpressureBuffer(cfg().getInt(OcraftConfig.CLIENT_BUFFER_SIZE_RESPONSE_BACKPRESSURE))
                .observeOn(Schedulers.computation(), false, cfg().getInt(CLIENT_BUFFER_SIZE_RESPONSE_STREAM))
                .publish()
                .autoConnect()
                .doOnSubscribe(s -> await.register())
                .doOnCancel(await::arriveAndDeregister);

        responseStream().subscribe(this);
        await.arriveAndDeregister();

        Optional.ofNullable(game).ifPresent(s2Controller -> {
            responseStream().subscribe(s2Controller);
            await.arriveAndDeregister();
        });

        channelProvider.start(connectToIp, connectToPort);
    }
项目:AndroidMVPresenter    文件:AbstractPresenter.java   
@Override
public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
    return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
        @Override
        public Publisher<?> apply(Throwable throwable) throws Exception {
            if(++retryCount <= maxRetries){
                return todoBeforeRetry.apply(throwable).toFlowable(BackpressureStrategy.BUFFER);
            }
            return Flowable.error(throwable);
        }
    });
}
项目:RX_Demo    文件:Rx2Test2Activity.java   
private void flowableTest() {
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 128; i++) {
                    Log.d(TAG, "emit " + i);
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR)//增加了一个参数
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
//                        s.request(Long.MAX_VALUE);  //注意这句代码
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);

                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }
项目:YiZhi    文件:RxHelper.java   
/**
 * 生成Flowable
 *
 * @param t
 * @return Flowable
 */
public static <T> Flowable<T> createFlowable(final T t) {
    return Flowable.create(new FlowableOnSubscribe<T>() {
        @Override
        public void subscribe(FlowableEmitter<T> emitter) throws Exception {
            try {
                emitter.onNext(t);
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.BUFFER);
}
项目:KomaMusic    文件:LocalDataSource.java   
@Override
public Flowable<List<Song>> getAllSongs() {
    return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
            e.onNext(SongsPresenter.getAllSongs());
            e.onComplete();
        }
    }, BackpressureStrategy.LATEST);
}
项目:KomaMusic    文件:LocalDataSource.java   
@Override
public Flowable<List<Playlist>> getAllPlaylists() {
    return Flowable.create(new FlowableOnSubscribe<List<Playlist>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<List<Playlist>> e) throws Exception {
            e.onNext(PlaylistsPresenter.getAllPlaylists());
            e.onComplete();
        }
    }, BackpressureStrategy.LATEST);
}
项目:KomaMusic    文件:LocalDataSource.java   
@Override
public Flowable<List<Album>> getAllAlbums() {
    return Flowable.create(new FlowableOnSubscribe<List<Album>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<List<Album>> e) throws Exception {
            e.onNext(AlbumsPresenter.getAllAlbums());
            e.onComplete();
        }
    }, BackpressureStrategy.LATEST);
}
项目:KomaMusic    文件:LocalDataSource.java   
@Override
public Flowable<List<Song>> getAlbumSongs(final long albumId) {
    return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
            e.onNext(AlbumDetailPresenter.getAlbumSongs(albumId));
            e.onComplete();
        }
    }, BackpressureStrategy.LATEST);
}
项目:PosTrainer    文件:AlarmDatabase.java   
@Override
public Flowable<List<Alarm>> getAlarms() {
    return Flowable.create(
            new FlowableOnSubscribe<List<Alarm>>() {
                @Override
                public void subscribe(FlowableEmitter<List<Alarm>> e) throws Exception {
                    Realm realm = Realm.getDefaultInstance();

                    RealmQuery<RealmAlarm> query = realm.where(RealmAlarm.class);
                    RealmResults<RealmAlarm> result = query.findAll();

                    List<Alarm> alarmList = new ArrayList<>();

                    if (result.size() == 0) {
                        e.onComplete();
                    } else {
                        for (int i = 0; i < result.size(); i++) {
                            Alarm alarm = new Alarm();
                            RealmAlarm realmAlarm = result.get(i);

                            alarm.setActive(realmAlarm.isActive());
                            alarm.setRenewAutomatically(realmAlarm.isRenewAutomatically());
                            alarm.setVibrateOnly(realmAlarm.isVibrateOnly());
                            alarm.setHourOfDay(realmAlarm.getHourOfDay());
                            alarm.setMinute(realmAlarm.getMinute());
                            alarm.setAlarmTitle(realmAlarm.getAlarmTitle());
                            alarm.setAlarmId(realmAlarm.getAlarmId());

                            alarmList.add(
                                    alarm
                            );
                        }
                        e.onNext(alarmList);
                    }
                }

            },
            BackpressureStrategy.LATEST);
}
项目:KomaMusic    文件:LocalDataSource.java   
@Override
public Flowable<List<Song>> getMyFavoriteSongs() {
    return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
            e.onNext(MyFavoritePresenter.getFavoriteSongs());
            e.onComplete();
        }
    }, BackpressureStrategy.LATEST);
}
项目:filestack-java    文件:Upload.java   
/**
 * Start this upload asynchronously. Returns progress updates.
 *
 * @return {@link Flowable} that emits {@link Progress} events
 */
public Flowable<Progress<FileLink>> run() {
  Flowable<Prog<FileLink>> startFlow = Flowable
      .fromCallable(new UploadStartFunc(this))
      .subscribeOn(Schedulers.io());

  // Create multiple func instances to each upload a subrange of parts from the file
  // Merge each of these together into one so they're executed concurrently
  Flowable<Prog<FileLink>> transferFlow = Flowable.empty();
  for (int i = 0; i < CONCURRENCY; i++) {
    UploadTransferFunc func = new UploadTransferFunc(this);
    Flowable<Prog<FileLink>> temp = Flowable
        .create(func, BackpressureStrategy.BUFFER)
        .subscribeOn(Schedulers.io());
    transferFlow = transferFlow.mergeWith(temp);
  }

  Flowable<Prog<FileLink>> completeFlow = Flowable
      .fromCallable(new UploadCompleteFunc(this))
      .subscribeOn(Schedulers.io());

  return startFlow
      .concatWith(transferFlow)
      .concatWith(completeFlow)
      .buffer(PROG_INTERVAL_SEC, TimeUnit.SECONDS)
      .flatMap(new ProgMapFunc(this));
}
项目:streamingpool-core    文件:DoAfterFirstSubscribe.java   
@Override
public Publisher<T> apply(Flowable<T> flowable) {
    return Flowable.create(flowableEmitter -> {
        flowable.subscribe(flowableEmitter::onNext,
                flowableEmitter::onError,
                flowableEmitter::onComplete
        );
        if (done.compareAndSet(false, true)) {
            afterFirstSubscribe.run();
        }
    }, BackpressureStrategy.MISSING);
}
项目:reactivejournal    文件:RxJournalBackPressureLatest.java   
public static void main(String[] args) throws IOException {

        ReactiveJournal reactiveJournal = new ReactiveJournal("/tmp/fastproducer");
        reactiveJournal.clearCache();
        Flowable<Long> fastProducer = FastProducerSlowConsumer.createFastProducer(BackpressureStrategy.MISSING, 2500);

        ReactiveRecorder recorder = reactiveJournal.createReactiveRecorder();
        recorder.recordAsync(fastProducer,"input");
        //Set the replay strategy to ReplayRate.FAST as e want to process the event as soon as it is
        //received from the publisher.
        PlayOptions options = new PlayOptions().filter("input").replayRate(PlayOptions.ReplayRate.FAST);
        ConnectableFlowable journalInput = new RxJavaPlayer(reactiveJournal).play(options).publish();

        Consumer onNextSlowConsumer = FastProducerSlowConsumer.createOnNextSlowConsumer(10);

        recorder.record(journalInput, "consumed");

        long startTime = System.currentTimeMillis();
        journalInput.observeOn(Schedulers.io()).subscribe(onNextSlowConsumer::accept,
                e -> System.out.println("ReactiveRecorder " + " " + e),
                () -> System.out.println("ReactiveRecorder complete [" + (System.currentTimeMillis()-startTime) + "]")
        );

        journalInput.connect();

        DSUtil.sleep(3000);
    }
项目:reactivejournal    文件:RxJavaBackPressure.java   
public static void main(String[] args) {
    run(BackpressureStrategy.BUFFER);
    run(BackpressureStrategy.LATEST);
    run(BackpressureStrategy.DROP);
    run(BackpressureStrategy.MISSING);
    run(BackpressureStrategy.ERROR);
}