in pkg/resolvers/endpoints.go [393:450]
func (r *defaultEndpointsResolver) getMatchingServiceClusterIPs(ctx context.Context, ls *metav1.LabelSelector, namespace string,
ports []networking.NetworkPolicyPort) []policyinfo.EndpointInfo {
var networkPeers []policyinfo.EndpointInfo
if ls == nil {
ls = &metav1.LabelSelector{}
}
svcSelector, err := metav1.LabelSelectorAsSelector(ls)
if err != nil {
r.logger.Info("Unable to get pod selector", "err", err)
return nil
}
svcList := &corev1.ServiceList{}
if err := r.k8sClient.List(ctx, svcList, &client.ListOptions{
Namespace: namespace,
}); err != nil {
r.logger.Info("Unable to list services", "err", err)
return nil
}
for _, svc := range svcList.Items {
// do not add headless services to policy endpoints
if k8s.IsServiceHeadless(&svc) {
r.logger.Info("skipping headless service when populating EndpointInfo", "serviceName", svc.Name, "serviceNamespace", svc.Namespace)
continue
}
// do not add services if their pod selector is not matching with the pod selector defined in the network policy
if !svcSelector.Matches(labels.Set(svc.Spec.Selector)) {
r.logger.Info("skipping pod selector mismatched service when populating EndpointInfo", "serviceName", svc.Name, "serviceNamespace", svc.Namespace, "expectedPS", svcSelector)
continue
}
var portList []policyinfo.Port
for _, port := range ports {
var portPtr *int32
if port.Port != nil {
portVal, err := r.getMatchingServicePort(ctx, &svc, port.Port, *port.Protocol)
if err != nil {
r.logger.Info("Unable to lookup service port", "err", err)
continue
}
portPtr = &portVal
}
portList = append(portList, policyinfo.Port{
Protocol: port.Protocol,
Port: portPtr,
EndPort: port.EndPort,
})
}
if len(ports) != len(portList) && len(portList) == 0 {
r.logger.Info("Couldn't find matching port for the service", "service", k8s.NamespacedName(&svc))
continue
}
networkPeers = append(networkPeers, policyinfo.EndpointInfo{
CIDR: policyinfo.NetworkAddress(svc.Spec.ClusterIP),
Ports: portList,
})
}
return networkPeers
}