Java 类io.reactivex.functions.BiConsumer 实例源码

项目:android-contact-extractor    文件:CQuery.java   
public Disposable build(final IContact iContact) {

        if (!PermissionWrapper.hasContactsPermissions(mContext)) {
            throw new SecurityException("Contact Permission Missing");
        }

        return new CListExtractorAbstract(mContext).getList(mListFilterType, orderBy, limit, skip)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new BiConsumer<List<CList>, Throwable>() {
                    @Override
                    public void accept(List<CList> genericCLists, Throwable throwable) throws Exception {
                        if (iContact == null)
                            return;

                        if (throwable == null)
                            iContact.onContactSuccess(genericCLists);
                        else
                            iContact.onContactError(throwable);


                    }
                });
    }
项目:rxjava2-jdbc    文件:Update.java   
private static <T> Flowable<T> create(NamedPreparedStatement ps, List<Object> parameters,
        Function<? super ResultSet, T> mapper) {
    Callable<ResultSet> initialState = () -> {
        Util.convertAndSetParameters(ps.ps, parameters, ps.names);
        ps.ps.execute();
        return ps.ps.getGeneratedKeys();
    };
    BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
        if (rs.next()) {
            emitter.onNext(mapper.apply(rs));
        } else {
            emitter.onComplete();
        }
    };
    Consumer<ResultSet> disposer = Util::closeSilently;
    return Flowable.generate(initialState, generator, disposer);
}
项目:rxjava2-jdbc    文件:Call.java   
private static <T> Flowable<T> createFlowable(NamedCallableStatement stmt,
        Function<? super ResultSet, ? extends T> f) throws SQLException {
    ResultSet rsActual = stmt.stmt.getResultSet();
    Callable<ResultSet> initialState = () -> rsActual;
    BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
        log.debug("getting row from ps={}, rs={}", stmt.stmt, rs);
        if (rs.next()) {
            T v = f.apply(rs);
            log.debug("emitting {}", v);
            emitter.onNext(v);
        } else {
            log.debug("completed");
            emitter.onComplete();
        }
    };
    Consumer<ResultSet> disposeState = Util::closeSilently;
    return Flowable.generate(initialState, generator, disposeState);
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void consumerSignalsErrorCancel() {
    BehaviorProcessor<Integer> pp = BehaviorProcessor.createDefault(1);

    pp
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doError(new IOException());
        }
    }))
    .test()
    .assertFailure(IOException.class);

    assertFalse(pp.hasSubscribers());
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void consumerThrowsCancel() {
    BehaviorProcessor<Integer> pp = BehaviorProcessor.createDefault(1);

    pp
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            throw new IOException();
        }
    }))
    .test()
    .assertFailure(IOException.class);

    assertFalse(pp.hasSubscribers());
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void consumerCompleteCancel() {
    BehaviorProcessor<Integer> pp = BehaviorProcessor.createDefault(1);

    pp
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doComplete();
        }
    }))
    .test()
    .assertResult();

    assertFalse(pp.hasSubscribers());
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void mapFused() {
    TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY);

    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doNext(t * 2);
        }
    }))
    .subscribe(ts);

    ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC))
    .assertResult(2, 4, 6, 8, 10);
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void mapAsyncFused() {
    TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY);

    UnicastProcessor<Integer> up = UnicastProcessor.create();
    TestHelper.emit(up, 1, 2, 3, 4, 5);

    up
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doNext(t * 2);
        }
    }))
    .subscribe(ts);

    ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.ASYNC))
    .assertResult(2, 4, 6, 8, 10);
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void filterFused() {
    TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY);

    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            if (t % 2 == 0) {
                e.doNext(t * 2);
            }
        }
    }))
    .subscribe(ts);

    ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC))
    .assertResult(4, 8);
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void consumerThrowsFused() {
    TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY);

    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            throw new IOException();
        }
    }))
    .subscribe(ts);

    ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC))
    .assertFailure(IOException.class);
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void consumerSignalsErrorFused() {
    TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY);

    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doError(new IOException());
        }
    }))
    .subscribe(ts);

    ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC))
    .assertFailure(IOException.class);
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void consumerCompleteFused() {
    TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY);

    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doComplete();
        }
    }))
    .subscribe(ts);

    ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC))
    .assertResult();
}
项目:RxJava2Extensions    文件:FlowableMapFilterConditionalTest.java   
@Test
public void consumerSignalsErrorCancel() {
    BehaviorProcessor<Integer> pp = BehaviorProcessor.createDefault(1);

    pp
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doError(new IOException());
        }
    }))
    .filter(Functions.alwaysTrue())
    .test()
    .assertFailure(IOException.class);

    assertFalse(pp.hasSubscribers());
}
项目:RxJava2Extensions    文件:FlowableMapFilterConditionalTest.java   
@Test
public void consumerThrowsCancel() {
    BehaviorProcessor<Integer> pp = BehaviorProcessor.createDefault(1);

    pp
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            throw new IOException();
        }
    }))
    .filter(Functions.alwaysTrue())
    .test()
    .assertFailure(IOException.class);

    assertFalse(pp.hasSubscribers());
}
项目:RxJava2Extensions    文件:FlowableMapFilterConditionalTest.java   
@Test
public void consumerCompleteCancel() {
    BehaviorProcessor<Integer> pp = BehaviorProcessor.createDefault(1);

    pp
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doComplete();
        }
    }))
    .filter(Functions.alwaysTrue())
    .test()
    .assertResult();

    assertFalse(pp.hasSubscribers());
}
项目:RxJava2Extensions    文件:FlowableMapFilterConditionalTest.java   
@Test
public void mapFused() {
    TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY);

    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doNext(t * 2);
        }
    }))
    .filter(Functions.alwaysTrue())
    .subscribe(ts);

    ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC))
    .assertResult(2, 4, 6, 8, 10);
}
项目:RxJava2Extensions    文件:FlowableMapFilterConditionalTest.java   
@Test
public void mapAsyncFused() {
    TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY);

    UnicastProcessor<Integer> up = UnicastProcessor.create();
    TestHelper.emit(up, 1, 2, 3, 4, 5);

    up
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doNext(t * 2);
        }
    }))
    .filter(Functions.alwaysTrue())
    .subscribe(ts);

    ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.ASYNC))
    .assertResult(2, 4, 6, 8, 10);
}
项目:RxJava2Extensions    文件:FlowableMapFilterConditionalTest.java   
@Test
public void filterFused() {
    TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY);

    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            if (t % 2 == 0) {
                e.doNext(t * 2);
            }
        }
    }))
    .filter(Functions.alwaysTrue())
    .subscribe(ts);

    ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC))
    .assertResult(4, 8);
}
项目:RxJava2Extensions    文件:FlowableMapFilterConditionalTest.java   
@Test
public void consumerThrowsFused() {
    TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY);

    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            throw new IOException();
        }
    }))
    .filter(Functions.alwaysTrue())
    .subscribe(ts);

    ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC))
    .assertFailure(IOException.class);
}
项目:RxJava2Extensions    文件:FlowableMapFilterConditionalTest.java   
@Test
public void consumerSignalsErrorFused() {
    TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY);

    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doError(new IOException());
        }
    }))
    .filter(Functions.alwaysTrue())
    .subscribe(ts);

    ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC))
    .assertFailure(IOException.class);
}
项目:RxJava2Extensions    文件:FlowableMapFilterConditionalTest.java   
@Test
public void consumerCompleteFused() {
    TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY);

    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doComplete();
        }
    }))
    .filter(Functions.alwaysTrue())
    .subscribe(ts);

    ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC))
    .assertResult();
}
项目:rxjava2-extras    文件:FlowableStateMachine.java   
public FlowableStateMachine(Flowable<In> source, //
        Callable<? extends State> initialState, //
        Function3<? super State, ? super In, ? super Emitter<Out>, ? extends State> transition, //
        BiConsumer<? super State, ? super Emitter<Out>> completionAction, //
        Consumer3<? super State, ? super Throwable, ? super Emitter<Out>> errorAction, //
        BackpressureStrategy backpressureStrategy, //
        int requestBatchSize) {
    Preconditions.checkNotNull(initialState);
    Preconditions.checkNotNull(transition);
    Preconditions.checkNotNull(backpressureStrategy);
    Preconditions.checkArgument(requestBatchSize > 0,
            "initialRequest must be greater than zero");
    this.source = source;
    this.initialState = initialState;
    this.transition = transition;
    this.completionAction = completionAction;
    this.errorAction = errorAction;
    this.backpressureStrategy = backpressureStrategy;
    this.requestBatchSize = requestBatchSize;
}
项目:rxjava2-extras    文件:FlowableStateMachine.java   
StateMachineSubscriber( //
        Callable<? extends State> initialState,
        Function3<? super State, ? super In, ? super Emitter<Out>, ? extends State> transition, //
        BiConsumer<? super State, ? super Emitter<Out>> completionAction, //
        Consumer3<? super State, ? super Throwable, ? super Emitter<Out>> errorAction, //
        BackpressureStrategy backpressureStrategy, //
        int requestBatchSize, //
        Subscriber<? super Out> child) {
    this.initialState = initialState;
    this.transition = transition;
    this.completionAction = completionAction;
    this.errorAction = errorAction;
    this.backpressureStrategy = backpressureStrategy;
    this.requestBatchSize = requestBatchSize;
    this.child = child;
    this.count = requestBatchSize;
}
项目:GankGirl    文件:ReadTadPagePresenter.java   
@Override
protected void onCreate(Bundle savedState) {
    super.onCreate(savedState);

    restartableFirst(RequestCommand.REQUEST_READ_TYPE,
            new Function0<Observable<List<ReadTypeBean>>>() {
                @Override
                public Observable<List<ReadTypeBean>> apply() {
                    return HttpRetrofit.getInstance().providers.getTypeList(observable, new DynamicKey("闲读分类"), new EvictDynamicKey(false))
                            .map(new HttpRetrofit.HttpResultFuncCcche<List<ReadTypeBean>>())
                            .compose(HttpRetrofit.toSubscribe());
                }
            },
            new BiConsumer<ReadTadPageFragment, List<ReadTypeBean>>() {
                @Override
                public void accept(@NonNull ReadTadPageFragment readTadPageFragment, @NonNull List<ReadTypeBean> readTypeBeen) throws Exception {
                    readTadPageFragment.onData(readTypeBeen);
                }
            });
}
项目:GankGirl    文件:ReadMorePresenter.java   
@Override
protected void onCreate(Bundle savedState) {
    super.onCreate(savedState);

    restartableFirst(RequestCommand.REQUEST_READ_CHILD_LIST,
            new Function0<Observable<ReadTypeBean>>() {
                @Override
                public Observable<ReadTypeBean> apply() {
                    return HttpRetrofit.getInstance().providers.getStackTypeList(observable, new DynamicKey(requestContext.getType()), new EvictDynamicKey(true))
                            .map(new HttpRetrofit.HttpResultFuncCcche<ReadTypeBean>())
                            .compose(HttpRetrofit.toSubscribe());
                }
            },
            new BiConsumer<ReadMoreActivity, ReadTypeBean>() {
                @Override
                public void accept(@NonNull ReadMoreActivity readMoreActivity, @NonNull ReadTypeBean readTypeBean) throws Exception {
                    if (readTypeBean.getReadListBeanList() != null)
                        readMoreActivity.onDataList(readTypeBean.getReadListBeanList());
                    if(!TextUtils.isEmpty(readTypeBean.getPage()))
                        readMoreActivity.setUrl_page(readTypeBean.getPage());
                }
            });
}
项目:rxlint    文件:SubscriberTest.java   
public void rx2WithSingleBiConsumer() {
    io.reactivex.Single.just(1).subscribe(new BiConsumer<Integer, Throwable>() {
        @Override public void accept(Integer integer, Throwable throwable) throws Exception {

        }
    });
}
项目:J-Chain    文件:LoggerTest.java   
private BiConsumer<Object, Object> booleanBiConsumer(final boolean[] result) {
    return new BiConsumer<Object, Object>() {
        @Override
        public void accept(Object o, Object o2) throws Exception {
            result[0] = (boolean) o2;
        }
    };
}
项目:RetrofitRxErrorHandler    文件:BaseBackoffStrategy.java   
private void callAction(Throwable throwable, Integer retry) throws Exception {
    BiConsumer<Throwable, Integer> action = doOnRetry(throwable, retry);
    if (action == null) {
        return;
    }
    action.accept(throwable, retry);
}
项目:RxJava2Extensions    文件:FlowableCoalesce.java   
CoalesceSubscriber(Subscriber<? super R> actual, Callable<R> containerSupplier,
        BiConsumer<R, T> coalescer, int bufferSize) {
    this.actual = actual;
    this.containerSupplier = containerSupplier;
    this.coalescer = coalescer;
    this.requested = new AtomicLong();
    this.bufferSize = bufferSize;
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void map() {
    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doNext(t * 2);
        }
    }))
    .test()
    .assertResult(2, 4, 6, 8, 10);
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void take() {
    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doNext(t * 2);
        }
    }))
    .take(3)
    .test()
    .assertResult(2, 4, 6);
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void filter() {
    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            if (t % 2 == 0) {
                e.doNext(t * 2);
            }
        }
    }))
    .test()
    .assertResult(4, 8);
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void mapAndComplete() {
    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doNext(t * 2);
            e.doComplete();
        }
    }))
    .test()
    .assertResult(2);
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void mapTwice() {
    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doNext(t * 2);
            e.doNext(t * 2);
        }
    }))
    .test()
    .assertFailure(IllegalStateException.class, 2);
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void mapHidden() {
    Flowable.range(1, 5).hide()
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doNext(t * 2);
        }
    }))
    .test()
    .assertResult(2, 4, 6, 8, 10);
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void filterHidden() {
    Flowable.range(1, 5).hide()
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            if (t % 2 == 0) {
                e.doNext(t * 2);
            }
        }
    }))
    .test()
    .assertResult(4, 8);
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void consumerThrows() {
    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            throw new IOException();
        }
    }))
    .test()
    .assertFailure(IOException.class);
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void consumerSignalsError() {
    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doError(new IOException());
        }
    }))
    .test()
    .assertFailure(IOException.class);
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void consumerCompletes() {
    Flowable.range(1, 5)
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doComplete();
        }
    }))
    .test()
    .assertResult();
}
项目:RxJava2Extensions    文件:FlowableMapFilterTest.java   
@Test
public void error() {
    Flowable.<Integer>error(new IOException())
    .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
        @Override
        public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
            e.doComplete();
        }
    }))
    .test()
    .assertFailure(IOException.class);
}