@Override public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions) { final Context timerContext = timer.time(); final AtomicBoolean decremented = new AtomicBoolean(false); return new CheckedForwardingClientCall<ReqT, RespT>(delegate.newCall(methodDescriptor, callOptions)) { @Override protected void checkedStart(ClientCall.Listener<RespT> responseListener, Metadata headers) throws Exception { ClientCall.Listener<RespT> timingListener = wrap(responseListener, timerContext, decremented); getStats().ACTIVE_RPC_COUNTER.inc(); getStats().RPC_METER.mark(); delegate().start(timingListener, headers); } @Override public void cancel(String message, Throwable cause) { if (!decremented.getAndSet(true)) { getStats().ACTIVE_RPC_COUNTER.dec(); } super.cancel(message, cause); } }; }
@Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { return new CheckedForwardingClientCall<ReqT, RespT>( next.newCall(method.toBuilder().setSafe(true).build(), callOptions)) { @Override public void checkedStart(Listener<RespT> responseListener, Metadata headers) { delegate().start(responseListener, headers); } }; }
@Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, final Channel next) { // TODO(ejona86): If the call fails for Auth reasons, this does not properly propagate info that // would be in WWW-Authenticate, because it does not yet have access to the header. return new CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { @Override protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws StatusException { Metadata cachedSaved; URI uri = serviceUri(next, method); synchronized (ClientAuthInterceptor.this) { // TODO(louiscryan): This is icky but the current auth library stores the same // metadata map until the next refresh cycle. This will be fixed once // https://github.com/google/google-auth-library-java/issues/3 // is resolved. // getRequestMetadata() may return a different map based on the provided URI, i.e., for // JWT. However, today it does not cache JWT and so we won't bother tring to cache its // return value based on the URI. Map<String, List<String>> latestMetadata = getRequestMetadata(uri); if (lastMetadata == null || lastMetadata != latestMetadata) { lastMetadata = latestMetadata; cached = toHeaders(lastMetadata); } cachedSaved = cached; } headers.merge(cachedSaved); delegate().start(responseListener, headers); } }; }
@Test public void exceptionInStart() { final Exception error = new Exception("emulated error"); ClientInterceptor interceptor = new ClientInterceptor() { @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { ClientCall<ReqT, RespT> call = next.newCall(method, callOptions); return new CheckedForwardingClientCall<ReqT, RespT>(call) { @Override protected void checkedStart(ClientCall.Listener<RespT> responseListener, Metadata headers) throws Exception { throw error; // delegate().start will not be called } }; } }; Channel intercepted = ClientInterceptors.intercept(channel, interceptor); @SuppressWarnings("unchecked") ClientCall.Listener<Void> listener = mock(ClientCall.Listener.class); ClientCall<Void, Void> interceptedCall = intercepted.newCall(method, CallOptions.DEFAULT); assertNotSame(call, interceptedCall); interceptedCall.start(listener, new Metadata()); interceptedCall.sendMessage(null /*request*/); interceptedCall.halfClose(); interceptedCall.request(1); call.done = true; ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class); verify(listener).onClose(captor.capture(), any(Metadata.class)); assertSame(error, captor.getValue().getCause()); // Make sure nothing bad happens after the exception. ClientCall<?, ?> noop = ((CheckedForwardingClientCall<?, ?>)interceptedCall).delegate(); // Should not throw, even on bad input noop.cancel("Cancel for test", null); noop.start(null, null); noop.request(-1); noop.halfClose(); noop.sendMessage(null); assertFalse(noop.isReady()); }