func New()

in pkg/controller/ingress/reconcile/store/store.go [154:466]


func New(
	namespace string,
	resyncPeriod time.Duration,
	client clientset.Interface,
	updateCh *channels.RingChannel,
	disableCatchAll bool) Storer {

	store := &k8sStore{
		informers:        &Informer{},
		listers:          &Lister{},
		updateCh:         updateCh,
		syncSecretMu:     &sync.Mutex{},
		backendConfigMu:  &sync.RWMutex{},
		secretIngressMap: NewObjectRefMap(),
	}

	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(klog.Infof)
	eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{
		Interface: client.CoreV1().Events(namespace),
	})
	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{
		Component: "alb-ingress-controller",
	})

	store.listers.IngressWithAnnotation.Store = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
	// create informers factory, enable and assign required informers
	infFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
		informers.WithNamespace(namespace),
	)

	store.informers.Ingress = infFactory.Networking().V1().Ingresses().Informer()
	store.listers.Ingress.Store = store.informers.Ingress.GetStore()

	store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
	store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()

	store.informers.Service = infFactory.Core().V1().Services().Informer()
	store.listers.Service.Store = store.informers.Service.GetStore()

	store.informers.Node = infFactory.Core().V1().Nodes().Informer()
	store.listers.Node.Store = store.informers.Node.GetStore()

	store.informers.Pod = infFactory.Core().V1().Pods().Informer()
	store.listers.Pod.Store = store.informers.Pod.GetStore()

	ingDeleteHandler := func(obj interface{}) {
		ing, ok := toIngress(obj)
		if !ok {
			// If we reached here it means the ingress was deleted but its final state is unrecorded.
			tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
			if !ok {
				klog.ErrorS(nil, "Error obtaining object from tombstone", "key", obj)
				return
			}
			ing, ok = tombstone.Obj.(*networking.Ingress)
			if !ok {
				klog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
				return
			}
		}

		if !IsValid(ing) {
			return
		}

		if isCatchAllIngress(ing.Spec) && disableCatchAll {
			klog.InfoS("Ignoring delete for catch-all because of --disable-catch-all", "ingress", klog.KObj(ing))
			return
		}

		//store.listers.IngressWithAnnotation.Delete(ing)

		key := MetaNamespaceKey(ing)
		store.secretIngressMap.Delete(key)

		updateCh.In() <- helper.Event{
			Type: helper.IngressDeleteEvent,
			Obj:  obj,
		}
	}

	ingEventHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			ing, ok := toIngress(obj)
			if !ok {
				return
			}
			if !IsValid(ing) {
				ingressClass, _ := annotations.GetStringAnnotation(IngressKey, ing)
				klog.InfoS("Ignoring ingress", "ingress", klog.KObj(ing), "kubernetes.io/ingress.class", ingressClass, "ingressClassName", pointer.StringPtrDerefOr(ing.Spec.IngressClassName, ""))
				return
			}

			if isCatchAllIngress(ing.Spec) && disableCatchAll {
				klog.InfoS("Ignoring add for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(ing))
				return
			}

			recorder.Eventf(ing, corev1.EventTypeNormal, "Sync", "Scheduled for sync")

			store.syncIngress(ing)

			updateCh.In() <- helper.Event{
				Type: helper.CreateEvent,
				Obj:  obj,
			}
		},
		DeleteFunc: ingDeleteHandler,
		UpdateFunc: func(old, cur interface{}) {
			oldIng, ok := toIngress(old)
			if !ok {
				return
			}
			curIng, ok := toIngress(cur)
			if !ok {
				return
			}
			validOld := IsValid(oldIng)
			validCur := IsValid(curIng)
			if !validOld && validCur {
				if isCatchAllIngress(curIng.Spec) && disableCatchAll {
					klog.InfoS("ignoring update for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(curIng))
					return
				}

				klog.InfoS("creating ingress", "ingress", klog.KObj(curIng), "class", IngressKey)
				recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
			} else if validOld && !validCur {
				klog.InfoS("removing ingress", "ingress", klog.KObj(curIng), "class", IngressKey)
				ingDeleteHandler(old)
				return
			} else if validCur && !reflect.DeepEqual(old, cur) {
				if isCatchAllIngress(curIng.Spec) && disableCatchAll {
					klog.InfoS("ignoring update for catch-all ingress and delete old one because of --disable-catch-all", "ingress", klog.KObj(curIng))
					ingDeleteHandler(old)
					return
				}

				recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
			} else {
				klog.V(3).InfoS("No changes on ingress. Skipping update", "ingress", klog.KObj(curIng))
				return
			}

			store.syncIngress(curIng)

			updateCh.In() <- helper.Event{
				Type: helper.UpdateEvent,
				Obj:  cur,
			}
		},
	}

	epEventHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			ep1 := obj.(*corev1.Endpoints)
			key := MetaNamespaceKey(ep1)
			svc, exist, err := store.listers.Service.GetByKey(key)
			if err != nil {
				klog.Error(err, "get service GetByKey by endpoint failed", "endpoint", ep1)
				return
			}
			if !exist {
				klog.Warningf("epEventHandler %s", key)
				return
			}
			s := svc.(*corev1.Service)

			klog.Info("controller: endpoint add event",
				util.NamespacedName(ep1).String())
			updateCh.In() <- helper.Event{
				Type: helper.EndPointEvent,
				Obj:  s,
			}
		},
		DeleteFunc: func(obj interface{}) {
			ep1 := obj.(*corev1.Endpoints)
			key := MetaNamespaceKey(ep1)
			svc, exist, err := store.listers.Service.GetByKey(key)
			if err != nil {
				klog.Error(err, "DeleteFunc get service GetByKey by endpoint failed", "endpoint", ep1)
				return
			}
			if !exist {
				klog.Warningf("DeleteFunc epEventHandler %s", key)
				return
			}

			s := svc.(*corev1.Service)

			klog.Info("controller: endpoint delete event",
				util.NamespacedName(ep1).String())
			updateCh.In() <- helper.Event{
				Type: helper.EndPointEvent,
				Obj:  s,
			}
		},
		UpdateFunc: func(old, cur interface{}) {
			ep1 := old.(*corev1.Endpoints)
			ep2 := cur.(*corev1.Endpoints)
			if !reflect.DeepEqual(ep1.Subsets, ep2.Subsets) {
				key := MetaNamespaceKey(ep1)
				svc, exist, err := store.listers.Service.GetByKey(key)
				if err != nil {
					klog.Error(err, "UpdateFunc get service GetByKey by endpoint failed", "endpoint", ep1)
					return
				}
				if !exist {
					klog.Warningf("UpdateFunc epEventHandler %s", key)
					return
				}
				s := svc.(*corev1.Service)

				klog.Info("controller: endpoint update event",
					util.NamespacedName(ep1).String())
				updateCh.In() <- helper.Event{
					Type: helper.EndPointEvent,
					Obj:  s,
				}
			}
		},
	}
	podEventHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			err := store.listers.Pod.Add(obj)
			if err != nil {
				klog.Error(err, "Pod Add failed")
				return
			}
		},
		DeleteFunc: func(obj interface{}) {
			store.listers.Pod.Delete(obj)
		},
		UpdateFunc: func(old, cur interface{}) {
		},
	}
	nodeEventHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			serviceList := store.listers.Service.List()
			for _, v := range serviceList {
				svc := v.(*corev1.Service)
				klog.Info("node change: enqueue service", util.Key(svc))
				updateCh.In() <- helper.Event{
					Type: helper.NodeEvent,
					Obj:  svc,
				}
			}
		},
		UpdateFunc: func(old, cur interface{}) {
			nodeOld := old.(*corev1.Node)
			nodeNew := cur.(*corev1.Node)

			if !reflect.DeepEqual(nodeOld.Labels, nodeNew.Labels) {
				serviceList := store.listers.Service.List()
				for _, v := range serviceList {
					svc := v.(*corev1.Service)
					klog.Info("node change: enqueue service", util.Key(svc))
					updateCh.In() <- helper.Event{
						Type: helper.NodeEvent,
						Obj:  svc,
					}
				}
			}
		},

		DeleteFunc: func(obj interface{}) {
			serviceList := store.listers.Service.List()
			for _, v := range serviceList {
				svc := v.(*corev1.Service)
				klog.Info("node change: enqueue service", util.Key(svc))
				updateCh.In() <- helper.Event{
					Type: helper.NodeEvent,
					Obj:  svc,
				}
			}

		},
	}

	serviceHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			curSvc := obj.(*corev1.Service)
			store.enqueueImpactedIngresses(updateCh, curSvc)
		},
		UpdateFunc: func(old, cur interface{}) {
			// update the server group
			oldSvc := old.(*corev1.Service)
			curSvc := cur.(*corev1.Service)

			if reflect.DeepEqual(oldSvc, curSvc) {
				return
			}

			updateCh.In() <- helper.Event{
				Type: helper.ServiceEvent,
				Obj:  cur,
			}
		},
		DeleteFunc: func(obj interface{}) {
			// ingress refer service to delete
			curSvc := obj.(*corev1.Service)
			store.enqueueImpactedIngresses(updateCh, curSvc)
		},
	}

	store.informers.Ingress.AddEventHandler(ingEventHandler)
	store.informers.Endpoint.AddEventHandler(epEventHandler)
	store.informers.Node.AddEventHandler(podEventHandler)
	store.informers.Service.AddEventHandler(serviceHandler)
	store.informers.Node.AddEventHandler(nodeEventHandler)
	return store
}