in pkg/controller/ingress/reconcile/store/store.go [154:466]
func New(
namespace string,
resyncPeriod time.Duration,
client clientset.Interface,
updateCh *channels.RingChannel,
disableCatchAll bool) Storer {
store := &k8sStore{
informers: &Informer{},
listers: &Lister{},
updateCh: updateCh,
syncSecretMu: &sync.Mutex{},
backendConfigMu: &sync.RWMutex{},
secretIngressMap: NewObjectRefMap(),
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{
Interface: client.CoreV1().Events(namespace),
})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{
Component: "alb-ingress-controller",
})
store.listers.IngressWithAnnotation.Store = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
// create informers factory, enable and assign required informers
infFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
informers.WithNamespace(namespace),
)
store.informers.Ingress = infFactory.Networking().V1().Ingresses().Informer()
store.listers.Ingress.Store = store.informers.Ingress.GetStore()
store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()
store.informers.Service = infFactory.Core().V1().Services().Informer()
store.listers.Service.Store = store.informers.Service.GetStore()
store.informers.Node = infFactory.Core().V1().Nodes().Informer()
store.listers.Node.Store = store.informers.Node.GetStore()
store.informers.Pod = infFactory.Core().V1().Pods().Informer()
store.listers.Pod.Store = store.informers.Pod.GetStore()
ingDeleteHandler := func(obj interface{}) {
ing, ok := toIngress(obj)
if !ok {
// If we reached here it means the ingress was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.ErrorS(nil, "Error obtaining object from tombstone", "key", obj)
return
}
ing, ok = tombstone.Obj.(*networking.Ingress)
if !ok {
klog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
return
}
}
if !IsValid(ing) {
return
}
if isCatchAllIngress(ing.Spec) && disableCatchAll {
klog.InfoS("Ignoring delete for catch-all because of --disable-catch-all", "ingress", klog.KObj(ing))
return
}
//store.listers.IngressWithAnnotation.Delete(ing)
key := MetaNamespaceKey(ing)
store.secretIngressMap.Delete(key)
updateCh.In() <- helper.Event{
Type: helper.IngressDeleteEvent,
Obj: obj,
}
}
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ing, ok := toIngress(obj)
if !ok {
return
}
if !IsValid(ing) {
ingressClass, _ := annotations.GetStringAnnotation(IngressKey, ing)
klog.InfoS("Ignoring ingress", "ingress", klog.KObj(ing), "kubernetes.io/ingress.class", ingressClass, "ingressClassName", pointer.StringPtrDerefOr(ing.Spec.IngressClassName, ""))
return
}
if isCatchAllIngress(ing.Spec) && disableCatchAll {
klog.InfoS("Ignoring add for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(ing))
return
}
recorder.Eventf(ing, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
store.syncIngress(ing)
updateCh.In() <- helper.Event{
Type: helper.CreateEvent,
Obj: obj,
}
},
DeleteFunc: ingDeleteHandler,
UpdateFunc: func(old, cur interface{}) {
oldIng, ok := toIngress(old)
if !ok {
return
}
curIng, ok := toIngress(cur)
if !ok {
return
}
validOld := IsValid(oldIng)
validCur := IsValid(curIng)
if !validOld && validCur {
if isCatchAllIngress(curIng.Spec) && disableCatchAll {
klog.InfoS("ignoring update for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(curIng))
return
}
klog.InfoS("creating ingress", "ingress", klog.KObj(curIng), "class", IngressKey)
recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
} else if validOld && !validCur {
klog.InfoS("removing ingress", "ingress", klog.KObj(curIng), "class", IngressKey)
ingDeleteHandler(old)
return
} else if validCur && !reflect.DeepEqual(old, cur) {
if isCatchAllIngress(curIng.Spec) && disableCatchAll {
klog.InfoS("ignoring update for catch-all ingress and delete old one because of --disable-catch-all", "ingress", klog.KObj(curIng))
ingDeleteHandler(old)
return
}
recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
} else {
klog.V(3).InfoS("No changes on ingress. Skipping update", "ingress", klog.KObj(curIng))
return
}
store.syncIngress(curIng)
updateCh.In() <- helper.Event{
Type: helper.UpdateEvent,
Obj: cur,
}
},
}
epEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ep1 := obj.(*corev1.Endpoints)
key := MetaNamespaceKey(ep1)
svc, exist, err := store.listers.Service.GetByKey(key)
if err != nil {
klog.Error(err, "get service GetByKey by endpoint failed", "endpoint", ep1)
return
}
if !exist {
klog.Warningf("epEventHandler %s", key)
return
}
s := svc.(*corev1.Service)
klog.Info("controller: endpoint add event",
util.NamespacedName(ep1).String())
updateCh.In() <- helper.Event{
Type: helper.EndPointEvent,
Obj: s,
}
},
DeleteFunc: func(obj interface{}) {
ep1 := obj.(*corev1.Endpoints)
key := MetaNamespaceKey(ep1)
svc, exist, err := store.listers.Service.GetByKey(key)
if err != nil {
klog.Error(err, "DeleteFunc get service GetByKey by endpoint failed", "endpoint", ep1)
return
}
if !exist {
klog.Warningf("DeleteFunc epEventHandler %s", key)
return
}
s := svc.(*corev1.Service)
klog.Info("controller: endpoint delete event",
util.NamespacedName(ep1).String())
updateCh.In() <- helper.Event{
Type: helper.EndPointEvent,
Obj: s,
}
},
UpdateFunc: func(old, cur interface{}) {
ep1 := old.(*corev1.Endpoints)
ep2 := cur.(*corev1.Endpoints)
if !reflect.DeepEqual(ep1.Subsets, ep2.Subsets) {
key := MetaNamespaceKey(ep1)
svc, exist, err := store.listers.Service.GetByKey(key)
if err != nil {
klog.Error(err, "UpdateFunc get service GetByKey by endpoint failed", "endpoint", ep1)
return
}
if !exist {
klog.Warningf("UpdateFunc epEventHandler %s", key)
return
}
s := svc.(*corev1.Service)
klog.Info("controller: endpoint update event",
util.NamespacedName(ep1).String())
updateCh.In() <- helper.Event{
Type: helper.EndPointEvent,
Obj: s,
}
}
},
}
podEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
err := store.listers.Pod.Add(obj)
if err != nil {
klog.Error(err, "Pod Add failed")
return
}
},
DeleteFunc: func(obj interface{}) {
store.listers.Pod.Delete(obj)
},
UpdateFunc: func(old, cur interface{}) {
},
}
nodeEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
serviceList := store.listers.Service.List()
for _, v := range serviceList {
svc := v.(*corev1.Service)
klog.Info("node change: enqueue service", util.Key(svc))
updateCh.In() <- helper.Event{
Type: helper.NodeEvent,
Obj: svc,
}
}
},
UpdateFunc: func(old, cur interface{}) {
nodeOld := old.(*corev1.Node)
nodeNew := cur.(*corev1.Node)
if !reflect.DeepEqual(nodeOld.Labels, nodeNew.Labels) {
serviceList := store.listers.Service.List()
for _, v := range serviceList {
svc := v.(*corev1.Service)
klog.Info("node change: enqueue service", util.Key(svc))
updateCh.In() <- helper.Event{
Type: helper.NodeEvent,
Obj: svc,
}
}
}
},
DeleteFunc: func(obj interface{}) {
serviceList := store.listers.Service.List()
for _, v := range serviceList {
svc := v.(*corev1.Service)
klog.Info("node change: enqueue service", util.Key(svc))
updateCh.In() <- helper.Event{
Type: helper.NodeEvent,
Obj: svc,
}
}
},
}
serviceHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
curSvc := obj.(*corev1.Service)
store.enqueueImpactedIngresses(updateCh, curSvc)
},
UpdateFunc: func(old, cur interface{}) {
// update the server group
oldSvc := old.(*corev1.Service)
curSvc := cur.(*corev1.Service)
if reflect.DeepEqual(oldSvc, curSvc) {
return
}
updateCh.In() <- helper.Event{
Type: helper.ServiceEvent,
Obj: cur,
}
},
DeleteFunc: func(obj interface{}) {
// ingress refer service to delete
curSvc := obj.(*corev1.Service)
store.enqueueImpactedIngresses(updateCh, curSvc)
},
}
store.informers.Ingress.AddEventHandler(ingEventHandler)
store.informers.Endpoint.AddEventHandler(epEventHandler)
store.informers.Node.AddEventHandler(podEventHandler)
store.informers.Service.AddEventHandler(serviceHandler)
store.informers.Node.AddEventHandler(nodeEventHandler)
return store
}