Java 类io.reactivex.processors.FlowableProcessor 实例源码

项目:GitHub    文件:MultiMission.java   
@Override
public void init(Map<String, DownloadMission> missionMap,
                 Map<String, FlowableProcessor<DownloadEvent>> processorMap) {
    DownloadMission mission = missionMap.get(getUrl());
    if (mission == null) {
        missionMap.put(getUrl(), this);
    } else {
        if (mission.isCanceled()) {
            missionMap.put(getUrl(), this);
        } else {
            throw new IllegalArgumentException(formatStr(Constant.DOWNLOAD_URL_EXISTS, getUrl()));
        }
    }

    this.processor = createProcessor(getUrl(), processorMap);

    for (SingleMission each : missions) {
        each.init(missionMap, processorMap);
    }
}
项目:GitHub    文件:DownloadService.java   
/**
 * Receive the url download event.
 * <p>
 * Will receive the following event:
 * {@link DownloadFlag#NORMAL}、{@link DownloadFlag#WAITING}、
 * {@link DownloadFlag#STARTED}、{@link DownloadFlag#PAUSED}、
 * {@link DownloadFlag#COMPLETED}、{@link DownloadFlag#FAILED};
 * <p>
 * Every event has {@link DownloadStatus}, you can get it and display it on the interface.
 *
 * @param url url
 * @return DownloadEvent
 */
public FlowableProcessor<DownloadEvent> receiveDownloadEvent(String url) {
    FlowableProcessor<DownloadEvent> processor = createProcessor(url, processorMap);
    DownloadMission mission = missionMap.get(url);
    if (mission == null) {  //Not yet add this url mission.
        DownloadRecord record = dataBaseHelper.readSingleRecord(url);
        if (record == null) {
            processor.onNext(normal(null));
        } else {
            File file = getFiles(record.getSaveName(), record.getSavePath())[0];
            if (file.exists()) {
                processor.onNext(createEvent(record.getFlag(), record.getStatus()));
            } else {
                processor.onNext(normal(null));
            }
        }
    }
    return processor;
}
项目:reduxfx    文件:ReduxFXStore.java   
@SafeVarargs
public ReduxFXStore(S initialState, BiFunction<S, Object, Update<S>> updater, Middleware<S>... middlewares) {
    final BiFunction<S, Object, Update<S>> chainedUpdater = applyMiddlewares(updater, middlewares);

    final Publisher<Object> actionPublisher =
            Flowable.create(actionEmitter -> this.actionEmitter = actionEmitter, BackpressureStrategy.BUFFER);

    final FlowableProcessor<Update<S>> updateProcessor = BehaviorProcessor.create();

    statePublisher = updateProcessor.map(Update::getState)
            .startWith(initialState);

    statePublisher.zipWith(actionPublisher, chainedUpdater::apply)
            .subscribe(updateProcessor);

    commandPublisher = updateProcessor
            .map(Update::getCommands)
            .flatMapIterable(commands -> commands);
}
项目:GitHub    文件:SingleMission.java   
@Override
public void init(Map<String, DownloadMission> missionMap,
                 Map<String, FlowableProcessor<DownloadEvent>> processorMap) {
    DownloadMission mission = missionMap.get(getUrl());
    if (mission == null) {
        missionMap.put(getUrl(), this);
    } else {
        if (mission.isCanceled()) {
            missionMap.put(getUrl(), this);
        } else {
            throw new IllegalArgumentException(formatStr(Constant.DOWNLOAD_URL_EXISTS, getUrl()));
        }
    }
    this.processor = createProcessor(getUrl(), processorMap);
}
项目:GitHub    文件:Utils.java   
public static FlowableProcessor<DownloadEvent> createProcessor(
        String missionId, Map<String, FlowableProcessor<DownloadEvent>> processorMap) {

    if (processorMap.get(missionId) == null) {
        FlowableProcessor<DownloadEvent> processor =
                BehaviorProcessor.<DownloadEvent>create().toSerialized();
        processorMap.put(missionId, processor);
    }
    return processorMap.get(missionId);
}
项目:MVPtemplate    文件:RxBus.java   
public <T> Flowable<T> register(@NonNull Object tag) {
    List<FlowableProcessor> processors = mProcessorMapper.get(tag);
    if (null == processors) {
        processors = new ArrayList<FlowableProcessor>();
        mProcessorMapper.put(tag, processors);
    }
    FlowableProcessor<T> processor;
    processors.add(processor = PublishProcessor.create());
    return processor;
}
项目:MVPtemplate    文件:RxBus.java   
@SuppressWarnings("rawtypes")
public void unregister(@NonNull Object tag) {
    List<FlowableProcessor> processors = mProcessorMapper.get(tag);
    if (null != processors) {
        mProcessorMapper.remove(tag);
    }
}
项目:MVPtemplate    文件:RxBus.java   
/**
 * 取消监听
 * @param tag
 * @param flowable
 * @return
 */
@SuppressWarnings("rawtypes")
public RxBus unregister(@NonNull Object tag,
                        @NonNull Flowable<?> flowable) {
    if (null == flowable)
        return getInstance();
    List<FlowableProcessor> processors = mProcessorMapper.get(tag);
    if (null != processors) {
        processors.remove((FlowableProcessor<?>) flowable);
        if (isEmpty(processors)) {
            mProcessorMapper.remove(tag);
        }
    }
    return getInstance();
}
项目:MVPtemplate    文件:RxBus.java   
/**
 * 触发事件
 *
 * @param content
 */
@SuppressWarnings({"unchecked", "rawtypes"})
public void post(@NonNull Object tag, @NonNull Object content) {
    List<FlowableProcessor> processors = mProcessorMapper.get(tag);
    if (!isEmpty(processors)) {
        for (FlowableProcessor processor : processors) {
            processor.onNext(content);
        }
    }
}
项目:RxJava2Jdk9Interop    文件:FlowInterop.java   
/**
 * Wraps a Flow.Processor (identity) into a FlowableProcessor.
 * @param source the source Flow.Processor, not null
 * @param <T> the input and output type of the Flow.Processor
 * @return the new FlowableProcessor instance
 * @throws  NullPointerException if source is null
 */
@SuppressWarnings("unchecked")
public static <T> FlowableProcessor<T> fromFlowProcessor(Flow.Processor<T, T> source) {
    if (source instanceof FlowableProcessor) {
        return (FlowableProcessor<T>)source;
    }
    ObjectHelper.requireNonNull(source, "source is null");
    return new FlowableProcessorFromFlowProcessor<>(source);
}
项目:RxJava2Jdk9Interop    文件:FlowInteropTest.java   
@Test
public void flowProcessorToFlowableProcessor() {
    TestFlowProcessor<Integer> tfp = new TestFlowProcessor<>();

    FlowableProcessor<Integer> fp = FlowInterop.fromFlowProcessor(tfp);

    assertFalse(fp.hasSubscribers());
    assertFalse(fp.hasComplete());
    assertFalse(fp.hasThrowable());
    assertNull(fp.getThrowable());

    TestSubscriber<Integer> ts = fp.test();

    assertTrue(fp.hasSubscribers());
    assertFalse(fp.hasComplete());
    assertFalse(fp.hasThrowable());
    assertNull(fp.getThrowable());

    fp.onNext(1);
    fp.onNext(2);
    fp.onNext(3);
    fp.onNext(4);
    fp.onNext(5);
    fp.onComplete();

    assertFalse(fp.hasSubscribers());
    assertTrue(fp.hasComplete());
    assertFalse(fp.hasThrowable());
    assertNull(fp.getThrowable());

    ts.assertResult(1, 2, 3, 4, 5);
}
项目:RxJava2Jdk9Interop    文件:FlowInteropTest.java   
@Test
public void flowProcessorToFlowableProcessorTake() {
    TestFlowProcessor<Integer> tfp = new TestFlowProcessor<>();

    FlowableProcessor<Integer> fp = FlowInterop.fromFlowProcessor(tfp);

    assertFalse(fp.hasSubscribers());
    assertFalse(fp.hasComplete());
    assertFalse(fp.hasThrowable());
    assertNull(fp.getThrowable());

    TestSubscriber<Integer> ts = fp.take(3).test();

    assertTrue(fp.hasSubscribers());
    assertFalse(fp.hasComplete());
    assertFalse(fp.hasThrowable());
    assertNull(fp.getThrowable());

    fp.onNext(1);
    fp.onNext(2);
    fp.onNext(3);

    assertFalse(fp.hasSubscribers());
    assertFalse(fp.hasComplete());
    assertFalse(fp.hasThrowable());
    assertNull(fp.getThrowable());

    fp.onNext(4);
    fp.onNext(5);
    fp.onComplete();

    assertFalse(fp.hasSubscribers());
    assertTrue(fp.hasComplete());
    assertFalse(fp.hasThrowable());
    assertNull(fp.getThrowable());

    ts.assertResult(1, 2, 3);
}
项目:RxJava2Jdk9Interop    文件:FlowInteropTest.java   
@Test
public void flowProcessorToFlowableProcessorError() {
    TestFlowProcessor<Integer> tfp = new TestFlowProcessor<>();

    FlowableProcessor<Integer> fp = FlowInterop.fromFlowProcessor(tfp);

    assertFalse(fp.hasSubscribers());
    assertFalse(fp.hasComplete());
    assertFalse(fp.hasThrowable());
    assertNull(fp.getThrowable());

    TestSubscriber<Integer> ts = fp.test();

    assertTrue(fp.hasSubscribers());
    assertFalse(fp.hasComplete());
    assertFalse(fp.hasThrowable());
    assertNull(fp.getThrowable());

    fp.onNext(1);
    fp.onNext(2);
    fp.onNext(3);
    fp.onNext(4);
    fp.onNext(5);
    fp.onError(new IOException());

    assertFalse(fp.hasSubscribers());
    assertFalse(fp.hasComplete());
    assertTrue(fp.hasThrowable());
    assertNotNull(fp.getThrowable());

    ts.assertFailure(IOException.class, 1, 2, 3, 4, 5);
}
项目:reduxfx    文件:ComponentDriver.java   
private Flowable<IntegerChangedCommand> getIntegerChangedCommandFlowable() {
    if (integerChangedCommandFlowable == null) {
        final FlowableProcessor<IntegerChangedCommand> processor = PublishProcessor.create();
        commandProcessor
                .filter(command -> command instanceof IntegerChangedCommand)
                .map(command -> (IntegerChangedCommand) command)
                .subscribe(processor);
        integerChangedCommandFlowable = processor;
    }
    return integerChangedCommandFlowable;
}
项目:reduxfx    文件:ComponentDriver.java   
private Flowable<ObjectChangedCommand<?>> getObjectChangedCommandFlowable() {
    if (objectChangedCommandFlowable == null) {
        final FlowableProcessor<ObjectChangedCommand<?>> processor = PublishProcessor.create();
        commandProcessor
                .filter(command -> command instanceof ObjectChangedCommand)
                .map(command -> (ObjectChangedCommand<?>) command)
                .subscribe(processor);
        objectChangedCommandFlowable = processor;
    }
    return objectChangedCommandFlowable;
}
项目:reduxfx    文件:ComponentDriver.java   
private Flowable<FireEventCommand<? extends Event>> getFireEventCommandFlowable() {
    if (fireEventCommandFlowable == null) {
        final FlowableProcessor<FireEventCommand<? extends Event>> processor = PublishProcessor.create();
        commandProcessor
                .filter(command -> command instanceof FireEventCommand)
                .map(command -> (FireEventCommand<? extends Event>) command)
                .subscribe(processor);
        fireEventCommandFlowable = processor;
    }
    return fireEventCommandFlowable;
}
项目:GitHub    文件:DownloadMission.java   
public abstract void init(Map<String, DownloadMission> missionMap,
Map<String, FlowableProcessor<DownloadEvent>> processorMap);
项目:RxShell    文件:Harvester.java   
BaseSub(String tag, Subscriber<? super T> customer, @Nullable List<String> buffer, @Nullable FlowableProcessor<String> processor) {
    this.tag = tag;
    this.customer = customer;
    this.processor = processor;
    this.buffer = buffer;
}
项目:RxShell    文件:Cmd.java   
public FlowableProcessor<String> getOutputProcessor() {
    return outputProcessor;
}
项目:RxShell    文件:Cmd.java   
public FlowableProcessor<String> getErrorProcessor() {
    return errorProcessor;
}
项目:MVPtemplate    文件:RxBus.java   
@SuppressWarnings("rawtypes")
public static boolean isEmpty(Collection<FlowableProcessor> collection) {
    return null == collection || collection.isEmpty();
}
项目:richeditor    文件:RequestBodyWrapper.java   
public FlowableProcessor<BaseUploadBean> getUploadProcessor() {
    return mUploadProcessor;
}
项目:RHub    文件:RxJava2ProcProxy.java   
public RxJava2ProcProxy(FlowableProcessor proc, TePolicy tePolicy) {
    super(proc, tePolicy);
}
项目:RxJava2Extensions    文件:RefCountProcessor.java   
@SuppressWarnings("unchecked")
RefCountProcessor(FlowableProcessor<T> actual) {
    this.actual = actual;
    this.upstream = new AtomicReference<Subscription>();
    this.subscribers = new AtomicReference<RefCountSubscriber<T>[]>(EMPTY);
}
项目:viska-android    文件:WebRtcPlugin.java   
public WebRtcPlugin() {
  final FlowableProcessor<EventObject> unsafeStream = PublishProcessor.create();
  this.eventStream = unsafeStream.toSerialized();
}
项目:resilience4j    文件:RxJava2Adapter.java   
/**
 * Converts the EventPublisher into a Flowable.
 *
 * @param eventPublisher the event publisher
 * @param <T> the type of the event
 * @return the Flowable
 */
public static <T> Flowable<T> toFlowable(EventPublisher<T> eventPublisher) {
    PublishProcessor<T> publishProcessor = PublishProcessor.create();
    FlowableProcessor<T> flowableProcessor = publishProcessor.toSerialized();
    eventPublisher.onEvent(flowableProcessor::onNext);
    return flowableProcessor;
}