/** * Post请求的Rxjava方式. * @param url * @param requestParams * @return */ public <T> Flowable<T> post(String url, EasyRequestParams requestParams, RxEasyConverter<T> rxEasyConverter) { FormBody.Builder builder = new FormBody.Builder(); ConcurrentHashMap<String, String> paramsMap = requestParams.getRequestParams(); for (ConcurrentHashMap.Entry<String, String> entry : paramsMap.entrySet()) { builder.add(entry.getKey(), entry.getValue()); } RequestBody requestBody = builder.build(); final Request request = new Request.Builder() .url(url) .post(requestBody) .build(); Call call = EasyHttpClientManager.getInstance().getOkHttpClient(EasyCacheType.CACHE_TYPE_DEFAULT).newCall(request); return Flowable.create(new CallFlowableOnSubscribe(call,rxEasyConverter), BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()); }
@SuppressWarnings("MissingPermission") @RequiresPermission(anyOf = { Manifest.permission.ACCESS_COARSE_LOCATION, Manifest.permission.ACCESS_FINE_LOCATION }) public void startLocationUpdates(boolean checkLocationSettings) { stopLocationUpdates(); locationUpdatesDisposable = locationSettingsCheck(checkLocationSettings) .flatMapObservable(ignore -> locationUpdates() .startWith(lastLocation())) .map(this::transformLocation) .toFlowable(BackpressureStrategy.LATEST) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::setLocation, error -> Timber.e("Failed to get location updates", error)); }
@Override public void onImageCapture(@Nullable byte[] imageBytes) { //Process the image using Tf. Flowable<BarcodeInfo> flowable = Flowable.create(e -> { Bitmap bitmap = null; if (imageBytes != null) bitmap = CameraUtils.bytesToBitmap(imageBytes); if (bitmap != null) e.onNext(mBarcodeScanner.scan(bitmap)); e.onComplete(); }, BackpressureStrategy.DROP); final Subscription[] subscriptions = new Subscription[1]; flowable.subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .doOnSubscribe(subscription -> subscriptions[0] = subscription) .doOnError(t -> { Timber.e(t.getMessage()); subscriptions[0].cancel(); }) .doOnComplete(() -> subscriptions[0].cancel()) .subscribe(barcodeInfo -> { //TODO Display the info }); }
private void asyncMethod(final ProceedingJoinPoint joinPoint) throws Throwable { Flowable.create(new FlowableOnSubscribe<Object>() { @Override public void subscribe(FlowableEmitter<Object> e) throws Exception { Looper.prepare(); try { joinPoint.proceed(); } catch (Throwable throwable) { throwable.printStackTrace(); } Looper.loop(); } } , BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(); }
public static void main(String[] args) { // TODO Auto-generated method stub Flowable<String> month_maybe = Flowable.create(emitter -> { try { String[] monthArray = { "Jan", "Feb", "Mar", "Apl", "May", "Jun", "July", "Aug", "Sept", "Oct", "Nov", "Dec" }; List<String> months = Arrays.asList(monthArray); for (String month : months) { emitter.onNext(month); } emitter.onComplete(); } catch (Exception e) { emitter.onError(e); } },BackpressureStrategy.MISSING); month_maybe.subscribe(s -> System.out.println(s)); }
@Test public void shouldPassMessageToLocalStreamWhenSendMessage() throws Exception { String message = "hello @alex http://youtube.com/q=look (love) there @yui you go http://twitter.com"; final List<Link> expectedLinks = asList( new Link("http://youtube.com/q=look", ""), new Link("http://twitter.com", "")); when(userResolver.getLoggedInUser()).thenReturn(TestUtils.createMockUser()); final TestSubscriber<Message> userTestSubscriber = new TestSubscriber<>(); final Observable<Message> messageViewModelMessages = messageViewModel.localMessageStream(); messageViewModelMessages.toFlowable(BackpressureStrategy.LATEST).subscribe(userTestSubscriber); final Message m = new Message(id, message, Arrays.asList("alex", "yui"), Arrays.asList("love"), expectedLinks, TestUtils.createMockUser()); messageViewModel.sendMessage(message, Arrays.asList("love"), sendScheduler); userTestSubscriber.assertValue(m); }
public static void main(String[] args) throws IOException { ReactiveJournal reactiveJournal = new ReactiveJournal("/tmp/fastproducer"); reactiveJournal.clearCache(); Flowable<Long> fastProducer = FastProducerSlowConsumer.createFastProducer(BackpressureStrategy.MISSING, 500); reactiveJournal.createReactiveRecorder().recordAsync(fastProducer,"input"); PlayOptions options = new PlayOptions().filter("input").replayRate(PlayOptions.ReplayRate.FAST); Flowable journalInput = new RxJavaPlayer(reactiveJournal).play(options); Consumer onNextSlowConsumer = FastProducerSlowConsumer.createOnNextSlowConsumer(10); long startTime = System.currentTimeMillis(); journalInput.subscribe(onNextSlowConsumer::accept, e -> System.out.println("ReactiveRecorder " + " " + e), () -> System.out.println("ReactiveRecorder complete [" + (System.currentTimeMillis()-startTime) + "]") ); }
private RxReporter(Builder<M, R> builder) { this.sender = builder.sender; this.metrics = builder.metrics; this.messageTimeoutNanos = builder.messageTimeoutNanos; this.bufferedMaxMessages = builder.bufferedMaxMessages; this.pendingMaxMessages = builder.pendingMaxMessages; this.overflowStrategy = builder.overflowStrategy; this.scheduler = builder.scheduler; Flowable<SendingTask<M>> flowable = Flowable.create(this, BackpressureStrategy.MISSING); initBackpressurePolicy(flowable) .observeOn(Schedulers.single()) .groupBy(new MessagePartitioner()) .subscribe(new MessageGroupSubscriber(messageTimeoutNanos, bufferedMaxMessages, sender, scheduler)); }
@Test public void testUpdatesChecked() throws Exception { RxPaperBook book = RxPaperBook.with("UPDATES_CH", Schedulers.trampoline()); final String key = "hello"; final ComplexObject value = ComplexObject.random(); final TestSubscriber<ComplexObject> updatesSubscriber = TestSubscriber.create(); book.observe(key, ComplexObject.class, BackpressureStrategy.MISSING).subscribe(updatesSubscriber); updatesSubscriber.assertValueCount(0); book.write(key, value).subscribe(); updatesSubscriber.assertValueCount(1); updatesSubscriber.assertValues(value); final ComplexObject newValue = ComplexObject.random(); book.write(key, newValue).subscribe(); updatesSubscriber.assertValueCount(2); updatesSubscriber.assertValues(value, newValue); // Error value final int wrongValue = 3; book.write(key, wrongValue).test().assertComplete().assertNoErrors(); updatesSubscriber.assertValueCount(2); updatesSubscriber.assertValues(value, newValue); updatesSubscriber.assertNoErrors(); }
@Test public void createYourOwnTibco() throws Exception { Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> e) throws Exception { while (!e.isCancelled()) { long numberRecords = e.requested(); System.out.println(numberRecords); if (numberRecords > 0) { } } } }, BackpressureStrategy.BUFFER); flowable.map(x -> x + "Yay!").subscribe(System.out::println); }
private <T> Flowable<T> forFlowable(Flowable<T> source, BackpressureStrategy backpressureStrategy) { return Flowable.using(this::makeDialog, new Function<ProgressDialog, Publisher<? extends T>>() { @Override public Publisher<? extends T> apply(@NonNull ProgressDialog dialog) throws Exception { return Flowable.create(emitter -> { if (builder.cancelable) { dialog.setOnCancelListener(dialogInterface -> emitter.onComplete()); } dialog.setOnDismissListener(dialogInterface -> emitter.onComplete()); source.subscribe(emitter::onNext, emitter::onError, emitter::onComplete); }, backpressureStrategy); } }, Dialog::dismiss); }
@Test public void shouldReturnApiStream() throws Exception { final TestSubscriber<Message> userTestSubscriber = new TestSubscriber<>(); final Observable<Message> messageViewModelMessages = messageViewModel.apiSendMessageStream(); messageViewModelMessages.toFlowable(BackpressureStrategy.LATEST).subscribe(userTestSubscriber); userTestSubscriber.assertNoErrors(); userTestSubscriber.assertNoValues(); }
@Override public Flowable<List<Task>> getTasks() { String[] projection = { TaskEntry.COLUMN_NAME_ENTRY_ID, TaskEntry.COLUMN_NAME_TITLE, TaskEntry.COLUMN_NAME_DESCRIPTION, TaskEntry.COLUMN_NAME_COMPLETED }; String sql = String.format("SELECT %s FROM %s", TextUtils.join(",", projection), TaskEntry.TABLE_NAME); return mDatabaseHelper.createQuery(TaskEntry.TABLE_NAME, sql) .mapToList(mTaskMapperFunction) .toFlowable(BackpressureStrategy.BUFFER); }
@Override public Flowable<Optional<Task>> getTask(@NonNull String taskId) { String[] projection = { TaskEntry.COLUMN_NAME_ENTRY_ID, TaskEntry.COLUMN_NAME_TITLE, TaskEntry.COLUMN_NAME_DESCRIPTION, TaskEntry.COLUMN_NAME_COMPLETED }; String sql = String.format("SELECT %s FROM %s WHERE %s LIKE ?", TextUtils.join(",", projection), TaskEntry.TABLE_NAME, TaskEntry.COLUMN_NAME_ENTRY_ID); return mDatabaseHelper.createQuery(TaskEntry.TABLE_NAME, sql, taskId) .mapToOneOrDefault(cursor -> Optional.of(mTaskMapperFunction.apply(cursor)), Optional.<Task>absent()) .toFlowable(BackpressureStrategy.BUFFER); }
@Override public Object adapt(Call<R> call) { Observable<Response<R>> responseObservable = isAsync ? new CallEnqueueObservable<>(call) : new CallExecuteObservable<>(call); Observable<?> observable; if (isResult) { observable = new ResultObservable<>(responseObservable); } else if (isBody) { observable = new BodyObservable<>(responseObservable); } else { observable = responseObservable; } if (scheduler != null) { observable = observable.subscribeOn(scheduler); } if (isFlowable) { return observable.toFlowable(BackpressureStrategy.LATEST); } if (isSingle) { return observable.singleOrError(); } if (isMaybe) { return observable.singleElement(); } if (isCompletable) { return observable.ignoreElements(); } return observable; }
private Publisher<DownloadStatus> save(final Response<ResponseBody> response) { return Flowable.create(new FlowableOnSubscribe<DownloadStatus>() { @Override public void subscribe(FlowableEmitter<DownloadStatus> e) throws Exception { record.save(e, response); } }, BackpressureStrategy.LATEST); }
@Before public void setUp() throws Exception { if (sensorGatherer == null) throw new Error("sensorGatherer must be initialized before calling super.setUp() method"); when(sensorConfig.getBackpressureStrategy(any(SensorType.class))).thenReturn(BackpressureStrategy.BUFFER); when(permissionChecker.isPermissionGranted()).thenReturn(true); when(sensorChecker.isReady(any(SensorType.class))).thenReturn(true); }
/** * 生成Flowable * @param <T> * @return */ public static <T> Flowable<T> createData(final T t) { return Flowable.create(new FlowableOnSubscribe<T>() { @Override public void subscribe(FlowableEmitter<T> emitter) throws Exception { try { emitter.onNext(t); emitter.onComplete(); } catch (Exception e) { emitter.onError(e); } } }, BackpressureStrategy.BUFFER); }
@Override protected void onResume() { super.onResume(); // Listen to key presses and only start search after user paused to avoid excessive redrawing on the screen. disposable = RxTextView.textChangeEvents(searchInputView) .debounce(200, TimeUnit.MILLISECONDS) // default Scheduler is Schedulers.computation() .observeOn(AndroidSchedulers.mainThread()) // Needed to access Realm data .toFlowable(BackpressureStrategy.BUFFER) .switchMap(textChangeEvent -> { // Use Async API to move Realm queries off the main thread. // Realm currently doesn't support the standard Schedulers. return realm.where(Person.class) .beginsWith("name", textChangeEvent.text().toString()) .findAllSortedAsync("name") .asFlowable(); }) // Only continue once data is actually loaded // RealmObservables will emit the unloaded (empty) list as its first item .filter(people -> people.isLoaded()) .subscribe(people -> { searchResultsView.removeAllViews(); for (Person person : people) { TextView view = new TextView(ThrottleSearchActivity.this); view.setText(person.getName()); searchResultsView.addView(view); } }, throwable -> throwable.printStackTrace()); }
private void flowable() { Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception { Log.e(TAG, "start send data "); for (int i = 0; i < 100; i++) { e.onNext(i); } e.onComplete(); } }, BackpressureStrategy.DROP)//指定背压策略 .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new FlowableSubscriber<Integer>() { @Override public void onSubscribe(@NonNull Subscription s) { //1, onSubscribe 是2.x新添加的方法,在发射数据前被调用,相当于1.x的onStart方法 //2, 参数为 Subscription ,Subscription 可用于向上游请求发射多少个元素,也可用于取笑请求 //3, 必须要调用Subscription 的request来请求发射数据,不然上游是不会发射数据的。 Log.e(TAG, "onSubscribe..."); s.request(10); } @Override public void onNext(Integer integer) { Log.e(TAG, "onNext:" + integer); } @Override public void onError(Throwable t) { Log.e(TAG, "onError..." + t); } @Override public void onComplete() { Log.e(TAG, "onComplete..."); } }); }
public void calcIconTotal() { Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> flowableEmitter) throws Exception { XmlResourceParser xml = mView.getResources().getXml(R.xml.drawable); int total = 0; while (xml.getEventType() != XmlResourceParser.END_DOCUMENT) { if (xml.getEventType() == XmlPullParser.START_TAG) { if (xml.getName().startsWith("item")) { total++; } } xml.next(); } flowableEmitter.onNext(total); } }, BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { mView.setIconTotal(integer); } }); }
@Override public void putEntries(Observable<Entry> entries) { final String insert = "INSERT OR REPLACE INTO TILES(zoom_level, tile_column, tile_row, tile_data)" + " values (?, ?, ?, ?);"; final Observable<Object> params = entries.concatMap(entry -> { byte[] compressedMvt; try { compressedMvt = CompressUtil.getCompressedAsGzip(entry.getVector()); } catch (final IOException ex) { throw Exceptions.propagate(ex); } return Observable.<Object>just(entry.getZoomLevel(), entry.getColumn(), flipY(entry.getRow(), entry.getZoomLevel()), compressedMvt); }) // source: https://github.com/davidmoten/rxjava-jdbc/pull/46/files .toList() .flattenAsObservable(objects -> objects); // TODO update when upstream is enhanced dataSource.update(insert) .parameterStream(params.toFlowable(BackpressureStrategy.BUFFER)) .counts() .test() // TODO remove hack .awaitDone(5, TimeUnit.SECONDS) .assertComplete(); }
/** * @param promise * @param strategy The {@link BackpressureStrategy} to use * @param <T> * @return * @see RxRatpack#observe(Promise) */ public static <T> Flowable<T> flow(Promise<T> promise, BackpressureStrategy strategy) { return Flowable.create(subscriber -> promise.onError(subscriber::onError).then(value -> { subscriber.onNext(value); subscriber.onComplete(); }), strategy); }
/** * 水缸的更多处理 */ private void flowableSize() { Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { for (int i = 0; i < 10000; i++) { emitter.onNext(i); } } }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()) //DROP和LATEST的区别在于,当发出事件数量有限时,后者一定会接收到最后一条数据,如这里的9999,而DROP是连续性的 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); mSubscription = s; s.request(128); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
private void doRxJavaWork() { Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> e) throws Exception { e.onNext("事件1"); e.onNext("事件2"); e.onNext("事件3"); e.onNext("事件4"); e.onComplete(); } }, BackpressureStrategy.BUFFER); Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe: "); s.request(Long.MAX_VALUE); } @Override public void onNext(String string) { Log.d(TAG, "onNext: " + string); } @Override public void onError(Throwable t) { Log.d(TAG, "onError: " + t.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete: "); } }; flowable.subscribe(subscriber); }
/** * 获取内存缓存请求Flowable对象 * * @return 内存缓存查询Flowable对象 */ private Flowable<CCBaseResponse<T>> getMemoryCacheQueryFlowable() { //内存缓存数据获取 return Flowable.create(new FlowableOnSubscribe<CCBaseResponse<T>>() { @Override public void subscribe(@NonNull FlowableEmitter<CCBaseResponse<T>> e) throws Exception { T response = null; try { if (ccCacheQueryCallback != null) { response = ccCacheQueryCallback.<T>onQueryFromMemory(cacheKey); } CCBaseResponse<T> tccBaseResponse = new CCBaseResponse<T>(response, true, true, false); e.onNext(tccBaseResponse); e.onComplete(); } catch (Exception exception) { switch (cacheQueryMode) { case CCCacheMode.QueryMode.MODE_ONLY_MEMORY: e.onError(new CCDiskCacheQueryException(exception)); break; default: e.onComplete(); break; } } } }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()); }
private S2Client(Builder builder) { connectToIp = builder.connectToIp; connectToPort = builder.connectToPort; traced = builder.traced; tracer = builder.tracer; game = builder.game; log.info("Starting: {}", this); Channel channel = channelProvider.getChannel(); responseStream = channel.outputStream().mergeWith(channel.errorStream()) .map(this::prepareResponse) .toFlowable(BackpressureStrategy.ERROR) .onBackpressureBuffer(cfg().getInt(OcraftConfig.CLIENT_BUFFER_SIZE_RESPONSE_BACKPRESSURE)) .observeOn(Schedulers.computation(), false, cfg().getInt(CLIENT_BUFFER_SIZE_RESPONSE_STREAM)) .publish() .autoConnect() .doOnSubscribe(s -> await.register()) .doOnCancel(await::arriveAndDeregister); responseStream().subscribe(this); await.arriveAndDeregister(); Optional.ofNullable(game).ifPresent(s2Controller -> { responseStream().subscribe(s2Controller); await.arriveAndDeregister(); }); channelProvider.start(connectToIp, connectToPort); }
@Override public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception { return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() { @Override public Publisher<?> apply(Throwable throwable) throws Exception { if(++retryCount <= maxRetries){ return todoBeforeRetry.apply(throwable).toFlowable(BackpressureStrategy.BUFFER); } return Flowable.error(throwable); } }); }
private void flowableTest() { Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { for (int i = 0; i < 128; i++) { Log.d(TAG, "emit " + i); emitter.onNext(i); } } }, BackpressureStrategy.ERROR)//增加了一个参数 .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); // s.request(Long.MAX_VALUE); //注意这句代码 mSubscription = s; } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
/** * 生成Flowable * * @param t * @return Flowable */ public static <T> Flowable<T> createFlowable(final T t) { return Flowable.create(new FlowableOnSubscribe<T>() { @Override public void subscribe(FlowableEmitter<T> emitter) throws Exception { try { emitter.onNext(t); emitter.onComplete(); } catch (Exception e) { emitter.onError(e); } } }, BackpressureStrategy.BUFFER); }
@Override public Flowable<List<Song>> getAllSongs() { return Flowable.create(new FlowableOnSubscribe<List<Song>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception { e.onNext(SongsPresenter.getAllSongs()); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Playlist>> getAllPlaylists() { return Flowable.create(new FlowableOnSubscribe<List<Playlist>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Playlist>> e) throws Exception { e.onNext(PlaylistsPresenter.getAllPlaylists()); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Album>> getAllAlbums() { return Flowable.create(new FlowableOnSubscribe<List<Album>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Album>> e) throws Exception { e.onNext(AlbumsPresenter.getAllAlbums()); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Song>> getAlbumSongs(final long albumId) { return Flowable.create(new FlowableOnSubscribe<List<Song>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception { e.onNext(AlbumDetailPresenter.getAlbumSongs(albumId)); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Alarm>> getAlarms() { return Flowable.create( new FlowableOnSubscribe<List<Alarm>>() { @Override public void subscribe(FlowableEmitter<List<Alarm>> e) throws Exception { Realm realm = Realm.getDefaultInstance(); RealmQuery<RealmAlarm> query = realm.where(RealmAlarm.class); RealmResults<RealmAlarm> result = query.findAll(); List<Alarm> alarmList = new ArrayList<>(); if (result.size() == 0) { e.onComplete(); } else { for (int i = 0; i < result.size(); i++) { Alarm alarm = new Alarm(); RealmAlarm realmAlarm = result.get(i); alarm.setActive(realmAlarm.isActive()); alarm.setRenewAutomatically(realmAlarm.isRenewAutomatically()); alarm.setVibrateOnly(realmAlarm.isVibrateOnly()); alarm.setHourOfDay(realmAlarm.getHourOfDay()); alarm.setMinute(realmAlarm.getMinute()); alarm.setAlarmTitle(realmAlarm.getAlarmTitle()); alarm.setAlarmId(realmAlarm.getAlarmId()); alarmList.add( alarm ); } e.onNext(alarmList); } } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Song>> getMyFavoriteSongs() { return Flowable.create(new FlowableOnSubscribe<List<Song>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception { e.onNext(MyFavoritePresenter.getFavoriteSongs()); e.onComplete(); } }, BackpressureStrategy.LATEST); }
/** * Start this upload asynchronously. Returns progress updates. * * @return {@link Flowable} that emits {@link Progress} events */ public Flowable<Progress<FileLink>> run() { Flowable<Prog<FileLink>> startFlow = Flowable .fromCallable(new UploadStartFunc(this)) .subscribeOn(Schedulers.io()); // Create multiple func instances to each upload a subrange of parts from the file // Merge each of these together into one so they're executed concurrently Flowable<Prog<FileLink>> transferFlow = Flowable.empty(); for (int i = 0; i < CONCURRENCY; i++) { UploadTransferFunc func = new UploadTransferFunc(this); Flowable<Prog<FileLink>> temp = Flowable .create(func, BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()); transferFlow = transferFlow.mergeWith(temp); } Flowable<Prog<FileLink>> completeFlow = Flowable .fromCallable(new UploadCompleteFunc(this)) .subscribeOn(Schedulers.io()); return startFlow .concatWith(transferFlow) .concatWith(completeFlow) .buffer(PROG_INTERVAL_SEC, TimeUnit.SECONDS) .flatMap(new ProgMapFunc(this)); }
@Override public Publisher<T> apply(Flowable<T> flowable) { return Flowable.create(flowableEmitter -> { flowable.subscribe(flowableEmitter::onNext, flowableEmitter::onError, flowableEmitter::onComplete ); if (done.compareAndSet(false, true)) { afterFirstSubscribe.run(); } }, BackpressureStrategy.MISSING); }
public static void main(String[] args) throws IOException { ReactiveJournal reactiveJournal = new ReactiveJournal("/tmp/fastproducer"); reactiveJournal.clearCache(); Flowable<Long> fastProducer = FastProducerSlowConsumer.createFastProducer(BackpressureStrategy.MISSING, 2500); ReactiveRecorder recorder = reactiveJournal.createReactiveRecorder(); recorder.recordAsync(fastProducer,"input"); //Set the replay strategy to ReplayRate.FAST as e want to process the event as soon as it is //received from the publisher. PlayOptions options = new PlayOptions().filter("input").replayRate(PlayOptions.ReplayRate.FAST); ConnectableFlowable journalInput = new RxJavaPlayer(reactiveJournal).play(options).publish(); Consumer onNextSlowConsumer = FastProducerSlowConsumer.createOnNextSlowConsumer(10); recorder.record(journalInput, "consumed"); long startTime = System.currentTimeMillis(); journalInput.observeOn(Schedulers.io()).subscribe(onNextSlowConsumer::accept, e -> System.out.println("ReactiveRecorder " + " " + e), () -> System.out.println("ReactiveRecorder complete [" + (System.currentTimeMillis()-startTime) + "]") ); journalInput.connect(); DSUtil.sleep(3000); }
public static void main(String[] args) { run(BackpressureStrategy.BUFFER); run(BackpressureStrategy.LATEST); run(BackpressureStrategy.DROP); run(BackpressureStrategy.MISSING); run(BackpressureStrategy.ERROR); }