@Test public void delegatingPickFirstThenNameResolutionFails() { List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(false); Attributes resolutionAttrs = Attributes.newBuilder().set(RESOLUTION_ATTR, "yeah").build(); deliverResolvedAddresses(resolvedServers, resolutionAttrs); verify(pickFirstBalancerFactory).newLoadBalancer(helper); verify(pickFirstBalancer).handleResolvedAddressGroups(eq(resolvedServers), eq(resolutionAttrs)); // Then let name resolution fail. The error will be passed directly to the delegate. Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); deliverNameResolutionError(error); verify(pickFirstBalancer).handleNameResolutionError(error); verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); verifyNoMoreInteractions(roundRobinBalancerFactory); verifyNoMoreInteractions(roundRobinBalancer); }
@Test public void delegatingRoundRobinThenNameResolutionFails() { List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(false, false); Attributes resolutionAttrs = Attributes.newBuilder() .set(RESOLUTION_ATTR, "yeah") .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.ROUND_ROBIN) .build(); deliverResolvedAddresses(resolvedServers, resolutionAttrs); verify(roundRobinBalancerFactory).newLoadBalancer(helper); verify(roundRobinBalancer).handleResolvedAddressGroups(resolvedServers, resolutionAttrs); // Then let name resolution fail. The error will be passed directly to the delegate. Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); deliverNameResolutionError(error); verify(roundRobinBalancer).handleNameResolutionError(error); verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); verifyNoMoreInteractions(pickFirstBalancerFactory); verifyNoMoreInteractions(pickFirstBalancer); }
@Override public void updateBalancingState( final ConnectivityState newState, final SubchannelPicker newPicker) { checkNotNull(newState, "newState"); checkNotNull(newPicker, "newPicker"); runSerialized( new Runnable() { @Override public void run() { if (LbHelperImpl.this != lbHelper) { return; } subchannelPicker = newPicker; delayedTransport.reprocess(newPicker); // It's not appropriate to report SHUTDOWN state from lb. // Ignore the case of newState == SHUTDOWN for now. if (newState != SHUTDOWN) { channelStateManager.gotoState(newState); } } }); }
void handleSubchannelStateChange(final ConnectivityStateInfo newState) { switch (newState.getState()) { case READY: case IDLE: delayedTransport.reprocess(subchannelPicker); break; case TRANSIENT_FAILURE: delayedTransport.reprocess(new SubchannelPicker() { final PickResult errorResult = PickResult.withError(newState.getStatus()); @Override public PickResult pickSubchannel(PickSubchannelArgs args) { return errorResult; } }); break; default: // Do nothing } }
@Test public void reprocess_NoPendingStream() { SubchannelPicker picker = mock(SubchannelPicker.class); AbstractSubchannel subchannel = mock(AbstractSubchannel.class); when(subchannel.obtainActiveTransport()).thenReturn(mockRealTransport); when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( PickResult.withSubchannel(subchannel)); when(mockRealTransport.newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))).thenReturn(mockRealStream); delayedTransport.reprocess(picker); verifyNoMoreInteractions(picker); verifyNoMoreInteractions(transportListener); // Though picker was not originally used, it will be saved and serve future streams. ClientStream stream = delayedTransport.newStream(method, headers, CallOptions.DEFAULT); verify(picker).pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT)); verify(subchannel).obtainActiveTransport(); assertSame(mockRealStream, stream); }
@Override public ClientTransport get(PickSubchannelArgs args) { SubchannelPicker pickerCopy = subchannelPicker; if (shutdown.get()) { // If channel is shut down, delayedTransport is also shut down which will fail the stream // properly. return delayedTransport; } if (pickerCopy == null) { channelExecutor.executeLater(new Runnable() { @Override public void run() { exitIdleMode(); } }).drain(); return delayedTransport; } // There is no need to reschedule the idle timer here. // // pickerCopy != null, which means idle timer has not expired when this method starts. // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it. // // In most cases the idle timer is scheduled to fire after the transport has created the // stream, which would have reported in-use state to the channel that would have cancelled // the idle timer. PickResult pickResult = pickerCopy.pickSubchannel(args); ClientTransport transport = GrpcUtil.getTransportFromPickResult( pickResult, args.getCallOptions().isWaitForReady()); if (transport != null) { return transport; } return delayedTransport; }
private void subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail) { createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); // This call will be buffered by the channel, thus involve delayed transport CallOptions callOptions = CallOptions.DEFAULT; if (waitForReady) { callOptions = callOptions.withWaitForReady(); } else { callOptions = callOptions.withoutWaitForReady(); } ClientCall<String, Integer> call1 = channel.newCall(method, callOptions); call1.start(mockCallListener, new Metadata()); SubchannelPicker picker = mock(SubchannelPicker.class); Status status = Status.UNAVAILABLE.withDescription("for test"); when(picker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(drop ? PickResult.withDrop(status) : PickResult.withError(status)); helper.updateBalancingState(READY, picker); executor.runDueTasks(); if (shouldFail) { verify(mockCallListener).onClose(same(status), any(Metadata.class)); } else { verifyZeroInteractions(mockCallListener); } // This call doesn't involve delayed transport ClientCall<String, Integer> call2 = channel.newCall(method, callOptions); call2.start(mockCallListener2, new Metadata()); executor.runDueTasks(); if (shouldFail) { verify(mockCallListener2).onClose(same(status), any(Metadata.class)); } else { verifyZeroInteractions(mockCallListener2); } }
@Test public void resetGrpclbWhenSwitchingAwayFromGrpclb() { InOrder inOrder = inOrder(helper); List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); Attributes grpclbResolutionAttrs = Attributes.newBuilder() .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); assertNull(balancer.getDelegate()); verify(helper).createOobChannel(addrsEq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); assertEquals(1, fakeOobChannels.size()); ManagedChannel oobChannel = fakeOobChannels.poll(); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); assertEquals(1, lbRequestObservers.size()); StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); verify(lbRequestObserver).onNext( eq(LoadBalanceRequest.newBuilder().setInitialRequest( InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); // Simulate receiving LB response List<ServerEntry> backends = Arrays.asList(new ServerEntry("127.0.0.1", 2000, "token0001")); inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); lbResponseObserver.onNext(buildInitialResponse()); lbResponseObserver.onNext(buildLbResponse(backends)); inOrder.verify(helper).createSubchannel( eq(new EquivalentAddressGroup(backends.get(0).addr)), any(Attributes.class)); assertEquals(1, mockSubchannels.size()); Subchannel subchannel = mockSubchannels.poll(); verify(subchannel).requestConnection(); // Switch to round-robin. GRPCLB streams and connections should be closed. List<EquivalentAddressGroup> roundRobinResolutionList = createResolvedServerAddresses(false, false, false); Attributes roundRobinResolutionAttrs = Attributes.newBuilder() .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.ROUND_ROBIN).build(); verify(lbRequestObserver, never()).onCompleted(); verify(subchannel, never()).shutdown(); assertFalse(oobChannel.isShutdown()); deliverResolvedAddresses(roundRobinResolutionList, roundRobinResolutionAttrs); verify(lbRequestObserver).onCompleted(); verify(subchannel).shutdown(); assertTrue(oobChannel.isShutdown()); assertTrue(oobChannel.isTerminated()); assertSame(LbPolicy.ROUND_ROBIN, balancer.getLbPolicy()); assertSame(roundRobinBalancer, balancer.getDelegate()); assertNull(balancer.getGrpclbState()); }
/** * If a {@link SubchannelPicker} is being, or has been provided via {@link #reprocess}, the last * picker will be consulted. * * <p>Otherwise, if the delayed transport is not shutdown, then a {@link PendingStream} is * returned; if the transport is shutdown, then a {@link FailingClientStream} is returned. */ @Override public final ClientStream newStream( MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) { try { SubchannelPicker picker; PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions); long pickerVersion = -1; synchronized (lock) { if (shutdownStatus == null) { if (lastPicker == null) { return createPendingStream(args); } picker = lastPicker; pickerVersion = lastPickerVersion; } else { return new FailingClientStream(shutdownStatus); } } while (true) { PickResult pickResult = picker.pickSubchannel(args); ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, callOptions.isWaitForReady()); if (transport != null) { return transport.newStream( args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions()); } // This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible // race with reprocess()), we will buffer it. Otherwise, will try with the new picker. synchronized (lock) { if (shutdownStatus != null) { return new FailingClientStream(shutdownStatus); } if (pickerVersion == lastPickerVersion) { return createPendingStream(args); } picker = lastPicker; pickerVersion = lastPickerVersion; } } } finally { channelExecutor.drain(); } }
/** * Use the picker to try picking a transport for every pending stream, proceed the stream if the * pick is successful, otherwise keep it pending. * * <p>This method may be called concurrently with {@code newStream()}, and it's safe. All pending * streams will be served by the latest picker (if a same picker is given more than once, they are * considered different pickers) as soon as possible. * * <p>This method <strong>must not</strong> be called concurrently with itself. */ final void reprocess(SubchannelPicker picker) { ArrayList<PendingStream> toProcess; ArrayList<PendingStream> toRemove = new ArrayList<PendingStream>(); synchronized (lock) { lastPicker = picker; lastPickerVersion++; if (!hasPendingStreams()) { return; } toProcess = new ArrayList<PendingStream>(pendingStreams); } for (final PendingStream stream : toProcess) { PickResult pickResult = picker.pickSubchannel(stream.args); CallOptions callOptions = stream.args.getCallOptions(); final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, callOptions.isWaitForReady()); if (transport != null) { Executor executor = defaultAppExecutor; // createRealStream may be expensive. It will start real streams on the transport. If // there are pending requests, they will be serialized too, which may be expensive. Since // we are now on transport thread, we need to offload the work to an executor. if (callOptions.getExecutor() != null) { executor = callOptions.getExecutor(); } executor.execute(new Runnable() { @Override public void run() { stream.createRealStream(transport); } }); toRemove.add(stream); } // else: stay pending } synchronized (lock) { // Between this synchronized and the previous one: // - Streams may have been cancelled, which may turn pendingStreams into emptiness. // - shutdown() may be called, which may turn pendingStreams into null. if (!hasPendingStreams()) { return; } pendingStreams.removeAll(toRemove); // Because delayed transport is long-lived, we take this opportunity to down-size the // hashmap. if (pendingStreams.isEmpty()) { pendingStreams = new LinkedHashSet<PendingStream>(); } if (!hasPendingStreams()) { // There may be a brief gap between delayed transport clearing in-use state, and first real // transport starting streams and setting in-use state. During the gap the whole channel's // in-use state may be false. However, it shouldn't cause spurious switching to idleness // (which would shutdown the transports and LoadBalancer) because the gap should be shorter // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second). channelExecutor.executeLater(reportTransportNotInUse); if (shutdownStatus != null && reportTransportTerminated != null) { channelExecutor.executeLater(reportTransportTerminated); reportTransportTerminated = null; } } } channelExecutor.drain(); }
void setSubchannel(final InternalSubchannel subchannel) { log.log(Level.FINE, "[{0}] Created with [{1}]", new Object[] {this, subchannel}); this.subchannel = subchannel; subchannelImpl = new AbstractSubchannel() { @Override public void shutdown() { subchannel.shutdown(Status.UNAVAILABLE.withDescription("OobChannel is shutdown")); } @Override ClientTransport obtainActiveTransport() { return subchannel.obtainActiveTransport(); } @Override public void requestConnection() { subchannel.obtainActiveTransport(); } @Override public EquivalentAddressGroup getAddresses() { return subchannel.getAddressGroup(); } @Override public Attributes getAttributes() { return Attributes.EMPTY; } @Override public ListenableFuture<ChannelStats> getStats() { SettableFuture<ChannelStats> ret = SettableFuture.create(); ChannelStats.Builder builder = new ChannelStats.Builder(); subchannelCallsTracer.updateBuilder(builder); builder.setTarget(authority).setState(subchannel.getState()); ret.set(builder.build()); return ret; } }; subchannelPicker = new SubchannelPicker() { final PickResult result = PickResult.withSubchannel(subchannelImpl); @Override public PickResult pickSubchannel(PickSubchannelArgs args) { return result; } }; delayedTransport.reprocess(subchannelPicker); }
@Test public void realTransportsHoldsOffIdleness() throws Exception { final EquivalentAddressGroup addressGroup = servers.get(1); // Start a call, which goes to delayed transport ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); call.start(mockCallListener, new Metadata()); // Verify that we have exited the idle mode ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null); verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture()); Helper helper = helperCaptor.getValue(); assertTrue(channel.inUseStateAggregator.isInUse()); // Assume LoadBalancer has received an address, then create a subchannel. Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); subchannel.requestConnection(); MockClientTransportInfo t0 = newTransports.poll(); t0.listener.transportReady(); SubchannelPicker mockPicker = mock(SubchannelPicker.class); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(subchannel)); helper.updateBalancingState(READY, mockPicker); // Delayed transport creates real streams in the app executor executor.runDueTasks(); // Delayed transport exits in-use, while real transport has not entered in-use yet. assertFalse(channel.inUseStateAggregator.isInUse()); // Now it's in-use t0.listener.transportInUse(true); assertTrue(channel.inUseStateAggregator.isInUse()); // As long as the transport is in-use, the channel won't go idle. timer.forwardTime(IDLE_TIMEOUT_SECONDS * 2, TimeUnit.SECONDS); assertTrue(channel.inUseStateAggregator.isInUse()); t0.listener.transportInUse(false); assertFalse(channel.inUseStateAggregator.isInUse()); // And allow the channel to go idle. timer.forwardTime(IDLE_TIMEOUT_SECONDS - 1, TimeUnit.SECONDS); verify(mockLoadBalancer, never()).shutdown(); timer.forwardTime(1, TimeUnit.SECONDS); verify(mockLoadBalancer).shutdown(); }
@Test public void oobTransportDoesNotAffectIdleness() { // Start a call, which goes to delayed transport ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); call.start(mockCallListener, new Metadata()); // Verify that we have exited the idle mode ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null); verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture()); Helper helper = helperCaptor.getValue(); // Fail the RPC SubchannelPicker failingPicker = mock(SubchannelPicker.class); when(failingPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withError(Status.UNAVAILABLE)); helper.updateBalancingState(TRANSIENT_FAILURE, failingPicker); executor.runDueTasks(); verify(mockCallListener).onClose(same(Status.UNAVAILABLE), any(Metadata.class)); // ... so that the channel resets its in-use state assertFalse(channel.inUseStateAggregator.isInUse()); // Now make an RPC on an OOB channel ManagedChannel oob = helper.createOobChannel(servers.get(0), "oobauthority"); verify(mockTransportFactory, never()) .newClientTransport(any(SocketAddress.class), same("oobauthority"), same(USER_AGENT), same(NO_PROXY)); ClientCall<String, Integer> oobCall = oob.newCall(method, CallOptions.DEFAULT); oobCall.start(mockCallListener2, new Metadata()); verify(mockTransportFactory) .newClientTransport(any(SocketAddress.class), same("oobauthority"), same(USER_AGENT), same(NO_PROXY)); MockClientTransportInfo oobTransportInfo = newTransports.poll(); assertEquals(0, newTransports.size()); // The OOB transport reports in-use state oobTransportInfo.listener.transportInUse(true); // But it won't stop the channel from going idle verify(mockLoadBalancer, never()).shutdown(); timer.forwardTime(IDLE_TIMEOUT_SECONDS, TimeUnit.SECONDS); verify(mockLoadBalancer).shutdown(); }