Java 类io.reactivex.subjects.SingleSubject 实例源码

项目:async-sqs    文件:BaseSqsRequestSender.java   
@Override
public <T> Single<T> sendRequest(SqsAction<T> action) {
    Request asyncRequest = action.toHttpRequest(credentialsProvider.getCredentials());
    SingleSubject<T> responseSubject = SingleSubject.create();
    httpClient.executeRequest(asyncRequest, new AsyncCompletionHandler<Response>() {
        @Override
        public Response onCompleted(Response httpResponse) {
            Single.fromCallable(() -> action.parseHttpResponse(httpResponse))
                    .subscribeWith(responseSubject);
            return httpResponse;
        }

        @Override
        public void onThrowable(Throwable throwable) {
            responseSubject.onError(throwable);
        }
    });
    return responseSubject;
}
项目:async-sqs    文件:SqsConsumerTest.java   
@Test
public void testShutdownWithPendingPermits() {
    SingleSubject<List<SqsMessage<String>>> singleSubject = SingleSubject.create();

    when(sqsQueueMock.deleteMessage(any(String.class))).thenReturn(Completable.complete());
    when(sqsQueueMock.receiveMessages(anyInt(), any(Optional.class))).thenReturn(singleSubject);

    consumer.setMessageBuffer(messageBufferSmall);
    consumer.processNextMessage(consumer.getNextMessage());

    //handler does not ack here, so permits will be pending forever

    Completable shutdownCompletable = consumer.shutdownAsync();
    singleSubject.onSuccess(Collections.emptyList());

    shutdownCompletable.test().assertNotComplete();
}
项目:async-sqs    文件:SqsConsumerTest.java   
@Test
public void testHandlerDeleteAndShutdown() {
    SingleSubject<List<SqsMessage<String>>> singleSubject = SingleSubject.create();

    when(sqsQueueMock.deleteMessage(any(String.class))).thenReturn(Completable.complete());
    when(sqsQueueMock.receiveMessages(anyInt(), any(Optional.class))).thenReturn(singleSubject);

    doAnswer((invocation -> {
        ((MessageAcknowledger) invocation.getArgument(1)).delete();
        return null;
    })).when(consumerHandlerMock).handleMessage(any(), any());

    consumer.setMessageBuffer(messageBufferSmall);
    consumer.processNextMessage(consumer.getNextMessage());
    Completable shutdownCompletable = consumer.shutdownAsync();
    singleSubject.onSuccess(Collections.emptyList());

    shutdownCompletable.test().assertComplete();
    verify(sqsQueueMock).deleteMessage(any(String.class));
}
项目:async-sqs    文件:SqsConsumerTest.java   
@Test
public void testHandlerIgnoreAndShutdown() {
    SingleSubject<List<SqsMessage<String>>> singleSubject = SingleSubject.create();

    when(sqsQueueMock.deleteMessage(any(String.class))).thenReturn(Completable.complete());
    when(sqsQueueMock.receiveMessages(anyInt(), any(Optional.class))).thenReturn(singleSubject);

    doAnswer((invocation -> {
        ((MessageAcknowledger) invocation.getArgument(1)).ignore();
        return null;
    })).when(consumerHandlerMock).handleMessage(any(), any());

    consumer.setMessageBuffer(messageBufferSmall);
    consumer.processNextMessage(consumer.getNextMessage());
    Completable shutdownCompletable = consumer.shutdownAsync();
    singleSubject.onSuccess(Collections.emptyList());

    shutdownCompletable.test().assertComplete();
    verify(sqsQueueMock, never()).deleteMessage(any(String.class));
}
项目:DisposableAttach    文件:DisposableAttachSingleTest.java   
@Test public void test() {
    SingleSubject<String> subject = SingleSubject.create();
    Single<String> singleSource = subject.hide();


    TestObserver testObserver = new TestObserver();
    CompositeDisposable composite = new CompositeDisposable();
    Disposable disposable = singleSource
            .compose(DisposableAttach.<String>to(composite))
            .subscribeWith(testObserver);

    subject.onSuccess("Foo");
    testObserver.assertValue("Foo");
    assertTrue(composite.size() == 1);
    composite.dispose();
    assertTrue(composite.size() == 0);
    assertTrue(composite.isDisposed());
    assertTrue(disposable.isDisposed());
    assertTrue(testObserver.isDisposed());
}
项目:RxJava2Jdk8Interop    文件:SingleInterop.java   
/**
 * Returns a Single that emits the value of the CompletionStage, its error or
 * NoSuchElementException if it signals null.
 * @param <T> the value type
 * @param future the source CompletionStage instance
 * @return the new Completable instance
 */
public static <T> Single<T> fromFuture(CompletionStage<T> future) {
    SingleSubject<T> cs = SingleSubject.create();

    future.whenComplete((v, e) -> {
        if (e != null) {
            cs.onError(e);
        } else 
        if (v != null) {
            cs.onSuccess(v);
        } else {
            cs.onError(new NoSuchElementException());
        }
    });

    return cs;
}
项目:AutoDispose    文件:AutoDisposeSingleObserverTest.java   
@Test public void autoDispose_withMaybe_normal() {
  RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
  SingleSubject<Integer> source = SingleSubject.create();
  MaybeSubject<Integer> lifecycle = MaybeSubject.create();
  source.as(AutoDispose.<Integer>autoDisposable(lifecycle))
      .subscribe(o);
  o.takeSubscribe();

  assertThat(source.hasObservers()).isTrue();
  assertThat(lifecycle.hasObservers()).isTrue();

  // Got the event
  source.onSuccess(1);
  assertThat(o.takeSuccess()).isEqualTo(1);

  // Nothing more, lifecycle disposed too
  o.assertNoMoreEvents();
  assertThat(source.hasObservers()).isFalse();
  assertThat(lifecycle.hasObservers()).isFalse();
}
项目:AutoDispose    文件:AutoDisposeSingleObserverTest.java   
@Test public void autoDispose_withMaybe_interrupted() {
  RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
  SingleSubject<Integer> source = SingleSubject.create();
  MaybeSubject<Integer> lifecycle = MaybeSubject.create();
  source.as(AutoDispose.<Integer>autoDisposable(lifecycle))
      .subscribe(o);
  o.takeSubscribe();

  assertThat(source.hasObservers()).isTrue();
  assertThat(lifecycle.hasObservers()).isTrue();

  // Lifecycle ends
  lifecycle.onSuccess(2);
  assertThat(source.hasObservers()).isFalse();
  assertThat(lifecycle.hasObservers()).isFalse();

  // Event if upstream emits, no one is listening
  source.onSuccess(2);
  o.assertNoMoreEvents();
}
项目:AutoDispose    文件:AutoDisposeSingleObserverTest.java   
@Test public void autoDispose_withProvider() {
  RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
  SingleSubject<Integer> source = SingleSubject.create();
  MaybeSubject<Integer> scope = MaybeSubject.create();
  ScopeProvider provider = makeProvider(scope);
  source.as(AutoDispose.<Integer>autoDisposable(provider))
      .subscribe(o);
  o.takeSubscribe();

  assertThat(source.hasObservers()).isTrue();
  assertThat(scope.hasObservers()).isTrue();

  source.onSuccess(3);
  o.takeSuccess();

  // All cleaned up
  o.assertNoMoreEvents();
  assertThat(source.hasObservers()).isFalse();
  assertThat(scope.hasObservers()).isFalse();
}
项目:AutoDispose    文件:AutoDisposeSingleObserverTest.java   
@Test public void autoDispose_withProvider_interrupted() {
  RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
  SingleSubject<Integer> source = SingleSubject.create();
  MaybeSubject<Integer> scope = MaybeSubject.create();
  ScopeProvider provider = makeProvider(scope);
  source.as(AutoDispose.<Integer>autoDisposable(provider))
      .subscribe(o);
  o.takeSubscribe();

  assertThat(source.hasObservers()).isTrue();
  assertThat(scope.hasObservers()).isTrue();

  // Lifecycle ends
  scope.onSuccess(3);
  assertThat(source.hasObservers()).isFalse();
  assertThat(scope.hasObservers()).isFalse();

  // No one is listening even if upstream finally does emit
  source.onSuccess(3);
  o.assertNoMoreEvents();
}
项目:AutoDispose    文件:AutoDisposeSingleObserverTest.java   
@Test public void autoDispose_withLifecycleProvider() {
  RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
  SingleSubject<Integer> source = SingleSubject.create();
  BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0);
  LifecycleScopeProvider<Integer> provider = makeLifecycleProvider(lifecycle);
  source.as(AutoDispose.<Integer>autoDisposable(provider))
      .subscribe(o);
  o.takeSubscribe();

  assertThat(source.hasObservers()).isTrue();
  assertThat(lifecycle.hasObservers()).isTrue();

  lifecycle.onNext(1);

  assertThat(source.hasObservers()).isTrue();
  assertThat(lifecycle.hasObservers()).isTrue();

  source.onSuccess(3);
  o.takeSuccess();

  // All cleaned up
  o.assertNoMoreEvents();
  assertThat(source.hasObservers()).isFalse();
  assertThat(lifecycle.hasObservers()).isFalse();
}
项目:AutoDispose    文件:AutoDisposeSingleObserverTest.java   
@Test public void autoDispose_withLifecycleProvider_interrupted() {
  RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
  SingleSubject<Integer> source = SingleSubject.create();
  BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0);
  LifecycleScopeProvider<Integer> provider = makeLifecycleProvider(lifecycle);
  source.as(AutoDispose.<Integer>autoDisposable(provider))
      .subscribe(o);
  o.takeSubscribe();

  assertThat(source.hasObservers()).isTrue();
  assertThat(lifecycle.hasObservers()).isTrue();

  lifecycle.onNext(1);

  assertThat(source.hasObservers()).isTrue();
  assertThat(lifecycle.hasObservers()).isTrue();

  // Lifecycle ends
  lifecycle.onNext(3);
  assertThat(source.hasObservers()).isFalse();
  assertThat(lifecycle.hasObservers()).isFalse();

  // No one is listening even if upstream finally does emit
  source.onSuccess(3);
  o.assertNoMoreEvents();
}
项目:AutoDispose    文件:AutoDisposeSingleObserverTest.java   
@Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() {
  AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() {
    @Override public void accept(OutsideLifecycleException e) { }
  });
  BehaviorSubject<Integer> lifecycle = BehaviorSubject.create();
  LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle);
  SingleSubject<Integer> source = SingleSubject.create();
  TestObserver<Integer> o = source
          .as(AutoDispose.<Integer>autoDisposable(provider))
          .test();

  assertThat(source.hasObservers()).isFalse();
  assertThat(lifecycle.hasObservers()).isFalse();
  o.assertNoValues();
  o.assertNoErrors();
}
项目:AutoDispose    文件:AutoDisposeSingleObserverTest.java   
@Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() {
  AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() {
    @Override public void accept(OutsideLifecycleException e) {
      // Noop
    }
  });
  BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0);
  lifecycle.onNext(1);
  lifecycle.onNext(2);
  lifecycle.onNext(3);
  LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle);
  SingleSubject<Integer> source = SingleSubject.create();
  TestObserver<Integer> o = source
          .as(AutoDispose.<Integer>autoDisposable(provider))
          .test();

  assertThat(source.hasObservers()).isFalse();
  assertThat(lifecycle.hasObservers()).isFalse();
  o.assertNoValues();
  o.assertNoErrors();
}
项目:AutoDispose    文件:AutoDisposeSingleObserverTest.java   
@Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithExp() {
  AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() {
    @Override public void accept(OutsideLifecycleException e) {
      // Wrap in an IllegalStateException so we can verify this is the exception we see on the
      // other side
      throw new IllegalStateException(e);
    }
  });
  BehaviorSubject<Integer> lifecycle = BehaviorSubject.create();
  LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle);
  SingleSubject<Integer> source = SingleSubject.create();
  TestObserver<Integer> o = source
          .as(AutoDispose.<Integer>autoDisposable(provider))
          .test();

  o.assertNoValues();
  o.assertError(new Predicate<Throwable>() {
    @Override public boolean test(Throwable throwable) {
      return throwable instanceof IllegalStateException
          && throwable.getCause() instanceof OutsideLifecycleException;
    }
  });
}
项目:async-sqs    文件:MessageAcknowledger.java   
public MessageAcknowledger(SqsQueue<T> sqsQueue, String receiptId, Instant expirationTime) {
    this.expirationTime = expirationTime;
    this.sqsQueue = sqsQueue;
    this.receiptId = receiptId;
    this.ackModeSingle = SingleSubject.create();
    this.ackingComplete = CompletableSubject.create();

    Duration duration = Duration.between(Instant.now(), expirationTime);
    Completable.timer(duration.toMillis(), TimeUnit.MILLISECONDS).subscribe(this::ignore);
}
项目:async-sqs    文件:MappingSqsQueue.java   
@Override
public Single<String> publishMessage(U body, Optional<Duration> maybeDelay) {
    return Single.defer(() -> {
        T serializedBody = inverseMap.apply(body);
        return delegate.publishMessage(serializedBody, maybeDelay);
    }).subscribeWith(SingleSubject.create());
}
项目:async-sqs    文件:RetryingSqsRequestSender.java   
@Override
public <T> Single<T> sendRequest(SqsAction<T> request) {
    return Single.defer(() -> delegate.sendRequest(request))
            .retry((errCount, error) -> {
                if (errCount > retryCount || request.isBatchAction()) {
                    return false;
                }
                if (error instanceof AmazonSQSException) {
                    return ((AmazonSQSException) error).getErrorType() == AmazonServiceException.ErrorType.Service;
                }
                return true;
            }).subscribeWith(SingleSubject.create());//convert to Hot single
}
项目:AutoDispose    文件:AutoDisposeSingleObserverTest.java   
@Test public void autoDispose_withScopeProviderCompleted_shouldNotReportDoubleSubscriptions() {
  TestObserver<Object> o = SingleSubject.create()
          .as(AutoDispose.autoDisposable(ScopeProvider.UNBOUND))
          .test();
  o.assertNoValues();
  o.assertNoErrors();

  rule.assertNoErrors();
}
项目:AutoDispose    文件:AutoDisposeSingleObserverTest.java   
@Test public void unbound_shouldStillPassValues() {
  SingleSubject<Integer> s = SingleSubject.create();
  TestObserver<Integer> o = s
          .as(AutoDispose.<Integer>autoDisposable(ScopeProvider.UNBOUND))
          .test();

  s.onSuccess(1);
  o.assertValue(1);
}
项目:yarpc-java    文件:HttpOutbound.java   
@Override
public Single<TransportResponse> callUnary(TransportRequest req) {
  // TODO: This will establish a new connection for each request. We should pool Channels.
  // Maybe we should just use AsyncHttpClient for this.

  SingleSubject<TransportResponse> responseSubject = SingleSubject.create();
  ChannelFuture channelFuture =
      bootstrap
          .handler(
              new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                  ch.pipeline()
                      .addLast(new HttpClientCodec())
                      .addLast(new HttpTransportEncoder<>(requestEncoderConfig))
                      .addLast(new HttpTransportDecoder<>(ErrorResponseDecoder.CONFIGURATION))
                      .addLast(new HttpTransportDecoder<>(TransportResponseDecoder.CONFIGURATION))
                      .addLast(Channels.channelReader(responseSubject, TransportResponse.class))
                      .addLast(new TransportErrorObserver(responseSubject));
                }
              })
          .connect(url.getHost(), url.getPort() == -1 ? url.getDefaultPort() : url.getPort());

  Channel channel = channelFuture.channel();
  channelFuture.addListener(
      future -> {
        if (future.isSuccess()) {
          channel.writeAndFlush(req);
        } else if (!future.isCancelled()) {
          responseSubject.onError(future.cause());
        }
      });

  return responseSubject;
}
项目:yarpc-java    文件:HttpOutbound.java   
@Override
public Single<Ack> callOneway(TransportRequest req) {
  SingleSubject<Ack> ackSubject = SingleSubject.create();

  ChannelFuture channelFuture =
      bootstrap
          .handler(
              new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                  ch.pipeline()
                      .addLast(new HttpClientCodec())
                      .addLast(new WriteAck<>(ackSubject, LastHttpContent.class))
                      .addLast(new HttpTransportEncoder<>(requestEncoderConfig));
                }
              })
          .connect(url.getHost(), url.getPort() == -1 ? url.getDefaultPort() : url.getPort());

  Channel channel = channelFuture.channel();
  channelFuture.addListener(
      future -> {
        if (future.isSuccess()) {
          channel.writeAndFlush(req);
        } else if (!future.isCancelled()) {
          ackSubject.onError(future.cause());
        }
      });

  return ackSubject;
}
项目:async-sqs    文件:MappingSqsMessagePublisher.java   
@Override
public Single<String> publishMessage(T body, Optional<Duration> maybeDelay) {
    return Single.defer(() -> delegate.publishMessage(map.apply(body), maybeDelay))
            .subscribeWith(SingleSubject.create());//makes it hot
}
项目:async-sqs    文件:SendMessageEntry.java   
@Default
public SingleSubject<String> getResultSubject() {
    return SingleSubject.create();
}
项目:async-sqs    文件:RetryingSqsQueue.java   
@Override
public Single<String> publishMessage(T body, Optional<Duration> maybeDelay) {
    return Single.defer(() -> delegate.publishMessage(body, maybeDelay))
            .retry(this::shouldRetry)
            .subscribeWith(SingleSubject.create());//convert to Hot single
}
项目:akarnokd-misc    文件:SingleObserveOnRaceTest.java   
@Test
public void race() throws Exception {
    Worker w = Schedulers.newThread().createWorker();
    try {
        for (int i = 0; i < 1000; i++) {
            Integer[] value = { 0, 0 };

            TestObserver<Integer> to = new TestObserver<Integer>() {
                @Override
                public void onSuccess(Integer v) {
                    value[1] = value[0];
                    super.onSuccess(v);
                }
            };

            SingleSubject<Integer> subj = SingleSubject.create();

            subj.observeOn(Schedulers.single())
            .onTerminateDetach()
            .subscribe(to);

            AtomicInteger wip = new AtomicInteger(2);
            CountDownLatch cdl = new CountDownLatch(2);

            w.schedule(() -> {
                if (wip.decrementAndGet() != 0) {
                    while (wip.get() != 0);
                }
                subj.onSuccess(1);
                cdl.countDown();
            });

            Schedulers.single().scheduleDirect(() -> {
                if (wip.decrementAndGet() != 0) {
                    while (wip.get() != 0);
                }
                to.cancel();
                value[0] = null;
                cdl.countDown();
            });

            cdl.await();

            Assert.assertNotNull(value[1]);
        }
    } finally {
        w.dispose();
    }
}