in pkg/policyendpoints/manager.go [51:103]
func (m *policyEndpointsManager) Reconcile(ctx context.Context, policy *networking.NetworkPolicy) error {
ingressRules, egressRules, podSelectorEndpoints, err := m.endpointsResolver.Resolve(ctx, policy)
if err != nil {
return err
}
policyEndpointList := &policyinfo.PolicyEndpointList{}
if err := m.k8sClient.List(ctx, policyEndpointList,
client.InNamespace(policy.Namespace),
client.MatchingFields{IndexKeyPolicyReferenceName: policy.Name}); err != nil {
return err
}
existingPolicyEndpoints := make([]policyinfo.PolicyEndpoint, 0, len(policyEndpointList.Items))
for _, policyEndpoint := range policyEndpointList.Items {
existingPolicyEndpoints = append(existingPolicyEndpoints, policyEndpoint)
}
createList, updateList, deleteList, err := m.computePolicyEndpoints(policy, existingPolicyEndpoints, ingressRules, egressRules, podSelectorEndpoints)
if err != nil {
return err
}
m.logger.Info("Got policy endpoints lists", "create", len(createList), "update", len(updateList), "delete", len(deleteList))
for _, policyEndpoint := range createList {
if err := m.k8sClient.Create(ctx, &policyEndpoint); err != nil {
return err
}
m.logger.Info("Created policy endpoint", "id", k8s.NamespacedName(&policyEndpoint))
}
for _, policyEndpoint := range updateList {
oldRes := &policyinfo.PolicyEndpoint{}
if err := m.k8sClient.Get(ctx, k8s.NamespacedName(&policyEndpoint), oldRes); err != nil {
return err
}
if equality.Semantic.DeepEqual(oldRes.Spec, policyEndpoint.Spec) {
m.logger.V(1).Info("Policy endpoint already up to date", "id", k8s.NamespacedName(&policyEndpoint))
continue
}
if err := m.k8sClient.Patch(ctx, &policyEndpoint, client.MergeFrom(oldRes)); err != nil {
return err
}
m.logger.Info("Updated policy endpoint", "id", k8s.NamespacedName(&policyEndpoint))
}
for _, policyEndpoint := range deleteList {
if err := m.k8sClient.Delete(ctx, &policyEndpoint); err != nil {
return err
}
m.logger.Info("Deleted policy endpoint", "id", k8s.NamespacedName(&policyEndpoint))
}
return nil
}