in xds/balancer/cdsbalancer/cdsbalancer.go [292:378]
func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
if err := update.err; err != nil {
b.logger.Warnf("Watch error from xds-client %p: %v", b.xdsClient, err)
b.handleErrorFromUpdate(err, false)
return
}
b.logger.Infof("Watch update from xds-client %p, content: %+v, security config: %v", b.xdsClient, pretty.ToJSON(update.updates), pretty.ToJSON(update.securityCfg))
// Process the security config from the received update before building the
// child policy or forwarding the update to it. We do this because the child
// policy may try to create a new subConn inline. Processing the security
// configuration here and setting up the handshakeInfo will make sure that
// such attempts are handled properly.
if err := b.handleSecurityConfig(update.securityCfg); err != nil {
// If the security config is invalid, for example, if the provider
// instance is not found in the bootstrap config, we need to put the
// channel in transient failure.
b.logger.Warnf("Invalid security config update from xds-client %p: %v", b.xdsClient, err)
b.handleErrorFromUpdate(err, false)
return
}
// The first good update from the watch API leads to the instantiation of an
// cluster_resolver balancer. Further updates/errors are propagated to the existing
// cluster_resolver balancer.
if b.childLB == nil {
childLB, err := newChildBalancer(b.ccw, b.bOpts)
if err != nil {
b.logger.Errorf("Failed to create child policy of type %s, %v", clusterresolver.Name, err)
return
}
b.childLB = childLB
b.logger.Infof("Created child policy %p of type %s", b.childLB, clusterresolver.Name)
}
dms := make([]clusterresolver.DiscoveryMechanism, len(update.updates))
for i, cu := range update.updates {
switch cu.ClusterType {
case resource.ClusterTypeEDS:
dms[i] = clusterresolver.DiscoveryMechanism{
Type: clusterresolver.DiscoveryMechanismTypeEDS,
Cluster: cu.ClusterName,
EDSServiceName: cu.EDSServiceName,
MaxConcurrentRequests: cu.MaxRequests,
}
if cu.EnableLRS {
// An empty string here indicates that the cluster_resolver balancer should use the
// same xDS server for load reporting as it does for EDS
// requests/responses.
dms[i].LoadReportingServerName = new(string)
}
case resource.ClusterTypeLogicalDNS:
dms[i] = clusterresolver.DiscoveryMechanism{
Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS,
DNSHostname: cu.DNSHostName,
}
default:
b.logger.Infof("unexpected cluster type %v when handling update from cluster handler", cu.ClusterType)
}
}
lbCfg := &clusterresolver.LBConfig{
DiscoveryMechanisms: dms,
}
// lbPolicy is set only when the policy is ringhash. The default (when it's
// not set) is roundrobin. And similarly, we only need to set XDSLBPolicy
// for ringhash (it also defaults to roundrobin).
if lbp := update.lbPolicy; lbp != nil {
lbCfg.XDSLBPolicy = &internalserviceconfig.BalancerConfig{
Name: ringhash.Name,
Config: &ringhash.LBConfig{
MinRingSize: lbp.MinimumRingSize,
MaxRingSize: lbp.MaximumRingSize,
},
}
}
ccState := balancer.ClientConnState{
ResolverState: client.SetClient(resolver.State{}, b.xdsClient),
BalancerConfig: lbCfg,
}
if err := b.childLB.UpdateClientConnState(ccState); err != nil {
b.logger.Errorf("xds: cluster_resolver balancer.UpdateClientConnState(%+v) returned error: %v", ccState, err)
}
}