GrpcService(HandlerRegistry registry, Set<PathMapping> pathMappings, DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry, Set<SerializationFormat> supportedSerializationFormats, int maxOutboundMessageSizeBytes, int maxInboundMessageSizeBytes) { this.registry = requireNonNull(registry, "registry"); this.pathMappings = requireNonNull(pathMappings, "pathMappings"); this.decompressorRegistry = requireNonNull(decompressorRegistry, "decompressorRegistry"); this.compressorRegistry = requireNonNull(compressorRegistry, "compressorRegistry"); this.supportedSerializationFormats = supportedSerializationFormats; jsonMarshaller = jsonMarshaller(registry); this.maxOutboundMessageSizeBytes = maxOutboundMessageSizeBytes; this.maxInboundMessageSizeBytes = maxInboundMessageSizeBytes; }
/** * Constructs a new {@link GrpcService} that can be bound to * {@link com.linecorp.armeria.server.ServerBuilder}. It is recommended to bind the service to a server * using {@link com.linecorp.armeria.server.ServerBuilder#service(ServiceWithPathMappings)} to mount all * service paths without interfering with other services. */ public ServiceWithPathMappings<HttpRequest, HttpResponse> build() { HandlerRegistry handlerRegistry = registryBuilder.build(); GrpcService grpcService = new GrpcService( handlerRegistry, handlerRegistry .methods() .keySet() .stream() .map(path -> PathMapping.ofExact("/" + path)) .collect(ImmutableSet.toImmutableSet()), firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()), firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()), supportedSerializationFormats, maxOutboundMessageSizeBytes, maxInboundMessageSizeBytes); return enableUnframedRequests ? grpcService.decorate(UnframedGrpcService::new) : grpcService; }
@Before public void setUp() { when(ctx.alloc()).thenReturn(ByteBufAllocator.DEFAULT); call = new ArmeriaServerCall<>( HttpHeaders.of(), TestServiceGrpc.METHOD_UNARY_CALL, CompressorRegistry.getDefaultInstance(), DecompressorRegistry.getDefaultInstance(), res, MAX_MESSAGE_BYTES, MAX_MESSAGE_BYTES, ctx, GrpcSerializationFormats.PROTO, MessageMarshaller.builder().build()); call.setListener(listener); call.messageReader().onSubscribe(subscription); when(ctx.logBuilder()).thenReturn(new DefaultRequestLog(ctx)); when(ctx.alloc()).thenReturn(ByteBufAllocator.DEFAULT); }
@VisibleForTesting static void prepareHeaders( Metadata headers, DecompressorRegistry decompressorRegistry, Compressor compressor, boolean fullStreamDecompression) { headers.discardAll(MESSAGE_ENCODING_KEY); if (compressor != Codec.Identity.NONE) { headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding()); } headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY); byte[] advertisedEncodings = InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry); if (advertisedEncodings.length != 0) { headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings); } headers.discardAll(CONTENT_ENCODING_KEY); headers.discardAll(CONTENT_ACCEPT_ENCODING_KEY); if (fullStreamDecompression) { headers.put(CONTENT_ACCEPT_ENCODING_KEY, FULL_STREAM_DECOMPRESSION_ENCODINGS); } }
@Test public void prepareHeaders_removeReservedHeaders() { Metadata m = new Metadata(); m.put(GrpcUtil.MESSAGE_ENCODING_KEY, "gzip"); m.put(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY, "gzip".getBytes(GrpcUtil.US_ASCII)); m.put(GrpcUtil.CONTENT_ENCODING_KEY, "gzip"); m.put(GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY, "gzip".getBytes(GrpcUtil.US_ASCII)); ClientCallImpl.prepareHeaders( m, DecompressorRegistry.emptyInstance(), Codec.Identity.NONE, false); assertNull(m.get(GrpcUtil.MESSAGE_ENCODING_KEY)); assertNull(m.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY)); assertNull(m.get(GrpcUtil.CONTENT_ENCODING_KEY)); assertNull(m.get(GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY)); }
private void sendMessage_serverSendsOne_closeOnSecondCall( MethodDescriptor<Long, Long> method) { ServerCallImpl<Long, Long> serverCall = new ServerCallImpl<Long, Long>( stream, method, requestHeaders, context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance()); serverCall.sendHeaders(new Metadata()); serverCall.sendMessage(1L); verify(stream, times(1)).writeMessage(any(InputStream.class)); verify(stream, never()).close(any(Status.class), any(Metadata.class)); // trying to send a second message causes gRPC to close the underlying stream serverCall.sendMessage(1L); verify(stream, times(1)).writeMessage(any(InputStream.class)); ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class); verify(stream, times(1)).close(statusCaptor.capture(), metadataCaptor.capture()); assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode()); assertEquals(ServerCallImpl.TOO_MANY_RESPONSES, statusCaptor.getValue().getDescription()); assertTrue(metadataCaptor.getValue().keys().isEmpty()); }
private void sendMessage_serverSendsOne_closeOnSecondCall_appRunToCompletion( MethodDescriptor<Long, Long> method) { ServerCallImpl<Long, Long> serverCall = new ServerCallImpl<Long, Long>( stream, method, requestHeaders, context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance()); serverCall.sendHeaders(new Metadata()); serverCall.sendMessage(1L); serverCall.sendMessage(1L); verify(stream, times(1)).writeMessage(any(InputStream.class)); verify(stream, times(1)).close(any(Status.class), any(Metadata.class)); // App runs to completion but everything is ignored serverCall.sendMessage(1L); serverCall.close(Status.OK, new Metadata()); try { serverCall.close(Status.OK, new Metadata()); fail("calling a second time should still cause an error"); } catch (IllegalStateException expected) { // noop } }
private void serverSendsOne_okFailsOnMissingResponse( MethodDescriptor<Long, Long> method) { ServerCallImpl<Long, Long> serverCall = new ServerCallImpl<Long, Long>( stream, method, requestHeaders, context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance()); serverCall.close(Status.OK, new Metadata()); ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class); verify(stream, times(1)).close(statusCaptor.capture(), metadataCaptor.capture()); assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode()); assertEquals(ServerCallImpl.MISSING_RESPONSE, statusCaptor.getValue().getDescription()); assertTrue(metadataCaptor.getValue().keys().isEmpty()); }
ArmeriaClientCall( ClientRequestContext ctx, Client<HttpRequest, HttpResponse> httpClient, HttpRequestWriter req, MethodDescriptor<I, O> method, int maxOutboundMessageSizeBytes, int maxInboundMessageSizeBytes, CallOptions callOptions, CompressorRegistry compressorRegistry, DecompressorRegistry decompressorRegistry, SerializationFormat serializationFormat, @Nullable MessageMarshaller jsonMarshaller) { this.ctx = ctx; this.httpClient = httpClient; this.req = req; this.callOptions = callOptions; this.compressorRegistry = compressorRegistry; this.decompressorRegistry = decompressorRegistry; this.messageFramer = new ArmeriaMessageFramer(ctx.alloc(), maxOutboundMessageSizeBytes); this.marshaller = new GrpcMessageMarshaller<>( ctx.alloc(), serializationFormat, method, jsonMarshaller); responseReader = new HttpStreamReader( decompressorRegistry, new ArmeriaMessageDeframer(this, maxInboundMessageSizeBytes, ctx.alloc()), this); executor = callOptions.getExecutor(); }
@Override public <I, O> ClientCall<I, O> newCall( MethodDescriptor<I, O> method, CallOptions callOptions) { HttpRequestWriter req = HttpRequest.streaming( HttpHeaders .of(HttpMethod.POST, uri().getPath() + method.getFullMethodName()) .contentType(serializationFormat.mediaType())); ClientRequestContext ctx = newContext(HttpMethod.POST, req); ctx.logBuilder().serializationFormat(serializationFormat); ctx.logBuilder().requestContent(GrpcLogUtil.rpcRequest(method), null); ctx.logBuilder().deferResponseContent(); return new ArmeriaClientCall<>( ctx, httpClient, req, method, options().getOrElse(GrpcClientOptions.MAX_OUTBOUND_MESSAGE_SIZE_BYTES, ArmeriaMessageFramer.NO_MAX_OUTBOUND_MESSAGE_SIZE), options().getOrElse( GrpcClientOptions.MAX_INBOUND_MESSAGE_SIZE_BYTES, options().getOrElse( ClientOption.DEFAULT_MAX_RESPONSE_LENGTH, (long) DEFAULT_MAX_INBOUND_MESSAGE_SIZE).intValue()), callOptions, CompressorRegistry.getDefaultInstance(), DecompressorRegistry.getDefaultInstance(), serializationFormat, jsonMarshaller); }
ArmeriaServerCall(HttpHeaders clientHeaders, MethodDescriptor<I, O> method, CompressorRegistry compressorRegistry, DecompressorRegistry decompressorRegistry, HttpResponseWriter res, int maxInboundMessageSizeBytes, int maxOutboundMessageSizeBytes, ServiceRequestContext ctx, SerializationFormat serializationFormat, MessageMarshaller jsonMarshaller) { requireNonNull(clientHeaders, "clientHeaders"); this.method = requireNonNull(method, "method"); this.ctx = requireNonNull(ctx, "ctx"); this.serializationFormat = requireNonNull(serializationFormat, "serializationFormat"); this.messageReader = new HttpStreamReader( requireNonNull(decompressorRegistry, "decompressorRegistry"), new ArmeriaMessageDeframer( this, maxInboundMessageSizeBytes, ctx.alloc()) .decompressor(clientDecompressor(clientHeaders, decompressorRegistry)), this); this.messageFramer = new ArmeriaMessageFramer(ctx.alloc(), maxOutboundMessageSizeBytes); this.res = requireNonNull(res, "res"); this.compressorRegistry = requireNonNull(compressorRegistry, "compressorRegistry"); this.clientAcceptEncoding = Strings.emptyToNull(clientHeaders.get(GrpcHeaderNames.GRPC_ACCEPT_ENCODING)); this.decompressorRegistry = requireNonNull(decompressorRegistry, "decompressorRegistry"); marshaller = new GrpcMessageMarshaller<>(ctx.alloc(), serializationFormat, method, jsonMarshaller); }
private static Decompressor clientDecompressor(HttpHeaders headers, DecompressorRegistry registry) { String encoding = headers.get(GrpcHeaderNames.GRPC_ENCODING); if (encoding == null) { return Identity.NONE; } Decompressor decompressor = registry.lookupDecompressor(encoding); return firstNonNull(decompressor, Identity.NONE); }
public HttpStreamReader(DecompressorRegistry decompressorRegistry, ArmeriaMessageDeframer deframer, TransportStatusListener transportStatusListener) { this.decompressorRegistry = requireNonNull(decompressorRegistry, "decompressorRegistry"); this.deframer = requireNonNull(deframer, "deframer"); this.transportStatusListener = requireNonNull(transportStatusListener, "transportStatusListener"); }
@Test public void uncompressedClient_compressedEndpoint() throws Exception { ManagedChannel nonDecompressingChannel = ManagedChannelBuilder.forAddress("127.0.0.1", server.httpPort()) .decompressorRegistry( DecompressorRegistry.emptyInstance() .with(Codec.Identity.NONE, false)) .usePlaintext(true) .build(); UnitTestServiceBlockingStub client = UnitTestServiceGrpc.newBlockingStub(nonDecompressingChannel); assertThat(client.staticUnaryCallSetsMessageCompression(REQUEST_MESSAGE)) .isEqualTo(RESPONSE_MESSAGE); nonDecompressingChannel.shutdownNow(); }
@Override public final T decompressorRegistry(DecompressorRegistry registry) { if (registry != null) { this.decompressorRegistry = registry; } else { this.decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY; } return thisT(); }
ServerCallImpl(ServerStream stream, MethodDescriptor<ReqT, RespT> method, Metadata inboundHeaders, Context.CancellableContext context, DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry) { this.stream = stream; this.method = method; this.context = context; this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY); this.decompressorRegistry = decompressorRegistry; this.compressorRegistry = compressorRegistry; }
@Override public void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) { checkNotNull(decompressorRegistry, "decompressorRegistry"); delayOrExecute(new Runnable() { @Override public void run() { realStream.setDecompressorRegistry(decompressorRegistry); } }); }
@Override public final void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) { class DecompressorRegistryEntry implements BufferEntry { @Override public void runWith(Substream substream) { substream.stream.setDecompressorRegistry(decompressorRegistry); } } delayOrExecute(new DecompressorRegistryEntry()); }
@Override public final T decompressorRegistry(DecompressorRegistry registry) { if (registry != null) { decompressorRegistry = registry; } else { decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY; } return thisT(); }
@Test public void setStream_sendsAllMessages() { stream.start(listener); stream.setCompressor(Codec.Identity.NONE); stream.setDecompressorRegistry(DecompressorRegistry.getDefaultInstance()); stream.setMessageCompression(true); InputStream message = new ByteArrayInputStream(new byte[]{'a'}); stream.writeMessage(message); stream.setMessageCompression(false); stream.writeMessage(message); stream.setStream(realStream); verify(realStream).setCompressor(Codec.Identity.NONE); verify(realStream).setDecompressorRegistry(DecompressorRegistry.getDefaultInstance()); verify(realStream).setMessageCompression(true); verify(realStream).setMessageCompression(false); verify(realStream, times(2)).writeMessage(message); verify(realStream).start(listenerCaptor.capture()); stream.writeMessage(message); verify(realStream, times(3)).writeMessage(message); verifyNoMoreInteractions(listener); listenerCaptor.getValue().onReady(); verify(listener).onReady(); }
@Before public void setUp() { MockitoAnnotations.initMocks(this); context = Context.ROOT.withCancellation(); call = new ServerCallImpl<Long, Long>(stream, UNARY_METHOD, requestHeaders, context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance()); }
@Test public void decompressorRegistry_normal() { DecompressorRegistry decompressorRegistry = DecompressorRegistry.emptyInstance(); assertNotEquals(decompressorRegistry, builder.decompressorRegistry); assertEquals(builder, builder.decompressorRegistry(decompressorRegistry)); assertEquals(decompressorRegistry, builder.decompressorRegistry); }
@Test public void decompressorRegistry_null() { DecompressorRegistry defaultValue = builder.decompressorRegistry; assertEquals(builder, builder.decompressorRegistry(DecompressorRegistry.emptyInstance())); assertNotEquals(defaultValue, builder.decompressorRegistry); builder.decompressorRegistry(null); assertEquals(defaultValue, builder.decompressorRegistry); }
@Override public DropwizardServerBuilder decompressorRegistry(@Nullable final DecompressorRegistry registry) { origin.decompressorRegistry(registry); return this; }
/** * Sets the {@link DecompressorRegistry} to use when decompressing messages. If not set, will use * the default, which supports gzip only. */ public GrpcServiceBuilder decompressorRegistry(DecompressorRegistry registry) { decompressorRegistry = requireNonNull(registry, "registry"); return this; }
@Before public void setUp() { reader = new HttpStreamReader(DecompressorRegistry.getDefaultInstance(), deframer, transportStatusListener); }
@Override public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {}
ClientCallImpl<ReqT, RespT> setDecompressorRegistry(DecompressorRegistry decompressorRegistry) { this.decompressorRegistry = decompressorRegistry; return this; }
@Override public final void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) { transportState().setDecompressorRegistry(decompressorRegistry); }
private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) { Preconditions.checkState(this.listener == null, "Already called start"); this.decompressorRegistry = checkNotNull(decompressorRegistry, "decompressorRegistry"); }
/** * Sets the registry to find a decompressor for the framer. May only be called before {@link * #start}. If the transport does not support compression, this may do nothing. * * @param decompressorRegistry the registry of decompressors for decoding responses */ void setDecompressorRegistry(DecompressorRegistry decompressorRegistry);