diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index 29b18fb6aa7..fff22dad960 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -25,6 +25,7 @@ import com.google.common.primitives.UnsignedInts; import com.google.errorprone.annotations.CheckReturnValue; import io.grpc.Attributes; +import io.grpc.ConnectivityState; import io.grpc.EquivalentAddressGroup; import io.grpc.HttpConnectProxiedSocketAddress; import io.grpc.InternalEquivalentAddressGroup; @@ -32,9 +33,11 @@ import io.grpc.LoadBalancer; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; +import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.StatusOr; import io.grpc.internal.GrpcUtil; +import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.util.GracefulSwitchLoadBalancer; import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig; import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig; @@ -83,7 +86,9 @@ final class CdsLoadBalancer2 extends LoadBalancer { private final Helper helper; private final LoadBalancerRegistry lbRegistry; private final ClusterState clusterState = new ClusterState(); + private final CdsLbHelper cdsLbHelper = new CdsLbHelper(); private GracefulSwitchLoadBalancer delegate; + private boolean addBackendServicePickDetailsLabel; // Following fields are effectively final. private String clusterName; private Subscription clusterSubscription; @@ -91,7 +96,7 @@ final class CdsLoadBalancer2 extends LoadBalancer { CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) { this.helper = checkNotNull(helper, "helper"); this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry"); - this.delegate = new GracefulSwitchLoadBalancer(helper); + this.delegate = new GracefulSwitchLoadBalancer(cdsLbHelper); logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority())); logger.log(XdsLogLevel.INFO, "Created"); } @@ -126,6 +131,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { XdsClusterConfig clusterConfig = clusterConfigOr.getValue(); if (clusterConfig.getChildren() instanceof EndpointConfig) { + addBackendServicePickDetailsLabel = true; StatusOr edsUpdate = getEdsUpdate(xdsConfig, clusterName); StatusOr statusOrResult = clusterState.edsUpdateToResult( clusterName, @@ -156,8 +162,12 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { resolvedAddresses.toBuilder() .setLoadBalancingPolicyConfig(gracefulConfig) .setAddresses(Collections.unmodifiableList(addresses)) + .setAttributes(resolvedAddresses.getAttributes().toBuilder() + .set(NameResolver.ATTR_BACKEND_SERVICE, clusterName) + .build()) .build()); } else if (clusterConfig.getChildren() instanceof AggregateConfig) { + addBackendServicePickDetailsLabel = false; Map priorityChildConfigs = new HashMap<>(); List leafClusters = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames(); for (String childCluster: leafClusters) { @@ -196,7 +206,8 @@ public void handleNameResolutionError(Status error) { public void shutdown() { logger.log(XdsLogLevel.INFO, "Shutdown"); delegate.shutdown(); - delegate = new GracefulSwitchLoadBalancer(helper); + delegate = new GracefulSwitchLoadBalancer(cdsLbHelper); + addBackendServicePickDetailsLabel = false; if (clusterSubscription != null) { clusterSubscription.close(); clusterSubscription = null; @@ -206,6 +217,7 @@ public void shutdown() { @CheckReturnValue // don't forget to return up the stack after the fail call private Status fail(Status error) { delegate.shutdown(); + addBackendServicePickDetailsLabel = false; helper.updateBalancingState( TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); return Status.OK; // XdsNameResolver isn't a polling NR, so this value doesn't matter @@ -215,6 +227,38 @@ private String errorPrefix() { return "CdsLb for " + clusterName + ": "; } + private final class CdsLbHelper extends ForwardingLoadBalancerHelper { + @Override + public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { + if (addBackendServicePickDetailsLabel) { + newPicker = new BackendServiceMetricLabelSubchannelPicker(newPicker, clusterName); + } + delegate().updateBalancingState(newState, newPicker); + } + + @Override + protected Helper delegate() { + return helper; + } + } + + private static final class BackendServiceMetricLabelSubchannelPicker extends SubchannelPicker { + private final SubchannelPicker delegate; + private final String backendService; + + private BackendServiceMetricLabelSubchannelPicker( + SubchannelPicker delegate, String backendService) { + this.delegate = checkNotNull(delegate, "delegate"); + this.backendService = checkNotNull(backendService, "backendService"); + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.backend_service", backendService); + return delegate.pickSubchannel(args); + } + } + /** * The number of bits assigned to the fractional part of fixed-point values. We normalize weights * to a fixed-point number between 0 and 1, representing that item's proportion of traffic (1 == diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index 64105144240..81d28168737 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -33,7 +33,6 @@ import io.grpc.InternalLogId; import io.grpc.LoadBalancer; import io.grpc.Metadata; -import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.internal.ForwardingClientStreamTracer; import io.grpc.internal.GrpcUtil; @@ -154,9 +153,6 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { return childSwitchLb.acceptResolvedAddresses( resolvedAddresses.toBuilder() - .setAttributes(attributes.toBuilder() - .set(NameResolver.ATTR_BACKEND_SERVICE, cluster) - .build()) .setLoadBalancingPolicyConfig(config.childConfig) .build()); } @@ -409,7 +405,6 @@ private RequestLimitingSubchannelPicker(SubchannelPicker delegate, public PickResult pickSubchannel(PickSubchannelArgs args) { args.getCallOptions().getOption(ClusterImplLoadBalancerProvider.FILTER_METADATA_CONSUMER) .accept(filterMetadata); - args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.backend_service", cluster); for (DropOverload dropOverload : dropPolicies) { int rand = random.nextInt(1_000_000); if (rand < dropOverload.dropsPerMillion()) { diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index ff4813fe6a8..96fb6b1cf44 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -61,6 +61,7 @@ import io.grpc.ConnectivityState; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickDetailsConsumer; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.ResolvedAddresses; @@ -296,6 +297,24 @@ public void nonAggregateCluster_resourceUpdate() { assertThat(childLbConfig.maxConcurrentRequests).isEqualTo(200L); } + @Test + public void nonAggregateCluster_addsBackendServiceAttributeAndPickDetailsLabel() { + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, EDS_CLUSTER)); + startXdsDepManager(); + + FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); + assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isEqualTo(CLUSTER); + childBalancer.deliverSubchannelState(PickResult.withNoResult(), ConnectivityState.READY); + + verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + PickDetailsConsumer detailsConsumer = mock(PickDetailsConsumer.class); + PickResult result = + pickerCaptor.getValue().pickSubchannel(newPickSubchannelArgs(detailsConsumer)); + + assertThat(result.getStatus().isOk()).isTrue(); + verify(detailsConsumer).addOptionalLabel("grpc.lb.backend_service", CLUSTER); + } + @Test public void nonAggregateCluster_resourceRevoked() { lbRegistry.register(new PriorityLoadBalancerProvider()); @@ -429,6 +448,35 @@ public void discoverAggregateCluster_createsPriorityLbPolicy() { .isEqualTo("cds_experimental"); } + @Test + public void aggregateCluster_doesNotAddBackendServiceAttributeOrPickDetailsLabelFromRoot() { + String cluster1 = "cluster-01.googleapis.com"; + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + // CLUSTER (aggr.) -> [cluster1 (EDS)] + CLUSTER, Cluster.newBuilder() + .setName(CLUSTER) + .setClusterType(Cluster.CustomClusterType.newBuilder() + .setName("envoy.clusters.aggregate") + .setTypedConfig(Any.pack(ClusterConfig.newBuilder() + .addClusters(cluster1) + .build()))) + .build(), + cluster1, EDS_CLUSTER.toBuilder().setName(cluster1).build())); + startXdsDepManager(); + + FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); + assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isNull(); + childBalancer.deliverSubchannelState(PickResult.withNoResult(), ConnectivityState.READY); + + verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + PickDetailsConsumer detailsConsumer = mock(PickDetailsConsumer.class); + PickResult result = + pickerCaptor.getValue().pickSubchannel(newPickSubchannelArgs(detailsConsumer)); + + assertThat(result.getStatus().isOk()).isTrue(); + verify(detailsConsumer, never()).addOptionalLabel(eq("grpc.lb.backend_service"), any()); + } + @Test // Both priorities will get tried using real priority LB policy. public void discoverAggregateCluster_testChildCdsLbPolicyParsing() { @@ -462,12 +510,16 @@ public void discoverAggregateCluster_testChildCdsLbPolicyParsing() { .isEqualTo("cluster-01.googleapis.com"); assertThat(cluster1ImplConfig.edsServiceName) .isEqualTo("backend-service-1.googleapis.com"); + assertThat(childBalancers.get(0).attributes.get(NameResolver.ATTR_BACKEND_SERVICE)) + .isEqualTo(cluster1); ClusterImplConfig cluster2ImplConfig = (ClusterImplConfig) childBalancers.get(1).config; assertThat(cluster2ImplConfig.cluster) .isEqualTo("cluster-02.googleapis.com"); assertThat(cluster2ImplConfig.edsServiceName) .isEqualTo("backend-service-1.googleapis.com"); + assertThat(childBalancers.get(1).attributes.get(NameResolver.ATTR_BACKEND_SERVICE)) + .isEqualTo(cluster2); } @Test @@ -577,7 +629,9 @@ public void unknownLbProvider() { startXdsDepManager(); verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + PickResult result = + pickerCaptor.getValue().pickSubchannel( + newPickSubchannelArgs(mock(PickDetailsConsumer.class))); Status actualStatus = result.getStatus(); assertThat(actualStatus.getCode()).isEqualTo(Status.Code.UNAVAILABLE); assertThat(actualStatus.getDescription()).contains("Invalid LoadBalancingPolicy"); @@ -605,7 +659,9 @@ public void invalidLbConfig() { startXdsDepManager(); verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + PickResult result = + pickerCaptor.getValue().pickSubchannel( + newPickSubchannelArgs(mock(PickDetailsConsumer.class))); Status actualStatus = result.getStatus(); assertThat(actualStatus.getCode()).isEqualTo(Status.Code.UNAVAILABLE); assertThat(actualStatus.getDescription()).contains("Invalid 'minRingSize'"); @@ -639,12 +695,19 @@ private void startXdsDepManager(final CdsConfig cdsConfig) { } private static void assertPickerStatus(SubchannelPicker picker, Status expectedStatus) { - PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class)); + PickResult result = picker.pickSubchannel( + newPickSubchannelArgs(mock(PickDetailsConsumer.class))); Status actualStatus = result.getStatus(); assertThat(actualStatus.getCode()).isEqualTo(expectedStatus.getCode()); assertThat(actualStatus.getDescription()).isEqualTo(expectedStatus.getDescription()); } + private static PickSubchannelArgs newPickSubchannelArgs(PickDetailsConsumer pickDetailsConsumer) { + PickSubchannelArgs args = mock(PickSubchannelArgs.class); + when(args.getPickDetailsConsumer()).thenReturn(pickDetailsConsumer); + return args; + } + private final class FakeLoadBalancerProvider extends LoadBalancerProvider { private final String policyName; private final LoadBalancerProvider configParsingDelegate; @@ -660,7 +723,7 @@ private final class FakeLoadBalancerProvider extends LoadBalancerProvider { @Override public LoadBalancer newLoadBalancer(Helper helper) { - FakeLoadBalancer balancer = new FakeLoadBalancer(policyName); + FakeLoadBalancer balancer = new FakeLoadBalancer(policyName, helper); childBalancers.add(balancer); return balancer; } @@ -692,17 +755,21 @@ public NameResolver.ConfigOrError parseLoadBalancingPolicyConfig( private final class FakeLoadBalancer extends LoadBalancer { private final String name; + private final Helper helper; private Object config; + private Attributes attributes; private Status upstreamError; private boolean shutdown; - FakeLoadBalancer(String name) { + FakeLoadBalancer(String name, Helper helper) { this.name = name; + this.helper = helper; } @Override public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { config = resolvedAddresses.getLoadBalancingPolicyConfig(); + attributes = resolvedAddresses.getAttributes(); return Status.OK; } @@ -716,5 +783,15 @@ public void shutdown() { shutdown = true; childBalancers.remove(this); } + + void deliverSubchannelState(final PickResult result, ConnectivityState state) { + SubchannelPicker picker = new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return result; + } + }; + helper.updateBalancingState(state, picker); + } } } diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java index 6cf6fa22a8a..e89c0c3b8d5 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java @@ -21,8 +21,11 @@ import static com.google.common.truth.Truth.assertWithMessage; import static io.grpc.xds.ClusterImplLoadBalancer.ATTR_SUBCHANNEL_ADDRESS_NAME; import static io.grpc.xds.XdsNameResolver.AUTO_HOST_REWRITE_KEY; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -188,7 +191,7 @@ public void handleResolvedAddresses_propagateToChildPolicy() { assertThat(childBalancer.config).isSameInstanceAs(weightedTargetConfig); assertThat(childBalancer.attributes.get(io.grpc.xds.XdsAttributes.XDS_CLIENT)) .isSameInstanceAs(xdsClient); - assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isEqualTo(CLUSTER); + assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isNull(); } /** @@ -289,11 +292,11 @@ public void pick_addsOptionalLabels() { // The value will be determined by the parent policy, so can be different than the value used in // makeAddress() for the test. verify(detailsConsumer).addOptionalLabel("grpc.lb.locality", locality.toString()); - verify(detailsConsumer).addOptionalLabel("grpc.lb.backend_service", CLUSTER); + verify(detailsConsumer, never()).addOptionalLabel(eq("grpc.lb.backend_service"), any()); } @Test - public void pick_noResult_addsClusterLabel() { + public void pick_noResult_doesNotAddClusterLabel() { LoadBalancerProvider weightedTargetProvider = new WeightedTargetLoadBalancerProvider(); WeightedTargetConfig weightedTargetConfig = buildWeightedTargetConfig(ImmutableMap.of(locality, 10)); @@ -313,7 +316,7 @@ public void pick_noResult_addsClusterLabel() { TestMethodDescriptors.voidMethod(), new Metadata(), CallOptions.DEFAULT, detailsConsumer); PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs); assertThat(result.getStatus().isOk()).isTrue(); - verify(detailsConsumer).addOptionalLabel("grpc.lb.backend_service", CLUSTER); + verify(detailsConsumer, never()).addOptionalLabel(eq("grpc.lb.backend_service"), any()); } @Test diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java index a508da34f88..388af1a643e 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java @@ -63,6 +63,7 @@ import io.grpc.InternalEquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickDetailsConsumer; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.ResolvedAddresses; @@ -1064,7 +1065,9 @@ private FakeNameResolver assertResolverCreated(String uriPath) { private static void assertPicker(SubchannelPicker picker, Status expectedStatus, @Nullable Subchannel expectedSubchannel) { - PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class)); + PickSubchannelArgs args = mock(PickSubchannelArgs.class); + when(args.getPickDetailsConsumer()).thenReturn(mock(PickDetailsConsumer.class)); + PickResult result = picker.pickSubchannel(args); Status actualStatus = result.getStatus(); assertThat(actualStatus.getCode()).isEqualTo(expectedStatus.getCode()); assertThat(actualStatus.getDescription()).isEqualTo(expectedStatus.getDescription());