func()

in kubernetes/controllers/elasticjob_controller.go [105:163]


func (r *ElasticJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
	// setup FieldIndexer to inform the manager that this controller owns pods and services,
	// so that it will automatically call Reconcile on the underlying ElasticJob when a Pod or Service changes, is deleted, etc.
	if err := mgr.GetFieldIndexer().IndexField(&corev1.Pod{}, jobOwnerKey, func(rawObj runtime.Object) []string {
		pod := rawObj.(*corev1.Pod)
		owner := metav1.GetControllerOf(pod)
		if owner == nil {
			return nil
		}

		// Make sure owner is ElasticJob Controller.
		if owner.APIVersion != r.GetAPIGroupVersion().Version || owner.Kind != r.GetAPIGroupVersionKind().Kind {
			return nil
		}

		return []string{owner.Name}
	}); err != nil {
		return err
	}

	if err := mgr.GetFieldIndexer().IndexField(&corev1.Service{}, jobOwnerKey, func(rawObj runtime.Object) []string {
		svc := rawObj.(*corev1.Service)
		owner := metav1.GetControllerOf(svc)
		if owner == nil {
			return nil
		}

		if owner.APIVersion != r.GetAPIGroupVersion().Version || owner.Kind != r.GetAPIGroupVersionKind().Kind {
			return nil
		}

		return []string{owner.Name}
	}); err != nil {
		return err
	}

	// Setup ElasticJobReconciler
	r.Client = mgr.GetClient()
	r.Scheme = mgr.GetScheme()

	// Create k8s clients to list pods and service objects
	kubeClientSet := kubernetes.NewForConfigOrDie(mgr.GetConfig())

	r.jobController = common.JobController{
		Controller:    r,
		Config:        common.JobControllerConfiguration{EnableGangScheduling: false},
		Expectations:  k8scontroller.NewControllerExpectations(),
		WorkQueue:     workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), r.ControllerName()),
		Recorder:      mgr.GetEventRecorderFor(r.ControllerName()),
		KubeClientSet: kubeClientSet,
	}

	return ctrl.NewControllerManagedBy(mgr).
		For(&v1alpha1.ElasticJob{}).
		Owns(&corev1.Pod{}).
		Owns(&corev1.Service{}).
		WithEventFilter(predicate.Funcs{CreateFunc: onDependentCreateFunc(r), DeleteFunc: onDependentDeleteFunc(r)}).
		Complete(r)
}