/** * Echo the request headers from a client into response headers and trailers. Useful for * testing end-to-end metadata propagation. */ private static ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key<?>... keys) { final Set<Metadata.Key<?>> keySet = new HashSet<Metadata.Key<?>>(Arrays.asList(keys)); return new ServerInterceptor() { @Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( ServerCall<ReqT, RespT> call, final Metadata requestHeaders, ServerCallHandler<ReqT, RespT> next) { return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) { @Override public void sendHeaders(Metadata responseHeaders) { responseHeaders.merge(requestHeaders, keySet); super.sendHeaders(responseHeaders); } @Override public void close(Status status, Metadata trailers) { trailers.merge(requestHeaders, keySet); super.close(status, trailers); } }, requestHeaders); } }; }
public Server startServer() throws IOException { ServerInterceptor headersInterceptor = new TracingMetadataUtils.ServerHeadersInterceptor(); NettyServerBuilder b = NettyServerBuilder.forPort(workerOptions.listenPort) .addService(ServerInterceptors.intercept(actionCacheServer, headersInterceptor)) .addService(ServerInterceptors.intercept(bsServer, headersInterceptor)) .addService(ServerInterceptors.intercept(casServer, headersInterceptor)); if (execServer != null) { b.addService(ServerInterceptors.intercept(execServer, headersInterceptor)); b.addService(ServerInterceptors.intercept(watchServer, headersInterceptor)); } else { logger.info("Execution disabled, only serving cache requests."); } Server server = b.build(); logger.log(INFO, "Starting gRPC server on port {0,number,#}.", workerOptions.listenPort); server.start(); return server; }
/** * Echoes request headers with the specified key(s) from a client into response headers only. */ private static ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key<?>... keys) { final Set<Metadata.Key<?>> keySet = new HashSet<Metadata.Key<?>>(Arrays.asList(keys)); return new ServerInterceptor() { @Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( ServerCall<ReqT, RespT> call, final Metadata requestHeaders, ServerCallHandler<ReqT, RespT> next) { return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) { @Override public void sendHeaders(Metadata responseHeaders) { responseHeaders.merge(requestHeaders, keySet); super.sendHeaders(responseHeaders); } @Override public void close(Status status, Metadata trailers) { super.close(status, trailers); } }, requestHeaders); } }; }
/** * Echoes request headers with the specified key(s) from a client into response trailers only. */ private static ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key<?>... keys) { final Set<Metadata.Key<?>> keySet = new HashSet<Metadata.Key<?>>(Arrays.asList(keys)); return new ServerInterceptor() { @Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( ServerCall<ReqT, RespT> call, final Metadata requestHeaders, ServerCallHandler<ReqT, RespT> next) { return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) { @Override public void sendHeaders(Metadata responseHeaders) { super.sendHeaders(responseHeaders); } @Override public void close(Status status, Metadata trailers) { trailers.merge(requestHeaders, keySet); super.close(status, trailers); } }, requestHeaders); } }; }
@Override protected AbstractServerImplBuilder<?> getServerBuilder() { return NettyServerBuilder.forPort(0) .maxMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) .compressorRegistry(compressors) .decompressorRegistry(decompressors) .intercept(new ServerInterceptor() { @Override public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { Listener<ReqT> listener = next.startCall(call, headers); // TODO(carl-mastrangelo): check that encoding was set. call.setMessageCompression(true); return listener; } }); }
/** * Construct a server. * * @param builder builder with configuration for server * @param transportServer transport server that will create new incoming transports * @param rootContext context that callbacks for new RPCs should be derived from */ ServerImpl( AbstractServerImplBuilder<?> builder, InternalServer transportServer, Context rootContext) { this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool"); this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder"); this.fallbackRegistry = Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry"); this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer"); // Fork from the passed in context so that it does not propagate cancellation, it only // inherits values. this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext").fork(); this.decompressorRegistry = builder.decompressorRegistry; this.compressorRegistry = builder.compressorRegistry; this.transportFilters = Collections.unmodifiableList( new ArrayList<ServerTransportFilter>(builder.transportFilters)); this.interceptors = builder.interceptors.toArray(new ServerInterceptor[builder.interceptors.size()]); this.handshakeTimeoutMillis = builder.handshakeTimeoutMillis; }
/** Never returns {@code null}. */ private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName, ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers, Context.CancellableContext context, StatsTraceContext statsTraceCtx) { // TODO(ejona86): should we update fullMethodName to have the canonical path of the method? ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<ReqT, RespT>( stream, methodDef.getMethodDescriptor(), headers, context, decompressorRegistry, compressorRegistry); ServerCallHandler<ReqT, RespT> callHandler = methodDef.getServerCallHandler(); statsTraceCtx.serverCallStarted( new ServerCallInfoImpl<ReqT, RespT>( methodDef.getMethodDescriptor(), call.getAttributes(), call.getAuthority())); for (ServerInterceptor interceptor : interceptors) { callHandler = InternalServerInterceptors.interceptCallHandler(interceptor, callHandler); } ServerCall.Listener<ReqT> listener = callHandler.startCall(call, headers); if (listener == null) { throw new NullPointerException( "startCall() returned a null listener for method " + fullMethodName); } return call.newServerStreamListener(listener); }
private Iterable<ServerServiceDefinition> buildServerServiceDefinitionsWithInterceptors() { List<ServerInterceptor> serverInterceptors = ImmutableList.copyOf(serverInterceptors()); return ImmutableList.copyOf(Iterables.stream(serverServiceDefinitions()) .map(s -> applyServiceInterceptor(s, serverInterceptors)) .iterator()); }
private Iterable<ServerInterceptor> buildServerInterceptors() { ImmutableList.Builder<ServerInterceptor> builder = ImmutableList.builder(); builder.addAll(enabledServerInterceptors()); for (GrpcPlugin<? super ConfigT> plugin : grpcPlugins()) { builder.addAll(plugin.serverInterceptors()); } return builder.build(); }
@Test public void enabledServerInterceptorsDefaultsToEmpty() { MockGrpcApplication application = new MockGrpcApplication(configContext, serverBuilder); Iterable<ServerInterceptor> serverInterceptors = application.enabledServerInterceptors(); assertThat(serverInterceptors) .isNotNull() .isEmpty(); }
@Test public void serverInterceptorsIncludesApplicationAndPluginInterceptors() { Iterable<ServerInterceptor> applicationInterceptors = ImmutableList.of( mock(ServerInterceptor.class), mock(ServerInterceptor.class)); Iterable<ServerInterceptor> pluginInterceptors = ImmutableList.of( mock(ServerInterceptor.class), mock(ServerInterceptor.class)); GrpcPlugin<GrpcConfigSection> plugin = new GrpcPluginBase<GrpcConfigSection>( applicationResolver, configContext) { @Override public Iterable<ServerInterceptor> serverInterceptors() { return pluginInterceptors; } }; MockGrpcApplication application = new MockGrpcApplication(configContext, serverBuilder) { @Override protected Iterable<Plugin<? super GrpcConfigSection>> enabledPlugins() { return ImmutableList.of(plugin); } @Override protected Iterable<ServerInterceptor> enabledServerInterceptors() { return applicationInterceptors; } }; assertThat(application.serverInterceptors()) .containsAll(applicationInterceptors) .containsAll(pluginInterceptors); }
private void startServer() { AbstractServerImplBuilder<?> builder = getServerBuilder(); if (builder == null) { server = null; return; } testServiceExecutor = Executors.newScheduledThreadPool(2); List<ServerInterceptor> allInterceptors = ImmutableList.<ServerInterceptor>builder() .add(recordServerCallInterceptor(serverCallCapture)) .add(TestUtils.recordRequestHeadersInterceptor(requestHeadersCapture)) .add(recordContextInterceptor(contextCapture)) .addAll(TestServiceImpl.interceptors()) .build(); builder .addService( ServerInterceptors.intercept( new TestServiceImpl(testServiceExecutor), allInterceptors)) .addStreamTracerFactory(serverStreamTracerFactory); io.grpc.internal.TestingAccessor.setStatsImplementation( builder, new CensusStatsModule( tagger, tagContextBinarySerializer, serverStatsRecorder, GrpcUtil.STOPWATCH_SUPPLIER, true)); try { server = builder.build().start(); } catch (IOException ex) { throw new RuntimeException(ex); } }
/** * Captures the request attributes. Useful for testing ServerCalls. * {@link ServerCall#getAttributes()} */ private static ServerInterceptor recordServerCallInterceptor( final AtomicReference<ServerCall<?, ?>> serverCallCapture) { return new ServerInterceptor() { @Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( ServerCall<ReqT, RespT> call, Metadata requestHeaders, ServerCallHandler<ReqT, RespT> next) { serverCallCapture.set(call); return next.startCall(call, requestHeaders); } }; }
private static ServerInterceptor recordContextInterceptor( final AtomicReference<Context> contextCapture) { return new ServerInterceptor() { @Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( ServerCall<ReqT, RespT> call, Metadata requestHeaders, ServerCallHandler<ReqT, RespT> next) { contextCapture.set(Context.current()); return next.startCall(call, requestHeaders); } }; }
/** Returns interceptors necessary for full service implementation. */ public static List<ServerInterceptor> interceptors() { return Arrays.asList( echoRequestHeadersInterceptor(Util.METADATA_KEY), echoRequestMetadataInHeaders(Util.ECHO_INITIAL_METADATA_KEY), echoRequestMetadataInTrailers(Util.ECHO_TRAILING_METADATA_KEY)); }
private void startServer(int serverFlowControlWindow) { ServerBuilder<?> builder = NettyServerBuilder.forAddress(new InetSocketAddress("localhost", 0)) .flowControlWindow(serverFlowControlWindow); builder.addService(ServerInterceptors.intercept( new TestServiceImpl(Executors.newScheduledThreadPool(2)), ImmutableList.<ServerInterceptor>of())); try { server = builder.build().start(); } catch (IOException e) { throw new RuntimeException(e); } }
/** * Capture the request headers from a client. Useful for testing metadata propagation. */ public static ServerInterceptor recordRequestHeadersInterceptor( final AtomicReference<Metadata> headersCapture) { return new ServerInterceptor() { @Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( ServerCall<ReqT, RespT> call, Metadata requestHeaders, ServerCallHandler<ReqT, RespT> next) { headersCapture.set(requestHeaders); return next.startCall(call, requestHeaders); } }; }
public static ServerInterceptor instance() { return new HeaderServerInterceptor(); }
public GrpcInterceptorRegistration addInterceptor(ServerInterceptor serverInterceptor) { GrpcInterceptorRegistration grpcInterceptorRegistration = new GrpcInterceptorRegistration(serverInterceptor); registrations.add(grpcInterceptorRegistration); return grpcInterceptorRegistration; }
public GrpcInterceptorRegistration(ServerInterceptor serverInterceptor) { this.serverInterceptor = serverInterceptor; }
public ServerInterceptor getServerInterceptor() { return serverInterceptor; }
public void setServerInterceptor(ServerInterceptor serverInterceptor) { this.serverInterceptor = serverInterceptor; }
/** * {@inheritDoc} */ @Override public final Iterable<ServerInterceptor> serverInterceptors() { return serverInterceptors.get(); }
@VisibleForTesting ServerServiceDefinition applyServiceInterceptor( ServerServiceDefinition serverServiceDefinition, List<ServerInterceptor> serverInterceptors) { return ServerInterceptors.intercept(serverServiceDefinition, serverInterceptors); }
/** * {@inheritDoc} */ @Override public Iterable<ServerInterceptor> serverInterceptors() { return ImmutableList.of(); }
@Override public DropwizardServerBuilder intercept(final ServerInterceptor interceptor) { origin.intercept(interceptor); return this; }
/** * Create a tree of client to server calls where each received call on the server * fans out to two downstream calls. Uses SimpleRequest.response_size to limit the nodeCount * of the tree. One of the leaves will ABORT to trigger cancellation back up to tree. */ private void startCallTreeServer(int depthThreshold) throws IOException { final AtomicInteger nodeCount = new AtomicInteger((2 << depthThreshold) - 1); server = InProcessServerBuilder.forName("channel").executor(otherWork).addService( ServerInterceptors.intercept(service, new ServerInterceptor() { @Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( final ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { // Respond with the headers but nothing else. call.sendHeaders(new Metadata()); call.request(1); return new ServerCall.Listener<ReqT>() { @Override public void onMessage(final ReqT message) { Messages.SimpleRequest req = (Messages.SimpleRequest) message; if (nodeCount.decrementAndGet() == 0) { // we are in the final leaf node so trigger an ABORT upwards Context.currentContextExecutor(otherWork).execute(new Runnable() { @Override public void run() { call.close(Status.ABORTED, new Metadata()); } }); } else if (req.getResponseSize() != 0) { // We are in a non leaf node so fire off two requests req = req.toBuilder().setResponseSize(req.getResponseSize() - 1).build(); for (int i = 0; i < 2; i++) { asyncStub.unaryCall(req, new StreamObserver<Messages.SimpleResponse>() { @Override public void onNext(Messages.SimpleResponse value) { } @Override public void onError(Throwable t) { Status status = Status.fromThrowable(t); if (status.getCode() == Status.Code.CANCELLED) { observedCancellations.countDown(); } // Propagate closure upwards. try { call.close(status, new Metadata()); } catch (IllegalStateException t2) { // Ignore error if already closed. } } @Override public void onCompleted() { } }); } } } @Override public void onCancel() { receivedCancellations.countDown(); } }; } }) ).build(); server.start(); }
@Nullable @Override public ServerInterceptor getServerInterceptor(String fullMethodName) { return null; }
@Override public final T intercept(ServerInterceptor interceptor) { interceptors.add(interceptor); return thisT(); }
public static ServerInterceptor instance() { return new TransmitStatusRuntimeExceptionInterceptor(); }
@Test public void binaryLogTest() throws Exception { final List<Object> capturedReqs = new ArrayList<Object>(); final class TracingClientInterceptor implements ClientInterceptor { private final List<MethodDescriptor<?, ?>> interceptedMethods = new ArrayList<MethodDescriptor<?, ?>>(); @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { interceptedMethods.add(method); return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { @Override public void sendMessage(ReqT message) { capturedReqs.add(message); super.sendMessage(message); } }; } } TracingClientInterceptor userInterceptor = new TracingClientInterceptor(); binlogProvider = new BinaryLogProvider() { @Nullable @Override public ServerInterceptor getServerInterceptor(String fullMethodName) { return null; } @Override public ClientInterceptor getClientInterceptor(String fullMethodName) { return new TracingClientInterceptor(); } @Override protected int priority() { return 0; } }; createChannel( new FakeNameResolverFactory(true), Collections.<ClientInterceptor>singletonList(userInterceptor)); ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT.withDeadlineAfter(0, TimeUnit.NANOSECONDS)); ClientCall.Listener<Integer> listener = new NoopClientCallListener<Integer>(); call.start(listener, new Metadata()); assertEquals(1, executor.runDueTasks()); String actualRequest = "hello world"; call.sendMessage(actualRequest); // The user supplied interceptor must still operate on the original message types assertThat(userInterceptor.interceptedMethods).hasSize(1); assertSame( method.getRequestMarshaller(), userInterceptor.interceptedMethods.get(0).getRequestMarshaller()); assertSame( method.getResponseMarshaller(), userInterceptor.interceptedMethods.get(0).getResponseMarshaller()); // The binlog interceptor must be closest to the transport assertThat(capturedReqs).hasSize(2); // The InputStream is already spent, so just check its type rather than contents assertEquals(actualRequest, capturedReqs.get(0)); assertThat(capturedReqs.get(1)).isInstanceOf(InputStream.class); }
@Override public ServerInterceptor getServerInterceptor(String fullMethodName) { return null; }
@Nullable @Override public ServerInterceptor getServerInterceptor(String fullMethodName) { throw new UnsupportedOperationException(); }
private void adapteGrpcServerInterceptor(){ List<RpcServerInterceptor> interceptors = getInterceptors(); for(RpcServerInterceptor interceptor : interceptors){ if(ServerInterceptor.class.isAssignableFrom(interceptor.getClass())){ adaptedInterceptors.add((ServerInterceptor) interceptor); logger.debug("Found rpc server interceptor '{}'", interceptor.getClass().getName()); } } }
/** * Returns the {@link ServerInterceptor}s provided by the application itself. Similar to * {@link ApplicationBase#enabledPlugins()}, implementors should also call {@code super.enabledServerInterceptors()} * and merge its result with theirs in order to support default service interceptors. */ protected Iterable<ServerInterceptor> enabledServerInterceptors() { return ImmutableList.of(); }
/** * Returns the {@link ServerInterceptor}s that are provided by this plugin. The {@link GrpcApplication} applies them * to all {@link ServerServiceDefinition}s (aggregated from the app and all its plugins) during the application * start phase. */ Iterable<ServerInterceptor> serverInterceptors();
/** * Returns a {@link ServerInterceptor} for binary logging. gRPC is free to cache the interceptor, * so the interceptor must be reusable across calls. At runtime, the request and response * marshallers are always {@code Marshaller<InputStream>}. * Returns {@code null} if this method is not binary logged. */ // TODO(zpencer): ensure the interceptor properly handles retries and hedging @Nullable public abstract ServerInterceptor getServerInterceptor(String fullMethodName);
Iterable<ServerInterceptor> serverInterceptors();