@Test public void onNextDelegates() { ClientCallStreamObserver<Object> obs = mock(ClientCallStreamObserver.class); RxConsumerStreamObserver rxObs = new RxConsumerStreamObserver(); Subscriber<Object> sub = mock(Subscriber.class); rxObs.beforeStart(obs); rxObs.getRxConsumer().subscribe(sub); TestSubscriber<Object> testSubscriber = ((Flowable<Object>)rxObs.getRxConsumer()).test(); Object obj = new Object(); rxObs.onNext(obj); rxObs.onCompleted(); testSubscriber.awaitTerminalEvent(3, TimeUnit.SECONDS); testSubscriber.assertValues(obj); }
@Test public void onErrorDelegates() { ClientCallStreamObserver<Object> obs = mock(ClientCallStreamObserver.class); RxConsumerStreamObserver rxObs = new RxConsumerStreamObserver(); Subscriber<Object> sub = mock(Subscriber.class); rxObs.beforeStart(obs); rxObs.getRxConsumer().subscribe(sub); TestSubscriber<Object> testSubscriber = ((Flowable<Object>)rxObs.getRxConsumer()).test(); Throwable obj = new Exception(); rxObs.onError(obj); testSubscriber.awaitTerminalEvent(3, TimeUnit.SECONDS); testSubscriber.assertError(obj); }
@Test public void onNextKeepsPumpRunning() { ClientCallStreamObserver<Object> obs = mock(ClientCallStreamObserver.class); when(obs.isReady()).thenReturn(true); ReactivePublisherBackpressureOnReadyHandler<Object> handler = new ReactivePublisherBackpressureOnReadyHandler<Object>(obs); Subscription sub = mock(Subscription.class); handler.onSubscribe(sub); Object obj = new Object(); handler.onNext(obj); verify(obs).onNext(obj); verify(sub).request(1); }
@Test public void onNextStopsPump() { ClientCallStreamObserver<Object> obs = mock(ClientCallStreamObserver.class); when(obs.isReady()).thenReturn(false); ReactivePublisherBackpressureOnReadyHandler<Object> handler = new ReactivePublisherBackpressureOnReadyHandler<Object>(obs); Subscription sub = mock(Subscription.class); handler.onSubscribe(sub); Object obj = new Object(); handler.onNext(obj); verify(obs).onNext(obj); verify(sub, never()).request(1); }
@Test public void onNextDelegates() { ClientCallStreamObserver<Object> obs = mock(ClientCallStreamObserver.class); ReactorConsumerStreamObserver rxObs = new ReactorConsumerStreamObserver(); Subscriber<Object> sub = mock(Subscriber.class); rxObs.beforeStart(obs); rxObs.getRxConsumer().subscribe(sub); Object obj = new Object(); StepVerifier.create(rxObs.getRxConsumer()) .then(() -> rxObs.onNext(obj)) .expectNext(obj) .then(rxObs::onCompleted) .expectComplete() .verify(Duration.ofSeconds(3)); }
@Test public void testCallsAreForwardedAndOnReadyHandlerBound() { @SuppressWarnings("unchecked") StreamObserver<Object> delegateObserver = mock(StreamObserver.class); @SuppressWarnings("unchecked") ClientCallStreamObserver<Object> callStreamObserver = mock(ClientCallStreamObserver.class); Runnable onReadyHandler = new Runnable() { @Override public void run() { } }; ClientResponseObserver<Object, Object> observer = new ForwardingClientResponseObserver<>(delegateObserver, onReadyHandler); observer.onNext("A"); verify(delegateObserver).onNext("A"); Throwable t = new RuntimeException(); observer.onError(t); verify(delegateObserver).onError(t); observer.onCompleted(); verify(delegateObserver).onCompleted(); observer.beforeStart(callStreamObserver); verify(callStreamObserver).setOnReadyHandler(onReadyHandler); verifyNoMoreInteractions(delegateObserver, callStreamObserver); }
@Test public void flowControl() throws Exception { FlowControlClientResponseObserver clientResponseObserver = new FlowControlClientResponseObserver(); ClientCallStreamObserver<ServerReflectionRequest> requestObserver = (ClientCallStreamObserver<ServerReflectionRequest>) stub.serverReflectionInfo(clientResponseObserver); // ClientCalls.startCall() calls request(1) initially, so we should get an immediate response. requestObserver.onNext(flowControlRequest); assertEquals(1, clientResponseObserver.getResponses().size()); assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0)); // Verify we don't receive an additional response until we request it. requestObserver.onNext(flowControlRequest); assertEquals(1, clientResponseObserver.getResponses().size()); requestObserver.request(1); assertEquals(2, clientResponseObserver.getResponses().size()); assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1)); requestObserver.onCompleted(); assertTrue(clientResponseObserver.onCompleteCalled()); }
@Test public void flowControlOnCompleteWithPendingRequest() throws Exception { FlowControlClientResponseObserver clientResponseObserver = new FlowControlClientResponseObserver(); ClientCallStreamObserver<ServerReflectionRequest> requestObserver = (ClientCallStreamObserver<ServerReflectionRequest>) stub.serverReflectionInfo(clientResponseObserver); // ClientCalls.startCall() calls request(1) initially, so make additional request. requestObserver.onNext(flowControlRequest); requestObserver.onNext(flowControlRequest); requestObserver.onCompleted(); assertEquals(1, clientResponseObserver.getResponses().size()); assertFalse(clientResponseObserver.onCompleteCalled()); requestObserver.request(1); assertTrue(clientResponseObserver.onCompleteCalled()); assertEquals(2, clientResponseObserver.getResponses().size()); assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1)); }
@Test public void rxConsumerIsSet() { ClientCallStreamObserver<Object> obs = mock(ClientCallStreamObserver.class); RxConsumerStreamObserver rxObs = new RxConsumerStreamObserver(); rxObs.beforeStart(obs); assertThat(rxObs.getRxConsumer()).isNotNull(); }
@Override public void beforeStart(ClientCallStreamObserver<TRequest> producerStream) { Preconditions.checkNotNull(producerStream); // Subscribe to the rxProducer with an adapter to a gRPC StreamObserver that respects backpressure // signals from the underlying gRPC client transport. onReadyHandler = new ReactivePublisherBackpressureOnReadyHandler<TRequest>(producerStream); }
@Test public void runPrimesThePump() { ClientCallStreamObserver<Object> obs = mock(ClientCallStreamObserver.class); ReactivePublisherBackpressureOnReadyHandler<Object> handler = new ReactivePublisherBackpressureOnReadyHandler<Object>(obs); Subscription sub = mock(Subscription.class); handler.onSubscribe(sub); handler.run(); verify(sub).request(1); }
@Test public void rxConsumerIsSet() { ClientCallStreamObserver<Object> obs = mock(ClientCallStreamObserver.class); ReactorConsumerStreamObserver rxObs = new ReactorConsumerStreamObserver(); rxObs.beforeStart(obs); assertThat(rxObs.getRxConsumer()).isNotNull(); }
@Test public void onErrorDelegates() { ClientCallStreamObserver<Object> obs = mock(ClientCallStreamObserver.class); ReactorConsumerStreamObserver rxObs = new ReactorConsumerStreamObserver(); Subscriber<Object> sub = mock(Subscriber.class); rxObs.beforeStart(obs); rxObs.getRxConsumer().subscribe(sub); Throwable obj = new Exception("test error"); StepVerifier.create(rxObs.getRxConsumer()) .then(() -> rxObs.onError(obj)) .expectErrorMessage("test error") .verify(Duration.ofSeconds(3)); }
public ReactivePublisherBackpressureOnReadyHandler(ClientCallStreamObserver<T> requestStream) { this.requestStream = Preconditions.checkNotNull(requestStream); requestStream.setOnReadyHandler(this); }
@Override public void beforeStart(ClientCallStreamObserver<TRequest> requestStream) { delegate.beforeStart(requestStream); }
@Override public void subscribe(Subscriber<? super T> subscriber) { Preconditions.checkNotNull(subscriber); subscriber.onSubscribe(new Subscription() { private static final int MAX_REQUEST_RETRIES = 20; @Override public void request(long l) { // Some Reactive Streams implementations use Long.MAX_VALUE to indicate "all messages"; gRPC uses Integer.MAX_VALUE. int i = (int) Math.min(l, Integer.MAX_VALUE); // Very rarely, request() gets called before the client has finished setting up its stream. If this // happens, wait momentarily and try again. for (int j = 0; j < MAX_REQUEST_RETRIES; j++) { try { callStreamObserver.request(i); break; } catch (IllegalStateException ex) { if (j == MAX_REQUEST_RETRIES - 1) { throw ex; } try { Thread.sleep(1); } catch (InterruptedException e) { // no-op } } } } @Override public void cancel() { // Don't cancel twice if the server is already canceled if (callStreamObserver instanceof ServerCallStreamObserver && ((ServerCallStreamObserver) callStreamObserver).isCancelled()) { return; } isCanceled = true; if (callStreamObserver instanceof ClientCallStreamObserver) { ((ClientCallStreamObserver) callStreamObserver).cancel("Client canceled request", null); } else { callStreamObserver.onError(Status.CANCELLED.withDescription("Server canceled request").asRuntimeException()); } } }); this.subscriber = subscriber; subscribed.countDown(); }
@Override public void beforeStart(ClientCallStreamObserver<TRequest> requestStream) { super.beforeStart(Preconditions.checkNotNull(requestStream)); onReadyHandler = new ReactivePublisherBackpressureOnReadyHandler<TRequest>(requestStream); }
@Override public void beforeStart(ClientCallStreamObserver<TRequest> requestStream) { publisher = new ReactiveStreamObserverPublisher<TResponse>(Preconditions.checkNotNull(requestStream)); rxConsumer = getReactiveConsumerFromPublisher(publisher); beforeStartCalled.countDown(); }
@Override public void beforeStart(ClientCallStreamObserver<RespT> stream) { stream.setOnReadyHandler(onReadyHandler); }
@Override public void beforeStart(ClientCallStreamObserver requestStream) { requestStream.setOnReadyHandler(phaser::arrive); }
/** * Tests client per-message compression for streaming calls. The Java API does not support * inspecting a message's compression level, so this is primarily intended to run against a gRPC * C++ server. */ public void clientCompressedStreaming() throws Exception { final StreamingInputCallRequest expectCompressedRequest = StreamingInputCallRequest.newBuilder() .setExpectCompressed(BoolValue.newBuilder().setValue(true)) .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[27182]))) .build(); final StreamingInputCallRequest expectUncompressedRequest = StreamingInputCallRequest.newBuilder() .setExpectCompressed(BoolValue.newBuilder().setValue(false)) .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[45904]))) .build(); final StreamingInputCallResponse goldenResponse = StreamingInputCallResponse.newBuilder().setAggregatedPayloadSize(73086).build(); StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create(); StreamObserver<StreamingInputCallRequest> requestObserver = asyncStub.streamingInputCall(responseObserver); // Send a non-compressed message with expectCompress=true. Servers supporting this test case // should return INVALID_ARGUMENT. requestObserver.onNext(expectCompressedRequest); responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS); Throwable e = responseObserver.getError(); assertNotNull("expected INVALID_ARGUMENT", e); assertEquals(Status.INVALID_ARGUMENT.getCode(), Status.fromThrowable(e).getCode()); // Start a new stream responseObserver = StreamRecorder.create(); @SuppressWarnings("unchecked") ClientCallStreamObserver<StreamingInputCallRequest> clientCallStreamObserver = (ClientCallStreamObserver) asyncStub.withCompression("gzip").streamingInputCall(responseObserver); clientCallStreamObserver.setMessageCompression(true); clientCallStreamObserver.onNext(expectCompressedRequest); clientCallStreamObserver.setMessageCompression(false); clientCallStreamObserver.onNext(expectUncompressedRequest); clientCallStreamObserver.onCompleted(); responseObserver.awaitCompletion(); assertSuccess(responseObserver); assertEquals(goldenResponse, responseObserver.firstValue().get()); }
@Override public void beforeStart(final ClientCallStreamObserver<ServerReflectionRequest> requestStream) { requestStream.disableAutoInboundFlowControl(); }