@Override public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) { ClientTracer tracer = new ClientTracer(); // TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than // one streams. We will need to update this file to support them. if (streamTracerUpdater != null) { checkState( streamTracerUpdater.compareAndSet(this, null, tracer), "Are you creating multiple streams per call? This class doesn't yet support this case"); } else { checkState( streamTracer == null, "Are you creating multiple streams per call? This class doesn't yet support this case"); streamTracer = tracer; } if (module.propagateTags) { headers.discardAll(module.statsHeader); if (!module.tagger.empty().equals(parentCtx)) { headers.put(module.statsHeader, parentCtx); } } return tracer; }
private Substream createSubstream(int previousAttempts) { Substream sub = new Substream(previousAttempts); // one tracer per substream final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub); ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() { @Override public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) { return bufferSizeTracer; } }; Metadata newHeaders = updateHeaders(headers, previousAttempts); // NOTICE: This set _must_ be done before stream.start() and it actually is. sub.stream = newSubstream(tracerFactory, newHeaders); return sub; }
@Test public void clientStreamTracerTransfers() { ClientStreamTracer.Factory factory1 = new ClientStreamTracer.Factory() {}; ClientStreamTracer.Factory factory2 = new ClientStreamTracer.Factory() {}; CallOptions baseOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1); CallOptions defaultOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory2); DefaultCallOptionsClientInterceptor interceptor = new DefaultCallOptionsClientInterceptor(defaultOptions); CallOptions patchedOptions = interceptor.patchOptions(baseOptions); assertThat(patchedOptions.getStreamTracerFactories()).containsExactly(factory1, factory2); }
@Override public <ReqT> RetriableStream<ReqT> newRetriableStream( final MethodDescriptor<ReqT, ?> method, final CallOptions callOptions, final Metadata headers, final Context context) { RetryPolicy retryPolicy = retryPolicies == null ? DEFAULT : retryPolicies.get(method); return new RetriableStream<ReqT>( method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit, getCallExecutor(callOptions), transportFactory.getScheduledExecutorService(), retryPolicy) { @Override Status prestart() { return uncommittedRetriableStreamsRegistry.add(this); } @Override void postCommit() { uncommittedRetriableStreamsRegistry.remove(this); } @Override ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata newHeaders) { // TODO(zdapeng): only add tracer when retry is enabled. CallOptions newOptions = callOptions.withStreamTracerFactory(tracerFactory); ClientTransport transport = get(new PickSubchannelArgsImpl(method, newHeaders, newOptions)); Context origContext = context.attach(); try { return transport.newStream(method, newHeaders, newOptions); } finally { context.detach(origContext); } } }; }
/** * Factory method for the client-side. */ public static StatsTraceContext newClientContext(CallOptions callOptions, Metadata headers) { List<ClientStreamTracer.Factory> factories = callOptions.getStreamTracerFactories(); if (factories.isEmpty()) { return NOOP; } // This array will be iterated multiple times per RPC. Use primitive array instead of Collection // so that for-each doesn't create an Iterator every time. StreamTracer[] tracers = new StreamTracer[factories.size()]; for (int i = 0; i < tracers.length; i++) { tracers[i] = factories.get(i).newClientStreamTracer(callOptions, headers); } return new StatsTraceContext(tracers); }
@Test public void pickerReturnsStreamTracer_noDelay() { ClientStream mockStream = mock(ClientStream.class); ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class); ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class); createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); subchannel.requestConnection(); MockClientTransportInfo transportInfo = transports.poll(); transportInfo.listener.transportReady(); ClientTransport mockTransport = transportInfo.transport; when(mockTransport.newStream( any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) .thenReturn(mockStream); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( PickResult.withSubchannel(subchannel, factory2)); helper.updateBalancingState(READY, mockPicker); CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1); ClientCall<String, Integer> call = channel.newCall(method, callOptions); call.start(mockCallListener, new Metadata()); verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class)); verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture()); assertEquals( Arrays.asList(factory1, factory2), callOptionsCaptor.getValue().getStreamTracerFactories()); // The factories are safely not stubbed because we do not expect any usage of them. verifyZeroInteractions(factory1); verifyZeroInteractions(factory2); }
@Test public void pickerReturnsStreamTracer_delayed() { ClientStream mockStream = mock(ClientStream.class); ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class); ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class); createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1); ClientCall<String, Integer> call = channel.newCall(method, callOptions); call.start(mockCallListener, new Metadata()); Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); subchannel.requestConnection(); MockClientTransportInfo transportInfo = transports.poll(); transportInfo.listener.transportReady(); ClientTransport mockTransport = transportInfo.transport; when(mockTransport.newStream( any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) .thenReturn(mockStream); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( PickResult.withSubchannel(subchannel, factory2)); helper.updateBalancingState(READY, mockPicker); assertEquals(1, executor.runDueTasks()); verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class)); verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture()); assertEquals( Arrays.asList(factory1, factory2), callOptionsCaptor.getValue().getStreamTracerFactories()); // The factories are safely not stubbed because we do not expect any usage of them. verifyZeroInteractions(factory1); verifyZeroInteractions(factory2); }
@Override ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata metadata) { bufferSizeTracer = tracerFactory.newClientStreamTracer(CallOptions.DEFAULT, new Metadata()); int actualPreviousRpcAttemptsInHeader = metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS) == null ? 0 : Integer.valueOf(metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS)); return retriableStreamRecorder.newSubstream(actualPreviousRpcAttemptsInHeader); }
@Override public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) { callsStartedUpdater.getAndIncrement(this); return new StreamTracer(); }
@Override public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) { TestClientStreamTracer tracer = new TestClientStreamTracer(); clientStreamTracers.add(tracer); return tracer; }
/** * See {@link ClientStreamTracer#outboundHeaders}. For client-side only. * * <p>Transport-specific, thus should be called by transport implementations. */ public void clientOutboundHeaders() { for (StreamTracer tracer : tracers) { ((ClientStreamTracer) tracer).outboundHeaders(); } }
/** * See {@link ClientStreamTracer#inboundHeaders}. For client-side only. * * <p>Called from abstract stream implementations. */ public void clientInboundHeaders() { for (StreamTracer tracer : tracers) { ((ClientStreamTracer) tracer).inboundHeaders(); } }
/** * Returns a transport out of a PickResult, or {@code null} if the result is "buffer". */ @Nullable static ClientTransport getTransportFromPickResult(PickResult result, boolean isWaitForReady) { final ClientTransport transport; Subchannel subchannel = result.getSubchannel(); if (subchannel != null) { transport = ((AbstractSubchannel) subchannel).obtainActiveTransport(); } else { transport = null; } if (transport != null) { final ClientStreamTracer.Factory streamTracerFactory = result.getStreamTracerFactory(); if (streamTracerFactory == null) { return transport; } return new ClientTransport() { @Override public ClientStream newStream( MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) { return transport.newStream( method, headers, callOptions.withStreamTracerFactory(streamTracerFactory)); } @Override public void ping(PingCallback callback, Executor executor) { transport.ping(callback, executor); } @Override public LogId getLogId() { return transport.getLogId(); } @Override public ListenableFuture<TransportStats> getStats() { return transport.getStats(); } }; } if (!result.getStatus().isOk() && (result.isDrop() || !isWaitForReady)) { return new FailingClientTransport(result.getStatus()); } return null; }
@Override public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) { headers.discardAll(tracingHeader); headers.put(tracingHeader, span.getContext()); return new ClientTracer(span); }
@Test public void clientBasicTracingDefaultSpan() { CensusTracingModule.ClientCallTracer callTracer = censusTracing.newClientCallTracer(null, method); Metadata headers = new Metadata(); ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); verify(tracer).spanBuilderWithExplicitParent( eq("Sent.package1.service2.method3"), isNull(Span.class)); verify(spyClientSpan, never()).end(any(EndSpanOptions.class)); clientStreamTracer.outboundMessage(0); clientStreamTracer.outboundMessageSent(0, 882, -1); clientStreamTracer.inboundMessage(0); clientStreamTracer.outboundMessage(1); clientStreamTracer.outboundMessageSent(1, -1, 27); clientStreamTracer.inboundMessageRead(0, 255, 90); clientStreamTracer.streamClosed(Status.OK); callTracer.callEnded(Status.OK); InOrder inOrder = inOrder(spyClientSpan); inOrder.verify(spyClientSpan, times(3)).addNetworkEvent(networkEventCaptor.capture()); List<NetworkEvent> events = networkEventCaptor.getAllValues(); assertEquals( NetworkEvent.builder(Type.SENT, 0).setCompressedMessageSize(882).build(), events.get(0)); assertEquals( NetworkEvent.builder(Type.SENT, 1).setUncompressedMessageSize(27).build(), events.get(1)); assertEquals( NetworkEvent.builder(Type.RECV, 0) .setCompressedMessageSize(255) .setUncompressedMessageSize(90) .build(), events.get(2)); inOrder.verify(spyClientSpan).end( EndSpanOptions.builder() .setStatus(io.opencensus.trace.Status.OK) .setSampleToLocalSpanStore(false) .build()); verifyNoMoreInteractions(spyClientSpan); verifyNoMoreInteractions(tracer); }
/** * Creates a new physical ClientStream that represents a retry/hedging attempt. The returned * Client stream is not yet started. */ abstract ClientStream newSubstream( ClientStreamTracer.Factory tracerFactory, Metadata headers);