Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 46 additions & 2 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,19 @@
import com.google.common.primitives.UnsignedInts;
import com.google.errorprone.annotations.CheckReturnValue;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.HttpConnectProxiedSocketAddress;
import io.grpc.InternalEquivalentAddressGroup;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.StatusOr;
import io.grpc.internal.GrpcUtil;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig;
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
Expand Down Expand Up @@ -83,15 +86,17 @@ final class CdsLoadBalancer2 extends LoadBalancer {
private final Helper helper;
private final LoadBalancerRegistry lbRegistry;
private final ClusterState clusterState = new ClusterState();
private final CdsLbHelper cdsLbHelper = new CdsLbHelper();
private GracefulSwitchLoadBalancer delegate;
private boolean addBackendServicePickDetailsLabel;
// Following fields are effectively final.
private String clusterName;
private Subscription clusterSubscription;

CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
this.helper = checkNotNull(helper, "helper");
this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
this.delegate = new GracefulSwitchLoadBalancer(helper);
this.delegate = new GracefulSwitchLoadBalancer(cdsLbHelper);
logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
logger.log(XdsLogLevel.INFO, "Created");
}
Expand Down Expand Up @@ -126,6 +131,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
XdsClusterConfig clusterConfig = clusterConfigOr.getValue();

if (clusterConfig.getChildren() instanceof EndpointConfig) {
addBackendServicePickDetailsLabel = true;
StatusOr<EdsUpdate> edsUpdate = getEdsUpdate(xdsConfig, clusterName);
StatusOr<ClusterResolutionResult> statusOrResult = clusterState.edsUpdateToResult(
clusterName,
Expand Down Expand Up @@ -156,8 +162,12 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
resolvedAddresses.toBuilder()
.setLoadBalancingPolicyConfig(gracefulConfig)
.setAddresses(Collections.unmodifiableList(addresses))
.setAttributes(resolvedAddresses.getAttributes().toBuilder()
.set(NameResolver.ATTR_BACKEND_SERVICE, clusterName)
.build())
.build());
} else if (clusterConfig.getChildren() instanceof AggregateConfig) {
addBackendServicePickDetailsLabel = false;
Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
List<String> leafClusters = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames();
for (String childCluster: leafClusters) {
Expand Down Expand Up @@ -196,7 +206,8 @@ public void handleNameResolutionError(Status error) {
public void shutdown() {
logger.log(XdsLogLevel.INFO, "Shutdown");
delegate.shutdown();
delegate = new GracefulSwitchLoadBalancer(helper);
delegate = new GracefulSwitchLoadBalancer(cdsLbHelper);
addBackendServicePickDetailsLabel = false;
if (clusterSubscription != null) {
clusterSubscription.close();
clusterSubscription = null;
Expand All @@ -206,6 +217,7 @@ public void shutdown() {
@CheckReturnValue // don't forget to return up the stack after the fail call
private Status fail(Status error) {
delegate.shutdown();
addBackendServicePickDetailsLabel = false;
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
return Status.OK; // XdsNameResolver isn't a polling NR, so this value doesn't matter
Expand All @@ -215,6 +227,38 @@ private String errorPrefix() {
return "CdsLb for " + clusterName + ": ";
}

private final class CdsLbHelper extends ForwardingLoadBalancerHelper {
@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
if (addBackendServicePickDetailsLabel) {
newPicker = new BackendServiceMetricLabelSubchannelPicker(newPicker, clusterName);
}
delegate().updateBalancingState(newState, newPicker);
}

@Override
protected Helper delegate() {
return helper;
}
}

private static final class BackendServiceMetricLabelSubchannelPicker extends SubchannelPicker {
private final SubchannelPicker delegate;
private final String backendService;

private BackendServiceMetricLabelSubchannelPicker(
SubchannelPicker delegate, String backendService) {
this.delegate = checkNotNull(delegate, "delegate");
this.backendService = checkNotNull(backendService, "backendService");
}

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.backend_service", backendService);
return delegate.pickSubchannel(args);
}
}

/**
* The number of bits assigned to the fractional part of fixed-point values. We normalize weights
* to a fixed-point number between 0 and 1, representing that item's proportion of traffic (1 ==
Expand Down
5 changes: 0 additions & 5 deletions xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.Metadata;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.internal.ForwardingClientStreamTracer;
import io.grpc.internal.GrpcUtil;
Expand Down Expand Up @@ -154,9 +153,6 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {

return childSwitchLb.acceptResolvedAddresses(
resolvedAddresses.toBuilder()
.setAttributes(attributes.toBuilder()
.set(NameResolver.ATTR_BACKEND_SERVICE, cluster)
.build())
.setLoadBalancingPolicyConfig(config.childConfig)
.build());
}
Expand Down Expand Up @@ -409,7 +405,6 @@ private RequestLimitingSubchannelPicker(SubchannelPicker delegate,
public PickResult pickSubchannel(PickSubchannelArgs args) {
args.getCallOptions().getOption(ClusterImplLoadBalancerProvider.FILTER_METADATA_CONSUMER)
.accept(filterMetadata);
args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.backend_service", cluster);
for (DropOverload dropOverload : dropPolicies) {
int rand = random.nextInt(1_000_000);
if (rand < dropOverload.dropsPerMillion()) {
Expand Down
87 changes: 82 additions & 5 deletions xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import io.grpc.ConnectivityState;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickDetailsConsumer;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
Expand Down Expand Up @@ -296,6 +297,24 @@ public void nonAggregateCluster_resourceUpdate() {
assertThat(childLbConfig.maxConcurrentRequests).isEqualTo(200L);
}

@Test
public void nonAggregateCluster_addsBackendServiceAttributeAndPickDetailsLabel() {
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, EDS_CLUSTER));
startXdsDepManager();

FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isEqualTo(CLUSTER);
childBalancer.deliverSubchannelState(PickResult.withNoResult(), ConnectivityState.READY);

verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
PickDetailsConsumer detailsConsumer = mock(PickDetailsConsumer.class);
PickResult result =
pickerCaptor.getValue().pickSubchannel(newPickSubchannelArgs(detailsConsumer));

assertThat(result.getStatus().isOk()).isTrue();
verify(detailsConsumer).addOptionalLabel("grpc.lb.backend_service", CLUSTER);
}

@Test
public void nonAggregateCluster_resourceRevoked() {
lbRegistry.register(new PriorityLoadBalancerProvider());
Expand Down Expand Up @@ -429,6 +448,35 @@ public void discoverAggregateCluster_createsPriorityLbPolicy() {
.isEqualTo("cds_experimental");
}

@Test
public void aggregateCluster_doesNotAddBackendServiceAttributeOrPickDetailsLabelFromRoot() {
String cluster1 = "cluster-01.googleapis.com";
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(
// CLUSTER (aggr.) -> [cluster1 (EDS)]
CLUSTER, Cluster.newBuilder()
.setName(CLUSTER)
.setClusterType(Cluster.CustomClusterType.newBuilder()
.setName("envoy.clusters.aggregate")
.setTypedConfig(Any.pack(ClusterConfig.newBuilder()
.addClusters(cluster1)
.build())))
.build(),
cluster1, EDS_CLUSTER.toBuilder().setName(cluster1).build()));
startXdsDepManager();

FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isNull();
childBalancer.deliverSubchannelState(PickResult.withNoResult(), ConnectivityState.READY);

verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
PickDetailsConsumer detailsConsumer = mock(PickDetailsConsumer.class);
PickResult result =
pickerCaptor.getValue().pickSubchannel(newPickSubchannelArgs(detailsConsumer));

assertThat(result.getStatus().isOk()).isTrue();
verify(detailsConsumer, never()).addOptionalLabel(eq("grpc.lb.backend_service"), any());
}

@Test
// Both priorities will get tried using real priority LB policy.
public void discoverAggregateCluster_testChildCdsLbPolicyParsing() {
Expand Down Expand Up @@ -462,12 +510,16 @@ public void discoverAggregateCluster_testChildCdsLbPolicyParsing() {
.isEqualTo("cluster-01.googleapis.com");
assertThat(cluster1ImplConfig.edsServiceName)
.isEqualTo("backend-service-1.googleapis.com");
assertThat(childBalancers.get(0).attributes.get(NameResolver.ATTR_BACKEND_SERVICE))
.isEqualTo(cluster1);
ClusterImplConfig cluster2ImplConfig =
(ClusterImplConfig) childBalancers.get(1).config;
assertThat(cluster2ImplConfig.cluster)
.isEqualTo("cluster-02.googleapis.com");
assertThat(cluster2ImplConfig.edsServiceName)
.isEqualTo("backend-service-1.googleapis.com");
assertThat(childBalancers.get(1).attributes.get(NameResolver.ATTR_BACKEND_SERVICE))
.isEqualTo(cluster2);
}

@Test
Expand Down Expand Up @@ -577,7 +629,9 @@ public void unknownLbProvider() {
startXdsDepManager();
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
PickResult result =
pickerCaptor.getValue().pickSubchannel(
newPickSubchannelArgs(mock(PickDetailsConsumer.class)));
Status actualStatus = result.getStatus();
assertThat(actualStatus.getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(actualStatus.getDescription()).contains("Invalid LoadBalancingPolicy");
Expand Down Expand Up @@ -605,7 +659,9 @@ public void invalidLbConfig() {
startXdsDepManager();
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
PickResult result =
pickerCaptor.getValue().pickSubchannel(
newPickSubchannelArgs(mock(PickDetailsConsumer.class)));
Status actualStatus = result.getStatus();
assertThat(actualStatus.getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(actualStatus.getDescription()).contains("Invalid 'minRingSize'");
Expand Down Expand Up @@ -639,12 +695,19 @@ private void startXdsDepManager(final CdsConfig cdsConfig) {
}

private static void assertPickerStatus(SubchannelPicker picker, Status expectedStatus) {
PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class));
PickResult result = picker.pickSubchannel(
newPickSubchannelArgs(mock(PickDetailsConsumer.class)));
Status actualStatus = result.getStatus();
assertThat(actualStatus.getCode()).isEqualTo(expectedStatus.getCode());
assertThat(actualStatus.getDescription()).isEqualTo(expectedStatus.getDescription());
}

private static PickSubchannelArgs newPickSubchannelArgs(PickDetailsConsumer pickDetailsConsumer) {
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
when(args.getPickDetailsConsumer()).thenReturn(pickDetailsConsumer);
return args;
}

private final class FakeLoadBalancerProvider extends LoadBalancerProvider {
private final String policyName;
private final LoadBalancerProvider configParsingDelegate;
Expand All @@ -660,7 +723,7 @@ private final class FakeLoadBalancerProvider extends LoadBalancerProvider {

@Override
public LoadBalancer newLoadBalancer(Helper helper) {
FakeLoadBalancer balancer = new FakeLoadBalancer(policyName);
FakeLoadBalancer balancer = new FakeLoadBalancer(policyName, helper);
childBalancers.add(balancer);
return balancer;
}
Expand Down Expand Up @@ -692,17 +755,21 @@ public NameResolver.ConfigOrError parseLoadBalancingPolicyConfig(

private final class FakeLoadBalancer extends LoadBalancer {
private final String name;
private final Helper helper;
private Object config;
private Attributes attributes;
private Status upstreamError;
private boolean shutdown;

FakeLoadBalancer(String name) {
FakeLoadBalancer(String name, Helper helper) {
this.name = name;
this.helper = helper;
}

@Override
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
config = resolvedAddresses.getLoadBalancingPolicyConfig();
attributes = resolvedAddresses.getAttributes();
return Status.OK;
}

Expand All @@ -716,5 +783,15 @@ public void shutdown() {
shutdown = true;
childBalancers.remove(this);
}

void deliverSubchannelState(final PickResult result, ConnectivityState state) {
SubchannelPicker picker = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return result;
}
};
helper.updateBalancingState(state, picker);
}
}
}
11 changes: 7 additions & 4 deletions xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import static com.google.common.truth.Truth.assertWithMessage;
import static io.grpc.xds.ClusterImplLoadBalancer.ATTR_SUBCHANNEL_ADDRESS_NAME;
import static io.grpc.xds.XdsNameResolver.AUTO_HOST_REWRITE_KEY;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -188,7 +191,7 @@ public void handleResolvedAddresses_propagateToChildPolicy() {
assertThat(childBalancer.config).isSameInstanceAs(weightedTargetConfig);
assertThat(childBalancer.attributes.get(io.grpc.xds.XdsAttributes.XDS_CLIENT))
.isSameInstanceAs(xdsClient);
assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isEqualTo(CLUSTER);
assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isNull();
}

/**
Expand Down Expand Up @@ -289,11 +292,11 @@ public void pick_addsOptionalLabels() {
// The value will be determined by the parent policy, so can be different than the value used in
// makeAddress() for the test.
verify(detailsConsumer).addOptionalLabel("grpc.lb.locality", locality.toString());
verify(detailsConsumer).addOptionalLabel("grpc.lb.backend_service", CLUSTER);
verify(detailsConsumer, never()).addOptionalLabel(eq("grpc.lb.backend_service"), any());
}

@Test
public void pick_noResult_addsClusterLabel() {
public void pick_noResult_doesNotAddClusterLabel() {
LoadBalancerProvider weightedTargetProvider = new WeightedTargetLoadBalancerProvider();
WeightedTargetConfig weightedTargetConfig =
buildWeightedTargetConfig(ImmutableMap.of(locality, 10));
Expand All @@ -313,7 +316,7 @@ public void pick_noResult_addsClusterLabel() {
TestMethodDescriptors.voidMethod(), new Metadata(), CallOptions.DEFAULT, detailsConsumer);
PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs);
assertThat(result.getStatus().isOk()).isTrue();
verify(detailsConsumer).addOptionalLabel("grpc.lb.backend_service", CLUSTER);
verify(detailsConsumer, never()).addOptionalLabel(eq("grpc.lb.backend_service"), any());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import io.grpc.InternalEquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickDetailsConsumer;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
Expand Down Expand Up @@ -1064,7 +1065,9 @@ private FakeNameResolver assertResolverCreated(String uriPath) {

private static void assertPicker(SubchannelPicker picker, Status expectedStatus,
@Nullable Subchannel expectedSubchannel) {
PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class));
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
when(args.getPickDetailsConsumer()).thenReturn(mock(PickDetailsConsumer.class));
PickResult result = picker.pickSubchannel(args);
Status actualStatus = result.getStatus();
assertThat(actualStatus.getCode()).isEqualTo(expectedStatus.getCode());
assertThat(actualStatus.getDescription()).isEqualTo(expectedStatus.getDescription());
Expand Down
Loading