in pkg/proxy/winkernel/proxier.go [1088:1597]
func (proxier *Proxier) syncProxyRules() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
// don't sync rules till we've received services and endpoints
if !proxier.isInitialized() {
klog.V(2).InfoS("Not syncing hns until Services and Endpoints have been received from master")
return
}
// Keep track of how long syncs take.
start := time.Now()
defer func() {
metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
klog.V(4).InfoS("Syncing proxy rules complete", "elapsed", time.Since(start))
}()
hnsNetworkName := proxier.network.name
hns := proxier.hns
var gatewayHnsendpoint *endpointInfo
if proxier.forwardHealthCheckVip {
gatewayHnsendpoint, _ = hns.getEndpointByName(proxier.rootHnsEndpointName)
}
prevNetworkID := proxier.network.id
updatedNetwork, err := hns.getNetworkByName(hnsNetworkName)
if updatedNetwork == nil || updatedNetwork.id != prevNetworkID || isNetworkNotFoundError(err) {
klog.InfoS("The HNS network is not present or has changed since the last sync, please check the CNI deployment", "hnsNetworkName", hnsNetworkName)
proxier.cleanupAllPolicies()
if updatedNetwork != nil {
proxier.network = *updatedNetwork
}
return
}
// We assume that if this was called, we really want to sync them,
// even if nothing changed in the meantime. In other words, callers are
// responsible for detecting no-op changes and not calling this function.
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
deletedUDPClusterIPs := serviceUpdateResult.DeletedUDPClusterIPs
// merge stale services gathered from EndpointsMap.Update
for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices {
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
klog.V(2).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
deletedUDPClusterIPs.Insert(svcInfo.ClusterIP().String())
}
}
// Query HNS for endpoints and load balancers
queriedEndpoints, err := hns.getAllEndpointsByNetwork(hnsNetworkName)
if err != nil {
klog.ErrorS(err, "Querying HNS for endpoints failed")
return
}
if queriedEndpoints == nil {
klog.V(4).InfoS("No existing endpoints found in HNS")
queriedEndpoints = make(map[string]*(endpointInfo))
}
queriedLoadBalancers, err := hns.getAllLoadBalancers()
if queriedLoadBalancers == nil {
klog.V(4).InfoS("No existing load balancers found in HNS")
queriedLoadBalancers = make(map[loadBalancerIdentifier]*(loadBalancerInfo))
}
if err != nil {
klog.ErrorS(err, "Querying HNS for load balancers failed")
return
}
if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
if _, ok := queriedEndpoints[proxier.sourceVip]; !ok {
_, err = newSourceVIP(hns, hnsNetworkName, proxier.sourceVip, proxier.hostMac, proxier.nodeIP.String())
if err != nil {
klog.ErrorS(err, "Source Vip endpoint creation failed")
return
}
}
}
klog.V(3).InfoS("Syncing Policies")
// Program HNS by adding corresponding policies for each service.
for svcName, svc := range proxier.svcPortMap {
svcInfo, ok := svc.(*serviceInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
continue
}
if svcInfo.policyApplied {
klog.V(4).InfoS("Policy already applied", "serviceInfo", svcInfo)
continue
}
if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
serviceVipEndpoint := queriedEndpoints[svcInfo.ClusterIP().String()]
if serviceVipEndpoint == nil {
klog.V(4).InfoS("No existing remote endpoint", "IP", svcInfo.ClusterIP())
hnsEndpoint := &endpointInfo{
ip: svcInfo.ClusterIP().String(),
isLocal: false,
macAddress: proxier.hostMac,
providerAddress: proxier.nodeIP.String(),
}
newHnsEndpoint, err := hns.createEndpoint(hnsEndpoint, hnsNetworkName)
if err != nil {
klog.ErrorS(err, "Remote endpoint creation failed for service VIP")
continue
}
newHnsEndpoint.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID)
*newHnsEndpoint.refCount++
svcInfo.remoteEndpoint = newHnsEndpoint
updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints)
}
}
var hnsEndpoints []endpointInfo
var hnsLocalEndpoints []endpointInfo
klog.V(4).InfoS("Applying Policy", "serviceInfo", svcName)
// Create Remote endpoints for every endpoint, corresponding to the service
containsPublicIP := false
containsNodeIP := false
var allEndpointsTerminating, allEndpointsNonServing bool
someEndpointsServing := true
if len(svcInfo.loadBalancerIngressIPs) > 0 {
// Check should be done only if comes under the feature gate or enabled
// The check should be done only if Spec.Type == Loadbalancer.
allEndpointsTerminating = proxier.isAllEndpointsTerminating(svcName, svcInfo.localTrafficDSR)
allEndpointsNonServing = proxier.isAllEndpointsNonServing(svcName, svcInfo.localTrafficDSR)
someEndpointsServing = !allEndpointsNonServing
klog.V(4).InfoS("Terminating status checked for all endpoints", "svcClusterIP", svcInfo.ClusterIP(), "allEndpointsTerminating", allEndpointsTerminating, "allEndpointsNonServing", allEndpointsNonServing, "localTrafficDSR", svcInfo.localTrafficDSR)
} else {
klog.V(4).InfoS("Skipped terminating status check for all endpoints", "svcClusterIP", svcInfo.ClusterIP(), "ingressLBCount", len(svcInfo.loadBalancerIngressIPs))
}
for _, epInfo := range proxier.endpointsMap[svcName] {
ep, ok := epInfo.(*endpointInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast endpointInfo", "serviceName", svcName)
continue
}
if svcInfo.internalTrafficLocal && svcInfo.localTrafficDSR && !ep.IsLocal() {
// No need to use or create remote endpoint when internal and external traffic policy is remote
klog.V(3).InfoS("Skipping the endpoint. Both internalTraffic and external traffic policies are local", "EpIP", ep.ip, " EpPort", ep.port)
continue
}
if someEndpointsServing {
if !allEndpointsTerminating && !ep.IsReady() {
klog.V(3).InfoS("Skipping the endpoint for LB creation. Endpoint is either not ready or all not all endpoints are terminating", "EpIP", ep.ip, " EpPort", ep.port, "allEndpointsTerminating", allEndpointsTerminating, "IsEpReady", ep.IsReady())
continue
}
if !ep.IsServing() {
klog.V(3).InfoS("Skipping the endpoint for LB creation. Endpoint is not serving", "EpIP", ep.ip, " EpPort", ep.port, "IsEpServing", ep.IsServing())
continue
}
}
var newHnsEndpoint *endpointInfo
hnsNetworkName := proxier.network.name
var err error
// targetPort is zero if it is specified as a name in port.TargetPort, so the real port should be got from endpoints.
// Note that hcsshim.AddLoadBalancer() doesn't support endpoints with different ports, so only port from first endpoint is used.
// TODO(feiskyer): add support of different endpoint ports after hcsshim.AddLoadBalancer() add that.
if svcInfo.targetPort == 0 {
svcInfo.targetPort = int(ep.port)
}
// There is a bug in Windows Server 2019 that can cause two endpoints to be created with the same IP address, so we need to check using endpoint ID first.
// TODO: Remove lookup by endpoint ID, and use the IP address only, so we don't need to maintain multiple keys for lookup.
if len(ep.hnsID) > 0 {
newHnsEndpoint = queriedEndpoints[ep.hnsID]
}
if newHnsEndpoint == nil {
// First check if an endpoint resource exists for this IP, on the current host
// A Local endpoint could exist here already
// A remote endpoint was already created and proxy was restarted
newHnsEndpoint = queriedEndpoints[ep.IP()]
}
if newHnsEndpoint == nil {
if ep.IsLocal() {
klog.ErrorS(err, "Local endpoint not found: on network", "ip", ep.IP(), "hnsNetworkName", hnsNetworkName)
continue
}
if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
klog.InfoS("Updating network to check for new remote subnet policies", "networkName", proxier.network.name)
networkName := proxier.network.name
updatedNetwork, err := hns.getNetworkByName(networkName)
if err != nil {
klog.ErrorS(err, "Unable to find HNS Network specified, please check network name and CNI deployment", "hnsNetworkName", hnsNetworkName)
proxier.cleanupAllPolicies()
return
}
proxier.network = *updatedNetwork
providerAddress := proxier.network.findRemoteSubnetProviderAddress(ep.IP())
if len(providerAddress) == 0 {
klog.InfoS("Could not find provider address, assuming it is a public IP", "IP", ep.IP())
providerAddress = proxier.nodeIP.String()
}
hnsEndpoint := &endpointInfo{
ip: ep.ip,
isLocal: false,
macAddress: conjureMac("02-11", netutils.ParseIPSloppy(ep.ip)),
providerAddress: providerAddress,
}
newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
if err != nil {
klog.ErrorS(err, "Remote endpoint creation failed", "endpointInfo", hnsEndpoint)
continue
}
updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints)
} else {
hnsEndpoint := &endpointInfo{
ip: ep.ip,
isLocal: false,
macAddress: ep.macAddress,
}
newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
if err != nil {
klog.ErrorS(err, "Remote endpoint creation failed")
continue
}
updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints)
}
}
// For Overlay networks 'SourceVIP' on an Load balancer Policy can either be chosen as
// a) Source VIP configured on kube-proxy (or)
// b) Node IP of the current node
//
// For L2Bridge network the Source VIP is always the NodeIP of the current node and the same
// would be configured on kube-proxy as SourceVIP
//
// The logic for choosing the SourceVIP in Overlay networks is based on the backend endpoints:
// a) Endpoints are any IP's outside the cluster ==> Choose NodeIP as the SourceVIP
// b) Endpoints are IP addresses of a remote node => Choose NodeIP as the SourceVIP
// c) Everything else (Local POD's, Remote POD's, Node IP of current node) ==> Choose the configured SourceVIP
if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) && !ep.IsLocal() {
providerAddress := proxier.network.findRemoteSubnetProviderAddress(ep.IP())
isNodeIP := (ep.IP() == providerAddress)
isPublicIP := (len(providerAddress) == 0)
klog.InfoS("Endpoint on overlay network", "ip", ep.IP(), "hnsNetworkName", hnsNetworkName, "isNodeIP", isNodeIP, "isPublicIP", isPublicIP)
containsNodeIP = containsNodeIP || isNodeIP
containsPublicIP = containsPublicIP || isPublicIP
}
// Save the hnsId for reference
klog.V(1).InfoS("Hns endpoint resource", "endpointInfo", newHnsEndpoint)
hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint)
if newHnsEndpoint.IsLocal() {
hnsLocalEndpoints = append(hnsLocalEndpoints, *newHnsEndpoint)
} else {
// We only share the refCounts for remote endpoints
ep.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID)
*ep.refCount++
}
ep.hnsID = newHnsEndpoint.hnsID
klog.V(3).InfoS("Endpoint resource found", "endpointInfo", ep)
}
klog.V(3).InfoS("Associated endpoints for service", "endpointInfo", hnsEndpoints, "serviceName", svcName)
if len(svcInfo.hnsID) > 0 {
// This should not happen
klog.InfoS("Load Balancer already exists -- Debug ", "hnsID", svcInfo.hnsID)
}
// In ETP:Cluster, if all endpoints are under termination,
// it will have serving and terminating, else only ready and serving
if len(hnsEndpoints) == 0 {
if svcInfo.winProxyOptimization {
// Deleting loadbalancers when there are no endpoints to serve.
klog.V(3).InfoS("Cleanup existing ", "endpointInfo", hnsEndpoints, "serviceName", svcName)
svcInfo.deleteLoadBalancerPolicy(proxier.mapStaleLoadbalancers)
}
klog.ErrorS(nil, "Endpoint information not available for service, not applying any policy", "serviceName", svcName)
continue
}
klog.V(4).InfoS("Trying to apply Policies for service", "serviceInfo", svcInfo)
var hnsLoadBalancer *loadBalancerInfo
var sourceVip = proxier.sourceVip
if containsPublicIP || containsNodeIP {
sourceVip = proxier.nodeIP.String()
}
sessionAffinityClientIP := svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP
if sessionAffinityClientIP && !proxier.supportedFeatures.SessionAffinity {
klog.InfoS("Session Affinity is not supported on this version of Windows")
}
endpointsAvailableForLB := !allEndpointsTerminating && !allEndpointsNonServing
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), hnsEndpoints, queriedLoadBalancers)
// clusterIPEndpoints is the endpoint list used for creating ClusterIP loadbalancer.
clusterIPEndpoints := hnsEndpoints
if svcInfo.internalTrafficLocal {
// Take local endpoints for clusterip loadbalancer when internal traffic policy is local.
clusterIPEndpoints = hnsLocalEndpoints
}
if len(clusterIPEndpoints) > 0 {
// If all endpoints are terminating, then no need to create Cluster IP LoadBalancer
// Cluster IP LoadBalancer creation
hnsLoadBalancer, err := hns.getLoadBalancer(
clusterIPEndpoints,
loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.ipFamily == v1.IPv6Protocol, sessionAffinity: sessionAffinityClientIP},
sourceVip,
svcInfo.ClusterIP().String(),
Enum(svcInfo.Protocol()),
uint16(svcInfo.targetPort),
uint16(svcInfo.Port()),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
continue
}
svcInfo.hnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for cluster ip resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID)
} else {
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for cluster ip resources. Reason : all endpoints are terminating", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating)
}
// If nodePort is specified, user should be able to use nodeIP:nodePort to reach the backend endpoints
if svcInfo.NodePort() > 0 {
// If the preserve-destination service annotation is present, we will disable routing mesh for NodePort.
// This means that health services can use Node Port without falsely getting results from a different node.
nodePortEndpoints := hnsEndpoints
if svcInfo.preserveDIP || svcInfo.localTrafficDSR {
nodePortEndpoints = hnsLocalEndpoints
}
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.nodePorthnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), nodePortEndpoints, queriedLoadBalancers)
if len(nodePortEndpoints) > 0 && endpointsAvailableForLB {
// If all endpoints are in terminating stage, then no need to create Node Port LoadBalancer
hnsLoadBalancer, err := hns.getLoadBalancer(
nodePortEndpoints,
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
sourceVip,
"",
Enum(svcInfo.Protocol()),
uint16(svcInfo.targetPort),
uint16(svcInfo.NodePort()),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
continue
}
svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "hnsID", hnsLoadBalancer.hnsID)
} else {
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating)
}
}
// Create a Load Balancer Policy for each external IP
for _, externalIP := range svcInfo.externalIPs {
// Disable routing mesh if ExternalTrafficPolicy is set to local
externalIPEndpoints := hnsEndpoints
if svcInfo.localTrafficDSR {
externalIPEndpoints = hnsLocalEndpoints
}
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &externalIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), externalIPEndpoints, queriedLoadBalancers)
if len(externalIPEndpoints) > 0 && endpointsAvailableForLB {
// If all endpoints are in terminating stage, then no need to External IP LoadBalancer
// Try loading existing policies, if already available
hnsLoadBalancer, err = hns.getLoadBalancer(
externalIPEndpoints,
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
sourceVip,
externalIP.ip,
Enum(svcInfo.Protocol()),
uint16(svcInfo.targetPort),
uint16(svcInfo.Port()),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
continue
}
externalIP.hnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for externalIP resources", "externalIP", externalIP, "hnsID", hnsLoadBalancer.hnsID)
} else {
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for externalIP resources", "externalIP", externalIP, "allEndpointsTerminating", allEndpointsTerminating)
}
}
// Create a Load Balancer Policy for each loadbalancer ingress
for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
// Try loading existing policies, if already available
lbIngressEndpoints := hnsEndpoints
if svcInfo.preserveDIP || svcInfo.localTrafficDSR {
lbIngressEndpoints = hnsLocalEndpoints
}
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), lbIngressEndpoints, queriedLoadBalancers)
if len(lbIngressEndpoints) > 0 {
hnsLoadBalancer, err := hns.getLoadBalancer(
lbIngressEndpoints,
loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
sourceVip,
lbIngressIP.ip,
Enum(svcInfo.Protocol()),
uint16(svcInfo.targetPort),
uint16(svcInfo.Port()),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
continue
}
lbIngressIP.hnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)
} else {
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)
}
if proxier.forwardHealthCheckVip && gatewayHnsendpoint != nil && endpointsAvailableForLB {
// Avoid creating health check loadbalancer if all the endpoints are terminating
nodeport := proxier.healthzPort
if svcInfo.HealthCheckNodePort() != 0 {
nodeport = svcInfo.HealthCheckNodePort()
}
proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointInfo{*gatewayHnsendpoint}, queriedLoadBalancers)
hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer(
[]endpointInfo{*gatewayHnsendpoint},
loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP},
sourceVip,
lbIngressIP.ip,
Enum(svcInfo.Protocol()),
uint16(nodeport),
uint16(nodeport),
queriedLoadBalancers,
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
continue
}
lbIngressIP.healthCheckHnsID = hnsHealthCheckLoadBalancer.hnsID
klog.V(3).InfoS("Hns Health Check LoadBalancer resource created for loadBalancer Ingress resources", "ip", lbIngressIP)
} else {
klog.V(3).InfoS("Skipped creating Hns Health Check LoadBalancer for loadBalancer Ingress resources", "ip", lbIngressIP, "allEndpointsTerminating", allEndpointsTerminating)
}
}
svcInfo.policyApplied = true
klog.V(2).InfoS("Policy successfully applied for service", "serviceInfo", svcInfo)
}
if proxier.healthzServer != nil {
proxier.healthzServer.Updated(proxier.ipFamily)
}
metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
// Update service healthchecks. The endpoints list might include services that are
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
// will just drop those endpoints.
if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
klog.ErrorS(err, "Error syncing healthcheck services")
}
if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
klog.ErrorS(err, "Error syncing healthcheck endpoints")
}
// Finish housekeeping.
// TODO: these could be made more consistent.
for _, svcIP := range deletedUDPClusterIPs.UnsortedList() {
// TODO : Check if this is required to cleanup stale services here
klog.V(5).InfoS("Pending delete stale service IP connections", "IP", svcIP)
}
// remove stale endpoint refcount entries
for hnsID, referenceCount := range proxier.endPointsRefCount {
if *referenceCount <= 0 {
klog.V(3).InfoS("Deleting unreferenced remote endpoint", "hnsID", hnsID)
proxier.hns.deleteEndpoint(hnsID)
delete(proxier.endPointsRefCount, hnsID)
}
}
// This will cleanup stale load balancers which are pending delete
// in last iteration
proxier.cleanupStaleLoadbalancers()
}