Java 类io.reactivex.flowables.GroupedFlowable 实例源码

项目:javarx-study    文件:ObservableFunctionalTest.java   
@Test
public void testBasicGroupByFlowable() throws InterruptedException {
    Flowable<GroupedFlowable<String, Integer>> groupedFlowable =
            Flowable.range(1, 100).groupBy(integer -> {
                if (integer % 2 == 0) return "Even";
                else return "Odd";
            });

    groupedFlowable.subscribe(g -> g.subscribe(x -> System.out.println("g:" + g.getKey() + ", value:" + x)));
    Thread.sleep(4000);
}
项目:javarx-study    文件:ObservableFunctionalTest.java   
@Test
public void testBasicGroupByFlowableReduceIntoMultiMap() {
    Flowable<GroupedFlowable<String, Integer>> groupedFlowable =
            Flowable.range(1, 100).groupBy(integer -> {
                if (integer % 2 == 0) return "Even";
                else return "Odd";
            });

    Map<String, Single<List<Integer>>> result = new HashMap<>();

    groupedFlowable.subscribe(g -> result.put(g.getKey(), g.toList()));

    System.out.println(result.get("Even").blockingGet());
    System.out.println(result.get("Odd").blockingGet());
}
项目:buffer-slayer    文件:RxReporter.java   
@Override
public void accept(GroupedFlowable<MessageKey, SendingTask<M>> group) throws Exception {
  Flowable<List<SendingTask<M>>> bufferedMessages = group.buffer(messageTimeoutNanos, TimeUnit.NANOSECONDS, scheduler, bufferedMaxMessages);
  bufferedMessages.subscribeOn(scheduler).subscribe(SenderConsumerBridge.toConsumer(sender));
}
项目:cyclops    文件:FlowableKind.java   
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport("none")
public <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? extends K> keySelector) {
    return boxed.groupBy(keySelector);
}
项目:cyclops    文件:FlowableKind.java   
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport("none")
public <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? extends K> keySelector, boolean delayError) {
    return boxed.groupBy(keySelector, delayError);
}
项目:cyclops    文件:FlowableKind.java   
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport("none")
public <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector) {
    return boxed.groupBy(keySelector, valueSelector);
}
项目:cyclops    文件:FlowableKind.java   
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport("none")
public <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector, boolean delayError) {
    return boxed.groupBy(keySelector, valueSelector, delayError);
}
项目:cyclops    文件:FlowableKind.java   
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport("none")
public <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector, boolean delayError, int bufferSize) {
    return boxed.groupBy(keySelector, valueSelector, delayError, bufferSize);
}