Java 类io.reactivex.FlowableSubscriber 实例源码

项目:Reactive-Programming-With-Java-9    文件:Demo_average.java   
public static void main(String[] args) {
    MathFlowable.averageDouble(Flowable.range(10, 9)).subscribe(new FlowableSubscriber() {

        @Override
        public void onComplete() {
            // TODO Auto-generated method stub
            System.out.println("completed successfully");
        }

        @Override
        public void onError(Throwable arg0) {
            // TODO Auto-generated method stub

        }

        @Override
        public void onNext(Object value) {
            // TODO Auto-generated method stub
            System.out.println("average:-" + value);
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            // TODO Auto-generated method stub
            subscription.request(1);
        }
    });
}
项目:DailyStudy    文件:RxJavaActivity.java   
private void flowable() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
            Log.e(TAG, "start send data ");
            for (int i = 0; i < 100; i++) {
                e.onNext(i);
            }
            e.onComplete();
        }
    }, BackpressureStrategy.DROP)//指定背压策略
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<Integer>() {
                @Override
                public void onSubscribe(@NonNull Subscription s) {
                    //1, onSubscribe 是2.x新添加的方法,在发射数据前被调用,相当于1.x的onStart方法
                    //2, 参数为  Subscription ,Subscription 可用于向上游请求发射多少个元素,也可用于取笑请求
                    //3,  必须要调用Subscription 的request来请求发射数据,不然上游是不会发射数据的。
                    Log.e(TAG, "onSubscribe...");
                    s.request(10);
                }

                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG, "onNext:" + integer);
                }

                @Override
                public void onError(Throwable t) {
                    Log.e(TAG, "onError..." + t);
                }

                @Override
                public void onComplete() {
                    Log.e(TAG, "onComplete...");
                }
            });

}
项目:ratpack-rx2    文件:RxRatpack.java   
/**
 * @param flowable
 * @param doWithRegistrySpec
 * @param <T>
 * @return
 * @see RxRatpack#forkEach(Observable, Action)
 */
public static <T> Flowable<T> forkEach(Flowable<T> flowable, Action<? super RegistrySpec> doWithRegistrySpec) {
  return flowable.lift(downstream -> new FlowableSubscriber<T>() {

    private final AtomicInteger wip = new AtomicInteger(1);
    private final AtomicBoolean closed = new AtomicBoolean();
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription s) {
      this.subscription = s;
      s.request(1);
      downstream.onSubscribe(s);
    }

    @Override
    public void onComplete() {
      maybeDone();
    }

    @Override
    public void onError(final Throwable e) {
      terminate(() -> downstream.onError(e));
    }

    private void maybeDone() {
      if (wip.decrementAndGet() == 0) {
        terminate(downstream::onComplete);
      }
    }

    private void terminate(Runnable runnable) {
      if (closed.compareAndSet(false, true)) {
        subscription.cancel();
        runnable.run();
      }
    }

    @Override
    public void onNext(final T t) {
      // Avoid the overhead of creating executions if downstream is no longer interested
      if (closed.get()) {
        return;
      }

      wip.incrementAndGet();
      Execution.fork()
        .register(doWithRegistrySpec)
        .onComplete(e -> this.maybeDone())
        .onError(this::onError)
        .start(e -> {
          if (!closed.get()) {
            subscription.request(1);
            downstream.onNext(t);
          }
        });
    }
  });
}
项目:EasyHttp    文件:RxGetActivity.java   
@OnClick(R.id.go)
public void go() {
    Editable url = urlView.getText();

    if (TextUtils.isEmpty(url)) {
        Toast.makeText(this, "url is empty", Toast.LENGTH_SHORT);
        return;
    }

    RxEasyHttp.get(url.toString(), new RxEasyStringConverter())
            .doOnSubscribe(new Consumer<Subscription>() {
                @Override
                public void accept(@NonNull Subscription subscription) throws Exception {
                    dialog.show();
                    body.setText("");
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<String>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                    dialog.show();
                    body.setText("");
                }

                @Override
                public void onNext(String response) {
                    body.setText(response);
                }

                @Override
                public void onError(Throwable t) {
                    body.setText(t.toString());
                }

                @Override
                public void onComplete() {
                    dialog.cancel();
                }
            });
}
项目:EasyHttp    文件:RxPostActivity.java   
@OnClick(R.id.submit)
public void submit() {
    Editable content = comment.getText();

    if (TextUtils.isEmpty(content)) {
        Toast.makeText(this, "comment is empty", Toast.LENGTH_SHORT);
        return;
    }

    EasyRequestParams params = new EasyRequestParams();
    params.put("content", content.toString());

    String url = "http://book.km.com/app/index.php?c=version&a=feedback";

    RxEasyHttp.post(url, params, new RxEasyCustomConverter<PostEntity>() {
                @Override
                public void doNothing() {
                    // 防止范型类型擦除引起范型类型不能正确获取问题.
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<PostEntity>() {

                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                    dialog.show();
                }

                @Override
                public void onNext(PostEntity entity) {
                    Toast.makeText(RxPostActivity.this, "提交成功", Toast.LENGTH_LONG).show();
                    result.setText("status : " + entity.getStatus() + "\n" +
                            "message : " + entity.getMessage());

                }

                @Override
                public void onError(Throwable t) {
                    Toast.makeText(RxPostActivity.this, "提交失败", Toast.LENGTH_LONG).show();
                    result.setText(t.getMessage());
                    dialog.cancel();
                }

                @Override
                public void onComplete() {
                    dialog.cancel();
                }
            });
}