Java 类io.reactivex.BackpressureOverflowStrategy 实例源码

项目:rx-twitter-stream-android    文件:ActivityModule.java   
@ActivityScope
@Provides
public Function<Flowable<Tweet>, Flowable<Tweet>> provideBackPressureStrategy() {
    Function<Flowable<Tweet>, Flowable<Tweet>> backPressureStrategyFunction;
    switch (backPressureStrategy) {
        case BUFFER:
            backPressureStrategyFunction = flowable -> flowable.onBackpressureBuffer(50, () -> Log.d("", "Buffer full, dropping tweets"), BackpressureOverflowStrategy.DROP_OLDEST);
            break;
        case DROP:
            backPressureStrategyFunction = flowable -> flowable.onBackpressureDrop();
            break;
        case LATEST:
            backPressureStrategyFunction = flowable -> flowable.onBackpressureLatest();
            break;
        default:
            backPressureStrategyFunction = flowable -> flowable;
            break;
    }

    return backPressureStrategyFunction;
}
项目:streamingpool-core    文件:TrackKeepingDiscoveryService.java   
private <T> Flowable<T> applyBackpressureStrategy(Flowable<T> source, BackpressureStrategy backpressureStrategy) {
    if(backpressureStrategy == null){
        return source;
    }
    if (backpressureStrategy instanceof BackpressureLatestStrategy) {
        return source.onBackpressureLatest();
    }
    if (backpressureStrategy instanceof BackpressureDropStrategy) {
        return source.onBackpressureDrop(i -> {});
    }
    if (backpressureStrategy instanceof BackpressureBufferStrategy) {
        BackpressureBufferStrategy bufferStrategy = (BackpressureBufferStrategy) backpressureStrategy;

        if (bufferStrategy.overflowStrategy() == BackpressureBufferStrategy.BackpressureBufferOverflowStrategy.DROP_LATEST) {
            return source.onBackpressureBuffer(bufferStrategy.bufferSize(), NOOP, BackpressureOverflowStrategy.DROP_LATEST);
        }
        if (bufferStrategy.overflowStrategy() == BackpressureBufferStrategy.BackpressureBufferOverflowStrategy.DROP_OLDEST) {
            return source.onBackpressureBuffer(bufferStrategy.bufferSize(), NOOP, BackpressureOverflowStrategy.DROP_OLDEST);
        }
        throw new IllegalArgumentException("Cannot determine the specified buffer overflow strategy: " + bufferStrategy);
    }
    if (backpressureStrategy instanceof BackpressureNoneStrategy) {
        return source;
    }
    throw new IllegalArgumentException("Cannot determine the specified backpressure strategy: " + backpressureStrategy);
}
项目:buffer-slayer    文件:RxReporter.java   
private Flowable<SendingTask<M>> initBackpressurePolicy(Flowable<SendingTask<M>> flowable) {
  Strategy strategy = this.overflowStrategy;
  if (strategy == Strategy.DropNew) {
    return flowable.onBackpressureDrop(new Consumer<SendingTask<M>>() {
      @Override
      public void accept(SendingTask<M> task) throws Exception {
        metricsCallback(1);
      }
    });
  } else {
    BackpressureOverflowStrategy rxStrategy = RxOverflowStrategyBridge.toRxStrategy(strategy);
    return flowable.onBackpressureBuffer(pendingMaxMessages, new Action() {
      @Override
      public void run() throws Exception {
        metricsCallback(1);
      }
    }, rxStrategy);
  }
}
项目:Learning-RxJava    文件:Ch8_13.java   
public static void main(String[] args) {
    Flowable.interval(1, TimeUnit.MILLISECONDS)
            .onBackpressureBuffer(10,
                    () -> System.out.println("overflow!"),
                    BackpressureOverflowStrategy.DROP_LATEST)
            .observeOn(Schedulers.io())
            .subscribe(i -> {
                sleep(5);
                System.out.println(i);
            });
    sleep(5000);
}
项目:buffer-slayer    文件:RxOverflowStrategyBridge.java   
/**
 * Convert a {@link OverflowStrategy.Strategy} to rx-java's {@link BackpressureOverflowStrategy}
 */
static BackpressureOverflowStrategy toRxStrategy(OverflowStrategy.Strategy strategy) {
  switch (strategy) {
    case Fail:
      return BackpressureOverflowStrategy.ERROR;
    case DropTail:
      return BackpressureOverflowStrategy.DROP_LATEST;
    case DropHead:
      return BackpressureOverflowStrategy.DROP_OLDEST;
    default:
      throw new UnsupportedOperationException(strategy + " not supported using rx-java.");
  }
}
项目:streamingpool-core    文件:ErrorDeflector.java   
public <T> ErrorStreamPair<T> stream(Publisher<T> dataPublisher) {
    return ErrorStreamPair.ofDataError(dataPublisher,
            errorStream.toSerialized().onBackpressureBuffer(Flowable.bufferSize(),
                    () -> LOGGER.error("Discarding exception due to backpressure buffer limit"),
                    BackpressureOverflowStrategy.DROP_OLDEST));
}