@Override public Subscription subscribe(@NonNull SubscriptionRequestTO req, @NonNull FactObserver observer) { SubscriptionImpl<Fact> subscription = SubscriptionImpl.on(observer); StreamObserver<FactStoreProto.MSG_Notification> responseObserver = new ClientStreamObserver( subscription); ClientCall<MSG_SubscriptionRequest, MSG_Notification> call = stub.getChannel().newCall( RemoteFactStoreGrpc.METHOD_SUBSCRIBE, stub.getCallOptions() .withWaitForReady() .withCompression("gzip")); asyncServerStreamingCall(call, converter.toProto(req), responseObserver); return subscription.onClose(() -> { cancel(call); }); }
@Override public void failPlease(FailWithProbabilityOrSucceedEchoRequest request, StreamObserver<EchoResponse> responseObserver) { EchoRequest echoRequest = request.getEchoRequest(); int failProbability = request.getFailProbability(); Preconditions.checkArgument(failProbability >= 0 && failProbability <= 100, "fail probability not [" + failProbability + "] not in range [0, 100] inclusive"); logger.info("fail please - p(" + failProbability + " / 100)" + " echo: " + echoRequest.getEcho() + " with " + "repetitions: " + echoRequest.getRepeatEcho() + " received at " + DATE_FORMAT.format(new Date(System.currentTimeMillis()))); int randomFail = RANDOM.nextInt(100); if (randomFail < failProbability) { Status status = Status.INTERNAL; status = status.withCause(new FailPleaseException("Looks like you hit jackpot - we failed!")); responseObserver.onError(status.asRuntimeException()); } else { responseObserver.onNext(buildEchoResponseFromEchoRequest(echoRequest)); responseObserver.onCompleted(); } }
@Test public void manyToOne() { AtomicBoolean called = new AtomicBoolean(false); GreeterGrpc.GreeterStub stub = GreeterGrpc.newStub(channel); StreamObserver<HelloRequest> requestStream = stub.sayHelloReqStream(new LambdaStreamObserver<>( response -> { assertThat(response.getMessage()).isEqualTo("Hello A and B and C"); called.set(true); } )); requestStream.onNext(HelloRequest.newBuilder().setName("A").build()); requestStream.onNext(HelloRequest.newBuilder().setName("B").build()); requestStream.onNext(HelloRequest.newBuilder().setName("C").build()); requestStream.onCompleted(); await().atMost(1, TimeUnit.SECONDS).untilTrue(called); }
/** * Implements a bidirectional stream -> stream call as {@link Flowable} -> {@link Flowable}, where both the client * and the server independently stream to each other. */ public static <TRequest, TResponse> Flowable<TResponse> manyToMany( Flowable<TRequest> rxRequest, Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate) { try { final RxProducerConsumerStreamObserver<TRequest, TResponse> consumerStreamObserver = new RxProducerConsumerStreamObserver<TRequest, TResponse>(rxRequest); delegate.apply(new CancellableStreamObserver<TRequest, TResponse>(consumerStreamObserver, new Runnable() { @Override public void run() { consumerStreamObserver.cancel(); } })); consumerStreamObserver.rxSubscribe(); return ((Flowable<TResponse>) consumerStreamObserver.getRxConsumer()) .lift(new SubscribeOnlyOnceFlowableOperator<TResponse>()); } catch (Throwable throwable) { return Flowable.error(throwable); } }
/** * Implements a stream -> unary call as {@link Flux} -> {@link Mono}, where the client transits a stream of * messages. */ public static <TRequest, TResponse> Mono<TResponse> manyToOne( Flux<TRequest> rxRequest, Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate) { try { return Mono .<TResponse>create(emitter -> { ReactiveProducerStreamObserver<TRequest, TResponse> reactiveProducerStreamObserver = new ReactiveProducerStreamObserver<>( rxRequest, emitter::success, emitter::error, Runnables.doNothing()); delegate.apply( new CancellableStreamObserver<>(reactiveProducerStreamObserver, reactiveProducerStreamObserver::cancel)); reactiveProducerStreamObserver.rxSubscribe(); }) .transform(Operators.lift(new SubscribeOnlyOnceLifter<TResponse>())); } catch (Throwable throwable) { return Mono.error(throwable); } }
@Override public void execute( ExecuteRequest request, StreamObserver<Operation> responseObserver) { Instance instance; try { instance = instances.get(request.getInstanceName()); } catch (InstanceNotFoundException ex) { responseObserver.onError(BuildFarmInstances.toStatusException(ex)); return; } instance.execute( request.getAction(), request.getSkipCacheLookup(), request.getTotalInputFileCount(), request.getTotalInputFileBytes(), (operation) -> { responseObserver.onNext(operation); responseObserver.onCompleted(); }); }
@Override public void downloadProducts(DownloadProductsRequest request, StreamObserver<Product> responseObserver) { PublishSubject<Product> productPublishSubject = PublishSubject.create(); productPublishSubject .doOnNext(product -> { responseObserver.onNext(product); counter.labels("downloadProducts", "success"); }) .doOnComplete(() -> responseObserver.onCompleted()) .doOnError(t -> { responseObserver.onError(t); counter.labels("downloadProducts", "failed"); }) .subscribe(); productDao.downloadProducts(request, productPublishSubject); }
@Override public void downloadProductImage(DownloadProductImageRequest request, StreamObserver<DataChunk> responseObserver) { try { BufferedInputStream imageStream = new BufferedInputStream( productImageSeeker.seekProductImage(request.getProductId()) ); int bufferSize = 256 * 1024;// 256k byte[] buffer = new byte[bufferSize]; int length; while ((length = imageStream.read(buffer, 0, bufferSize)) != -1) { responseObserver.onNext( DataChunk.newBuilder().setData(ByteString.copyFrom(buffer, 0, length)).build() ); } responseObserver.onCompleted(); imageStream.close(); counter.labels("downloadProductImage", "success"); } catch (Exception e) { counter.labels("downloadProductImage", "failed"); log.error("error on read product image", e); responseObserver.onError(e); } }
@Test public void testPublishSome() throws Exception { doNothing().when(backend).publish(acFactList.capture()); Builder b = MSG_Facts.newBuilder(); Test0Fact f1 = new Test0Fact(); Test0Fact f2 = new Test0Fact(); MSG_Fact msg1 = protoConverter.toProto(f1); MSG_Fact msg2 = protoConverter.toProto(f2); b.addAllFact(Arrays.asList(msg1, msg2)); MSG_Facts r = b.build(); uut.publish(r, mock(StreamObserver.class)); verify(backend).publish(anyList()); List<Fact> facts = acFactList.getValue(); assertFalse(facts.isEmpty()); assertEquals(2, facts.size()); assertEquals(f1.id(), facts.get(0).id()); assertEquals(f2.id(), facts.get(1).id()); }
@Override public void sayHelloStreaming(HelloRequest req, StreamObserver<HelloReply> responseObserver) { try { logger.info("Saying hi"); responseObserver.onNext( HelloReply.newBuilder().setMessage("Hello " + req.getName()).build()); Thread.sleep(100); logger.info("Saying hi"); responseObserver.onNext( HelloReply.newBuilder().setMessage("Hello " + req.getName() + "!").build()); Thread.sleep(100); logger.info("Saying hi"); responseObserver.onNext( HelloReply.newBuilder().setMessage("Hello " + req.getName() + "!!").build()); } catch (InterruptedException e) { responseObserver.onError(e); } finally { responseObserver.onCompleted(); } }
@Override public void put( Operation operation, StreamObserver<com.google.rpc.Status> responseObserver) { Instance instance; try { instance = instances.getFromOperationName(operation.getName()); } catch (InstanceNotFoundException ex) { responseObserver.onError(BuildFarmInstances.toStatusException(ex)); return; } boolean ok = instance.putOperation(operation); Code code = ok ? Code.OK : Code.UNAVAILABLE; responseObserver.onNext(com.google.rpc.Status.newBuilder() .setCode(code.getNumber()) .build()); responseObserver.onCompleted(); }
private void onLinksVanished(LinkVanishedMsg request, StreamObserver<Void> responseObserver) { String scheme = request.getProviderId(); switch (request.getSubjectCase()) { case CONNECT_POINT: ConnectPoint cp = translate(request.getConnectPoint()); getLinkProviderServiceFor(scheme).linksVanished(cp); break; case DEVICE_ID: DeviceId did = deviceId(request.getDeviceId()); getLinkProviderServiceFor(scheme).linksVanished(did); break; case LINK_DESCRIPTION: LinkDescription desc = translate(request.getLinkDescription()); getLinkProviderServiceFor(scheme).linkVanished(desc); break; case SUBJECT_NOT_SET: default: // do nothing break; } }
@Override public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) { boolean ok = txConsistentService.handle(new TxEvent( message.getServiceName(), message.getInstanceId(), new Date(message.getTimestamp()), message.getGlobalTxId(), message.getLocalTxId(), message.getParentTxId().isEmpty() ? null : message.getParentTxId(), message.getType(), message.getCompensationMethod(), message.getPayloads().toByteArray() )); responseObserver.onNext(ok ? ALLOW : REJECT); responseObserver.onCompleted(); }
@Override public void createUser(MetaProto.UserParam user, StreamObserver<StatusProto.ResponseStatus> responseStreamObserver) { TransactionController txController = null; try { txController = ConnectionPool.INSTANCE().getTxController(); ActionResponse input = new ActionResponse(); input.setParam(user); txController.setAutoCommit(false); txController.addAction(new CreateUserAction()); txController.commit(input); responseStreamObserver.onNext(MetaConstants.OKStatus); responseStreamObserver.onCompleted(); } catch (ParaFlowException e) { responseStreamObserver.onNext(e.getResponseStatus()); responseStreamObserver.onCompleted(); e.handle(); } finally { if (txController != null) { txController.close(); } } }
@Test public void testNullsOnConstructor() throws Exception { String id = "id"; StreamObserver so = mock(StreamObserver.class); Function p = mock(Function.class); expectNPE(() -> new GrpcObserverAdapter(null, so, p)); expectNPE(() -> new GrpcObserverAdapter(null, null, p)); expectNPE(() -> new GrpcObserverAdapter(null, null, null)); expectNPE(() -> new GrpcObserverAdapter(id, so, null)); expectNPE(() -> new GrpcObserverAdapter(id, null, p)); expectNPE(() -> new GrpcObserverAdapter(id, null, null)); }
@Override public void createStorageFormat(MetaProto.StorageFormatParam storageFormat, StreamObserver<StatusProto.ResponseStatus> responseStreamObserver) { TransactionController txController = null; try { txController = ConnectionPool.INSTANCE().getTxController(); ActionResponse input = new ActionResponse(); input.setParam(storageFormat); txController.setAutoCommit(false); txController.addAction(new CreateStorageFormatAction()); txController.commit(input); responseStreamObserver.onNext(MetaConstants.OKStatus); responseStreamObserver.onCompleted(); } catch (ParaFlowException e) { responseStreamObserver.onNext(e.getResponseStatus()); responseStreamObserver.onCompleted(); e.handle(); } finally { if (txController != null) { txController.close(); } } }
@Override public void createFunc(MetaProto.FuncParam func, StreamObserver<StatusProto.ResponseStatus> responseStreamObserver) { TransactionController txController = null; try { txController = ConnectionPool.INSTANCE().getTxController(); ActionResponse input = new ActionResponse(); input.setParam(func); txController.setAutoCommit(false); txController.addAction(new CreateFuncAction()); txController.commit(input); responseStreamObserver.onNext(MetaConstants.OKStatus); responseStreamObserver.onCompleted(); } catch (ParaFlowException e) { responseStreamObserver.onNext(e.getResponseStatus()); responseStreamObserver.onCompleted(); e.handle(); } finally { if (txController != null) { txController.close(); } } }
@Override public void execute(RemoteInvocationRequest request, StreamObserver<RemoteInvocationResponse> responseObserver) { try { ByteArrayInputStream in = new ByteArrayInputStream(request.getData().toByteArray()); ObjectInputStream is = new ObjectInputStream(in); RemoteInvocation remoteInvocation = (RemoteInvocation) is.readObject(); RemoteInvocationResult remoteInvocationResult = exporter.invokeForInvocation(remoteInvocation); ByteArrayOutputStream out = new ByteArrayOutputStream(); ObjectOutputStream os = new ObjectOutputStream(out); os.writeObject(remoteInvocationResult); responseObserver.onNext(RemoteInvocationResponse.newBuilder().setData(ByteString.copyFrom(out.toByteArray())).build()); responseObserver.onCompleted(); } catch (Exception e) { responseObserver.onError(e); } }
@Override public void invoke(byte[] msg, StreamObserver<byte[]> responseObserver) { final RequestBody reqBody = RequestBody.create(OCTET_STREAM, msg); final Request req = new Request.Builder().url(url).post(reqBody).build(); try { try (Response resp = client.newCall(req).execute()) { final ResponseBody respBody = resp.body(); if (respBody != null) { responseObserver.onNext(respBody.bytes()); } } responseObserver.onCompleted(); } catch (IOException e) { responseObserver.onError(e); } }
@Override public void subscribe(SubscribeRequest request, StreamObserver<Empty> responseObserver) { // TODO Auto-generated method stub String serviceName=request.getServiceName(); HostInfo hostInfo=request.getHostInfo(); Register.subscribe(serviceName, hostInfo); Empty result=Empty.newBuilder().build(); responseObserver.onNext(result); responseObserver.onCompleted(); }
@Override public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) { try{ HelloReply reply = HelloReply.newBuilder().setMessage(("Hello "+request.getName())).build(); responseObserver.onNext(reply); responseObserver.onCompleted(); }catch (Exception ex){ responseObserver.onError(Status.INTERNAL.withCause(ex).withDescription("err").asRuntimeException()); } }
@Override public void subscribe(@NonNull MSG_SubscriptionRequest request, @NonNull StreamObserver<MSG_Notification> responseObserver) { SubscriptionRequestTO req = converter.fromProto(request); resetDebugInfo(req); BlockingStreamObserver<MSG_Notification> resp = new BlockingStreamObserver<>(req.toString(), (ServerCallStreamObserver) responseObserver); final boolean idOnly = req.idOnly(); store.subscribe(req, new GrpcObserverAdapter(req.toString(), resp, f -> idOnly ? converter .createNotificationFor(f.id()) : converter.createNotificationFor(f))); }
@Override public StreamObserver<HelloRequest> sayHelloClientStream( StreamObserver<HelloReply> responseObserver) { System.out.println(RpcContext.getContext().get("123")); return new StreamObserver<HelloRequest>() { private StringBuilder sb = new StringBuilder(); @Override public void onNext(HelloRequest value) { sb.append(value.getName() + ", "); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onCompleted() { HelloReply reply = new HelloReply(); reply.setMessage(sb.toString()); responseObserver.onNext(reply); responseObserver.onCompleted(); } }; }
/** * Implements a unary -> stream call as {@link Single} -> {@link Flowable}, where the server responds with a * stream of messages. */ public static <TRequest, TResponse> void oneToMany( TRequest request, StreamObserver<TResponse> responseObserver, Function<Single<TRequest>, Flowable<TResponse>> delegate) { try { Single<TRequest> rxRequest = Single.just(request); Flowable<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest)); rxResponse.subscribe(new ReactivePublisherBackpressureOnReadyHandler<TResponse>( (ServerCallStreamObserver<TResponse>) responseObserver)); } catch (Throwable throwable) { responseObserver.onError(prepareError(throwable)); } }
@Override public void getConsumerHostInfos(GetConsumerRequest request, StreamObserver<GetConsumerResponse> responseObserver) { // TODO Auto-generated method stub String serviceName=request.getServiceName(); ArrayList<HostInfo> consumerHostInfos=Register.getConsumerHostInfos(serviceName); GetConsumerResponse result=GetConsumerResponse.newBuilder() .addAllHostInfos(consumerHostInfos) .build(); responseObserver.onNext(result); responseObserver.onCompleted(); }
/** * Implements a unary -> stream call as {@link Mono} -> {@link Flux}, where the server responds with a * stream of messages. */ public static <TRequest, TResponse> Flux<TResponse> oneToMany( Mono<TRequest> rxRequest, BiConsumer<TRequest, StreamObserver<TResponse>> delegate) { try { ReactorConsumerStreamObserver<TRequest, TResponse> consumerStreamObserver = new ReactorConsumerStreamObserver<>(); rxRequest.subscribe(request -> delegate.accept(request, consumerStreamObserver)); return ((Flux<TResponse>) consumerStreamObserver.getRxConsumer()) .transform(Operators.lift(new SubscribeOnlyOnceLifter<TResponse>())); } catch (Throwable throwable) { return Flux.error(throwable); } }
@Override public StreamObserver<ClusterAPIProtos.ToRpcServerMessage> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { StreamObserver<ClusterAPIProtos.ToRpcServerMessage> result = this.queue.poll(timeout, unit); if (result == null) { throw new TimeoutException(); } else { return result; } }
@Test public void downloadProducts() throws Exception { doAnswer(invocation -> { PublishSubject<Product> publishSubject = (PublishSubject<Product>) invocation.getArguments()[1]; publishSubject.onNext(Product.getDefaultInstance()); publishSubject.onComplete(); return null; }).when(productDao).downloadProducts(any(), any()); List<Product> downloadedProducts = Lists.newArrayList(); AtomicBoolean onCompletedCalled = new AtomicBoolean(false); StreamObserver<Product> downloadObserver = new StreamObserver<Product>() { @Override public void onNext(Product value) { downloadedProducts.add(value); } @Override public void onError(Throwable t) { fail("should not fail"); } @Override public void onCompleted() { onCompletedCalled.compareAndSet(false, true); } }; productReadService.downloadProducts(DownloadProductsRequest.getDefaultInstance(), downloadObserver); verify(productDao, times(1)).downloadProducts(any(), any()); assertThat(downloadedProducts).containsOnly(Product.getDefaultInstance()); assertThat(onCompletedCalled).isTrue(); }
@Override public void addService(ServiceRequest request, StreamObserver<Empty> responseObserver) { // TODO Auto-generated method stub String serviceName=request.getServiceName(); HostInfo hostInfo=request.getHostInfo(); Register.addService(serviceName, hostInfo); Empty result=Empty.newBuilder().build(); responseObserver.onNext(result); responseObserver.onCompleted(); }
@Override public void executeQuery(CypherQueryString req, StreamObserver<CypherQueryResult> responseObserver) { try (Transaction tx = db.beginTx()) { Result result = db.execute(req.getQuery()); result.stream().forEach(stringObjectMap -> { CypherQueryResult r = CypherQueryResult.newBuilder().setResult(stringObjectMap.toString()).build(); responseObserver.onNext(r); }); tx.success(); } responseObserver.onCompleted(); }
@Override public synchronized void createShelf( CreateShelfRequest request, StreamObserver<Shelf> responseObserver) { String id = shelfIdCounter.getAndIncrement() + ""; shelfsById.put(id, request.getShelf().toBuilder().setId(id).build()); responseObserver.onNext(shelfsById.get(id)); responseObserver.onCompleted(); }
@Override public synchronized void listShelves( ListShelvesRequest request, StreamObserver<ListShelvesResponse> responseObserver) { NavigableMap<String, Shelf> cursor = shelfsById; // Resume iteration from the page token. if (!request.getPageToken().isEmpty()) { String pageToken = decodePageToken(request.getPageToken()); cursor = cursor.tailMap(pageToken, false); } ImmutableList<Shelf> shelves = cursor .values() .stream() .limit(request.getPageSize() > 0 ? request.getPageSize() : DEFAULT_PAGE_SIZE) .collect(ImmutableList.toImmutableList()); // Return one page of results. ListShelvesResponse.Builder responseBuilder = ListShelvesResponse.newBuilder().addAllShelves(shelves); // Set next page token to resume iteration in the next request. if (cursor.values().size() > shelves.size()) { String nextPageToken = encodePageToken(shelves.get(shelves.size() - 1).getId()); responseBuilder.setNextPageToken(nextPageToken); } responseObserver.onNext(responseBuilder.build()); responseObserver.onCompleted(); }
@Override public void sayHelloServerStream(HelloRequest hellorequest, StreamObserver<HelloReply> responseObserver) { System.out.println(RpcContext.getContext().get("123")); try { for (int i = 0; i < 10; i++) { HelloReply reply = new HelloReply(); reply.setMessage(hellorequest.getName()); responseObserver.onNext(reply); } } catch (Exception e) { responseObserver.onError(e); } responseObserver.onCompleted(); }
@Override public synchronized void listBooks( ListBooksRequest request, StreamObserver<ListBooksResponse> responseObserver) { NavigableMap<String, Book> cursor = booksById; // Resume iteration from the page token. if (!request.getPageToken().isEmpty()) { String pageToken = decodePageToken(request.getPageToken()); cursor = cursor.tailMap(pageToken, false); } ImmutableList<Book> books = cursor .values() .stream() .limit(request.getPageSize() > 0 ? request.getPageSize() : DEFAULT_PAGE_SIZE) .collect(ImmutableList.toImmutableList()); // Return one page of results. ListBooksResponse.Builder responseBuilder = ListBooksResponse.newBuilder().addAllBooks(books); // Set next page token to resume iteration in the next request. if (cursor.values().size() > books.size()) { String nextPageToken = encodePageToken(books.get(books.size() - 1).getId()); responseBuilder.setNextPageToken(nextPageToken); } responseObserver.onNext(responseBuilder.build()); responseObserver.onCompleted(); }
@Override public synchronized void deleteBook( DeleteBookRequest request, StreamObserver<Empty> responseObserver) { if (booksById.remove(request.getId()) == null) { throw new RuntimeException(String.format("Book with id=%s not found", request.getId())); } }
@Override public void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ToRpcServerMessage> msg) { RpcSessionCreationFuture future = pendingSessionMap.remove(msgUid); if (future != null) { try { future.onMsg(msg); } catch (InterruptedException e) { log.warn("Failed to report created session!"); } } else { log.warn("Failed to lookup pending session!"); } }
@Override public void onTxEvent(GrpcTxEvent request, StreamObserver<GrpcAck> responseObserver) { events.offer(new TxEvent( EventType.valueOf(request.getType()), request.getGlobalTxId(), request.getLocalTxId(), request.getParentTxId(), request.getCompensationMethod(), new String(request.getPayloads().toByteArray()))); sleep(); if (EventType.TxAbortedEvent.name().equals(request.getType())) { this.responseObserver.onNext(GrpcCompensateCommand .newBuilder() .setGlobalTxId(request.getGlobalTxId()) .build()); } if ("TxStartedEvent".equals(request.getType()) && request.getCompensationMethod().equals("reject")) { responseObserver.onNext(GrpcAck.newBuilder().setAborted(true).build()); } else { responseObserver.onNext(GrpcAck.newBuilder().setAborted(false).build()); } responseObserver.onCompleted(); }
@Override public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) { OmegaCallback callback = omegaCallbacks.getOrDefault(request.getServiceName(), emptyMap()) .remove(request.getInstanceId()); if (callback != null) { callback.disconnect(); } responseObserver.onNext(ALLOW); responseObserver.onCompleted(); }
@Test public void multipleContextTransfersOneHopSync() throws Exception { Metadata.Key<String> ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); Metadata.Key<String> l5dKey = Metadata.Key.of("l5d-context-key", Metadata.ASCII_STRING_MARSHALLER); String expectedCtxValue = "context-value"; AtomicReference<String> ctxValue = new AtomicReference<>(); AtomicReference<String> l5dValue = new AtomicReference<>(); // Service GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { @Override public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) { ctxValue.set(AmbientContext.current().get(ctxKey)); l5dValue.set(AmbientContext.current().get(l5dKey)); responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); responseObserver.onCompleted(); } }; // Plumbing serverRule1.getServiceRegistry().addService(ServerInterceptors.intercept(svc, new AmbientContextServerInterceptor("ctx-"), new AmbientContextServerInterceptor("l5d-"))); GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc .newBlockingStub(serverRule1.getChannel()) .withInterceptors( new AmbientContextClientInterceptor("ctx-"), new AmbientContextClientInterceptor("l5d-")); // Test AmbientContext.initialize(Context.current()).run(() -> { AmbientContext.current().put(ctxKey, expectedCtxValue); AmbientContext.current().put(l5dKey, expectedCtxValue); stub.sayHello(HelloRequest.newBuilder().setName("world").build()); }); assertThat(ctxValue.get()).isEqualTo(expectedCtxValue); assertThat(l5dValue.get()).isEqualTo(expectedCtxValue); }