in kubernetes/controllers/elasticjob_controller.go [105:163]
func (r *ElasticJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
// setup FieldIndexer to inform the manager that this controller owns pods and services,
// so that it will automatically call Reconcile on the underlying ElasticJob when a Pod or Service changes, is deleted, etc.
if err := mgr.GetFieldIndexer().IndexField(&corev1.Pod{}, jobOwnerKey, func(rawObj runtime.Object) []string {
pod := rawObj.(*corev1.Pod)
owner := metav1.GetControllerOf(pod)
if owner == nil {
return nil
}
// Make sure owner is ElasticJob Controller.
if owner.APIVersion != r.GetAPIGroupVersion().Version || owner.Kind != r.GetAPIGroupVersionKind().Kind {
return nil
}
return []string{owner.Name}
}); err != nil {
return err
}
if err := mgr.GetFieldIndexer().IndexField(&corev1.Service{}, jobOwnerKey, func(rawObj runtime.Object) []string {
svc := rawObj.(*corev1.Service)
owner := metav1.GetControllerOf(svc)
if owner == nil {
return nil
}
if owner.APIVersion != r.GetAPIGroupVersion().Version || owner.Kind != r.GetAPIGroupVersionKind().Kind {
return nil
}
return []string{owner.Name}
}); err != nil {
return err
}
// Setup ElasticJobReconciler
r.Client = mgr.GetClient()
r.Scheme = mgr.GetScheme()
// Create k8s clients to list pods and service objects
kubeClientSet := kubernetes.NewForConfigOrDie(mgr.GetConfig())
r.jobController = common.JobController{
Controller: r,
Config: common.JobControllerConfiguration{EnableGangScheduling: false},
Expectations: k8scontroller.NewControllerExpectations(),
WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), r.ControllerName()),
Recorder: mgr.GetEventRecorderFor(r.ControllerName()),
KubeClientSet: kubeClientSet,
}
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.ElasticJob{}).
Owns(&corev1.Pod{}).
Owns(&corev1.Service{}).
WithEventFilter(predicate.Funcs{CreateFunc: onDependentCreateFunc(r), DeleteFunc: onDependentDeleteFunc(r)}).
Complete(r)
}