From ebd1eadf6c2b2cc3e2550d710071c1828498fccf Mon Sep 17 00:00:00 2001 From: becomeStar Date: Sat, 27 Jun 2026 15:20:45 +0900 Subject: [PATCH] xds: Move backend_service plumbing to CDS A75 changed aggregate cluster handling so aggregate clusters are represented as priority children whose children are CDS policies. In that structure, A89 expects backend service metric plumbing to go through CDS instead of xds_cluster_impl. Set NameResolver.ATTR_BACKEND_SERVICE from CdsLoadBalancer2 for non-aggregate leaf clusters so WRR metrics can read the backend service. Also add the grpc.lb.backend_service pick details label from the leaf CDS picker wrapper. Do not add the aggregate root cluster as backend_service. Remove the old backend_service attribute and pick details label handling from ClusterImplLoadBalancer. --- .../java/io/grpc/xds/CdsLoadBalancer2.java | 48 +++++++++- .../io/grpc/xds/ClusterImplLoadBalancer.java | 5 -- .../io/grpc/xds/CdsLoadBalancer2Test.java | 87 +++++++++++++++++-- .../grpc/xds/ClusterImplLoadBalancerTest.java | 11 ++- .../xds/ClusterResolverLoadBalancerTest.java | 5 +- 5 files changed, 139 insertions(+), 17 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index 29b18fb6aa7..fff22dad960 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -25,6 +25,7 @@ import com.google.common.primitives.UnsignedInts; import com.google.errorprone.annotations.CheckReturnValue; import io.grpc.Attributes; +import io.grpc.ConnectivityState; import io.grpc.EquivalentAddressGroup; import io.grpc.HttpConnectProxiedSocketAddress; import io.grpc.InternalEquivalentAddressGroup; @@ -32,9 +33,11 @@ import io.grpc.LoadBalancer; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; +import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.StatusOr; import io.grpc.internal.GrpcUtil; +import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.util.GracefulSwitchLoadBalancer; import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig; import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig; @@ -83,7 +86,9 @@ final class CdsLoadBalancer2 extends LoadBalancer { private final Helper helper; private final LoadBalancerRegistry lbRegistry; private final ClusterState clusterState = new ClusterState(); + private final CdsLbHelper cdsLbHelper = new CdsLbHelper(); private GracefulSwitchLoadBalancer delegate; + private boolean addBackendServicePickDetailsLabel; // Following fields are effectively final. private String clusterName; private Subscription clusterSubscription; @@ -91,7 +96,7 @@ final class CdsLoadBalancer2 extends LoadBalancer { CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) { this.helper = checkNotNull(helper, "helper"); this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry"); - this.delegate = new GracefulSwitchLoadBalancer(helper); + this.delegate = new GracefulSwitchLoadBalancer(cdsLbHelper); logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority())); logger.log(XdsLogLevel.INFO, "Created"); } @@ -126,6 +131,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { XdsClusterConfig clusterConfig = clusterConfigOr.getValue(); if (clusterConfig.getChildren() instanceof EndpointConfig) { + addBackendServicePickDetailsLabel = true; StatusOr edsUpdate = getEdsUpdate(xdsConfig, clusterName); StatusOr statusOrResult = clusterState.edsUpdateToResult( clusterName, @@ -156,8 +162,12 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { resolvedAddresses.toBuilder() .setLoadBalancingPolicyConfig(gracefulConfig) .setAddresses(Collections.unmodifiableList(addresses)) + .setAttributes(resolvedAddresses.getAttributes().toBuilder() + .set(NameResolver.ATTR_BACKEND_SERVICE, clusterName) + .build()) .build()); } else if (clusterConfig.getChildren() instanceof AggregateConfig) { + addBackendServicePickDetailsLabel = false; Map priorityChildConfigs = new HashMap<>(); List leafClusters = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames(); for (String childCluster: leafClusters) { @@ -196,7 +206,8 @@ public void handleNameResolutionError(Status error) { public void shutdown() { logger.log(XdsLogLevel.INFO, "Shutdown"); delegate.shutdown(); - delegate = new GracefulSwitchLoadBalancer(helper); + delegate = new GracefulSwitchLoadBalancer(cdsLbHelper); + addBackendServicePickDetailsLabel = false; if (clusterSubscription != null) { clusterSubscription.close(); clusterSubscription = null; @@ -206,6 +217,7 @@ public void shutdown() { @CheckReturnValue // don't forget to return up the stack after the fail call private Status fail(Status error) { delegate.shutdown(); + addBackendServicePickDetailsLabel = false; helper.updateBalancingState( TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); return Status.OK; // XdsNameResolver isn't a polling NR, so this value doesn't matter @@ -215,6 +227,38 @@ private String errorPrefix() { return "CdsLb for " + clusterName + ": "; } + private final class CdsLbHelper extends ForwardingLoadBalancerHelper { + @Override + public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { + if (addBackendServicePickDetailsLabel) { + newPicker = new BackendServiceMetricLabelSubchannelPicker(newPicker, clusterName); + } + delegate().updateBalancingState(newState, newPicker); + } + + @Override + protected Helper delegate() { + return helper; + } + } + + private static final class BackendServiceMetricLabelSubchannelPicker extends SubchannelPicker { + private final SubchannelPicker delegate; + private final String backendService; + + private BackendServiceMetricLabelSubchannelPicker( + SubchannelPicker delegate, String backendService) { + this.delegate = checkNotNull(delegate, "delegate"); + this.backendService = checkNotNull(backendService, "backendService"); + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.backend_service", backendService); + return delegate.pickSubchannel(args); + } + } + /** * The number of bits assigned to the fractional part of fixed-point values. We normalize weights * to a fixed-point number between 0 and 1, representing that item's proportion of traffic (1 == diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index 64105144240..81d28168737 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -33,7 +33,6 @@ import io.grpc.InternalLogId; import io.grpc.LoadBalancer; import io.grpc.Metadata; -import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.internal.ForwardingClientStreamTracer; import io.grpc.internal.GrpcUtil; @@ -154,9 +153,6 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { return childSwitchLb.acceptResolvedAddresses( resolvedAddresses.toBuilder() - .setAttributes(attributes.toBuilder() - .set(NameResolver.ATTR_BACKEND_SERVICE, cluster) - .build()) .setLoadBalancingPolicyConfig(config.childConfig) .build()); } @@ -409,7 +405,6 @@ private RequestLimitingSubchannelPicker(SubchannelPicker delegate, public PickResult pickSubchannel(PickSubchannelArgs args) { args.getCallOptions().getOption(ClusterImplLoadBalancerProvider.FILTER_METADATA_CONSUMER) .accept(filterMetadata); - args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.backend_service", cluster); for (DropOverload dropOverload : dropPolicies) { int rand = random.nextInt(1_000_000); if (rand < dropOverload.dropsPerMillion()) { diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index ff4813fe6a8..96fb6b1cf44 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -61,6 +61,7 @@ import io.grpc.ConnectivityState; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickDetailsConsumer; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.ResolvedAddresses; @@ -296,6 +297,24 @@ public void nonAggregateCluster_resourceUpdate() { assertThat(childLbConfig.maxConcurrentRequests).isEqualTo(200L); } + @Test + public void nonAggregateCluster_addsBackendServiceAttributeAndPickDetailsLabel() { + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, EDS_CLUSTER)); + startXdsDepManager(); + + FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); + assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isEqualTo(CLUSTER); + childBalancer.deliverSubchannelState(PickResult.withNoResult(), ConnectivityState.READY); + + verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + PickDetailsConsumer detailsConsumer = mock(PickDetailsConsumer.class); + PickResult result = + pickerCaptor.getValue().pickSubchannel(newPickSubchannelArgs(detailsConsumer)); + + assertThat(result.getStatus().isOk()).isTrue(); + verify(detailsConsumer).addOptionalLabel("grpc.lb.backend_service", CLUSTER); + } + @Test public void nonAggregateCluster_resourceRevoked() { lbRegistry.register(new PriorityLoadBalancerProvider()); @@ -429,6 +448,35 @@ public void discoverAggregateCluster_createsPriorityLbPolicy() { .isEqualTo("cds_experimental"); } + @Test + public void aggregateCluster_doesNotAddBackendServiceAttributeOrPickDetailsLabelFromRoot() { + String cluster1 = "cluster-01.googleapis.com"; + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + // CLUSTER (aggr.) -> [cluster1 (EDS)] + CLUSTER, Cluster.newBuilder() + .setName(CLUSTER) + .setClusterType(Cluster.CustomClusterType.newBuilder() + .setName("envoy.clusters.aggregate") + .setTypedConfig(Any.pack(ClusterConfig.newBuilder() + .addClusters(cluster1) + .build()))) + .build(), + cluster1, EDS_CLUSTER.toBuilder().setName(cluster1).build())); + startXdsDepManager(); + + FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); + assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isNull(); + childBalancer.deliverSubchannelState(PickResult.withNoResult(), ConnectivityState.READY); + + verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + PickDetailsConsumer detailsConsumer = mock(PickDetailsConsumer.class); + PickResult result = + pickerCaptor.getValue().pickSubchannel(newPickSubchannelArgs(detailsConsumer)); + + assertThat(result.getStatus().isOk()).isTrue(); + verify(detailsConsumer, never()).addOptionalLabel(eq("grpc.lb.backend_service"), any()); + } + @Test // Both priorities will get tried using real priority LB policy. public void discoverAggregateCluster_testChildCdsLbPolicyParsing() { @@ -462,12 +510,16 @@ public void discoverAggregateCluster_testChildCdsLbPolicyParsing() { .isEqualTo("cluster-01.googleapis.com"); assertThat(cluster1ImplConfig.edsServiceName) .isEqualTo("backend-service-1.googleapis.com"); + assertThat(childBalancers.get(0).attributes.get(NameResolver.ATTR_BACKEND_SERVICE)) + .isEqualTo(cluster1); ClusterImplConfig cluster2ImplConfig = (ClusterImplConfig) childBalancers.get(1).config; assertThat(cluster2ImplConfig.cluster) .isEqualTo("cluster-02.googleapis.com"); assertThat(cluster2ImplConfig.edsServiceName) .isEqualTo("backend-service-1.googleapis.com"); + assertThat(childBalancers.get(1).attributes.get(NameResolver.ATTR_BACKEND_SERVICE)) + .isEqualTo(cluster2); } @Test @@ -577,7 +629,9 @@ public void unknownLbProvider() { startXdsDepManager(); verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + PickResult result = + pickerCaptor.getValue().pickSubchannel( + newPickSubchannelArgs(mock(PickDetailsConsumer.class))); Status actualStatus = result.getStatus(); assertThat(actualStatus.getCode()).isEqualTo(Status.Code.UNAVAILABLE); assertThat(actualStatus.getDescription()).contains("Invalid LoadBalancingPolicy"); @@ -605,7 +659,9 @@ public void invalidLbConfig() { startXdsDepManager(); verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + PickResult result = + pickerCaptor.getValue().pickSubchannel( + newPickSubchannelArgs(mock(PickDetailsConsumer.class))); Status actualStatus = result.getStatus(); assertThat(actualStatus.getCode()).isEqualTo(Status.Code.UNAVAILABLE); assertThat(actualStatus.getDescription()).contains("Invalid 'minRingSize'"); @@ -639,12 +695,19 @@ private void startXdsDepManager(final CdsConfig cdsConfig) { } private static void assertPickerStatus(SubchannelPicker picker, Status expectedStatus) { - PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class)); + PickResult result = picker.pickSubchannel( + newPickSubchannelArgs(mock(PickDetailsConsumer.class))); Status actualStatus = result.getStatus(); assertThat(actualStatus.getCode()).isEqualTo(expectedStatus.getCode()); assertThat(actualStatus.getDescription()).isEqualTo(expectedStatus.getDescription()); } + private static PickSubchannelArgs newPickSubchannelArgs(PickDetailsConsumer pickDetailsConsumer) { + PickSubchannelArgs args = mock(PickSubchannelArgs.class); + when(args.getPickDetailsConsumer()).thenReturn(pickDetailsConsumer); + return args; + } + private final class FakeLoadBalancerProvider extends LoadBalancerProvider { private final String policyName; private final LoadBalancerProvider configParsingDelegate; @@ -660,7 +723,7 @@ private final class FakeLoadBalancerProvider extends LoadBalancerProvider { @Override public LoadBalancer newLoadBalancer(Helper helper) { - FakeLoadBalancer balancer = new FakeLoadBalancer(policyName); + FakeLoadBalancer balancer = new FakeLoadBalancer(policyName, helper); childBalancers.add(balancer); return balancer; } @@ -692,17 +755,21 @@ public NameResolver.ConfigOrError parseLoadBalancingPolicyConfig( private final class FakeLoadBalancer extends LoadBalancer { private final String name; + private final Helper helper; private Object config; + private Attributes attributes; private Status upstreamError; private boolean shutdown; - FakeLoadBalancer(String name) { + FakeLoadBalancer(String name, Helper helper) { this.name = name; + this.helper = helper; } @Override public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { config = resolvedAddresses.getLoadBalancingPolicyConfig(); + attributes = resolvedAddresses.getAttributes(); return Status.OK; } @@ -716,5 +783,15 @@ public void shutdown() { shutdown = true; childBalancers.remove(this); } + + void deliverSubchannelState(final PickResult result, ConnectivityState state) { + SubchannelPicker picker = new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return result; + } + }; + helper.updateBalancingState(state, picker); + } } } diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java index 6cf6fa22a8a..e89c0c3b8d5 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java @@ -21,8 +21,11 @@ import static com.google.common.truth.Truth.assertWithMessage; import static io.grpc.xds.ClusterImplLoadBalancer.ATTR_SUBCHANNEL_ADDRESS_NAME; import static io.grpc.xds.XdsNameResolver.AUTO_HOST_REWRITE_KEY; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -188,7 +191,7 @@ public void handleResolvedAddresses_propagateToChildPolicy() { assertThat(childBalancer.config).isSameInstanceAs(weightedTargetConfig); assertThat(childBalancer.attributes.get(io.grpc.xds.XdsAttributes.XDS_CLIENT)) .isSameInstanceAs(xdsClient); - assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isEqualTo(CLUSTER); + assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isNull(); } /** @@ -289,11 +292,11 @@ public void pick_addsOptionalLabels() { // The value will be determined by the parent policy, so can be different than the value used in // makeAddress() for the test. verify(detailsConsumer).addOptionalLabel("grpc.lb.locality", locality.toString()); - verify(detailsConsumer).addOptionalLabel("grpc.lb.backend_service", CLUSTER); + verify(detailsConsumer, never()).addOptionalLabel(eq("grpc.lb.backend_service"), any()); } @Test - public void pick_noResult_addsClusterLabel() { + public void pick_noResult_doesNotAddClusterLabel() { LoadBalancerProvider weightedTargetProvider = new WeightedTargetLoadBalancerProvider(); WeightedTargetConfig weightedTargetConfig = buildWeightedTargetConfig(ImmutableMap.of(locality, 10)); @@ -313,7 +316,7 @@ public void pick_noResult_addsClusterLabel() { TestMethodDescriptors.voidMethod(), new Metadata(), CallOptions.DEFAULT, detailsConsumer); PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs); assertThat(result.getStatus().isOk()).isTrue(); - verify(detailsConsumer).addOptionalLabel("grpc.lb.backend_service", CLUSTER); + verify(detailsConsumer, never()).addOptionalLabel(eq("grpc.lb.backend_service"), any()); } @Test diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java index a508da34f88..388af1a643e 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java @@ -63,6 +63,7 @@ import io.grpc.InternalEquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickDetailsConsumer; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.ResolvedAddresses; @@ -1064,7 +1065,9 @@ private FakeNameResolver assertResolverCreated(String uriPath) { private static void assertPicker(SubchannelPicker picker, Status expectedStatus, @Nullable Subchannel expectedSubchannel) { - PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class)); + PickSubchannelArgs args = mock(PickSubchannelArgs.class); + when(args.getPickDetailsConsumer()).thenReturn(mock(PickDetailsConsumer.class)); + PickResult result = picker.pickSubchannel(args); Status actualStatus = result.getStatus(); assertThat(actualStatus.getCode()).isEqualTo(expectedStatus.getCode()); assertThat(actualStatus.getDescription()).isEqualTo(expectedStatus.getDescription());