/** * Implements a unary -> stream call as {@link Single} -> {@link Flowable}, where the server responds with a * stream of messages. */ public static <TRequest, TResponse> void oneToMany( TRequest request, StreamObserver<TResponse> responseObserver, Function<Single<TRequest>, Flowable<TResponse>> delegate) { try { Single<TRequest> rxRequest = Single.just(request); Flowable<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest)); rxResponse.subscribe(new ReactivePublisherBackpressureOnReadyHandler<TResponse>( (ServerCallStreamObserver<TResponse>) responseObserver)); } catch (Throwable throwable) { responseObserver.onError(prepareError(throwable)); } }
public ReactivePublisherBackpressureOnReadyHandler(ServerCallStreamObserver<T> requestStream) { this.requestStream = Preconditions.checkNotNull(requestStream); requestStream.setOnReadyHandler(this); requestStream.setOnCancelHandler(new Runnable() { @Override public void run() { subscription.cancel(); } }); }
/** * Implements a unary -> stream call as {@link Mono} -> {@link Flux}, where the server responds with a * stream of messages. */ public static <TRequest, TResponse> void oneToMany( TRequest request, StreamObserver<TResponse> responseObserver, Function<Mono<TRequest>, Flux<TResponse>> delegate) { try { Mono<TRequest> rxRequest = Mono.just(request); Flux<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest)); rxResponse.subscribe(new ReactivePublisherBackpressureOnReadyHandler<>( (ServerCallStreamObserver<TResponse>) responseObserver)); } catch (Throwable throwable) { responseObserver.onError(prepareError(throwable)); } }
BlockingStreamObserver(String id, ServerCallStreamObserver<T> delegate) { this.id = id; this.delegate = delegate; this.delegate.setOnReadyHandler(this::wakeup); this.delegate.setOnCancelHandler(this::wakeup); delegate.setCompression("gzip"); delegate.setMessageCompression(true); }
@Override public void subscribe(@NonNull MSG_SubscriptionRequest request, @NonNull StreamObserver<MSG_Notification> responseObserver) { SubscriptionRequestTO req = converter.fromProto(request); resetDebugInfo(req); BlockingStreamObserver<MSG_Notification> resp = new BlockingStreamObserver<>(req.toString(), (ServerCallStreamObserver) responseObserver); final boolean idOnly = req.idOnly(); store.subscribe(req, new GrpcObserverAdapter(req.toString(), resp, f -> idOnly ? converter .createNotificationFor(f.id()) : converter.createNotificationFor(f))); }
@Test public void testFetchById() throws Exception { UUID id = UUID.randomUUID(); uut.fetchById(protoConverter.toProto(id), mock(ServerCallStreamObserver.class)); verify(backend).fetchById(eq(id)); }
@Test public void testSubscribeFacts() throws Exception { SubscriptionRequest req = SubscriptionRequest.catchup(FactSpec.forMark()).fromNowOn(); when(backend.subscribe(this.reqCaptor.capture(), any())).thenReturn(null); uut.subscribe(new ProtoConverter().toProto(SubscriptionRequestTO.forFacts(req)), mock( ServerCallStreamObserver.class)); verify(backend).subscribe(any(), any()); assertFalse(reqCaptor.getValue().idOnly()); }
@Test public void testSubscribeIds() throws Exception { SubscriptionRequest req = SubscriptionRequest.catchup(FactSpec.forMark()).fromNowOn(); when(backend.subscribe(this.reqCaptor.capture(), any())).thenReturn(null); uut.subscribe(new ProtoConverter().toProto(SubscriptionRequestTO.forIds(req)), mock( ServerCallStreamObserver.class)); verify(backend).subscribe(any(), any()); assertTrue(reqCaptor.getValue().idOnly()); }
@Override public void watch(Request req, StreamObserver<Result> responseObserver) { logger.info("Start watching: " + req.getQuery()); int responseNo = 0; final ServerCallStreamObserver responseObserver2 = (ServerCallStreamObserver) responseObserver; while (!responseObserver2.isCancelled()) { sleepUpToMiilis(1000); responseObserver.onNext(Result.newBuilder().setTitle(format("result %d for [%s] from backend %d", responseNo++, req.getQuery(), id)).build()); } responseObserver2.setOnCancelHandler(() -> logger.warning("Request canceled!")); }
@Override public void staticUnaryCallSetsMessageCompression(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) { if (!request.equals(REQUEST_MESSAGE)) { responseObserver.onError(new IllegalArgumentException("Unexpected request: " + request)); return; } ServerCallStreamObserver<SimpleResponse> callObserver = (ServerCallStreamObserver<SimpleResponse>) responseObserver; callObserver.setCompression("gzip"); callObserver.setMessageCompression(true); responseObserver.onNext(RESPONSE_MESSAGE); responseObserver.onCompleted(); }
@VisibleForTesting GrpcSink( final String rpcCommandName, ServerCallStreamObserver<RunResponse> observer, ExecutorService executor) { // This queue is intentionally unbounded: we always act on it fairly quickly so filling up // RAM is not a concern but we don't want to block in the gRPC cancel/onready handlers. this.actionQueue = new LinkedBlockingQueue<>(); this.exchanger = new Exchanger<>(); this.observer = observer; this.observer.setOnCancelHandler( () -> { Thread commandThread = GrpcSink.this.commandThread.get(); if (commandThread != null) { logger.info( String.format( "Interrupting thread %s due to the streaming %s call being cancelled " + "(likely client hang up or explicit gRPC-level cancellation)", commandThread.getName(), rpcCommandName)); commandThread.interrupt(); } actionQueue.offer(SinkThreadAction.DISCONNECT); }); this.observer.setOnReadyHandler(() -> actionQueue.offer(SinkThreadAction.READY)); this.future = executor.submit(GrpcSink.this::call); }
@Override public StreamObserver<ServerReflectionRequest> serverReflectionInfo( final StreamObserver<ServerReflectionResponse> responseObserver) { final ServerCallStreamObserver<ServerReflectionResponse> serverCallStreamObserver = (ServerCallStreamObserver<ServerReflectionResponse>) responseObserver; ProtoReflectionStreamObserver requestObserver = new ProtoReflectionStreamObserver(updateIndexIfNecessary(), serverCallStreamObserver); serverCallStreamObserver.setOnReadyHandler(requestObserver); serverCallStreamObserver.disableAutoInboundFlowControl(); serverCallStreamObserver.request(1); return requestObserver; }
@Override public StreamObserver<Messages.SimpleRequest> streamingCall( final StreamObserver<Messages.SimpleResponse> observer) { final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver = (ServerCallStreamObserver<Messages.SimpleResponse>) observer; // TODO(spencerfang): flow control to stop reading when !responseObserver.isReady return new StreamObserver<Messages.SimpleRequest>() { @Override public void onNext(Messages.SimpleRequest value) { if (shutdown.get()) { responseObserver.onCompleted(); return; } responseObserver.onNext(Utils.makeResponse(value)); } @Override public void onError(Throwable t) { // other side closed with non OK responseObserver.onError(t); } @Override public void onCompleted() { // other side closed with OK responseObserver.onCompleted(); } }; }
@Override public void streamingFromServer( final Messages.SimpleRequest request, final StreamObserver<Messages.SimpleResponse> observer) { // send forever, until the client cancels or we shut down final Messages.SimpleResponse response = Utils.makeResponse(request); final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver = (ServerCallStreamObserver<Messages.SimpleResponse>) observer; // If the client cancels, copyWithFlowControl takes care of calling // responseObserver.onCompleted() for us StreamObservers.copyWithFlowControl( new Iterator<Messages.SimpleResponse>() { @Override public boolean hasNext() { return !shutdown.get() && !responseObserver.isCancelled(); } @Override public Messages.SimpleResponse next() { return response; } @Override public void remove() { throw new UnsupportedOperationException(); } }, responseObserver); }
@Override public void subscribe(Subscriber<? super T> subscriber) { Preconditions.checkNotNull(subscriber); subscriber.onSubscribe(new Subscription() { private static final int MAX_REQUEST_RETRIES = 20; @Override public void request(long l) { // Some Reactive Streams implementations use Long.MAX_VALUE to indicate "all messages"; gRPC uses Integer.MAX_VALUE. int i = (int) Math.min(l, Integer.MAX_VALUE); // Very rarely, request() gets called before the client has finished setting up its stream. If this // happens, wait momentarily and try again. for (int j = 0; j < MAX_REQUEST_RETRIES; j++) { try { callStreamObserver.request(i); break; } catch (IllegalStateException ex) { if (j == MAX_REQUEST_RETRIES - 1) { throw ex; } try { Thread.sleep(1); } catch (InterruptedException e) { // no-op } } } } @Override public void cancel() { // Don't cancel twice if the server is already canceled if (callStreamObserver instanceof ServerCallStreamObserver && ((ServerCallStreamObserver) callStreamObserver).isCancelled()) { return; } isCanceled = true; if (callStreamObserver instanceof ClientCallStreamObserver) { ((ClientCallStreamObserver) callStreamObserver).cancel("Client canceled request", null); } else { callStreamObserver.onError(Status.CANCELLED.withDescription("Server canceled request").asRuntimeException()); } } }); this.subscriber = subscriber; subscribed.countDown(); }
/** * Immediately responds with a payload of the type and size specified in the request. */ @Override public void unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver) { ServerCallStreamObserver<SimpleResponse> obs = (ServerCallStreamObserver<SimpleResponse>) responseObserver; SimpleResponse.Builder responseBuilder = SimpleResponse.newBuilder(); try { switch (req.getResponseCompression()) { case DEFLATE: // fallthrough, just use gzip case GZIP: obs.setCompression("gzip"); break; case NONE: obs.setCompression("identity"); break; case UNRECOGNIZED: // fallthrough default: obs.onError(Status.INVALID_ARGUMENT .withDescription("Unknown: " + req.getResponseCompression()) .asRuntimeException()); return; } } catch (IllegalArgumentException e) { obs.onError(Status.UNIMPLEMENTED .withDescription("compression not supported.") .withCause(e) .asRuntimeException()); return; } if (req.getResponseSize() != 0) { boolean compressable = compressableResponse(req.getResponseType()); ByteString dataBuffer = compressable ? compressableBuffer : uncompressableBuffer; // For consistency with the c++ TestServiceImpl, use a random offset for unary calls. // TODO(wonderfly): whether or not this is a good approach needs further discussion. int offset = random.nextInt( compressable ? compressableBuffer.size() : uncompressableBuffer.size()); ByteString payload = generatePayload(dataBuffer, offset, req.getResponseSize()); responseBuilder.getPayloadBuilder() .setType(compressable ? PayloadType.COMPRESSABLE : PayloadType.UNCOMPRESSABLE) .setBody(payload); } if (req.hasResponseStatus()) { obs.onError(Status.fromCodeValue(req.getResponseStatus().getCode()) .withDescription(req.getResponseStatus().getMessage()) .asRuntimeException()); return; } responseObserver.onNext(responseBuilder.build()); responseObserver.onCompleted(); }
/** * Immediately responds with a payload of the type and size specified in the request. */ @Override public void unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver) { ServerCallStreamObserver<SimpleResponse> obs = (ServerCallStreamObserver<SimpleResponse>) responseObserver; SimpleResponse.Builder responseBuilder = SimpleResponse.newBuilder(); try { if (req.hasResponseCompressed() && req.getResponseCompressed().getValue()) { obs.setCompression("gzip"); } else { obs.setCompression("identity"); } } catch (IllegalArgumentException e) { obs.onError(Status.UNIMPLEMENTED .withDescription("compression not supported.") .withCause(e) .asRuntimeException()); return; } if (req.getResponseSize() != 0) { boolean compressable = compressableResponse(req.getResponseType()); ByteString dataBuffer = compressable ? compressableBuffer : uncompressableBuffer; // For consistency with the c++ TestServiceImpl, use a random offset for unary calls. // TODO(wonderfly): whether or not this is a good approach needs further discussion. int offset = random.nextInt( compressable ? compressableBuffer.size() : uncompressableBuffer.size()); ByteString payload = generatePayload(dataBuffer, offset, req.getResponseSize()); responseBuilder.getPayloadBuilder() .setType(compressable ? PayloadType.COMPRESSABLE : PayloadType.UNCOMPRESSABLE) .setBody(payload); } if (req.hasResponseStatus()) { obs.onError(Status.fromCodeValue(req.getResponseStatus().getCode()) .withDescription(req.getResponseStatus().getMessage()) .asRuntimeException()); return; } responseObserver.onNext(responseBuilder.build()); responseObserver.onCompleted(); }
/** * Create a chain of client to server calls which can be cancelled top down. * * @return a Future that completes when call chain is created */ private Future<?> startChainingServer(final int depthThreshold) throws IOException { final AtomicInteger serversReady = new AtomicInteger(); final SettableFuture<Void> chainReady = SettableFuture.create(); class ChainingService extends TestServiceGrpc.TestServiceImplBase { @Override public void unaryCall(final SimpleRequest request, final StreamObserver<SimpleResponse> responseObserver) { ((ServerCallStreamObserver) responseObserver).setOnCancelHandler(new Runnable() { @Override public void run() { receivedCancellations.countDown(); } }); if (serversReady.incrementAndGet() == depthThreshold) { // Stop recursion chainReady.set(null); return; } Context.currentContextExecutor(otherWork).execute(new Runnable() { @Override public void run() { try { blockingStub.unaryCall(request); } catch (StatusRuntimeException e) { Status status = e.getStatus(); if (status.getCode() == Status.Code.CANCELLED) { observedCancellations.countDown(); } else { responseObserver.onError(e); } } } }); } } server = InProcessServerBuilder.forName("channel").executor(otherWork) .addService(new ChainingService()) .build().start(); return chainReady; }
ProtoReflectionStreamObserver( ServerReflectionIndex serverReflectionIndex, ServerCallStreamObserver<ServerReflectionResponse> serverCallStreamObserver) { this.serverReflectionIndex = serverReflectionIndex; this.serverCallStreamObserver = checkNotNull(serverCallStreamObserver, "observer"); }
@Override public StreamObserver<Messages.SimpleRequest> streamingBothWays( final StreamObserver<Messages.SimpleResponse> observer) { // receive data forever and send data forever until client cancels or we shut down. final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver = (ServerCallStreamObserver<Messages.SimpleResponse>) observer; // If the client cancels, copyWithFlowControl takes care of calling // responseObserver.onCompleted() for us StreamObservers.copyWithFlowControl( new Iterator<Messages.SimpleResponse>() { @Override public boolean hasNext() { return !shutdown.get() && !responseObserver.isCancelled(); } @Override public Messages.SimpleResponse next() { return BIDI_RESPONSE; } @Override public void remove() { throw new UnsupportedOperationException(); } }, responseObserver ); return new StreamObserver<Messages.SimpleRequest>() { @Override public void onNext(final Messages.SimpleRequest request) { // noop } @Override public void onError(Throwable t) { // other side cancelled } @Override public void onCompleted() { // Should never happen, because clients should cancel this call in order to stop // the operation. Also because copyWithFlowControl hogs the inbound network thread // via the handler for onReady, we would never expect this callback to be able to // run anyways. log.severe("clients should CANCEL the call to stop bidi streaming"); } }; }