Java 类io.grpc.stub.ClientCallStreamObserver 实例源码

项目:reactive-grpc    文件:RxConsumerStreamObserverTest.java   
@Test
public void onNextDelegates() {
    ClientCallStreamObserver<Object> obs = mock(ClientCallStreamObserver.class);
    RxConsumerStreamObserver rxObs = new RxConsumerStreamObserver();
    Subscriber<Object> sub = mock(Subscriber.class);

    rxObs.beforeStart(obs);
    rxObs.getRxConsumer().subscribe(sub);

    TestSubscriber<Object> testSubscriber = ((Flowable<Object>)rxObs.getRxConsumer()).test();

    Object obj = new Object();
    rxObs.onNext(obj);
    rxObs.onCompleted();

    testSubscriber.awaitTerminalEvent(3, TimeUnit.SECONDS);
    testSubscriber.assertValues(obj);
}
项目:reactive-grpc    文件:RxConsumerStreamObserverTest.java   
@Test
public void onErrorDelegates() {
    ClientCallStreamObserver<Object> obs = mock(ClientCallStreamObserver.class);
    RxConsumerStreamObserver rxObs = new RxConsumerStreamObserver();
    Subscriber<Object> sub = mock(Subscriber.class);

    rxObs.beforeStart(obs);
    rxObs.getRxConsumer().subscribe(sub);

    TestSubscriber<Object> testSubscriber = ((Flowable<Object>)rxObs.getRxConsumer()).test();

    Throwable obj = new Exception();
    rxObs.onError(obj);

    testSubscriber.awaitTerminalEvent(3, TimeUnit.SECONDS);
    testSubscriber.assertError(obj);
}
项目:reactive-grpc    文件:ReactivePublisherBackpressureOnReadyHandlerTest.java   
@Test
public void onNextKeepsPumpRunning() {
    ClientCallStreamObserver<Object> obs = mock(ClientCallStreamObserver.class);
    when(obs.isReady()).thenReturn(true);

    ReactivePublisherBackpressureOnReadyHandler<Object> handler = new ReactivePublisherBackpressureOnReadyHandler<Object>(obs);
    Subscription sub = mock(Subscription.class);

    handler.onSubscribe(sub);

    Object obj = new Object();
    handler.onNext(obj);

    verify(obs).onNext(obj);
    verify(sub).request(1);
}
项目:reactive-grpc    文件:ReactivePublisherBackpressureOnReadyHandlerTest.java   
@Test
public void onNextStopsPump() {
    ClientCallStreamObserver<Object> obs = mock(ClientCallStreamObserver.class);
    when(obs.isReady()).thenReturn(false);

    ReactivePublisherBackpressureOnReadyHandler<Object> handler = new ReactivePublisherBackpressureOnReadyHandler<Object>(obs);
    Subscription sub = mock(Subscription.class);

    handler.onSubscribe(sub);

    Object obj = new Object();
    handler.onNext(obj);

    verify(obs).onNext(obj);
    verify(sub, never()).request(1);
}
项目:reactive-grpc    文件:ReactorConsumerStreamObserverTest.java   
@Test
public void onNextDelegates() {
    ClientCallStreamObserver<Object> obs = mock(ClientCallStreamObserver.class);
    ReactorConsumerStreamObserver rxObs = new ReactorConsumerStreamObserver();
    Subscriber<Object> sub = mock(Subscriber.class);

    rxObs.beforeStart(obs);
    rxObs.getRxConsumer().subscribe(sub);

    Object obj = new Object();
    StepVerifier.create(rxObs.getRxConsumer())
            .then(() -> rxObs.onNext(obj))
            .expectNext(obj)
            .then(rxObs::onCompleted)
            .expectComplete()
            .verify(Duration.ofSeconds(3));
}
项目:beam    文件:ForwardingClientResponseObserverTest.java   
@Test
public void testCallsAreForwardedAndOnReadyHandlerBound() {
  @SuppressWarnings("unchecked")
  StreamObserver<Object> delegateObserver = mock(StreamObserver.class);
  @SuppressWarnings("unchecked")
  ClientCallStreamObserver<Object> callStreamObserver =
      mock(ClientCallStreamObserver.class);
  Runnable onReadyHandler = new Runnable() {
    @Override
    public void run() {
    }
  };
  ClientResponseObserver<Object, Object> observer =
      new ForwardingClientResponseObserver<>(delegateObserver, onReadyHandler);
  observer.onNext("A");
  verify(delegateObserver).onNext("A");
  Throwable t = new RuntimeException();
  observer.onError(t);
  verify(delegateObserver).onError(t);
  observer.onCompleted();
  verify(delegateObserver).onCompleted();
  observer.beforeStart(callStreamObserver);
  verify(callStreamObserver).setOnReadyHandler(onReadyHandler);
  verifyNoMoreInteractions(delegateObserver, callStreamObserver);
}
项目:grpc-java    文件:ProtoReflectionServiceTest.java   
@Test
public void flowControl() throws Exception {
  FlowControlClientResponseObserver clientResponseObserver =
      new FlowControlClientResponseObserver();
  ClientCallStreamObserver<ServerReflectionRequest> requestObserver =
      (ClientCallStreamObserver<ServerReflectionRequest>)
          stub.serverReflectionInfo(clientResponseObserver);

  // ClientCalls.startCall() calls request(1) initially, so we should get an immediate response.
  requestObserver.onNext(flowControlRequest);
  assertEquals(1, clientResponseObserver.getResponses().size());
  assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));

  // Verify we don't receive an additional response until we request it.
  requestObserver.onNext(flowControlRequest);
  assertEquals(1, clientResponseObserver.getResponses().size());

  requestObserver.request(1);
  assertEquals(2, clientResponseObserver.getResponses().size());
  assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1));

  requestObserver.onCompleted();
  assertTrue(clientResponseObserver.onCompleteCalled());
}
项目:grpc-java    文件:ProtoReflectionServiceTest.java   
@Test
public void flowControlOnCompleteWithPendingRequest() throws Exception {
  FlowControlClientResponseObserver clientResponseObserver =
      new FlowControlClientResponseObserver();
  ClientCallStreamObserver<ServerReflectionRequest> requestObserver =
      (ClientCallStreamObserver<ServerReflectionRequest>)
          stub.serverReflectionInfo(clientResponseObserver);

  // ClientCalls.startCall() calls request(1) initially, so make additional request.
  requestObserver.onNext(flowControlRequest);
  requestObserver.onNext(flowControlRequest);
  requestObserver.onCompleted();
  assertEquals(1, clientResponseObserver.getResponses().size());
  assertFalse(clientResponseObserver.onCompleteCalled());

  requestObserver.request(1);
  assertTrue(clientResponseObserver.onCompleteCalled());
  assertEquals(2, clientResponseObserver.getResponses().size());
  assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1));
}
项目:reactive-grpc    文件:RxConsumerStreamObserverTest.java   
@Test
public void rxConsumerIsSet() {
    ClientCallStreamObserver<Object> obs = mock(ClientCallStreamObserver.class);
    RxConsumerStreamObserver rxObs = new RxConsumerStreamObserver();

    rxObs.beforeStart(obs);

    assertThat(rxObs.getRxConsumer()).isNotNull();
}
项目:reactive-grpc    文件:ReactiveProducerStreamObserver.java   
@Override
public void beforeStart(ClientCallStreamObserver<TRequest> producerStream) {
    Preconditions.checkNotNull(producerStream);
    // Subscribe to the rxProducer with an adapter to a gRPC StreamObserver that respects backpressure
    // signals from the underlying gRPC client transport.
    onReadyHandler = new ReactivePublisherBackpressureOnReadyHandler<TRequest>(producerStream);
}
项目:reactive-grpc    文件:ReactivePublisherBackpressureOnReadyHandlerTest.java   
@Test
public void runPrimesThePump() {
    ClientCallStreamObserver<Object> obs = mock(ClientCallStreamObserver.class);
    ReactivePublisherBackpressureOnReadyHandler<Object> handler = new ReactivePublisherBackpressureOnReadyHandler<Object>(obs);
    Subscription sub = mock(Subscription.class);

    handler.onSubscribe(sub);

    handler.run();
    verify(sub).request(1);
}
项目:reactive-grpc    文件:ReactorConsumerStreamObserverTest.java   
@Test
public void rxConsumerIsSet() {
    ClientCallStreamObserver<Object> obs = mock(ClientCallStreamObserver.class);
    ReactorConsumerStreamObserver rxObs = new ReactorConsumerStreamObserver();

    rxObs.beforeStart(obs);

    assertThat(rxObs.getRxConsumer()).isNotNull();
}
项目:reactive-grpc    文件:ReactorConsumerStreamObserverTest.java   
@Test
public void onErrorDelegates() {
    ClientCallStreamObserver<Object> obs = mock(ClientCallStreamObserver.class);
    ReactorConsumerStreamObserver rxObs = new ReactorConsumerStreamObserver();
    Subscriber<Object> sub = mock(Subscriber.class);

    rxObs.beforeStart(obs);
    rxObs.getRxConsumer().subscribe(sub);

    Throwable obj = new Exception("test error");
    StepVerifier.create(rxObs.getRxConsumer())
            .then(() -> rxObs.onError(obj))
            .expectErrorMessage("test error")
            .verify(Duration.ofSeconds(3));
}
项目:reactive-grpc    文件:ReactivePublisherBackpressureOnReadyHandler.java   
public ReactivePublisherBackpressureOnReadyHandler(ClientCallStreamObserver<T> requestStream) {
    this.requestStream = Preconditions.checkNotNull(requestStream);
    requestStream.setOnReadyHandler(this);
}
项目:reactive-grpc    文件:CancellableStreamObserver.java   
@Override
public void beforeStart(ClientCallStreamObserver<TRequest> requestStream) {
    delegate.beforeStart(requestStream);
}
项目:reactive-grpc    文件:ReactiveStreamObserverPublisher.java   
@Override
public void subscribe(Subscriber<? super T> subscriber) {
    Preconditions.checkNotNull(subscriber);
    subscriber.onSubscribe(new Subscription() {
        private static final int MAX_REQUEST_RETRIES = 20;

        @Override
        public void request(long l) {
            // Some Reactive Streams implementations use Long.MAX_VALUE to indicate "all messages"; gRPC uses Integer.MAX_VALUE.
            int i = (int) Math.min(l, Integer.MAX_VALUE);

            // Very rarely, request() gets called before the client has finished setting up its stream. If this
            // happens, wait momentarily and try again.
            for (int j = 0; j < MAX_REQUEST_RETRIES; j++) {
                try {
                    callStreamObserver.request(i);
                    break;
                } catch (IllegalStateException ex) {
                    if (j == MAX_REQUEST_RETRIES - 1) {
                        throw ex;
                    }
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        // no-op
                    }
                }
            }
        }

        @Override
        public void cancel() {
            // Don't cancel twice if the server is already canceled
            if (callStreamObserver instanceof ServerCallStreamObserver && ((ServerCallStreamObserver) callStreamObserver).isCancelled()) {
                return;
            }

            isCanceled = true;
            if (callStreamObserver instanceof ClientCallStreamObserver) {
                ((ClientCallStreamObserver) callStreamObserver).cancel("Client canceled request", null);
            } else {
                callStreamObserver.onError(Status.CANCELLED.withDescription("Server canceled request").asRuntimeException());
            }
        }
    });
    this.subscriber = subscriber;

    subscribed.countDown();
}
项目:reactive-grpc    文件:ReactiveProducerConsumerStreamObserver.java   
@Override
public void beforeStart(ClientCallStreamObserver<TRequest> requestStream) {
    super.beforeStart(Preconditions.checkNotNull(requestStream));
    onReadyHandler = new ReactivePublisherBackpressureOnReadyHandler<TRequest>(requestStream);
}
项目:reactive-grpc    文件:ReactiveConsumerStreamObserver.java   
@Override
public void beforeStart(ClientCallStreamObserver<TRequest> requestStream) {
    publisher = new ReactiveStreamObserverPublisher<TResponse>(Preconditions.checkNotNull(requestStream));
    rxConsumer = getReactiveConsumerFromPublisher(publisher);
    beforeStartCalled.countDown();
}
项目:beam    文件:ForwardingClientResponseObserver.java   
@Override
public void beforeStart(ClientCallStreamObserver<RespT> stream) {
  stream.setOnReadyHandler(onReadyHandler);
}
项目:beam    文件:BeamFnLoggingClient.java   
@Override
public void beforeStart(ClientCallStreamObserver requestStream) {
  requestStream.setOnReadyHandler(phaser::arrive);
}
项目:grpc-java    文件:AbstractInteropTest.java   
/**
 * Tests client per-message compression for streaming calls. The Java API does not support
 * inspecting a message's compression level, so this is primarily intended to run against a gRPC
 * C++ server.
 */
public void clientCompressedStreaming() throws Exception {
  final StreamingInputCallRequest expectCompressedRequest =
      StreamingInputCallRequest.newBuilder()
          .setExpectCompressed(BoolValue.newBuilder().setValue(true))
          .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[27182])))
          .build();
  final StreamingInputCallRequest expectUncompressedRequest =
      StreamingInputCallRequest.newBuilder()
          .setExpectCompressed(BoolValue.newBuilder().setValue(false))
          .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[45904])))
          .build();
  final StreamingInputCallResponse goldenResponse =
      StreamingInputCallResponse.newBuilder().setAggregatedPayloadSize(73086).build();

  StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create();
  StreamObserver<StreamingInputCallRequest> requestObserver =
      asyncStub.streamingInputCall(responseObserver);

  // Send a non-compressed message with expectCompress=true. Servers supporting this test case
  // should return INVALID_ARGUMENT.
  requestObserver.onNext(expectCompressedRequest);
  responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
  Throwable e = responseObserver.getError();
  assertNotNull("expected INVALID_ARGUMENT", e);
  assertEquals(Status.INVALID_ARGUMENT.getCode(), Status.fromThrowable(e).getCode());

  // Start a new stream
  responseObserver = StreamRecorder.create();
  @SuppressWarnings("unchecked")
  ClientCallStreamObserver<StreamingInputCallRequest> clientCallStreamObserver =
      (ClientCallStreamObserver)
          asyncStub.withCompression("gzip").streamingInputCall(responseObserver);
  clientCallStreamObserver.setMessageCompression(true);
  clientCallStreamObserver.onNext(expectCompressedRequest);
  clientCallStreamObserver.setMessageCompression(false);
  clientCallStreamObserver.onNext(expectUncompressedRequest);
  clientCallStreamObserver.onCompleted();
  responseObserver.awaitCompletion();
  assertSuccess(responseObserver);
  assertEquals(goldenResponse, responseObserver.firstValue().get());
}
项目:grpc-java    文件:ProtoReflectionServiceTest.java   
@Override
public void beforeStart(final ClientCallStreamObserver<ServerReflectionRequest> requestStream) {
  requestStream.disableAutoInboundFlowControl();
}