/** * Start Netty Grpc Server. * * @return Server gRPC Server * @throws IOException - when something went wrong starting the grpc server */ final Server start() throws IOException { final int port = 8080; log.info("Starting grpc server on port '{}'...", port); final Server server = NettyServerBuilder .forPort(port) .addService(productReadService) .addService(productUpdateService) .addService(ServerInterceptors.intercept(echoService, serviceInterceptor)) .build(); server.start(); log.info("grpc (port={}) server started successfully.", port); return server; }
public void start(File certChainFile, File privateKeyFile, int port) throws IOException { ServerBuilder builder = ServerBuilder.forPort(port).useTransportSecurity(certChainFile, privateKeyFile); if (mAuthenticator != null) { builder.addService(ServerInterceptors.intercept( mSdkService, new AuthenticationInterceptor(this.mAuthenticator))); } else { builder.addService(mSdkService); } mGrpc = builder.build(); mGrpc.start(); }
@Test public void interceptorShouldFreezeContext() { TestService svc = new TestService(); // Plumbing serverRule.getServiceRegistry().addService(ServerInterceptors.interceptForward(svc, new AmbientContextServerInterceptor("ctx-"), new AmbientContextFreezeServerInterceptor())); GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc .newBlockingStub(serverRule.getChannel()) .withInterceptors(new AmbientContextClientInterceptor("ctx-")); // Test Metadata.Key<String> key = Metadata.Key.of("ctx-k", Metadata.ASCII_STRING_MARSHALLER); AmbientContext.initialize(Context.current()).run(() -> { AmbientContext.current().put(key, "value"); stub.sayHello(HelloRequest.newBuilder().setName("World").build()); }); assertThat(svc.frozen).isTrue(); }
private void start() throws IOException { server = ServerBuilder.forPort(port) .addService(ServerInterceptors.intercept(new GreeterImpl(), new HeaderServerInterceptor())) .build() .start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); CustomHeaderServer.this.stop(); System.err.println("*** server shut down"); } }); }
/** * Starts the Abelana GRPC server. * @throws Exception runtime Abelana GRPC server exception */ private void start() throws Exception { server = NettyServerBuilder.forPort(BackendConstants.PORT).addService(ServerInterceptors .intercept(AbelanaGrpc.bindService(new AbelanaGrpcImpl()), new AuthHeaderServerInterceptor())).build().start(); LOGGER.info("Server started, listening on " + BackendConstants.PORT); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.err.println("*** shutting down gRPC server" + " since JVM is shutting down"); AbelanaGrpcServer.this.stop(); System.err.println("*** server shut down"); } }); }
public Server startServer() throws IOException { ServerInterceptor headersInterceptor = new TracingMetadataUtils.ServerHeadersInterceptor(); NettyServerBuilder b = NettyServerBuilder.forPort(workerOptions.listenPort) .addService(ServerInterceptors.intercept(actionCacheServer, headersInterceptor)) .addService(ServerInterceptors.intercept(bsServer, headersInterceptor)) .addService(ServerInterceptors.intercept(casServer, headersInterceptor)); if (execServer != null) { b.addService(ServerInterceptors.intercept(execServer, headersInterceptor)); b.addService(ServerInterceptors.intercept(watchServer, headersInterceptor)); } else { logger.info("Execution disabled, only serving cache requests."); } Server server = b.build(); logger.log(INFO, "Starting gRPC server on port {0,number,#}.", workerOptions.listenPort); server.start(); return server; }
private void start() throws IOException { server = ServerBuilder.forPort(PORT) .addService(ServerInterceptors.intercept(new GreeterImpl(), new HeaderServerInterceptor())) .build() .start(); logger.info("Server started, listening on " + PORT); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); CustomHeaderServer.this.stop(); System.err.println("*** server shut down"); } }); }
@Test public void clientHeaderDeliveredToServer() { grpcServerRule.getServiceRegistry() .addService(ServerInterceptors.intercept(new GreeterImplBase() {}, mockServerInterceptor)); GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub( ClientInterceptors.intercept(grpcServerRule.getChannel(), new HeaderClientInterceptor())); ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class); try { blockingStub.sayHello(HelloRequest.getDefaultInstance()); fail(); } catch (StatusRuntimeException expected) { // expected because the method is not implemented at server side } verify(mockServerInterceptor).interceptCall( Matchers.<ServerCall<HelloRequest, HelloReply>>any(), metadataCaptor.capture(), Matchers.<ServerCallHandler<HelloRequest, HelloReply>>any()); assertEquals( "customRequestValue", metadataCaptor.getValue().get(HeaderClientInterceptor.CUSTOM_HEADER_KEY)); }
@VisibleForTesting void start() throws Exception { executor = Executors.newSingleThreadScheduledExecutor(); SslContext sslContext = null; if (useTls) { sslContext = GrpcSslContexts.forServer( TestUtils.loadCert("server1.pem"), TestUtils.loadCert("server1.key")).build(); } server = NettyServerBuilder.forPort(port) .sslContext(sslContext) .maxMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) .addService(ServerInterceptors.intercept( new TestServiceImpl(executor), TestServiceImpl.interceptors())) .build().start(); }
private void start() throws IOException { server = NettyServerBuilder.forPort(port) .addService(ServerInterceptors.intercept( new SimpleServiceImpl())) .addService(ServerInterceptors.intercept( new LessSimpleServiceImpl("localhost", port))).build().start(); }
@Before public void setUp() throws Exception { faker = new Faker(); Injector injector = Guice.createInjector(); EchoService echoService = injector.getInstance(EchoService.class); ServiceInterceptor serviceInterceptor = injector.getInstance(ServiceInterceptor.class); CallerInterceptor callerInterceptor = injector.getInstance(CallerInterceptor.class); grpcServerRule.getServiceRegistry().addService(ServerInterceptors.intercept(echoService, serviceInterceptor)); Channel channel = ClientInterceptors.intercept( grpcServerRule.getChannel(), callerInterceptor); stub = EchoServiceGrpc.newBlockingStub(channel); }
public void startInsecure(int port) throws IOException { ServerBuilder builder = ServerBuilder.forPort(port); if (mAuthenticator != null) { builder.addService(ServerInterceptors.intercept( mSdkService, new AuthenticationInterceptor(this.mAuthenticator))); } else { builder.addService(mSdkService); } mGrpc = builder.build(); mGrpc.start(); }
@Test public void contextTransfersOneHopSync() throws Exception { Metadata.Key<String> ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); String expectedCtxValue = "context-value"; AtomicReference<String> ctxValue = new AtomicReference<>(); // Service GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { @Override public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) { ctxValue.set(AmbientContext.current().get(ctxKey)); responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); responseObserver.onCompleted(); } }; // Plumbing serverRule1.getServiceRegistry().addService(ServerInterceptors .intercept(svc, new AmbientContextServerInterceptor("ctx-"))); GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc .newBlockingStub(serverRule1.getChannel()) .withInterceptors(new AmbientContextClientInterceptor("ctx-")); // Test AmbientContext.initialize(Context.current()).run(() -> { AmbientContext.current().put(ctxKey, expectedCtxValue); stub.sayHello(HelloRequest.newBuilder().setName("world").build()); }); assertThat(ctxValue.get()).isEqualTo(expectedCtxValue); }
@Test public void multiValueContextTransfers() throws Exception { Metadata.Key<String> ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); String expectedCtxValue1 = "context-value1"; String expectedCtxValue2 = "context-value2"; String expectedCtxValue3 = "context-value3"; AtomicReference<Iterable<String>> ctxValue = new AtomicReference<>(); // Service GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { @Override public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) { ctxValue.set(AmbientContext.current().getAll(ctxKey)); responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); responseObserver.onCompleted(); } }; // Plumbing serverRule1.getServiceRegistry().addService(ServerInterceptors .intercept(svc, new AmbientContextServerInterceptor("ctx-"))); GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc .newBlockingStub(serverRule1.getChannel()) .withInterceptors(new AmbientContextClientInterceptor("ctx-")); // Test AmbientContext.initialize(Context.current()).run(() -> { AmbientContext.current().put(ctxKey, expectedCtxValue1); AmbientContext.current().put(ctxKey, expectedCtxValue2); AmbientContext.current().put(ctxKey, expectedCtxValue3); stub.sayHello(HelloRequest.newBuilder().setName("world").build()); }); assertThat(ctxValue.get()).containsExactlyInAnyOrder(expectedCtxValue1, expectedCtxValue2, expectedCtxValue3); }
@Test public void contextTransfersOneHopAsync() throws Exception { Metadata.Key<String> ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); String expectedCtxValue = "context-value"; AtomicReference<String> ctxValue = new AtomicReference<>(); // Service GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { @Override public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) { ctxValue.set(AmbientContext.current().get(ctxKey)); responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); responseObserver.onCompleted(); } }; // Plumbing serverRule1.getServiceRegistry().addService(ServerInterceptors .intercept(svc, new AmbientContextServerInterceptor("ctx-"))); GreeterGrpc.GreeterFutureStub stub = GreeterGrpc .newFutureStub(serverRule1.getChannel()) .withInterceptors(new AmbientContextClientInterceptor("ctx-")); // Test AmbientContext.initialize(Context.current()).run(() -> { AmbientContext.current().put(ctxKey, expectedCtxValue); ListenableFuture<HelloResponse> futureResponse = stub.sayHello(HelloRequest.newBuilder().setName("world").build()); // Verify response callbacks still have context MoreFutures.onSuccess( futureResponse, response -> assertThat(AmbientContext.current().get(ctxKey)).isEqualTo(expectedCtxValue), Context.currentContextExecutor(Executors.newSingleThreadExecutor())); await().atMost(Duration.ONE_SECOND).until(futureResponse::isDone); }); assertThat(ctxValue.get()).isEqualTo(expectedCtxValue); }
@Test public void multipleContextTransfersOneHopSync() throws Exception { Metadata.Key<String> ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); Metadata.Key<String> l5dKey = Metadata.Key.of("l5d-context-key", Metadata.ASCII_STRING_MARSHALLER); String expectedCtxValue = "context-value"; AtomicReference<String> ctxValue = new AtomicReference<>(); AtomicReference<String> l5dValue = new AtomicReference<>(); // Service GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { @Override public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) { ctxValue.set(AmbientContext.current().get(ctxKey)); l5dValue.set(AmbientContext.current().get(l5dKey)); responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); responseObserver.onCompleted(); } }; // Plumbing serverRule1.getServiceRegistry().addService(ServerInterceptors.intercept(svc, new AmbientContextServerInterceptor("ctx-"), new AmbientContextServerInterceptor("l5d-"))); GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc .newBlockingStub(serverRule1.getChannel()) .withInterceptors( new AmbientContextClientInterceptor("ctx-"), new AmbientContextClientInterceptor("l5d-")); // Test AmbientContext.initialize(Context.current()).run(() -> { AmbientContext.current().put(ctxKey, expectedCtxValue); AmbientContext.current().put(l5dKey, expectedCtxValue); stub.sayHello(HelloRequest.newBuilder().setName("world").build()); }); assertThat(ctxValue.get()).isEqualTo(expectedCtxValue); assertThat(l5dValue.get()).isEqualTo(expectedCtxValue); }
@Test public void binaryContextValueTransfers() throws Exception { Metadata.Key<byte[]> ctxKey = Metadata.Key.of( "ctx-context-key" + Metadata.BINARY_HEADER_SUFFIX, Metadata.BINARY_BYTE_MARSHALLER); byte[] expectedCtxValue = BaseEncoding.base16().decode("DEADBEEF"); AtomicReference<byte[]> ctxValue = new AtomicReference<>(); // Service GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { @Override public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) { ctxValue.set(AmbientContext.current().get(ctxKey)); responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); responseObserver.onCompleted(); } }; // Plumbing serverRule.getServiceRegistry().addService(ServerInterceptors .intercept(svc, new AmbientContextServerInterceptor("ctx-"))); GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc .newBlockingStub(serverRule.getChannel()) .withInterceptors(new AmbientContextClientInterceptor("ctx-")); // Test AmbientContext.initialize(Context.current()).run(() -> { AmbientContext.current().put(ctxKey, expectedCtxValue); stub.sayHello(HelloRequest.newBuilder().setName("world").build()); }); assertThat(ctxValue.get()).containsExactly(expectedCtxValue); }
public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,ServerBuilder<?> serverBuilder,DeploymentsHandler deploymentsHandler, int port) { this.appProperties = appProperties; this.deploymentStore = deploymentStore; this.tokenStore = tokenStore; this.grpcDeploymentsListener = new grpcDeploymentsListener(this); this.deploymentsHandler = deploymentsHandler; deploymentsHandler.addListener(this.grpcDeploymentsListener); this.port = port; server = serverBuilder .addService(ServerInterceptors.intercept(new SeldonService(this), new HeaderServerInterceptor(this))) .build(); }
static public void main(String [] args) throws IOException, InterruptedException { Brave brave = Constant.brave("greeting-service"); Server greetingServer = ServerBuilder.forPort(8080) .addService(ServerInterceptors.intercept(new GreetingServiceImpl(), new BraveGrpcServerInterceptor(brave), MonitoringServerInterceptor.create(Configuration.allMetrics()))) .build(); greetingServer.start(); System.out.println("Server started!"); greetingServer.awaitTermination(); }
static public void main(String[] args) throws IOException, InterruptedException { UnknownStatusDescriptionInterceptor unknownStatusDescriptionInterceptor = new UnknownStatusDescriptionInterceptor(Arrays.asList( IllegalArgumentException.class )); Server server = ServerBuilder.forPort(8080) .addService(ServerInterceptors.intercept(new ErrorServiceImpl(), unknownStatusDescriptionInterceptor)) .build(); System.out.println("Starting server..."); server.start(); System.out.println("Server started!"); server.awaitTermination(); }
static public void main(String [] args) throws IOException, InterruptedException { JwtServerInterceptor jwtInterceptor = new JwtServerInterceptor(Constant.JWT_SECRET); Server greetingServer = ServerBuilder.forPort(8080) .addService(ServerInterceptors.intercept(new GreetingServiceImpl(), jwtInterceptor, new TraceIdServerInterceptor())) .build(); greetingServer.start(); System.out.println("Server started!"); greetingServer.awaitTermination(); }
/** * Load gRPC specified service definition and then add to gRPC registry. * * @param serverBuilder NettyServerBuilder * @return the instance of NettyServerBuilder */ @SuppressWarnings("unchecked") protected T bindService(T serverBuilder) { List<ServiceDefinitionAdapter<?>> services = ServiceDefinitionLoader.loader().getServiceList(); for (ServiceDefinitionAdapter<?> service : services) { serverBuilder .addService(ServerInterceptors.intercept((ServerServiceDefinition)service.getServiceDefinition(), adaptedInterceptors)); } serverBuilder.addService(ProtoReflectionService.newInstance()); return serverBuilder; }
private void startGrpcServer(Configuration monitoringConfig) { MonitoringServerInterceptor interceptor = MonitoringServerInterceptor.create( monitoringConfig.withCollectorRegistry(collectorRegistry)); grpcPort = Utils.pickUnusedPort(); grpcServer = ServerBuilder.forPort(grpcPort) .addService(ServerInterceptors.intercept(new HelloServiceImpl().bindService(), interceptor)) .build(); try { grpcServer.start(); } catch (IOException e) { throw new RuntimeException("Exception while running grpc server", e); } }
@Autowired public ExternalRpcServer(PredictionService predictionService) { logger.info("Initializing RPC server..."); this.predictionService = predictionService; ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port); server = serverBuilder.addService(ServerInterceptors.intercept(this, this)).build(); }
@Before public void setUp() throws Exception { GreeterImplBase greeterImplBase = new GreeterImplBase() { @Override public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) { responseObserver.onNext(HelloReply.getDefaultInstance()); responseObserver.onCompleted(); } }; grpcServerRule.getServiceRegistry() .addService(ServerInterceptors.intercept(greeterImplBase, new HeaderServerInterceptor())); }
@Before public void setUp() throws Exception { grpcServerRule .getServiceRegistry() .addService( ServerInterceptors.intercept(greeterServiceImpl, injectCacheControlInterceptor)); grpcServerRule.getServiceRegistry().addService(anotherGreeterServiceImpl); baseChannel = grpcServerRule.getChannel(); SafeMethodCachingInterceptor interceptor = SafeMethodCachingInterceptor.newSafeMethodCachingInterceptor(cache); channelToUse = ClientInterceptors.intercept(baseChannel, interceptor); }
private void startServer() { AbstractServerImplBuilder<?> builder = getServerBuilder(); if (builder == null) { server = null; return; } testServiceExecutor = Executors.newScheduledThreadPool(2); List<ServerInterceptor> allInterceptors = ImmutableList.<ServerInterceptor>builder() .add(recordServerCallInterceptor(serverCallCapture)) .add(TestUtils.recordRequestHeadersInterceptor(requestHeadersCapture)) .add(recordContextInterceptor(contextCapture)) .addAll(TestServiceImpl.interceptors()) .build(); builder .addService( ServerInterceptors.intercept( new TestServiceImpl(testServiceExecutor), allInterceptors)) .addStreamTracerFactory(serverStreamTracerFactory); io.grpc.internal.TestingAccessor.setStatsImplementation( builder, new CensusStatsModule( tagger, tagContextBinarySerializer, serverStatsRecorder, GrpcUtil.STOPWATCH_SUPPLIER, true)); try { server = builder.build().start(); } catch (IOException ex) { throw new RuntimeException(ex); } }
private void startServer(int serverFlowControlWindow) { ServerBuilder<?> builder = NettyServerBuilder.forAddress(new InetSocketAddress("localhost", 0)) .flowControlWindow(serverFlowControlWindow); builder.addService(ServerInterceptors.intercept( new TestServiceImpl(Executors.newScheduledThreadPool(2)), ImmutableList.<ServerInterceptor>of())); try { server = builder.build().start(); } catch (IOException e) { throw new RuntimeException(e); } }
/** * Get the current service definition * @return A service definition instance */ @Override public ServerServiceDefinition getServiceDefinition() { return ServerInterceptors.intercept(bindService(), interceptor); }
/** * Get password service definition * @return A service definition instance */ @Override public ServerServiceDefinition getServiceDefinition() { return ServerInterceptors.intercept(bindService(), interceptor); }
@VisibleForTesting ServerServiceDefinition applyServiceInterceptor( ServerServiceDefinition serverServiceDefinition, List<ServerInterceptor> serverInterceptors) { return ServerInterceptors.intercept(serverServiceDefinition, serverInterceptors); }
@Test public void compression() throws Exception { if (clientAcceptEncoding) { clientDecompressors = clientDecompressors.with(clientCodec, true); } if (clientEncoding) { clientCompressors.register(clientCodec); } if (serverAcceptEncoding) { serverDecompressors = serverDecompressors.with(serverCodec, true); } if (serverEncoding) { serverCompressors.register(serverCodec); } server = ServerBuilder.forPort(0) .addService( ServerInterceptors.intercept(new LocalServer(), new ServerCompressorInterceptor())) .compressorRegistry(serverCompressors) .decompressorRegistry(serverDecompressors) .build() .start(); channel = ManagedChannelBuilder.forAddress("localhost", server.getPort()) .decompressorRegistry(clientDecompressors) .compressorRegistry(clientCompressors) .intercept(new ClientCompressorInterceptor()) .usePlaintext(true) .build(); stub = TestServiceGrpc.newBlockingStub(channel); stub.unaryCall(REQUEST); if (clientAcceptEncoding && serverEncoding) { assertEquals("fzip", clientResponseHeaders.get(MESSAGE_ENCODING_KEY)); if (enableServerMessageCompression) { assertTrue(clientCodec.anyRead); assertTrue(serverCodec.anyWritten); } else { assertFalse(clientCodec.anyRead); assertFalse(serverCodec.anyWritten); } } else { // Either identity or null is accepted. assertThat(clientResponseHeaders.get(MESSAGE_ENCODING_KEY)) .isAnyOf(Codec.Identity.NONE.getMessageEncoding(), null); assertFalse(clientCodec.anyRead); assertFalse(serverCodec.anyWritten); } if (serverAcceptEncoding) { assertEqualsString("fzip", clientResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY)); } else { assertNull(clientResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY)); } if (clientAcceptEncoding) { assertEqualsString("fzip", serverResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY)); } else { assertNull(serverResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY)); } // Second call, once the client knows what the server supports. if (clientEncoding && serverAcceptEncoding) { assertEquals("fzip", serverResponseHeaders.get(MESSAGE_ENCODING_KEY)); if (enableClientMessageCompression) { assertTrue(clientCodec.anyWritten); assertTrue(serverCodec.anyRead); } else { assertFalse(clientCodec.anyWritten); assertFalse(serverCodec.anyRead); } } else { assertNull(serverResponseHeaders.get(MESSAGE_ENCODING_KEY)); assertFalse(clientCodec.anyWritten); assertFalse(serverCodec.anyRead); } }
/** * Create a tree of client to server calls where each received call on the server * fans out to two downstream calls. Uses SimpleRequest.response_size to limit the nodeCount * of the tree. One of the leaves will ABORT to trigger cancellation back up to tree. */ private void startCallTreeServer(int depthThreshold) throws IOException { final AtomicInteger nodeCount = new AtomicInteger((2 << depthThreshold) - 1); server = InProcessServerBuilder.forName("channel").executor(otherWork).addService( ServerInterceptors.intercept(service, new ServerInterceptor() { @Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( final ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { // Respond with the headers but nothing else. call.sendHeaders(new Metadata()); call.request(1); return new ServerCall.Listener<ReqT>() { @Override public void onMessage(final ReqT message) { Messages.SimpleRequest req = (Messages.SimpleRequest) message; if (nodeCount.decrementAndGet() == 0) { // we are in the final leaf node so trigger an ABORT upwards Context.currentContextExecutor(otherWork).execute(new Runnable() { @Override public void run() { call.close(Status.ABORTED, new Metadata()); } }); } else if (req.getResponseSize() != 0) { // We are in a non leaf node so fire off two requests req = req.toBuilder().setResponseSize(req.getResponseSize() - 1).build(); for (int i = 0; i < 2; i++) { asyncStub.unaryCall(req, new StreamObserver<Messages.SimpleResponse>() { @Override public void onNext(Messages.SimpleResponse value) { } @Override public void onError(Throwable t) { Status status = Status.fromThrowable(t); if (status.getCode() == Status.Code.CANCELLED) { observedCancellations.countDown(); } // Propagate closure upwards. try { call.close(status, new Metadata()); } catch (IllegalStateException t2) { // Ignore error if already closed. } } @Override public void onCompleted() { } }); } } } @Override public void onCancel() { receivedCancellations.countDown(); } }; } }) ).build(); server.start(); }
@Test public void statusRuntimeExceptionTransmitter() { final Status expectedStatus = Status.UNAVAILABLE; final Metadata expectedMetadata = new Metadata(); FakeServerCall<Void, Void> call = new FakeServerCall<Void, Void>(expectedStatus, expectedMetadata); final StatusRuntimeException exception = new StatusRuntimeException(expectedStatus, expectedMetadata); listener = new ServerCall.Listener<Void>() { @Override public void onMessage(Void message) { throw exception; } @Override public void onHalfClose() { throw exception; } @Override public void onCancel() { throw exception; } @Override public void onComplete() { throw exception; } @Override public void onReady() { throw exception; } }; ServerServiceDefinition intercepted = ServerInterceptors.intercept( serviceDefinition, Arrays.asList(TransmitStatusRuntimeExceptionInterceptor.instance())); // The interceptor should have handled the error by directly closing the ServerCall // and the exception should not propagate to the method's caller getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onMessage(null); getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onCancel(); getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onComplete(); getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onHalfClose(); getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onReady(); assertEquals(5, call.numCloses); }