@Nullable @Override public NameResolver newNameResolver(URI targetUri, Attributes params) { if (SCHEME.equals(targetUri.getScheme())) { String targetPath = Preconditions.checkNotNull(targetUri.getPath(), "targetPath"); Preconditions.checkArgument(targetPath.startsWith("/"), "the path component (%s) of the target (%s) must start with '/'", targetPath, targetUri); String[] parts = targetPath.split("/"); if (parts.length != 4) { throw new IllegalArgumentException("Must be formatted like kubernetes:///{namespace}/{service}/{port}"); } try { int port = Integer.valueOf(parts[3]); return new KubernetesNameResolver(parts[1], parts[2], port, params, GrpcUtil.TIMER_SERVICE, GrpcUtil.SHARED_CHANNEL_EXECUTOR); } catch (NumberFormatException e) { throw new IllegalArgumentException("Unable to parse port number", e); } } else { return null; } }
CronetClientTransport( StreamBuilderFactory streamFactory, InetSocketAddress address, String authority, @Nullable String userAgent, Executor executor, int maxMessageSize, boolean alwaysUsePut, TransportTracer transportTracer) { this.address = Preconditions.checkNotNull(address, "address"); this.authority = authority; this.userAgent = GrpcUtil.getGrpcUserAgent("cronet", userAgent); this.maxMessageSize = maxMessageSize; this.alwaysUsePut = alwaysUsePut; this.executor = Preconditions.checkNotNull(executor, "executor"); this.streamFactory = Preconditions.checkNotNull(streamFactory, "streamFactory"); this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer"); }
private void setGrpcHeaders(BidirectionalStream.Builder builder) { // Psuedo-headers are set by cronet. // All non-pseudo headers must come after pseudo headers. // TODO(ericgribkoff): remove this and set it on CronetEngine after crbug.com/588204 gets fixed. builder.addHeader(USER_AGENT_KEY.name(), userAgent); builder.addHeader(CONTENT_TYPE_KEY.name(), GrpcUtil.CONTENT_TYPE_GRPC); builder.addHeader("te", GrpcUtil.TE_TRAILERS); // Now add any application-provided headers. // TODO(ericgribkoff): make a String-based version to avoid unnecessary conversion between // String and byte array. byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(headers); for (int i = 0; i < serializedHeaders.length; i += 2) { String key = new String(serializedHeaders[i], Charset.forName("UTF-8")); // TODO(ericgribkoff): log an error or throw an exception if (isApplicationHeader(key)) { String value = new String(serializedHeaders[i + 1], Charset.forName("UTF-8")); builder.addHeader(key, value); } } }
@Override public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) { if (errorCode == ErrorCode.ENHANCE_YOUR_CALM) { String data = debugData.utf8(); log.log(Level.WARNING, String.format( "%s: Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: %s", this, data)); if ("too_many_pings".equals(data)) { tooManyPingsRunnable.run(); } } Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode) .augmentDescription("Received Goaway"); if (debugData.size() > 0) { // If a debug message was provided, use it. status = status.augmentDescription(debugData.utf8()); } startGoAway(lastGoodStreamId, null, status); }
@Override protected Attributes getNameResolverParams() { int defaultPort; switch (negotiationType) { case PLAINTEXT: defaultPort = GrpcUtil.DEFAULT_PORT_PLAINTEXT; break; case TLS: defaultPort = GrpcUtil.DEFAULT_PORT_SSL; break; default: throw new AssertionError(negotiationType + " not handled"); } return Attributes.newBuilder() .set(NameResolver.Factory.PARAMS_DEFAULT_PORT, defaultPort).build(); }
@Test public void addDefaultUserAgent() throws Exception { initTransport(); MockStreamListener listener = new MockStreamListener(); OkHttpClientStream stream = clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); stream.start(listener); Header userAgentHeader = new Header(GrpcUtil.USER_AGENT_KEY.name(), GrpcUtil.getGrpcUserAgent("okhttp", null)); List<Header> expectedHeaders = Arrays.asList(SCHEME_HEADER, METHOD_HEADER, new Header(Header.TARGET_AUTHORITY, "notarealauthority:80"), new Header(Header.TARGET_PATH, "/" + method.getFullMethodName()), userAgentHeader, CONTENT_TYPE_HEADER, TE_HEADER); verify(frameWriter, timeout(TIME_OUT_MS)) .synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders)); getStream(3).cancel(Status.CANCELLED); shutdownAndVerify(); }
@Test public void overrideDefaultUserAgent() throws Exception { startTransport(3, null, true, DEFAULT_MAX_MESSAGE_SIZE, "fakeUserAgent"); MockStreamListener listener = new MockStreamListener(); OkHttpClientStream stream = clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); stream.start(listener); List<Header> expectedHeaders = Arrays.asList(SCHEME_HEADER, METHOD_HEADER, new Header(Header.TARGET_AUTHORITY, "notarealauthority:80"), new Header(Header.TARGET_PATH, "/" + method.getFullMethodName()), new Header(GrpcUtil.USER_AGENT_KEY.name(), GrpcUtil.getGrpcUserAgent("okhttp", "fakeUserAgent")), CONTENT_TYPE_HEADER, TE_HEADER); verify(frameWriter, timeout(TIME_OUT_MS)) .synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders)); getStream(3).cancel(Status.CANCELLED); shutdownAndVerify(); }
@Test public void start_headerFieldOrder() { Metadata metaData = new Metadata(); metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application"); stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport, flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application", StatsTraceContext.NOOP, transportTracer); stream.start(new BaseClientStreamListener()); stream.transportState().start(3); verify(frameWriter).synStream(eq(false), eq(false), eq(3), eq(0), headersCaptor.capture()); assertThat(headersCaptor.getValue()).containsExactly( Headers.SCHEME_HEADER, Headers.METHOD_HEADER, new Header(Header.TARGET_AUTHORITY, "localhost"), new Header(Header.TARGET_PATH, "/" + methodDescriptor.getFullMethodName()), new Header(GrpcUtil.USER_AGENT_KEY.name(), "good-application"), Headers.CONTENT_TYPE_HEADER, Headers.TE_HEADER) .inOrder(); }
private OkHttpChannelBuilder createChannelBuilder() { OkHttpChannelBuilder builder = OkHttpChannelBuilder.forAddress("localhost", getPort()) .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) .connectionSpec(new ConnectionSpec.Builder(OkHttpChannelBuilder.DEFAULT_CONNECTION_SPEC) .cipherSuites(TestUtils.preferredTestCiphers().toArray(new String[0])) .tlsVersions(ConnectionSpec.MODERN_TLS.tlsVersions().toArray(new TlsVersion[0])) .build()) .overrideAuthority(GrpcUtil.authorityFromHostAndPort( TestUtils.TEST_SERVER_HOST, getPort())); io.grpc.internal.TestingAccessor.setStatsImplementation( builder, createClientCensusStatsModule()); try { builder.sslSocketFactory(TestUtils.newSslSocketFactoryForCa(Platform.get().getProvider(), TestUtils.loadCert("ca.pem"))); } catch (Exception e) { throw new RuntimeException(e); } return builder; }
@Test public void wrongHostNameFailHostnameVerification() throws Exception { ManagedChannel channel = createChannelBuilder() .overrideAuthority(GrpcUtil.authorityFromHostAndPort( BAD_HOSTNAME, getPort())) .build(); TestServiceGrpc.TestServiceBlockingStub blockingStub = TestServiceGrpc.newBlockingStub(channel); Throwable actualThrown = null; try { blockingStub.emptyCall(Empty.getDefaultInstance()); } catch (Throwable t) { actualThrown = t; } assertNotNull("The rpc should have been failed due to hostname verification", actualThrown); Throwable cause = Throwables.getRootCause(actualThrown); assertTrue( "Failed by unexpected exception: " + cause, cause instanceof SSLPeerUnverifiedException); channel.shutdown(); }
@Test public void hostnameVerifierWithBadHostname() throws Exception { ManagedChannel channel = createChannelBuilder() .overrideAuthority(GrpcUtil.authorityFromHostAndPort( BAD_HOSTNAME, getPort())) .hostnameVerifier(new HostnameVerifier() { @Override public boolean verify(String hostname, SSLSession session) { return true; } }) .build(); TestServiceGrpc.TestServiceBlockingStub blockingStub = TestServiceGrpc.newBlockingStub(channel); blockingStub.emptyCall(Empty.getDefaultInstance()); channel.shutdown(); }
/** * Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will * be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel} * may happen immediately, even before the TLS Handshake is complete. */ public static ProtocolNegotiator tls(SslContext sslContext, String authority) { Preconditions.checkNotNull(sslContext, "sslContext"); URI uri = GrpcUtil.authorityToUri(Preconditions.checkNotNull(authority, "authority")); String host; int port; if (uri.getHost() != null) { host = uri.getHost(); port = uri.getPort(); } else { /* * Implementation note: We pick -1 as the port here rather than deriving it from the original * socket address. The SSL engine doens't use this port number when contacting the remote * server, but rather it is used for other things like SSL Session caching. When an invalid * authority is provided (like "bad_cert"), picking the original port and passing it in would * mean that the port might used under the assumption that it was correct. By using -1 here, * it forces the SSL implementation to treat it as invalid. */ host = authority; port = -1; } return new TlsNegotiator(sslContext, host, port); }
NettyClientTransport( SocketAddress address, Class<? extends Channel> channelType, Map<ChannelOption<?>, ?> channelOptions, EventLoopGroup group, ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent, Runnable tooManyPingsRunnable, TransportTracer transportTracer) { this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator"); this.address = Preconditions.checkNotNull(address, "address"); this.group = Preconditions.checkNotNull(group, "group"); this.channelType = Preconditions.checkNotNull(channelType, "channelType"); this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions"); this.flowControlWindow = flowControlWindow; this.maxMessageSize = maxMessageSize; this.maxHeaderListSize = maxHeaderListSize; this.keepAliveTimeNanos = keepAliveTimeNanos; this.keepAliveTimeoutNanos = keepAliveTimeoutNanos; this.keepAliveWithoutCalls = keepAliveWithoutCalls; this.authority = new AsciiString(authority); this.userAgent = new AsciiString(GrpcUtil.getGrpcUserAgent("netty", userAgent)); this.tooManyPingsRunnable = Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer"); }
public static Http2Headers convertClientHeaders(Metadata headers, AsciiString scheme, AsciiString defaultPath, AsciiString authority, AsciiString method, AsciiString userAgent) { Preconditions.checkNotNull(defaultPath, "defaultPath"); Preconditions.checkNotNull(authority, "authority"); Preconditions.checkNotNull(method, "method"); // Discard any application supplied duplicates of the reserved headers headers.discardAll(CONTENT_TYPE_KEY); headers.discardAll(GrpcUtil.TE_HEADER); headers.discardAll(GrpcUtil.USER_AGENT_KEY); return GrpcHttp2OutboundHeaders.clientRequestHeaders( toHttp2Headers(headers), authority, defaultPath, method, scheme, userAgent); }
@Override @CheckReturnValue protected Attributes getNameResolverParams() { int defaultPort; switch (negotiationType) { case PLAINTEXT: case PLAINTEXT_UPGRADE: defaultPort = GrpcUtil.DEFAULT_PORT_PLAINTEXT; break; case TLS: defaultPort = GrpcUtil.DEFAULT_PORT_SSL; break; default: throw new AssertionError(negotiationType + " not handled"); } return Attributes.newBuilder() .set(NameResolver.Factory.PARAMS_DEFAULT_PORT, defaultPort).build(); }
@Override protected void manualSetUp() throws Exception { assertNull("manualSetUp should not run more than once", handler()); initChannel(new GrpcHttp2ServerHeadersDecoder(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE)); // replace the keepAliveManager with spyKeepAliveManager spyKeepAliveManager = mock(KeepAliveManager.class, delegatesTo(handler().getKeepAliveManagerForTest())); handler().setKeepAliveManagerForTest(spyKeepAliveManager); // Simulate receipt of the connection preface handler().handleProtocolNegotiationCompleted(Attributes.EMPTY); channelRead(Http2CodecUtil.connectionPrefaceBuf()); // Simulate receipt of initial remote settings. ByteBuf serializedSettings = serializeSettings(new Http2Settings()); channelRead(serializedSettings); }
@Test public void setSoLingerChannelOption() throws IOException { startServer(); Map<ChannelOption<?>, Object> channelOptions = new HashMap<ChannelOption<?>, Object>(); // set SO_LINGER option int soLinger = 123; channelOptions.put(ChannelOption.SO_LINGER, soLinger); NettyClientTransport transport = new NettyClientTransport( address, NioSocketChannel.class, channelOptions, group, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */, tooManyPingsRunnable, new TransportTracer()); transports.add(transport); callMeMaybe(transport.start(clientTransportListener)); // verify SO_LINGER has been set ChannelConfig config = transport.channel().config(); assertTrue(config instanceof SocketChannelConfig); assertEquals(soLinger, ((SocketChannelConfig) config).getSoLinger()); }
@Test public void maxMessageSizeShouldBeEnforced() throws Throwable { startServer(); // Allow the response payloads of up to 1 byte. NettyClientTransport transport = newTransport(newNegotiator(), 1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null, true); callMeMaybe(transport.start(clientTransportListener)); try { // Send a single RPC and wait for the response. new Rpc(transport).halfClose().waitForResponse(); fail("Expected the stream to fail."); } catch (ExecutionException e) { Status status = Status.fromThrowable(e); assertEquals(Code.RESOURCE_EXHAUSTED, status.getCode()); assertTrue("Missing exceeds maximum from: " + status.getDescription(), status.getDescription().contains("exceeds maximum")); } }
@Test public void getAttributes_negotiatorHandler() throws Exception { address = TestUtils.testServerAddress(12345); authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); NettyClientTransport transport = newTransport( new ProtocolNegotiator() { @Override public Handler newHandler(GrpcHttp2ConnectionHandler handler) { return null; } }); assertEquals(Attributes.EMPTY, transport.getAttributes()); transports.clear(); }
private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException { server = new NettyServer( TestUtils.testServerAddress(0), NioServerSocketChannel.class, new HashMap<ChannelOption<?>, Object>(), group, group, negotiator, Collections.<ServerStreamTracer.Factory>emptyList(), TransportTracer.getDefaultFactory(), maxStreamsPerConnection, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize, DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS, MAX_CONNECTION_IDLE_NANOS_DISABLED, MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0); server.start(serverListener); address = TestUtils.testServerAddress(server.getPort()); authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); }
@Test public void convertServerHeaders_sanitizes() { Metadata metaData = new Metadata(); // Intentionally being explicit here rather than relying on any pre-defined lists of headers, // since the goal of this test is to validate the correctness of such lists in the first place. metaData.put(GrpcUtil.CONTENT_TYPE_KEY, "to-be-removed"); metaData.put(GrpcUtil.TE_HEADER, "to-be-removed"); metaData.put(GrpcUtil.USER_AGENT_KEY, "to-be-removed"); metaData.put(userKey, userValue); Http2Headers output = Utils.convertServerHeaders(metaData); DefaultHttp2Headers headers = new DefaultHttp2Headers(); for (Map.Entry<CharSequence, CharSequence> entry : output) { headers.add(entry.getKey(), entry.getValue()); } // 2 reserved headers, 1 user header assertEquals(2 + 1, headers.size()); assertEquals(Utils.CONTENT_TYPE_GRPC, headers.get(GrpcUtil.CONTENT_TYPE_KEY.name())); }
@Test public void removeUserAgentFromApplicationHeaders() { Metadata metadata = new Metadata(); metadata.put(GrpcUtil.USER_AGENT_KEY, "bad agent"); listener = mock(ClientStreamListener.class); Mockito.reset(writeQueue); when(writeQueue.enqueue(any(QueuedCommand.class), any(boolean.class))).thenReturn(future); stream = new NettyClientStream( new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE), methodDescriptor, new Metadata(), channel, AsciiString.of("localhost"), AsciiString.of("http"), AsciiString.of("good agent"), StatsTraceContext.NOOP, transportTracer); stream.start(listener); ArgumentCaptor<CreateStreamCommand> cmdCap = ArgumentCaptor.forClass(CreateStreamCommand.class); verify(writeQueue).enqueue(cmdCap.capture(), eq(false)); assertThat(ImmutableListMultimap.copyOf(cmdCap.getValue().headers())) .containsEntry(Utils.USER_AGENT, AsciiString.of("good agent")); }
@Override public synchronized ClientStream newStream( final MethodDescriptor<?, ?> method, final Metadata headers, final CallOptions callOptions) { if (shutdownStatus != null) { final Status capturedStatus = shutdownStatus; final StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(callOptions, headers); return new NoopClientStream() { @Override public void start(ClientStreamListener listener) { statsTraceCtx.clientOutboundHeaders(); statsTraceCtx.streamClosed(capturedStatus); listener.closed(capturedStatus, new Metadata()); } }; } headers.put(GrpcUtil.USER_AGENT_KEY, userAgent); return new InProcessStream(method, headers, callOptions, authority).clientStream; }
@Nullable @Override public ConsulNameResolver newNameResolver(final URI targetUri, final Attributes params) { if (!SCHEME.equals(targetUri.getScheme())) { return null; } final String targetPath = checkNotNull(targetUri.getPath(), "targetPath"); checkArgument(targetPath.startsWith("/")); final String serviceName = targetPath.substring(1); checkArgument(serviceName.length() > 0, "serviceName"); String consulHost = targetUri.getHost(); if (Strings.isNullOrEmpty(consulHost)) { consulHost = DEFAULT_HOST; } int consulPort = targetUri.getPort(); if (consulPort == -1) { consulPort = DEFAULT_PORT; } final String tag = Strings.emptyToNull(targetUri.getFragment()); final ConsulClient consulClient = ConsulClientManager.getInstance(consulHost, consulPort); return new ConsulNameResolver( consulClient /* CatalogClient */, consulClient /* KeyValueClient */, serviceName, Optional.ofNullable(tag), GrpcUtil.TIMER_SERVICE, GrpcUtil.SHARED_CHANNEL_EXECUTOR ); }
@Before public void setup() { resolver = new ConsulNameResolver( catalogClient, keyValueClient, SERVICE_NAME, Optional.empty(), GrpcUtil.TIMER_SERVICE, GrpcUtil.SHARED_CHANNEL_EXECUTOR, 1, TimeUnit.SECONDS ); }
/** Say hello to server. */ public void greet(final String name) { final ClientCall<HelloRequest, HelloReply> call = channel.newCall(GreeterGrpc.METHOD_SAY_HELLO, CallOptions.DEFAULT); final CountDownLatch latch = new CountDownLatch(1); call.start(new Listener<HelloReply>() { @Override public void onHeaders(Metadata headers) { super.onHeaders(headers); String encoding = headers.get(GrpcUtil.MESSAGE_ENCODING_KEY); if (encoding == null) { throw new RuntimeException("No compression selected!"); } } @Override public void onMessage(HelloReply message) { super.onMessage(message); logger.info("Greeting: " + message.getMessage()); latch.countDown(); } @Override public void onClose(Status status, Metadata trailers) { latch.countDown(); if (!status.isOk()) { throw status.asRuntimeException(); } } }, new Metadata()); call.setMessageCompression(true); call.sendMessage(HelloRequest.newBuilder().setName(name).build()); call.request(1); call.halfClose(); Uninterruptibles.awaitUninterruptibly(latch, 100, TimeUnit.SECONDS); }
@Override public void start(Listener listener) { synchronized (lock) { Preconditions.checkState(this.listener == null, "already started"); this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR); this.listener = Preconditions.checkNotNull(listener, "listener"); resolve(); } }
@Override public void shutdown() { if (shutdown) { return; } shutdown = true; synchronized (lock) { if (executor != null) { executor = SharedResourceHolder.release(GrpcUtil.SHARED_CHANNEL_EXECUTOR, executor); } } }
@Override public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { return new GrpclbLoadBalancer( helper, PickFirstBalancerFactory.getInstance(), RoundRobinLoadBalancerFactory.getInstance(), // TODO(zhangkun83): balancer sends load reporting RPCs from it, which also involves // channelExecutor thus may also run other tasks queued in the channelExecutor. If such // load should not be on the shared scheduled executor, we should use a combination of the // scheduled executor and the default app executor. SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE), TIME_PROVIDER); }
OkHttpClientTransport(InetSocketAddress address, String authority, @Nullable String userAgent, Executor executor, @Nullable SSLSocketFactory sslSocketFactory, @Nullable HostnameVerifier hostnameVerifier, ConnectionSpec connectionSpec, int maxMessageSize, @Nullable InetSocketAddress proxyAddress, @Nullable String proxyUsername, @Nullable String proxyPassword, Runnable tooManyPingsRunnable, TransportTracer transportTracer) { this.address = Preconditions.checkNotNull(address, "address"); this.defaultAuthority = authority; this.maxMessageSize = maxMessageSize; this.executor = Preconditions.checkNotNull(executor, "executor"); serializingExecutor = new SerializingExecutor(executor); // Client initiated streams are odd, server initiated ones are even. Server should not need to // use it. We start clients at 3 to avoid conflicting with HTTP negotiation. nextStreamId = 3; this.sslSocketFactory = sslSocketFactory; this.hostnameVerifier = hostnameVerifier; this.connectionSpec = Preconditions.checkNotNull(connectionSpec, "connectionSpec"); this.stopwatchFactory = GrpcUtil.STOPWATCH_SUPPLIER; this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent); this.proxyAddress = proxyAddress; this.proxyUsername = proxyUsername; this.proxyPassword = proxyPassword; this.tooManyPingsRunnable = Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); this.transportTracer = Preconditions.checkNotNull(transportTracer); initTransportTracer(); }
/** * Create a transport connected to a fake peer for test. */ @VisibleForTesting OkHttpClientTransport( String userAgent, Executor executor, FrameReader frameReader, FrameWriter testFrameWriter, int nextStreamId, Socket socket, Supplier<Stopwatch> stopwatchFactory, @Nullable Runnable connectingCallback, SettableFuture<Void> connectedFuture, int maxMessageSize, Runnable tooManyPingsRunnable, TransportTracer transportTracer) { address = null; this.maxMessageSize = maxMessageSize; defaultAuthority = "notarealauthority:80"; this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent); this.executor = Preconditions.checkNotNull(executor, "executor"); serializingExecutor = new SerializingExecutor(executor); this.testFrameReader = Preconditions.checkNotNull(frameReader, "frameReader"); this.testFrameWriter = Preconditions.checkNotNull(testFrameWriter, "testFrameWriter"); this.socket = Preconditions.checkNotNull(socket, "socket"); this.nextStreamId = nextStreamId; this.stopwatchFactory = stopwatchFactory; this.connectionSpec = null; this.connectingCallback = connectingCallback; this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture"); this.proxyAddress = null; this.proxyUsername = null; this.proxyPassword = null; this.tooManyPingsRunnable = Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer"); initTransportTracer(); }
@VisibleForTesting int getOverridenPort() { URI uri = GrpcUtil.authorityToUri(defaultAuthority); if (uri.getPort() != -1) { return uri.getPort(); } return address.getPort(); }
@Override public void run() { String threadName = Thread.currentThread().getName(); if (!GrpcUtil.IS_RESTRICTED_APPENGINE) { Thread.currentThread().setName("OkHttpClientTransport"); } try { // Read until the underlying socket closes. while (frameReader.nextFrame(this)) { if (keepAliveManager != null) { keepAliveManager.onDataReceived(); } } // frameReader.nextFrame() returns false when the underlying read encounters an IOException, // it may be triggered by the socket closing, in such case, the startGoAway() will do // nothing, otherwise, we finish all streams since it's a real IO issue. startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE.withDescription("End of stream or IOException")); } catch (Throwable t) { // TODO(madongfly): Send the exception message to the server. startGoAway( 0, ErrorCode.PROTOCOL_ERROR, Status.UNAVAILABLE.withDescription("error in frame handler").withCause(t)); } finally { try { frameReader.close(); } catch (IOException ex) { log.log(Level.INFO, "Exception closing frame reader", ex); } listener.transportTerminated(); if (!GrpcUtil.IS_RESTRICTED_APPENGINE) { // Restore the original thread name. Thread.currentThread().setName(threadName); } } }
@VisibleForTesting @Nullable SSLSocketFactory createSocketFactory() { switch (negotiationType) { case TLS: try { if (sslSocketFactory == null) { SSLContext sslContext; if (GrpcUtil.IS_RESTRICTED_APPENGINE) { // The following auth code circumvents the following AccessControlException: // access denied ("java.util.PropertyPermission" "javax.net.ssl.keyStore" "read") // Conscrypt will attempt to load the default KeyStore if a trust manager is not // provided, which is forbidden on AppEngine sslContext = SSLContext.getInstance("TLS", Platform.get().getProvider()); TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); trustManagerFactory.init((KeyStore) null); sslContext.init( null, trustManagerFactory.getTrustManagers(), // Use an algorithm that doesn't need /dev/urandom SecureRandom.getInstance("SHA1PRNG", Platform.get().getProvider())); } else { sslContext = SSLContext.getInstance("Default", Platform.get().getProvider()); } sslSocketFactory = sslContext.getSocketFactory(); } return sslSocketFactory; } catch (GeneralSecurityException gse) { throw new RuntimeException("TLS Provider failure", gse); } case PLAINTEXT: return null; default: throw new RuntimeException("Unknown negotiation type: " + negotiationType); } }
@Override public void close() { if (closed) { return; } closed = true; SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService); if (usingSharedExecutor) { SharedResourceHolder.release(SHARED_EXECUTOR, (ExecutorService) executor); } }
@Test public void createRequestHeaders_sanitizes() { Metadata metaData = new Metadata(); // Intentionally being explicit here rather than relying on any pre-defined lists of headers, // since the goal of this test is to validate the correctness of such lists in the first place. metaData.put(GrpcUtil.CONTENT_TYPE_KEY, "to-be-removed"); metaData.put(GrpcUtil.USER_AGENT_KEY, "to-be-removed"); metaData.put(GrpcUtil.TE_HEADER, "to-be-removed"); Metadata.Key<String> userKey = Metadata.Key.of("user-key", Metadata.ASCII_STRING_MARSHALLER); String userValue = "user-value"; metaData.put(userKey, userValue); String path = "//testServerice/test"; String authority = "localhost"; String userAgent = "useragent"; List<Header> headers = Headers.createRequestHeaders( metaData, path, authority, userAgent, false); // 7 reserved headers, 1 user header assertEquals(7 + 1, headers.size()); // Check the 3 reserved headers that are non pseudo // Users can not create pseudo headers keys so no need to check for them here assertThat(headers).contains(Headers.CONTENT_TYPE_HEADER); assertThat(headers).contains(new Header(GrpcUtil.USER_AGENT_KEY.name(), userAgent)); assertThat(headers).contains(new Header(GrpcUtil.TE_HEADER.name(), GrpcUtil.TE_TRAILERS)); // Check the user header is in tact assertThat(headers).contains(new Header(userKey.name(), userValue)); }
@Test public void start_userAgentRemoved() { Metadata metaData = new Metadata(); metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application"); stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport, flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application", StatsTraceContext.NOOP, transportTracer); stream.start(new BaseClientStreamListener()); stream.transportState().start(3); verify(frameWriter).synStream(eq(false), eq(false), eq(3), eq(0), headersCaptor.capture()); assertThat(headersCaptor.getValue()) .contains(new Header(GrpcUtil.USER_AGENT_KEY.name(), "good-application")); }
private void startServer() { AbstractServerImplBuilder<?> builder = getServerBuilder(); if (builder == null) { server = null; return; } testServiceExecutor = Executors.newScheduledThreadPool(2); List<ServerInterceptor> allInterceptors = ImmutableList.<ServerInterceptor>builder() .add(recordServerCallInterceptor(serverCallCapture)) .add(TestUtils.recordRequestHeadersInterceptor(requestHeadersCapture)) .add(recordContextInterceptor(contextCapture)) .addAll(TestServiceImpl.interceptors()) .build(); builder .addService( ServerInterceptors.intercept( new TestServiceImpl(testServiceExecutor), allInterceptors)) .addStreamTracerFactory(serverStreamTracerFactory); io.grpc.internal.TestingAccessor.setStatsImplementation( builder, new CensusStatsModule( tagger, tagContextBinarySerializer, serverStatsRecorder, GrpcUtil.STOPWATCH_SUPPLIER, true)); try { server = builder.build().start(); } catch (IOException ex) { throw new RuntimeException(ex); } }
@Test public void sendsTimeoutHeader() { Assume.assumeTrue("can not capture request headers on server side", server != null); long configuredTimeoutMinutes = 100; TestServiceGrpc.TestServiceBlockingStub stub = blockingStub.withDeadlineAfter(configuredTimeoutMinutes, TimeUnit.MINUTES); stub.emptyCall(EMPTY); long transferredTimeoutMinutes = TimeUnit.NANOSECONDS.toMinutes( requestHeadersCapture.get().get(GrpcUtil.TIMEOUT_KEY)); Assert.assertTrue( "configuredTimeoutMinutes=" + configuredTimeoutMinutes + ", transferredTimeoutMinutes=" + transferredTimeoutMinutes, configuredTimeoutMinutes - transferredTimeoutMinutes >= 0 && configuredTimeoutMinutes - transferredTimeoutMinutes <= 1); }
@Test public void hostnameVerifierWithCorrectHostname() throws Exception { ManagedChannel channel = createChannelBuilder() .overrideAuthority(GrpcUtil.authorityFromHostAndPort( TestUtils.TEST_SERVER_HOST, getPort())) .hostnameVerifier(new HostnameVerifier() { @Override public boolean verify(String hostname, SSLSession session) { return false; } }) .build(); TestServiceGrpc.TestServiceBlockingStub blockingStub = TestServiceGrpc.newBlockingStub(channel); Throwable actualThrown = null; try { blockingStub.emptyCall(Empty.getDefaultInstance()); } catch (Throwable t) { actualThrown = t; } assertNotNull("The rpc should have been failed due to hostname verification", actualThrown); Throwable cause = Throwables.getRootCause(actualThrown); assertTrue( "Failed by unexpected exception: " + cause, cause instanceof SSLPeerUnverifiedException); channel.shutdown(); }