in internal/controllers/policy_controller.go [91:120]
func (r *policyReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
if err := r.setupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
return err
}
policyEventChan := make(chan event.GenericEvent)
policyEventHandler := eventhandlers.NewEnqueueRequestForPolicyEvent(r.policyTracker, r.podUpdateBatchPeriodDuration,
r.logger.WithName("eventHandler").WithName("policy"))
podEventHandler := eventhandlers.NewEnqueueRequestForPodEvent(policyEventChan, r.k8sClient, r.policyResolver,
r.logger.WithName("eventHandler").WithName("pod"))
nsEventHandler := eventhandlers.NewEnqueueRequestForNamespaceEvent(policyEventChan, r.k8sClient, r.policyResolver,
r.logger.WithName("eventHandler").WithName("namespace"))
svcEventHandler := eventhandlers.NewEnqueueRequestForServiceEvent(policyEventChan, r.k8sClient, r.policyResolver,
r.logger.WithName("eventHandler").WithName("service"))
if err := mgr.AddHealthzCheck("policy-controller", healthz.Ping); err != nil {
r.logger.Error(err, "Failed to setup the policy controller healthz check")
return err
}
return ctrl.NewControllerManagedBy(mgr).
Named(controllerName).
Watches(&networking.NetworkPolicy{}, policyEventHandler).
Watches(&corev1.Pod{}, podEventHandler).
Watches(&corev1.Namespace{}, nsEventHandler).
Watches(&corev1.Service{}, svcEventHandler).
WatchesRawSource(source.Channel(policyEventChan, policyEventHandler)).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.maxConcurrentReconciles,
}).Complete(r)
}