Java 类io.reactivex.flowables.ConnectableFlowable 实例源码

项目:streamingpool-core    文件:OverlapBufferStreamTest.java   
@Test
public void ifStartEmitsOnlyOnceBeforeDataStreamNeverEnds() throws InterruptedException {
    CountDownLatch sync = new CountDownLatch(1);

    ConnectableFlowable<?> sourceStream = just(0L).publish();
    ConnectableFlowable<?> startStream = just(new Object()).publish();

    sourceStream.buffer(startStream, opening -> never()).doOnTerminate(sync::countDown)
            .subscribe(System.out::println);

    sourceStream.connect();
    startStream.connect();

    sync.await(5, SECONDS);

    assertThat(sync.getCount()).isEqualTo(0L);
}
项目:reactivejournal    文件:HelloWorldRemote.java   
public static void main(String... args) throws IOException, InterruptedException {
    //Create the rxRecorder but don't delete the cache that has been created.
    ReactiveJournal reactiveJournal = new ReactiveJournal(HelloWorldApp_JounalAsObserver.FILE_NAME);
    //Get the input from the remote process
    RxJavaPlayer rxPlayer = new RxJavaPlayer(reactiveJournal);
    PlayOptions options = new PlayOptions().filter(HelloWorldApp_JounalAsObserver.INPUT_FILTER)
            .playFromNow(true).replayRate(PlayOptions.ReplayRate.FAST);
    ConnectableFlowable<Byte> remoteInput = rxPlayer.play(options).publish();

    BytesToWordsProcessor bytesToWords = new BytesToWordsProcessor();
    Flowable<String> flowableOutput = bytesToWords.process(remoteInput);


    flowableOutput.subscribe(
            s->LOG.info("Remote input [{}]", s),
            e-> LOG.error("Problem in remote [{}]", e),
            ()->{
                LOG.info("Remote input ended");
                System.exit(0);
            });

    remoteInput.connect();
}
项目:reactivejournal    文件:RxJournalBackPressureTestingFasterConsumer.java   
public static void main(String[] args) throws IOException {
    ReactiveJournal reactiveJournal = new ReactiveJournal("src/main/java/org/reactivejournal/examples/fastproducerslowconsumer/resources/");

    //Get the input from the recorder note that we have to set the replayRate to ACTUAL_TIME
    //to replicate the conditions in the 'real world'.
    PlayOptions options = new PlayOptions().filter("input").replayRate(ReplayRate.ACTUAL_TIME);
    ConnectableFlowable journalInput = new RxJavaPlayer(reactiveJournal).play(options).publish();

    //Reduce the latency of the consumer to 5ms - try reducing or increasing to study the effects.
    Consumer onNextSlowConsumer = FastProducerSlowConsumer.createOnNextSlowConsumer(3);

    long startTime = System.currentTimeMillis();
    journalInput.observeOn(Schedulers.io()).subscribe(onNextSlowConsumer::accept,
            e -> System.out.println("ReactiveRecorder " + " " + e),
            () -> System.out.println("ReactiveRecorder complete [" + (System.currentTimeMillis()-startTime) + "ms]")
    );

    journalInput.connect();

    DSUtil.sleep(1000);
}
项目:streamingpool-core    文件:OverlapBufferStreamFactory.java   
@SuppressWarnings("unchecked")
@Override
public <T> ErrorStreamPair<T> create(StreamId<T> id, DiscoveryService discoveryService) {
    if (!(id instanceof OverlapBufferStreamId)) {
        return ErrorStreamPair.empty();
    }

    OverlapBufferStreamId<?> analysisId = (OverlapBufferStreamId<?>) id;

    BufferSpecification bufferSpecification = analysisId.bufferSpecification();

    StreamId<?> startId = bufferSpecification.startId();
    StreamId<?> sourceId = analysisId.sourceId();

    Flowable<?> timeout = bufferSpecification.timeout();

    ConnectableFlowable<?> startStream = Flowable.fromPublisher(discoveryService.discover(startId)).publish();
    ConnectableFlowable<?> sourceStream = Flowable.fromPublisher(discoveryService.discover(sourceId)).publish();

    Set<EndStreamMatcher<?, ?>> matchers = bufferSpecification.endStreamMatchers();
    Map<EndStreamMatcher<Object, Object>, ConnectableFlowable<?>> endStreams = matchers.stream()
            .collect(Collectors.toMap(m -> (EndStreamMatcher<Object, Object>) m,
                    m -> Flowable.fromPublisher(discoveryService.discover(m.endStreamId())).publish()));

    StreamConnector sourceStreamConnector = new StreamConnector(sourceStream);
    Flowable<?> bufferStream = sourceStream
            .compose(new DoAfterFirstSubscribe<>(() -> {
                endStreams.values().forEach(ConnectableFlowable::connect);
                startStream.connect();
            }))
            .buffer(startStream,
                    opening -> closingStreamFor(opening, endStreams, timeout, sourceStreamConnector));
    return ErrorStreamPair.ofData((Publisher<T>) bufferStream);
}
项目:streamingpool-core    文件:OverlapBufferStreamFactory.java   
private Flowable<?> closingStreamFor(Object opening,
        Map<EndStreamMatcher<Object, Object>, ConnectableFlowable<?>> endStreams, Flowable<?> timeout,
        StreamConnector sourceStreamConnector) {

    Set<Flowable<?>> matchingEndStreams = endStreams.entrySet().stream()
            .map(e -> e.getValue().filter(v -> e.getKey().matching().test(opening, v))).collect(Collectors.toSet());

    matchingEndStreams.add(timeout);

    return Flowable.merge(matchingEndStreams)
            .compose(new DoAfterFirstSubscribe<>(sourceStreamConnector::connect))
            .take(1);
}
项目:reactivejournal    文件:HelloWorldApp_JounalAsObserver.java   
public static void main(String[] args) throws IOException {
    ConnectableFlowable flowableInput =
            Flowable.fromArray(new Byte[]{72,101,108,108,111,32,87,111,114,108,100,32}).map(
                    i->{
                        DSUtil.sleep(INTERVAL_MS);
                        return i;
                    }).publish();

    //Create the reactiveRecorder and delete any previous content by clearing the cache
    ReactiveJournal reactiveJournal = new ReactiveJournal(FILE_NAME);
    reactiveJournal.clearCache();

    //Pass the input stream into the reactiveRecorder which will subscribe to it and record all events.
    //The subscription will not be activated until 'connect' is called on the input stream.
    ReactiveRecorder reactiveRecorder = reactiveJournal.createReactiveRecorder();
    reactiveRecorder.record(flowableInput, INPUT_FILTER);

    BytesToWordsProcessor bytesToWords = new BytesToWordsProcessor();
    //Pass the input Byte stream into the BytesToWordsProcessor class which subscribes to the stream and returns
    //a stream of words.
    //The subscription will not be activated until 'connect' is called on the input stream.
    Flowable<String> flowableOutput = bytesToWords.process(flowableInput);

    //Pass the output stream (of words) into the reactiveRecorder which will subscribe to it and record all events.
    flowableOutput.subscribe(LOG::info);
    reactiveRecorder.record(flowableOutput, OUTPUT_FILTER);

    //Activate the subscriptions
    flowableInput.connect();

    reactiveJournal.writeToFile("/tmp/Demo/demo.txt",true);
}
项目:reactivejournal    文件:HelloWorldApp_JournalPlayThrough.java   
public static void main(String[] args) throws IOException {

        //Create the reactiveRecorder and delete any previous content by clearing the cache
        ReactiveJournal reactiveJournal = new ReactiveJournal(FILE_NAME);
        reactiveJournal.clearCache();

        //Pass the input stream into the reactiveRecorder which will subscribe to it and record all events.
        //The subscription will not be activated on a new thread which will allow this program to continue.
        ReactiveRecorder reactiveRecorder = reactiveJournal.createReactiveRecorder();
        reactiveRecorder.recordAsync(observableInput, INPUT_FILTER);

        BytesToWordsProcessor bytesToWords = new BytesToWordsProcessor();

        //Retrieve a stream of
        RxJavaPlayer rxPlayer = new RxJavaPlayer(reactiveJournal);
        PlayOptions options = new PlayOptions().filter(INPUT_FILTER).playFromNow(true);
        ConnectableFlowable recordedObservable = rxPlayer.play(options).publish();
        //Pass the input Byte stream into the BytesToWordsProcessor class which subscribes to the stream and returns
        //a stream of words.
        Flowable<String> flowableOutput = bytesToWords.process(recordedObservable);

        //Pass the output stream (of words) into the reactiveRecorder which will subscribe to it and record all events.
        reactiveRecorder.record(flowableOutput, OUTPUT_FILTER);
        flowableOutput.subscribe(s -> LOG.info("HelloWorldHot->" + s),
                throwable -> LOG.error("", throwable),
                ()->LOG.info("HelloWorldHot Complete"));
        //Only start the recording now because we want to make sure that the BytesToWordsProcessor and the reactiveRecorder
        //are both setup up to receive subscriptions.
        recordedObservable.connect();
        //Sometimes useful to see the recording written to a file
        reactiveJournal.writeToFile("/tmp/Demo/demo.txt",true);
    }
项目:reactivejournal    文件:RxJournalBackPressureLatest.java   
public static void main(String[] args) throws IOException {

        ReactiveJournal reactiveJournal = new ReactiveJournal("/tmp/fastproducer");
        reactiveJournal.clearCache();
        Flowable<Long> fastProducer = FastProducerSlowConsumer.createFastProducer(BackpressureStrategy.MISSING, 2500);

        ReactiveRecorder recorder = reactiveJournal.createReactiveRecorder();
        recorder.recordAsync(fastProducer,"input");
        //Set the replay strategy to ReplayRate.FAST as e want to process the event as soon as it is
        //received from the publisher.
        PlayOptions options = new PlayOptions().filter("input").replayRate(PlayOptions.ReplayRate.FAST);
        ConnectableFlowable journalInput = new RxJavaPlayer(reactiveJournal).play(options).publish();

        Consumer onNextSlowConsumer = FastProducerSlowConsumer.createOnNextSlowConsumer(10);

        recorder.record(journalInput, "consumed");

        long startTime = System.currentTimeMillis();
        journalInput.observeOn(Schedulers.io()).subscribe(onNextSlowConsumer::accept,
                e -> System.out.println("ReactiveRecorder " + " " + e),
                () -> System.out.println("ReactiveRecorder complete [" + (System.currentTimeMillis()-startTime) + "]")
        );

        journalInput.connect();

        DSUtil.sleep(3000);
    }
项目:RxJava2Extensions    文件:FlowableRefCountTimeout.java   
FlowableRefCountTimeout(ConnectableFlowable<T> source, int n, long timeout, TimeUnit unit,
        Scheduler scheduler) {
    this.source = source;
    this.n = n;
    this.timeout = timeout;
    this.unit = unit;
    this.scheduler = scheduler;
}
项目:RxJava2Extensions    文件:FlowableRefCountTimeout.java   
@Override
public Publisher<T> apply(Flowable<T> upstream) {
    if (upstream instanceof ConnectableFlowable) {
        return new FlowableRefCountTimeout<T>((ConnectableFlowable<T>)upstream, n, timeout, unit, scheduler);
    }
    throw new IllegalArgumentException("This transformer requires an upstream ConnectableFlowable");
}
项目:Attendance    文件:ApproveListFragment.java   
@Override
public void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    String githubToken = Constants.ETEST_API_KEY;
    String urlx = "http:\\" + SettingsActivity.getServerName(getActivity());
    String usicox = SettingsActivity.getUsIco(getActivity());
    if( usicox.equals("44551142")) {
        urlx = "http:\\" + Constants.EDCOM_url;
    }
    _rfetestService = RfEtestService.createGithubService(githubToken, urlx);

    _disposables = new CompositeDisposable();

    _rxBus = getRxBusSingleton();

    ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();

    _disposables
            .add(tapEventEmitter.subscribe(event -> {
                if (event instanceof ApproveListFragment.TapEvent) {
                    ///_showTapText();
                }
                if (event instanceof Attendance) {
                    String keys = ((Attendance) event).getRok();
                    //Log.d("In FRGM longClick", keys);
                    getApproveDialog( keys, (Attendance) event);

                }
            }));

    _disposables
            .add(tapEventEmitter.publish(stream ->
                    stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
                    .observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
                        ///_showTapCount(taps.size()); OK
                    }));

    _disposables.add(tapEventEmitter.connect());
}
项目:Attendance    文件:AbsTypesListRxFragment.java   
@Override
public void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    String githubToken = Constants.ETEST_API_KEY;
    String urlx = SettingsActivity.getServerName(getActivity());

    _disposables = new CompositeDisposable();

    _rxBus = getRxBusSingleton();

    ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();

    _disposables
            .add(tapEventEmitter.subscribe(event -> {
                if (event instanceof AbsTypesListRxFragment.TapEvent) {
                    ///_showTapText();
                }
                if (event instanceof Abstype) {
                    String keys = ((Abstype) event).getRok();
                    //Log.d("In FRGM longClick", keys);
                    getAbsTypesDialog( keys, (Abstype) event);

                }
            }));

    _disposables
            .add(tapEventEmitter.publish(stream ->
                    stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
                    .observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
                        ///_showTapCount(taps.size()); OK
                    }));

    _disposables.add(tapEventEmitter.connect());


}
项目:Attendance    文件:RxBusDemo_Bottom3Fragment.java   
@Override
public void onStart() {
    super.onStart();
    _disposables = new CompositeDisposable();

    ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();

    _disposables
                .add(tapEventEmitter.subscribe(event -> {
              if (event instanceof RxBusDemoFragment.TapEvent) {
                  _showTapText();
              }
                    if (event instanceof EventRxBus.Message) {
                        tvContent = (TextView) getActivity().findViewById(R.id.tvContent);
                        tvContent.setText(((EventRxBus.Message) event).message);
                    }
          }));

    _disposables
          .add(tapEventEmitter.publish(stream ->
                stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
                              .observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
                    _showTapCount(taps.size());
                }));

    _disposables.add(tapEventEmitter.connect());

}
项目:Attendance    文件:PostsFragment.java   
@Override public void onCreate(Bundle savedInstanceState) {
  super.onCreate(savedInstanceState);

  _rxBus = getRxBusSingleton();

  _disposables = new CompositeDisposable();

  ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();

  _disposables
          .add(tapEventEmitter.subscribe(event -> {
            if (event instanceof PostsFragment.TapEvent) {
              ///_showTapText();
            }
            if (event instanceof BlogPostEntity) {
              //saveAbsServer(((Attendance) event).daod + " / " + ((Attendance) event).dado, ((Attendance) event));
              String keys = ((BlogPostEntity) event).getAuthor();
              //blogPostsAdapter.remove(0);
              showProgress(true);
              Log.d("In FRGM shortClick", keys);
              BlogPostEntity postx = new BlogPostEntity(null, null, null);
              delBlogPostRx(postx,1, keys);

            }
          }));

  _disposables
          .add(tapEventEmitter.publish(stream ->
                  stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
                  .observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
                    ///_showTapCount(taps.size()); OK
                  }));

  _disposables.add(tapEventEmitter.connect());

}
项目:StompProtocolAndroid    文件:StompClient.java   
public Flowable<Void> send(StompMessage stompMessage) {
    Flowable<Void> flowable = mConnectionProvider.send(stompMessage.compile());
    if (!mConnected) {
        ConnectableFlowable<Void> deferred = flowable.publish();
        mWaitConnectionFlowables.add(deferred);
        return deferred;
    } else {
        return flowable;
    }
}
项目:akarnokd-misc    文件:DoOnErrorFusion.java   
public static void main(String[] args) {
    ConnectableFlowable<Integer> f = Flowable.just(1)
            .doOnNext(i -> {
                throw new IllegalArgumentException();
            })
            .doOnError(e -> {
                throw new IllegalStateException(e);
            }).publish();
    f.subscribe(
            i -> { throw new AssertionError(); }, 
            e -> e.printStackTrace());
    f.connect();
}
项目:RxJava-Android-Samples    文件:RxBusDemo_Bottom3Fragment.java   
@Override
public void onStart() {
  super.onStart();
  _disposables = new CompositeDisposable();

  ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();

  _disposables //
      .add(
      tapEventEmitter.subscribe(
          event -> {
            if (event instanceof RxBusDemoFragment.TapEvent) {
              _showTapText();
            }
          }));

  _disposables.add(
      tapEventEmitter
          .publish(stream -> stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(
              taps -> {
                _showTapCount(taps.size());
              }));

  _disposables.add(tapEventEmitter.connect());
}
项目:contentful.java    文件:Callbacks.java   
static <O extends CDAResource, C extends CDAResource> CDACallback<C> subscribeAsync(
    Flowable<O> flowable, CDACallback<C> callback, CDAClient client) {
  ConnectableFlowable<O> connectable = flowable.observeOn(Schedulers.io()).publish();

  callback.setSubscription(connectable.subscribe(
      new SuccessAction<O>(callback, client),
      new FailureAction(callback, client)));

  connectable.connect();

  return callback;
}
项目:streamingpool-core    文件:OverlapBufferStreamFactory.java   
private StreamConnector(ConnectableFlowable<?> stream) {
    this.stream = stream;
}
项目:reactivejournal    文件:HelloWorldTest.java   
@Test
public void testHelloWorld() throws IOException, InterruptedException {
    //Create the rxRecorder but don't delete the cache that has been created.
    ReactiveJournal reactiveJournal = new ReactiveJournal(HelloWorldApp_JounalAsObserver.FILE_NAME);
    //reactiveJournal.writeToFile("/tmp/Demo/demo.txt", true);

    //Get the input from the recorder
    RxJavaPlayer rxPlayer = new RxJavaPlayer(reactiveJournal);
    //In this case we can play the data stream in FAST mode.
    PlayOptions options= new PlayOptions().filter(HelloWorldApp_JounalAsObserver.INPUT_FILTER)
            .replayRate(PlayOptions.ReplayRate.FAST).sameThread(true);
    //Use a ConnectableObservable as we only want to kick off the stream when all
    //connections have been wired together.
    ConnectableFlowable<Byte> observableInput = rxPlayer.play(options).publish();

    BytesToWordsProcessor bytesToWords = new BytesToWordsProcessor();
    Flowable<String> flowableOutput = bytesToWords.process(observableInput);

    CountDownLatch latch = new CountDownLatch(1);
    //Send the output stream to the recorder to be validated against the recorded output
    ReactiveValidator reactiveValidator = reactiveJournal.createReactiveValidator();
    reactiveValidator.validate(HelloWorldApp_JounalAsObserver.FILE_NAME + "/.reactiveJournal",
            flowableOutput, HelloWorldApp_JounalAsObserver.OUTPUT_FILTER, new Subscriber() {
                @Override
                public void onSubscribe(Subscription subscription) {
                    subscription.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(Object o) {
                    LOG.info(o.toString());
                }

                @Override
                public void onError(Throwable throwable) {
                    LOG.error("Problem in process test [{}]", throwable);
                }

                @Override
                public void onComplete() {
                    LOG.info("Summary[" + reactiveValidator.getValidationResult().summaryResult()
                            + "] items compared[" + reactiveValidator.getValidationResult().summaryItemsCompared()
                            + "] items valid[" + reactiveValidator.getValidationResult().summaryItemsValid() +"]");
                    latch.countDown();
                }
            });

    observableInput.connect();
    boolean completedWithoutTimeout = latch.await(200, TimeUnit.SECONDS);
    Assert.assertEquals(ValidationResult.Result.OK, reactiveValidator.getValidationResult().getResult());
    Assert.assertTrue(completedWithoutTimeout);
}
项目:reactivejournal    文件:ReactiveRecorderTest.java   
@Test
public void recorderTest() throws IOException{
    //Flowable used to create the control run.
    Flowable<Byte> observableInput = HelloWorldApp_JournalPlayThrough.observableInput;

    //Create the rxRecorder and delete any previous content by clearing the cache
    ReactiveJournal reactiveJournal = new ReactiveJournal(tmpDir +"/playTest");
    reactiveJournal.clearCache();

    //Pass the input stream into the rxRecorder which will subscribe to it and record all events.
    //The subscription will happen on a new thread which will allow this program to continue.
    ReactiveRecorder reactiveRecorder = reactiveJournal.createReactiveRecorder();
    reactiveRecorder.recordAsync(observableInput, "input");

    BytesToWordsProcessor bytesToWords = new BytesToWordsProcessor();

    RxJavaPlayer rxPlayer = new RxJavaPlayer(reactiveJournal);
    PlayOptions options = new PlayOptions().filter("input").playFromNow(true).sameThread(true);
    ConnectableFlowable recordedObservable = rxPlayer.play(options).publish();
    //Pass the input Byte stream into the BytesToWordsProcessor class which subscribes to the stream and returns
    //a stream of words.
    Flowable<String> flowableOutput = bytesToWords.process(recordedObservable);

    //Pass the output stream (of words) into the rxRecorder which will subscribe to it and record all events.
    reactiveRecorder.record(flowableOutput, "output");

    //Only start the recording now because we want to make sure that the BytesToWordsProcessor and the rxRecorder
    //are both setup up to receive subscriptions.
    recordedObservable.connect();
    reactiveJournal.writeToFile(tmpDir + "/playTest/playTest.txt",true);

    List<String> toBeTested = Files.readAllLines(Paths.get(tmpDir + "/playTest/playTest.txt"));
    List<String> controlSet = Files.readAllLines(Paths.get("src/test/resources/playTest.txt"));

    Assert.assertEquals(controlSet.size(), toBeTested.size());

    //Asert all the values are in both files - they might not be in exactly the same order

    String[] controlSetInput = getFilterLinesFromFiles(controlSet, "input");
    String[] toBeTestedInput= getFilterLinesFromFiles(toBeTested, "input");
    Assert.assertArrayEquals(controlSetInput, toBeTestedInput);

    String[] controlSetOutput = getFilterLinesFromFiles(controlSet, "output");
    String[] toBeTestedOutput= getFilterLinesFromFiles(toBeTested, "output");
    Assert.assertArrayEquals(controlSetOutput, toBeTestedOutput);

    String[] controlSetEOS = getFilterLinesFromFiles(controlSet, "endOfStream");
    String[] toBeTestedEOS= getFilterLinesFromFiles(toBeTested, "endOfStream");
    Assert.assertArrayEquals(controlSetEOS, toBeTestedEOS);
}
项目:reactivejournal    文件:ReactivePlayerTest.java   
@Test
public void testPlay() throws IOException, InterruptedException {
    //Create the rxRecorder but don't delete the cache that has been created.
    ReactiveJournal reactiveJournal = new ReactiveJournal("src/test/resources/", "");
    reactiveJournal.writeToFile(tmpDir +"/rctext.txt", true);

    //Get the input from the recorder
    RxJavaPlayer rxPlayer = new RxJavaPlayer(reactiveJournal);
    PlayOptions options= new PlayOptions()
            .filter(HelloWorldApp_JounalAsObserver.INPUT_FILTER)
            .replayRate(REPLAY_RATE_STRATEGY)
            .completeAtEndOfFile(false);
    ConnectableFlowable<Byte> observableInput = rxPlayer.play(options).publish();

    BytesToWordsProcessor bytesToWords = new BytesToWordsProcessor();
    Flowable<String> flowableOutput = bytesToWords.process(observableInput);

    CountDownLatch latch = new CountDownLatch(1);
    //Send the output stream to the recorder to be validated against the recorded output
    ReactiveValidator reactiveValidator = reactiveJournal.createReactiveValidator();
    reactiveValidator.validate("src/test/resources/",
            flowableOutput, HelloWorldApp_JounalAsObserver.OUTPUT_FILTER, new Subscriber() {
                @Override
                public void onSubscribe(Subscription subscription) {

                }

                @Override
                public void onNext(Object o) {
                    LOG.info(o.toString());
                }

                @Override
                public void onError(Throwable throwable) {
                    LOG.error("Problem in process test [{}]", throwable);
                }

                @Override
                public void onComplete() {
                    LOG.info("Summary[" + reactiveValidator.getValidationResult().summaryResult()
                            + "] items compared[" + reactiveValidator.getValidationResult().summaryItemsCompared()
                            + "] items valid[" + reactiveValidator.getValidationResult().summaryItemsValid() +"]");
                    latch.countDown();
                }
            });

    observableInput.connect();
    boolean completedWithoutTimeout = latch.await(2, TimeUnit.SECONDS);
    Assert.assertEquals(ValidationResult.Result.OK, reactiveValidator.getValidationResult().getResult());
    Assert.assertTrue(completedWithoutTimeout);
}
项目:RxJava2Debug    文件:FlowableOnAssemblyConnectable.java   
FlowableOnAssemblyConnectable(ConnectableFlowable<T> source) {
    this.source = source;
    this.assembled = new RxJavaAssemblyException();
}
项目:RxjavaExample    文件:ExampleUnitTest.java   
@Test
public void ConnectableObservable() throws Exception{
    ConnectableFlowable<Integer> connect = Flowable.just(1,2,3).publish();
    connect.subscribe(this::print);
    connect.connect();//此时开始发射数据
}
项目:RxJava2Extensions    文件:ConnectableFlowableValidator.java   
ConnectableFlowableValidator(ConnectableFlowable<T> source, PlainConsumer<ProtocolNonConformanceException> onViolation) {
    this.source = source;
    this.onViolation = onViolation;
}
项目:RxJava2Extensions    文件:FlowableOnAssemblyConnectable.java   
FlowableOnAssemblyConnectable(ConnectableFlowable<T> source) {
    this.source = source;
    this.assembled = new RxJavaAssemblyException();
}
项目:RxJava2Extensions    文件:RxJavaProtocolValidatorTest.java   
@Test
public void connectableFlowable() {
    ConnectableFlowable<Integer> source = new ConnectableFlowable<Integer>() {

        @Override
        protected void subscribeActual(Subscriber<? super Integer> s) {
            s.onComplete();
            s.onError(null);
            s.onError(new IOException());
            s.onNext(null);
            s.onNext(1);
            s.onSubscribe(null);
            s.onSubscribe(new BooleanSubscription());
            s.onSubscribe(new BooleanSubscription());
            s.onComplete();
            s.onNext(2);
        }

        @Override
        public void connect(Consumer<? super Disposable> connection) {
        }
    };

    RxJavaProtocolValidator.setOnViolationHandler(this);
    Assert.assertSame(this, RxJavaProtocolValidator.getOnViolationHandler());

    SavedHooks h = RxJavaProtocolValidator.enableAndChain();
    Assert.assertTrue(RxJavaProtocolValidator.isEnabled());

    try {
        Flowable.just(1).publish().autoConnect().test().assertResult(1);
        Flowable.empty().publish().autoConnect().test().assertResult();
        Flowable.error(new IOException()).test().assertFailure(IOException.class);

        ConnectableFlowable<Integer> c = RxJavaPlugins.onAssembly(source);

        c.test(0);

        c.connect();

        Assert.assertEquals(15, errors.size());
        TestHelper.assertError(errors, 0, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 1, NullOnErrorParameterException.class);
        TestHelper.assertError(errors, 2, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 3, MultipleTerminationsException.class);
        TestHelper.assertError(errors, 4, OnSubscribeNotCalledException.class);
        Assert.assertTrue("" + errors.get(4).getCause(), errors.get(4).getCause() instanceof IOException);
        TestHelper.assertError(errors, 5, MultipleTerminationsException.class);
        TestHelper.assertError(errors, 6, NullOnNextParameterException.class);
        TestHelper.assertError(errors, 7, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 8, OnNextAfterTerminationException.class);
        TestHelper.assertError(errors, 9, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 10, OnNextAfterTerminationException.class);
        TestHelper.assertError(errors, 11, NullOnSubscribeParameterException.class);
        TestHelper.assertError(errors, 12, MultipleOnSubscribeCallsException.class);
        TestHelper.assertError(errors, 13, MultipleTerminationsException.class);
        TestHelper.assertError(errors, 14, OnNextAfterTerminationException.class);
    } finally {
        h.restore();
        RxJavaProtocolValidator.setOnViolationHandler(null);
    }
}
项目:Attendance    文件:CompanyChooseBaseSearchActivity.java   
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
  super.onCreate(savedInstanceState);
  setContentView(R.layout.companychoose_activity);

  mActionBarToolbar = (Toolbar) findViewById(R.id.tool_bar);
  setSupportActionBar(mActionBarToolbar);
  getSupportActionBar().setTitle(getString(R.string.choosecompany));

  mSubscription = new CompositeSubscription();
  FloatingActionButton fab = (FloatingActionButton) findViewById(R.id.fab);
  fab.setOnClickListener(v -> {

    mSubscription.add(getNewCompanyDialog(getString(R.string.newcompany), getString(R.string.fullfirma))
            .subscribeOn(rx.android.schedulers.AndroidSchedulers.mainThread())
            .observeOn(Schedulers.computation())
            .subscribe(this::setBoolean)
    );

          }
  );

  mQueryEditText = (EditText) findViewById(R.id.query_edit_text);
  mSearchButton = (Button) findViewById(R.id.search_button);
  mProgressBar = (ProgressBar) findViewById(R.id.progress_bar);

  _rxBus = getRxBusSingleton();

  RecyclerView list = (RecyclerView) findViewById(R.id.list);
  list.setLayoutManager(new LinearLayoutManager(this));
  list.setAdapter(mAdapter = new CompanyChooseAdapter(_rxBus));

  _disposables = new CompositeDisposable();

  ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();

  _disposables
          .add(tapEventEmitter.subscribe(event -> {
            if (event instanceof CompanyChooseBaseSearchActivity.OnItemClickEvent) {

            }
            if (event instanceof Company) {
              Company model = (Company) event;
              saveIcoId(model.cmico+ " " + model.cmname, model);

            }
          }));

  _disposables
          .add(tapEventEmitter.publish(stream ->
                  stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
                  .observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
                    ///_showTapCount(taps.size()); OK
                  }));

  _disposables.add(tapEventEmitter.connect());
}
项目:Attendance    文件:DgAllEmpsAbsListFragment.java   
@Override
public void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);

    //create mvvm without dagger2
    //mViewModel = getAllEmpsAbsMvvmViewModel();

    _disposables = new CompositeDisposable();

    _rxBus = ((AttendanceApplication) getActivity().getApplication()).getRxBusSingleton();

    ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();

    _disposables
            .add(tapEventEmitter.subscribe(event -> {
                if (event instanceof DgAllEmpsAbsListFragment.ClickFobEvent) {
                    Log.d("DgAllEmpsAbsActivity  ", " fobClick ");
                    String serverx = "DgAllEmpsAbsListFragment fobclick";
                    Toast.makeText(getActivity(), serverx, Toast.LENGTH_SHORT).show();


                }
                if (event instanceof RealmEmployee) {
                    String idemp = ((RealmEmployee) event).getKeyf();
                    RealmEmployee model= (RealmEmployee) event;

                    //Log.d("AllEmpsAbsListFragment ", icos);
                    //String serverx = "AllEmpsAbsListFragment shortclick";
                    //Toast.makeText(getActivity(), serverx, Toast.LENGTH_SHORT).show();

                    Intent i = new Intent(getActivity(), AbsenceActivity.class);
                    Bundle extras = new Bundle();
                    extras.putString("fromact", "1");
                    extras.putString("idemp", idemp);
                    i.putExtras(extras);
                    startActivity(i);

                }

            }));

    _disposables
            .add(tapEventEmitter.publish(stream ->
                    stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
                    .observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
                        ///_showTapCount(taps.size()); OK
                    }));

    _disposables.add(tapEventEmitter.connect());
}
项目:Attendance    文件:AbsenceListRxFragment.java   
@Override
public void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    String githubToken = Constants.ETEST_API_KEY;
    String urlx = SettingsActivity.getServerName(getActivity());

    fromact = getArguments().getString("fromact");
    Log.d("idemp inFrg fromact", fromact);
    idemp = getArguments().getString("idemp");
    Log.d("idemp inFrg idemp", idemp);

    gettimestramp = FirebaseDatabase.getInstance().getReference("gettimestamp");
    getTimeListener = new ValueEventListener() {
        public void onDataChange(DataSnapshot dataSnapshot) {
            timestampx=dataSnapshot.getValue().toString();
            Log.d(TAG, "ServerValue.TIMESTAMP oncreate " + timestampx);
        }
        public void onCancelled(DatabaseError databaseError) { }
    };

    gettimestramp.addValueEventListener(getTimeListener);
    gettimestramp.setValue(ServerValue.TIMESTAMP);

    _disposables = new CompositeDisposable();

    _rxBus = getRxBusSingleton();

    ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();

    _disposables
            .add(tapEventEmitter.subscribe(event -> {
                if (event instanceof AbsenceListRxFragment.TapEvent) {
                    ///_showTapText();
                }
                if (event instanceof Attendance) {
                    String keys = ((Attendance) event).getRok();
                    //Log.d("In FRGM longClick", keys);

                    Attendance model= (Attendance) event;
                    final String datsx = model.getDatsString();
                    //Log.d(TAG, "datsx " + datsx);

                    gettimestramp.setValue(ServerValue.TIMESTAMP);
                    //Log.d(TAG, "ServerValue.TIMESTAMP " + timestampx);

                    long timestampl = Long.parseLong(timestampx);
                    long datsl = Long.parseLong(datsx);
                    long rozdiel = timestampl - datsl;
                    //Log.d(TAG, "rozdiel " + rozdiel);

                    //Toast.makeText(getActivity(), "Longclick " + keys,Toast.LENGTH_SHORT).show();

                    if( model.aprv.equals("2")) {
                        rozdiel=1;
                    }

                    if( rozdiel < 180000 ) {
                        getAbsenceDialog( keys, (Attendance) event);
                    }else{
                        Toast.makeText(getActivity(), getResources().getString(R.string.cantdel),Toast.LENGTH_SHORT).show();
                    }


                }
            }));

    _disposables
            .add(tapEventEmitter.publish(stream ->
                    stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
                    .observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
                        ///_showTapCount(taps.size()); OK
                    }));

    _disposables.add(tapEventEmitter.connect());


}
项目:Attendance    文件:AttendanceListRxFragment.java   
@Override
public void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    String githubToken = Constants.ETEST_API_KEY;
    String urlx = SettingsActivity.getServerName(getActivity());

    gettimestramp = FirebaseDatabase.getInstance().getReference("gettimestamp");
    getTimeListener = new ValueEventListener() {
        public void onDataChange(DataSnapshot dataSnapshot) {
            timestampx=dataSnapshot.getValue().toString();
            Log.d("Att TIMES oncreate ", timestampx);

        }
        public void onCancelled(DatabaseError databaseError) { }
    };

    gettimestramp.addValueEventListener(getTimeListener);
    gettimestramp.setValue(ServerValue.TIMESTAMP);

    _disposables = new CompositeDisposable();

    _rxBus = getRxBusSingleton();

    ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();

    _disposables
            .add(tapEventEmitter.subscribe(event -> {
                if (event instanceof AttendanceListRxFragment.TapEvent) {
                    ///_showTapText();
                }
                if (event instanceof Attendance) {
                    String keys = ((Attendance) event).getRok();
                    //Log.d("In FRGM longClick", keys);
                    getAttendanceDialog( keys, (Attendance) event);

                }
            }));

    _disposables
            .add(tapEventEmitter.publish(stream ->
                    stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
                    .observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
                        ///_showTapCount(taps.size()); OK
                    }));

    _disposables.add(tapEventEmitter.connect());
}
项目:Attendance    文件:AllEmpsAbsListFragment.java   
@Override
public void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);

    //create mvvm without dagger2
    //mViewModel = getAllEmpsAbsMvvmViewModel();

    _disposables = new CompositeDisposable();

    _rxBus = ((AttendanceApplication) getActivity().getApplication()).getRxBusSingleton();

    ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();

    _disposables
            .add(tapEventEmitter.subscribe(event -> {
                if (event instanceof AllEmpsAbsListFragment.ClickFobEvent) {
                    Log.d("AllEmpsAbsActivity  ", " fobClick ");
                    String serverx = "AllEmpsAbsListFragment fobclick";
                    Toast.makeText(getActivity(), serverx, Toast.LENGTH_SHORT).show();


                }
                if (event instanceof RealmEmployee) {
                    String idemp = ((RealmEmployee) event).getKeyf();
                    RealmEmployee model= (RealmEmployee) event;

                    //Log.d("AllEmpsAbsListFragment ", icos);
                    //String serverx = "AllEmpsAbsListFragment shortclick";
                    //Toast.makeText(getActivity(), serverx, Toast.LENGTH_SHORT).show();

                    Intent i = new Intent(getActivity(), AbsenceActivity.class);
                    Bundle extras = new Bundle();
                    extras.putString("fromact", "1");
                    extras.putString("idemp", idemp);
                    i.putExtras(extras);
                    startActivity(i);

                }

            }));

    _disposables
            .add(tapEventEmitter.publish(stream ->
                    stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
                    .observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
                        ///_showTapCount(taps.size()); OK
                    }));

    _disposables.add(tapEventEmitter.connect());
}
项目:Attendance    文件:AbsServerAsBaseSearchActivity.java   
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
  super.onCreate(savedInstanceState);
  setContentView(R.layout.activity_absserver);

  //ActivityAbsserverDbindBinding binding = DataBindingUtil.setContentView(this, R.layout.activity_absserver_dbind);
  //binding.setServerabs(new Attendance());

  mActionBarToolbar = (Toolbar) findViewById(R.id.tool_bar);
  setSupportActionBar(mActionBarToolbar);
  getSupportActionBar().setTitle(getString(R.string.action_absmysql));

  mQueryEditText = (EditText) findViewById(R.id.query_edit_text);
  mSearchButton = (Button) findViewById(R.id.search_button);
  mProgressBar = (ProgressBar) findViewById(R.id.progress_bar);

  String githubToken = Constants.ETEST_API_KEY;
  String urlx = "http:\\" + SettingsActivity.getServerName(this);
  String usicox = SettingsActivity.getUsIco(this);
  if( usicox.equals("44551142")) {
    urlx = "http:\\" + Constants.EDCOM_url;
  }
  _githubService = RfEtestService.createGithubService(githubToken, urlx);

  //cheeses = Arrays.asList(getResources().getStringArray(R.array.cheeses3));

  _rxBus = getRxBusSingleton();

  RecyclerView list = (RecyclerView) findViewById(R.id.list);
  list.setLayoutManager(new LinearLayoutManager(this));
  list.setAdapter(mAdapter = new AbsServerAsAdapter(_rxBus));

  _disposables = new CompositeDisposable();

  ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();

  _disposables
          .add(tapEventEmitter.subscribe(event -> {
            if (event instanceof AbsServerAsBaseSearchActivity.TapEvent) {
              ///_showTapText();
            }
            if (event instanceof Attendance) {
              ///tvContent = (TextView) findViewById(R.id.tvContent);
              ///tvContent.setText(((EventRxBus.Message) event).message); OK change event instanceof to EventRxBus.Message
              ///_showTapTextToast(((EventRxBus.Absence) event).daod + " / " + ((EventRxBus.Absence) event).dado); OK change event instanceof to EventRxBus.Absence
              saveAbsServer(((Attendance) event).daod + " / " + ((Attendance) event).dado, ((Attendance) event));
            }
          }));

  _disposables
          .add(tapEventEmitter.publish(stream ->
                  stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
                  .observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
                    ///_showTapCount(taps.size()); OK
                  }));

  _disposables.add(tapEventEmitter.connect());
}
项目:Attendance    文件:EmployeeMvvmActivity.java   
@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_mvvm_employees);

    coordinatorLayout = (CoordinatorLayout) findViewById(R.id
            .coordinatorLayout);

    mActionBarToolbar = (Toolbar) findViewById(R.id.tool_bar);
    setSupportActionBar(mActionBarToolbar);
    getSupportActionBar().setTitle(getString(R.string.action_myemployee));

    mViewModel = getEmployeeMvvmViewModel();

    _rxBus = ((AttendanceApplication) getApplication()).getRxBusSingleton();

    _disposables = new CompositeDisposable();
    ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();
    _disposables
            .add(tapEventEmitter.subscribe(event -> {

                //Log.d("rxBus ", "tapEventEmitter");

                if (event instanceof EmployeeMvvmActivity.FobTapEvent) {
                    Log.d("EmpoloyeeActivity  ", " fobClick ");

                    //attention - activity leaked
                    //mSubscription.add(getNewEmployeeDialog(getString(R.string.newcompany), getString(R.string.fullfirma))
                    //        .subscribeOn(rx.android.schedulers.AndroidSchedulers.mainThread())
                    //        .observeOn(Schedulers.computation())
                    //        .subscribe(this::setBoolean)
                    //);;
                }
                if (event instanceof Employee) {
                    String keys = ((Employee) event).getUsatw();
                    //Log.d("In FRGM longClick", keys);

                    Employee model= (Employee) event;

                    //Toast.makeText(this, "Longclick " + keys,Toast.LENGTH_SHORT).show();
                    getEditEmloyeeDialog(model);

                }
            }));

    _disposables
            .add(tapEventEmitter.publish(stream ->
                    stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
                    .observeOn(io.reactivex.android.schedulers.AndroidSchedulers.mainThread()).subscribe(taps -> {
                        ///_showTapCount(taps.size()); OK
                    }));

    _disposables.add(tapEventEmitter.connect());

    setupViews();

    FloatingActionButton fab = (FloatingActionButton) findViewById(R.id.fab);
    fab.setOnClickListener(new View.OnClickListener() {
        public void onClick(View v) {

            Toast.makeText(EmployeeMvvmActivity.this, R.string.createemployee, Toast.LENGTH_LONG).show();

        }
    });

}
项目:Attendance    文件:CompaniesListFragment.java   
@Override
public void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);

    mViewModel = getCompaniesMvvmViewModel();

    _disposables = new CompositeDisposable();

    _rxBus = ((AttendanceApplication) getActivity().getApplication()).getRxBusSingleton();

    ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();

    _disposables
            .add(tapEventEmitter.subscribe(event -> {
                if (event instanceof CompaniesListFragment.ClickFobEvent) {
                    Log.d("CompaniesActivity  ", " fobClick ");

                    mSubscription.add(getNewCompanyDialog(getString(R.string.newcompany), getString(R.string.fullfirma))
                            .subscribeOn(rx.android.schedulers.AndroidSchedulers.mainThread())
                            .observeOn(Schedulers.computation())
                            .subscribe(this::setBoolean)
                    );



                }
                if (event instanceof Company) {
                    String icos = ((Company) event).getCmico();
                    Company model= (Company) event;

                    Log.d("CompaniesListFragment ", icos);
                    getEditCompanyDialog(model);

                }

            }));

    _disposables
            .add(tapEventEmitter.publish(stream ->
                    stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
                    .observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
                        ///_showTapCount(taps.size()); OK
                    }));

    _disposables.add(tapEventEmitter.connect());
}
项目:Attendance    文件:DgAeaListFragment.java   
@Override
public void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);

    //create mvvm without dagger2
    //mViewModel = getAllEmpsAbsMvvmViewModel();

    _disposables = new CompositeDisposable();

    _rxBus = ((AttendanceApplication) getActivity().getApplication()).getRxBusSingleton();

    ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();

    _disposables
            .add(tapEventEmitter.subscribe(event -> {
                if (event instanceof DgAllEmpsAbsListFragment.ClickFobEvent) {
                    Log.d("DgAllEmpsAbsActivity  ", " fobClick ");
                    String serverx = "DgAllEmpsAbsListFragment fobclick";
                    Toast.makeText(getActivity(), serverx, Toast.LENGTH_SHORT).show();


                }
                if (event instanceof RealmEmployee) {
                    String idemp = ((RealmEmployee) event).getKeyf();
                    RealmEmployee model= (RealmEmployee) event;

                    //Log.d("AllEmpsAbsListFragment ", icos);
                    //String serverx = "AllEmpsAbsListFragment shortclick";
                    //Toast.makeText(getActivity(), serverx, Toast.LENGTH_SHORT).show();

                    Intent i = new Intent(getActivity(), AbsenceActivity.class);
                    Bundle extras = new Bundle();
                    extras.putString("fromact", "1");
                    extras.putString("idemp", idemp);
                    i.putExtras(extras);
                    startActivity(i);

                }

            }));

    _disposables
            .add(tapEventEmitter.publish(stream ->
                    stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
                    .observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> {
                        ///_showTapCount(taps.size()); OK
                    }));

    _disposables.add(tapEventEmitter.connect());
}
项目:StompProtocolAndroid    文件:StompClient.java   
/**
 * If already connected and reconnect=false - nope
 *
 * @param _headers might be null
 */
public void connect(List<StompHeader> _headers, boolean reconnect) {
    if (reconnect) disconnect();
    if (mConnected) return;
    mLifecycleDisposable = mConnectionProvider.getLifecycleReceiver()
            .subscribe(lifecycleEvent -> {
                switch (lifecycleEvent.getType()) {
                    case OPENED:
                        List<StompHeader> headers = new ArrayList<>();
                        headers.add(new StompHeader(StompHeader.VERSION, SUPPORTED_VERSIONS));
                        if (_headers != null) headers.addAll(_headers);
                        mConnectionProvider.send(new StompMessage(StompCommand.CONNECT, headers, null).compile())
                                .subscribe();
                        break;

                    case CLOSED:
                        mConnected = false;
                        isConnecting = false;
                        break;

                    case ERROR:
                        mConnected = false;
                        isConnecting = false;
                        break;
                }
            });

    isConnecting = true;
    mMessagesDisposable = mConnectionProvider.messages()
            .map(StompMessage::from)
            .subscribe(stompMessage -> {
                if (stompMessage.getStompCommand().equals(StompCommand.CONNECTED)) {
                    mConnected = true;
                    isConnecting = false;
                    for (ConnectableFlowable<Void> flowable : mWaitConnectionFlowables) {
                        flowable.connect();
                    }
                    mWaitConnectionFlowables.clear();
                }
                callSubscribers(stompMessage);
            });
}
项目:cyclops    文件:FlowableKind.java   
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport("none")
public ConnectableFlowable<T> publish() {
    return boxed.publish();
}
项目:cyclops    文件:FlowableKind.java   
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport("none")
public ConnectableFlowable<T> publish(int bufferSize) {
    return boxed.publish(bufferSize);
}
项目:cyclops    文件:FlowableKind.java   
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport("none")
public ConnectableFlowable<T> replay() {
    return boxed.replay();
}