func()

in xds/balancer/cdsbalancer/cdsbalancer.go [292:378]


func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
	if err := update.err; err != nil {
		b.logger.Warnf("Watch error from xds-client %p: %v", b.xdsClient, err)
		b.handleErrorFromUpdate(err, false)
		return
	}

	b.logger.Infof("Watch update from xds-client %p, content: %+v, security config: %v", b.xdsClient, pretty.ToJSON(update.updates), pretty.ToJSON(update.securityCfg))

	// Process the security config from the received update before building the
	// child policy or forwarding the update to it. We do this because the child
	// policy may try to create a new subConn inline. Processing the security
	// configuration here and setting up the handshakeInfo will make sure that
	// such attempts are handled properly.
	if err := b.handleSecurityConfig(update.securityCfg); err != nil {
		// If the security config is invalid, for example, if the provider
		// instance is not found in the bootstrap config, we need to put the
		// channel in transient failure.
		b.logger.Warnf("Invalid security config update from xds-client %p: %v", b.xdsClient, err)
		b.handleErrorFromUpdate(err, false)
		return
	}

	// The first good update from the watch API leads to the instantiation of an
	// cluster_resolver balancer. Further updates/errors are propagated to the existing
	// cluster_resolver balancer.
	if b.childLB == nil {
		childLB, err := newChildBalancer(b.ccw, b.bOpts)
		if err != nil {
			b.logger.Errorf("Failed to create child policy of type %s, %v", clusterresolver.Name, err)
			return
		}
		b.childLB = childLB
		b.logger.Infof("Created child policy %p of type %s", b.childLB, clusterresolver.Name)
	}

	dms := make([]clusterresolver.DiscoveryMechanism, len(update.updates))
	for i, cu := range update.updates {
		switch cu.ClusterType {
		case resource.ClusterTypeEDS:
			dms[i] = clusterresolver.DiscoveryMechanism{
				Type:                  clusterresolver.DiscoveryMechanismTypeEDS,
				Cluster:               cu.ClusterName,
				EDSServiceName:        cu.EDSServiceName,
				MaxConcurrentRequests: cu.MaxRequests,
			}
			if cu.EnableLRS {
				// An empty string here indicates that the cluster_resolver balancer should use the
				// same xDS server for load reporting as it does for EDS
				// requests/responses.
				dms[i].LoadReportingServerName = new(string)

			}
		case resource.ClusterTypeLogicalDNS:
			dms[i] = clusterresolver.DiscoveryMechanism{
				Type:        clusterresolver.DiscoveryMechanismTypeLogicalDNS,
				DNSHostname: cu.DNSHostName,
			}
		default:
			b.logger.Infof("unexpected cluster type %v when handling update from cluster handler", cu.ClusterType)
		}
	}
	lbCfg := &clusterresolver.LBConfig{
		DiscoveryMechanisms: dms,
	}

	// lbPolicy is set only when the policy is ringhash. The default (when it's
	// not set) is roundrobin. And similarly, we only need to set XDSLBPolicy
	// for ringhash (it also defaults to roundrobin).
	if lbp := update.lbPolicy; lbp != nil {
		lbCfg.XDSLBPolicy = &internalserviceconfig.BalancerConfig{
			Name: ringhash.Name,
			Config: &ringhash.LBConfig{
				MinRingSize: lbp.MinimumRingSize,
				MaxRingSize: lbp.MaximumRingSize,
			},
		}
	}

	ccState := balancer.ClientConnState{
		ResolverState:  client.SetClient(resolver.State{}, b.xdsClient),
		BalancerConfig: lbCfg,
	}
	if err := b.childLB.UpdateClientConnState(ccState); err != nil {
		b.logger.Errorf("xds: cluster_resolver balancer.UpdateClientConnState(%+v) returned error: %v", ccState, err)
	}
}