private ByteBufOrStream getCompressedBody() { if (decompressor == Codec.Identity.NONE) { throw Status.INTERNAL.withDescription( DEBUG_STRING + ": Can't decode compressed frame as compression not configured.") .asRuntimeException(); } try { // Enforce the maxMessageSizeBytes limit on the returned stream. InputStream unlimitedStream = decompressor.decompress(new ByteBufInputStream(nextFrame, true)); return new ByteBufOrStream( new SizeEnforcingInputStream(unlimitedStream, maxMessageSizeBytes, DEBUG_STRING)); } catch (IOException e) { throw new RuntimeException(e); } }
@VisibleForTesting static void prepareHeaders( Metadata headers, DecompressorRegistry decompressorRegistry, Compressor compressor, boolean fullStreamDecompression) { headers.discardAll(MESSAGE_ENCODING_KEY); if (compressor != Codec.Identity.NONE) { headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding()); } headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY); byte[] advertisedEncodings = InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry); if (advertisedEncodings.length != 0) { headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings); } headers.discardAll(CONTENT_ENCODING_KEY); headers.discardAll(CONTENT_ACCEPT_ENCODING_KEY); if (fullStreamDecompression) { headers.put(CONTENT_ACCEPT_ENCODING_KEY, FULL_STREAM_DECOMPRESSION_ENCODINGS); } }
private InputStream getCompressedBody() { if (decompressor == Codec.Identity.NONE) { throw Status.INTERNAL.withDescription( debugString + ": Can't decode compressed frame as compression not configured.") .asRuntimeException(); } try { // Enforce the maxMessageSize limit on the returned stream. InputStream unlimitedStream = decompressor.decompress(ReadableBuffers.openStream(nextFrame, true)); return new SizeEnforcingInputStream( unlimitedStream, maxInboundMessageSize, statsTraceCtx, debugString); } catch (IOException e) { throw new RuntimeException(e); } }
@Test public void prepareHeaders_removeReservedHeaders() { Metadata m = new Metadata(); m.put(GrpcUtil.MESSAGE_ENCODING_KEY, "gzip"); m.put(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY, "gzip".getBytes(GrpcUtil.US_ASCII)); m.put(GrpcUtil.CONTENT_ENCODING_KEY, "gzip"); m.put(GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY, "gzip".getBytes(GrpcUtil.US_ASCII)); ClientCallImpl.prepareHeaders( m, DecompressorRegistry.emptyInstance(), Codec.Identity.NONE, false); assertNull(m.get(GrpcUtil.MESSAGE_ENCODING_KEY)); assertNull(m.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY)); assertNull(m.get(GrpcUtil.CONTENT_ENCODING_KEY)); assertNull(m.get(GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY)); }
@Test public void inboundHeadersReceived_disallowsContentAndMessageEncoding() { AbstractClientStream stream = new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer); stream.start(mockListener); Metadata headers = new Metadata(); headers.put(GrpcUtil.CONTENT_ENCODING_KEY, "gzip"); headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, new Codec.Gzip().getMessageEncoding()); stream.setFullStreamDecompression(true); stream.transportState().inboundHeadersReceived(headers); verifyNoMoreInteractions(mockListener); Throwable t = ((BaseTransportState) stream.transportState()).getDeframeFailedCause(); assertEquals(Status.INTERNAL.getCode(), Status.fromThrowable(t).getCode()); assertTrue( "unexpected deframe failed description", Status.fromThrowable(t) .getDescription() .equals("Full stream and gRPC message encoding cannot both be set")); }
@Test public void compressed() throws Exception { allocator = new BytesWritableBufferAllocator(100, Integer.MAX_VALUE); // setMessageCompression should default to true framer = new MessageFramer(sink, allocator, statsTraceCtx) .setCompressor(new Codec.Gzip()); writeKnownLength(framer, new byte[1000]); framer.flush(); // The GRPC header is written first as a separate frame. // The message count is only bumped when a message is completely written. verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(false), eq(0)); verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true), eq(1)); // Check the header ByteWritableBuffer buffer = frameCaptor.getAllValues().get(0); assertEquals(0x1, buffer.data[0]); ByteBuffer byteBuf = ByteBuffer.wrap(buffer.data, 1, 4); byteBuf.order(ByteOrder.BIG_ENDIAN); int length = byteBuf.getInt(); // compressed data should be smaller than uncompressed data. assertTrue(length < 1000); assertEquals(frameCaptor.getAllValues().get(1).size(), length); checkStats(length, 1000); }
@Test public void dontCompressIfNotRequested() throws Exception { allocator = new BytesWritableBufferAllocator(100, Integer.MAX_VALUE); framer = new MessageFramer(sink, allocator, statsTraceCtx) .setCompressor(new Codec.Gzip()) .setMessageCompression(false); writeKnownLength(framer, new byte[1000]); framer.flush(); // The GRPC header is written first as a separate frame verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true), eq(1)); // Check the header ByteWritableBuffer buffer = frameCaptor.getAllValues().get(0); // We purposefully don't check the last byte of length, since that depends on how exactly it // compressed. assertEquals(0x0, buffer.data[0]); ByteBuffer byteBuf = ByteBuffer.wrap(buffer.data, 1, 4); byteBuf.order(ByteOrder.BIG_ENDIAN); int length = byteBuf.getInt(); assertEquals(1000, length); assertEquals(buffer.data.length - 5 , length); checkStats(1000, 1000); }
@Override public void sendHeaders(Metadata unusedGrpcMetadata) { checkState(!sendHeadersCalled, "sendHeaders already called"); checkState(!closeCalled, "call is closed"); HttpHeaders headers = HttpHeaders.of(HttpStatus.OK); headers.contentType(serializationFormat.mediaType()); if (compressor == null || !messageCompression || clientAcceptEncoding == null) { compressor = Codec.Identity.NONE; } else { List<String> acceptedEncodingsList = ACCEPT_ENCODING_SPLITTER.splitToList(clientAcceptEncoding); if (!acceptedEncodingsList.contains(compressor.getMessageEncoding())) { // resort to using no compression. compressor = Codec.Identity.NONE; } } messageFramer.setCompressor(compressor); // Always put compressor, even if it's identity. headers.add(GrpcHeaderNames.GRPC_ENCODING, compressor.getMessageEncoding()); String advertisedEncodings = String.join(",", decompressorRegistry.getAdvertisedMessageEncodings()); if (!advertisedEncodings.isEmpty()) { headers.add(GrpcHeaderNames.GRPC_ACCEPT_ENCODING, advertisedEncodings); } sendHeadersCalled = true; res.write(headers); }
@Test public void uncompressedClient_compressedEndpoint() throws Exception { ManagedChannel nonDecompressingChannel = ManagedChannelBuilder.forAddress("127.0.0.1", server.httpPort()) .decompressorRegistry( DecompressorRegistry.emptyInstance() .with(Codec.Identity.NONE, false)) .usePlaintext(true) .build(); UnitTestServiceBlockingStub client = UnitTestServiceGrpc.newBlockingStub(nonDecompressingChannel); assertThat(client.staticUnaryCallSetsMessageCompression(REQUEST_MESSAGE)) .isEqualTo(RESPONSE_MESSAGE); nonDecompressingChannel.shutdownNow(); }
@Override public void sendHeaders(Metadata headers) { checkState(!sendHeadersCalled, "sendHeaders has already been called"); checkState(!closeCalled, "call is closed"); headers.discardAll(MESSAGE_ENCODING_KEY); if (compressor == null) { compressor = Codec.Identity.NONE; } else { if (messageAcceptEncoding != null) { // TODO(carl-mastrangelo): remove the string allocation. if (!GrpcUtil.iterableContains( ACCEPT_ENCODING_SPLITTER.split(new String(messageAcceptEncoding, GrpcUtil.US_ASCII)), compressor.getMessageEncoding())) { // resort to using no compression. compressor = Codec.Identity.NONE; } } else { compressor = Codec.Identity.NONE; } } // Always put compressor, even if it's identity. headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding()); stream.setCompressor(compressor); headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY); byte[] advertisedEncodings = InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry); if (advertisedEncodings.length != 0) { headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings); } // Don't check if sendMessage has been called, since it requires that sendHeaders was already // called. sendHeadersCalled = true; stream.writeHeaders(headers); }
protected TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer) { this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); this.transportTracer = checkNotNull(transportTracer, "transportTracer"); deframer = new MessageDeframer( this, Codec.Identity.NONE, maxMessageSize, statsTraceCtx, transportTracer, getClass().getName()); }
@Override public void setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor) { checkState(decompressor == Codec.Identity.NONE, "per-message decompressor already set"); checkState(this.fullStreamDecompressor == null, "full stream decompressor already set"); this.fullStreamDecompressor = checkNotNull(fullStreamDecompressor, "Can't pass a null full stream decompressor"); unprocessed = null; }
@Test public void setStream_sendsAllMessages() { stream.start(listener); stream.setCompressor(Codec.Identity.NONE); stream.setDecompressorRegistry(DecompressorRegistry.getDefaultInstance()); stream.setMessageCompression(true); InputStream message = new ByteArrayInputStream(new byte[]{'a'}); stream.writeMessage(message); stream.setMessageCompression(false); stream.writeMessage(message); stream.setStream(realStream); verify(realStream).setCompressor(Codec.Identity.NONE); verify(realStream).setDecompressorRegistry(DecompressorRegistry.getDefaultInstance()); verify(realStream).setMessageCompression(true); verify(realStream).setMessageCompression(false); verify(realStream, times(2)).writeMessage(message); verify(realStream).start(listenerCaptor.capture()); stream.writeMessage(message); verify(realStream, times(3)).writeMessage(message); verifyNoMoreInteractions(listener); listenerCaptor.getValue().onReady(); verify(listener).onReady(); }
@Test public void prepareHeaders_userAgentIgnored() { Metadata m = new Metadata(); m.put(GrpcUtil.USER_AGENT_KEY, "batmobile"); ClientCallImpl.prepareHeaders(m, decompressorRegistry, Codec.Identity.NONE, false); // User Agent is removed and set by the transport assertThat(m.get(GrpcUtil.USER_AGENT_KEY)).isNotNull(); }
@Test public void prepareHeaders_ignoreIdentityEncoding() { Metadata m = new Metadata(); ClientCallImpl.prepareHeaders(m, decompressorRegistry, Codec.Identity.NONE, false); assertNull(m.get(GrpcUtil.MESSAGE_ENCODING_KEY)); }
@Test public void prepareHeaders_noAcceptedContentEncodingsWithoutFullStreamDecompressionEnabled() { Metadata m = new Metadata(); ClientCallImpl.prepareHeaders(m, decompressorRegistry, Codec.Identity.NONE, false); assertNull(m.get(GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY)); }
@Test public void inboundHeadersReceived_acceptsGzipMessageEncoding() { AbstractClientStream stream = new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer); stream.start(mockListener); Metadata headers = new Metadata(); headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, new Codec.Gzip().getMessageEncoding()); stream.transportState().inboundHeadersReceived(headers); verify(mockListener).headersRead(headers); }
@Test public void inboundHeadersReceived_acceptsIdentityMessageEncoding() { AbstractClientStream stream = new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer); stream.start(mockListener); Metadata headers = new Metadata(); headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, Codec.Identity.NONE.getMessageEncoding()); stream.transportState().inboundHeadersReceived(headers); verify(mockListener).headersRead(headers); }
@Test public void endOfStreamWithInvalidGzipBlockShouldNotifyDeframerClosedWithPartialMessage() { assumeTrue("test only valid for full-stream compression", useGzipInflatingBuffer); // Create new deframer to allow writing bytes directly to the GzipInflatingBuffer MessageDeframer deframer = new MessageDeframer(listener, Codec.Identity.NONE, DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx, transportTracer, "test"); deframer.setFullStreamDecompressor(new GzipInflatingBuffer()); deframer.request(1); deframer.deframe(buffer(new byte[1])); deframer.closeWhenComplete(); verify(listener).deframerClosed(true); verifyNoMoreInteractions(listener); checkStats(tracer, transportTracer.getStats()); }
@Test public void compressed() { deframer = new MessageDeframer(listener, new Codec.Gzip(), DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx, transportTracer, "test"); deframer.request(1); byte[] payload = compress(new byte[1000]); assertTrue(payload.length < 100); byte[] header = new byte[]{1, 0, 0, 0, (byte) payload.length}; deframer.deframe(buffer(Bytes.concat(header, payload))); verify(listener).messagesAvailable(producer.capture()); assertEquals(Bytes.asList(new byte[1000]), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); }
@Test public void zeroLengthCompressibleMessageIsNotCompressed() { framer.setCompressor(new Codec.Gzip()); framer.setMessageCompression(true); writeKnownLength(framer, new byte[]{}); framer.flush(); verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true, 1); checkStats(0, 0); }
@Before public void setUp() throws Exception { clientDecompressors = clientDecompressors.with(Codec.Identity.NONE, false); serverDecompressors = serverDecompressors.with(Codec.Identity.NONE, false); }
@Test public void compression() throws Exception { if (clientAcceptEncoding) { clientDecompressors = clientDecompressors.with(clientCodec, true); } if (clientEncoding) { clientCompressors.register(clientCodec); } if (serverAcceptEncoding) { serverDecompressors = serverDecompressors.with(serverCodec, true); } if (serverEncoding) { serverCompressors.register(serverCodec); } server = ServerBuilder.forPort(0) .addService( ServerInterceptors.intercept(new LocalServer(), new ServerCompressorInterceptor())) .compressorRegistry(serverCompressors) .decompressorRegistry(serverDecompressors) .build() .start(); channel = ManagedChannelBuilder.forAddress("localhost", server.getPort()) .decompressorRegistry(clientDecompressors) .compressorRegistry(clientCompressors) .intercept(new ClientCompressorInterceptor()) .usePlaintext(true) .build(); stub = TestServiceGrpc.newBlockingStub(channel); stub.unaryCall(REQUEST); if (clientAcceptEncoding && serverEncoding) { assertEquals("fzip", clientResponseHeaders.get(MESSAGE_ENCODING_KEY)); if (enableServerMessageCompression) { assertTrue(clientCodec.anyRead); assertTrue(serverCodec.anyWritten); } else { assertFalse(clientCodec.anyRead); assertFalse(serverCodec.anyWritten); } } else { // Either identity or null is accepted. assertThat(clientResponseHeaders.get(MESSAGE_ENCODING_KEY)) .isAnyOf(Codec.Identity.NONE.getMessageEncoding(), null); assertFalse(clientCodec.anyRead); assertFalse(serverCodec.anyWritten); } if (serverAcceptEncoding) { assertEqualsString("fzip", clientResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY)); } else { assertNull(clientResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY)); } if (clientAcceptEncoding) { assertEqualsString("fzip", serverResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY)); } else { assertNull(serverResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY)); } // Second call, once the client knows what the server supports. if (clientEncoding && serverAcceptEncoding) { assertEquals("fzip", serverResponseHeaders.get(MESSAGE_ENCODING_KEY)); if (enableClientMessageCompression) { assertTrue(clientCodec.anyWritten); assertTrue(serverCodec.anyRead); } else { assertFalse(clientCodec.anyWritten); assertFalse(serverCodec.anyRead); } } else { assertNull(serverResponseHeaders.get(MESSAGE_ENCODING_KEY)); assertFalse(clientCodec.anyWritten); assertFalse(serverCodec.anyRead); } }
@BeforeClass public static void registerCompressors() { compressors.register(FZIPPER); compressors.register(Codec.Identity.NONE); }
public Fzip(String actualName, Codec delegate) { this.actualName = actualName; this.delegate = delegate; }
/** * Called by transport implementations when they receive headers. * * @param headers the parsed headers */ protected void inboundHeadersReceived(Metadata headers) { Preconditions.checkState(!statusReported, "Received headers on closed stream"); statsTraceCtx.clientInboundHeaders(); boolean compressedStream = false; String streamEncoding = headers.get(CONTENT_ENCODING_KEY); if (fullStreamDecompression && streamEncoding != null) { if (streamEncoding.equalsIgnoreCase("gzip")) { setFullStreamDecompressor(new GzipInflatingBuffer()); compressedStream = true; } else if (!streamEncoding.equalsIgnoreCase("identity")) { deframeFailed( Status.INTERNAL .withDescription( String.format("Can't find full stream decompressor for %s", streamEncoding)) .asRuntimeException()); return; } } String messageEncoding = headers.get(MESSAGE_ENCODING_KEY); if (messageEncoding != null) { Decompressor decompressor = decompressorRegistry.lookupDecompressor(messageEncoding); if (decompressor == null) { deframeFailed( Status.INTERNAL .withDescription(String.format("Can't find decompressor for %s", messageEncoding)) .asRuntimeException()); return; } else if (decompressor != Codec.Identity.NONE) { if (compressedStream) { deframeFailed( Status.INTERNAL .withDescription( String.format("Full stream and gRPC message encoding cannot both be set")) .asRuntimeException()); return; } setDecompressor(decompressor); } } listener().headersRead(headers); }