in internal/ingress/controller/store/store.go [239:794]
func New(
namespace string,
namespaceSelector labels.Selector,
configmap, tcp, udp, defaultSSLCertificate string,
resyncPeriod time.Duration,
client clientset.Interface,
updateCh *channels.RingChannel,
disableCatchAll bool,
icConfig *ingressclass.IngressClassConfiguration) Storer {
store := &k8sStore{
informers: &Informer{},
listers: &Lister{},
sslStore: NewSSLCertTracker(),
updateCh: updateCh,
backendConfig: ngx_config.NewDefault(),
syncSecretMu: &sync.Mutex{},
backendConfigMu: &sync.RWMutex{},
secretIngressMap: NewObjectRefMap(),
defaultSSLCertificate: defaultSSLCertificate,
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{
Interface: client.CoreV1().Events(namespace),
})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{
Component: "nginx-ingress-controller",
})
// k8sStore fulfills resolver.Resolver interface
store.annotations = annotations.NewAnnotationExtractor(store)
store.listers.IngressWithAnnotation.Store = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
// As we currently do not filter out kubernetes objects we list, we can
// retrieve a huge amount of data from the API server.
// In a cluster using HELM < v3 configmaps are used to store binary data.
// If you happen to have a lot of HELM releases in the cluster it will make
// the memory consumption of nginx-ingress-controller explode.
// In order to avoid that we filter out labels OWNER=TILLER.
labelsTweakListOptionsFunc := func(options *metav1.ListOptions) {
if len(options.LabelSelector) > 0 {
options.LabelSelector += ",OWNER!=TILLER"
} else {
options.LabelSelector = "OWNER!=TILLER"
}
}
// As of HELM >= v3 helm releases are stored using Secrets instead of ConfigMaps.
// In order to avoid listing those secrets we discard type "helm.sh/release.v1"
secretsTweakListOptionsFunc := func(options *metav1.ListOptions) {
helmAntiSelector := fields.OneTermNotEqualSelector("type", "helm.sh/release.v1")
baseSelector, err := fields.ParseSelector(options.FieldSelector)
if err != nil {
options.FieldSelector = helmAntiSelector.String()
} else {
options.FieldSelector = fields.AndSelectors(baseSelector, helmAntiSelector).String()
}
}
// create informers factory, enable and assign required informers
infFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
informers.WithNamespace(namespace),
)
// create informers factory for configmaps
infFactoryConfigmaps := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
informers.WithNamespace(namespace),
informers.WithTweakListOptions(labelsTweakListOptionsFunc),
)
// create informers factory for secrets
infFactorySecrets := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
informers.WithNamespace(namespace),
informers.WithTweakListOptions(secretsTweakListOptionsFunc),
)
store.informers.Ingress = infFactory.Networking().V1().Ingresses().Informer()
store.listers.Ingress.Store = store.informers.Ingress.GetStore()
if !icConfig.IgnoreIngressClass {
store.informers.IngressClass = infFactory.Networking().V1().IngressClasses().Informer()
store.listers.IngressClass.Store = cache.NewStore(cache.MetaNamespaceKeyFunc)
}
store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()
store.informers.Secret = infFactorySecrets.Core().V1().Secrets().Informer()
store.listers.Secret.Store = store.informers.Secret.GetStore()
store.informers.ConfigMap = infFactoryConfigmaps.Core().V1().ConfigMaps().Informer()
store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore()
store.informers.Service = infFactory.Core().V1().Services().Informer()
store.listers.Service.Store = store.informers.Service.GetStore()
// avoid caching namespaces at cluster scope when watching single namespace
if namespaceSelector != nil && !namespaceSelector.Empty() {
// cache informers factory for namespaces
infFactoryNamespaces := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
informers.WithTweakListOptions(labelsTweakListOptionsFunc),
)
store.informers.Namespace = infFactoryNamespaces.Core().V1().Namespaces().Informer()
store.listers.Namespace.Store = store.informers.Namespace.GetStore()
}
watchedNamespace := func(namespace string) bool {
if namespaceSelector == nil || namespaceSelector.Empty() {
return true
}
item, ok, err := store.listers.Namespace.GetByKey(namespace)
if !ok {
klog.Errorf("Namespace %s not existed: %v.", namespace, err)
return false
}
ns, ok := item.(*corev1.Namespace)
if !ok {
return false
}
return namespaceSelector.Matches(labels.Set(ns.Labels))
}
ingDeleteHandler := func(obj interface{}) {
ing, ok := toIngress(obj)
if !ok {
// If we reached here it means the ingress was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.ErrorS(nil, "Error obtaining object from tombstone", "key", obj)
return
}
ing, ok = tombstone.Obj.(*networkingv1.Ingress)
if !ok {
klog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
return
}
}
if !watchedNamespace(ing.Namespace) {
return
}
_, err := store.GetIngressClass(ing, icConfig)
if err != nil {
klog.InfoS("Ignoring ingress because of error while validating ingress class", "ingress", klog.KObj(ing), "error", err)
return
}
if hasCatchAllIngressRule(ing.Spec) && disableCatchAll {
klog.InfoS("Ignoring delete for catch-all because of --disable-catch-all", "ingress", klog.KObj(ing))
return
}
store.listers.IngressWithAnnotation.Delete(ing)
key := k8s.MetaNamespaceKey(ing)
store.secretIngressMap.Delete(key)
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
}
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ing, _ := toIngress(obj)
if !watchedNamespace(ing.Namespace) {
return
}
ic, err := store.GetIngressClass(ing, icConfig)
if err != nil {
klog.InfoS("Ignoring ingress because of error while validating ingress class", "ingress", klog.KObj(ing), "error", err)
return
}
klog.InfoS("Found valid IngressClass", "ingress", klog.KObj(ing), "ingressclass", ic)
if hasCatchAllIngressRule(ing.Spec) && disableCatchAll {
klog.InfoS("Ignoring add for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(ing))
return
}
recorder.Eventf(ing, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
store.syncIngress(ing)
store.updateSecretIngressMap(ing)
store.syncSecrets(ing)
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
},
DeleteFunc: ingDeleteHandler,
UpdateFunc: func(old, cur interface{}) {
oldIng, _ := toIngress(old)
curIng, _ := toIngress(cur)
if !watchedNamespace(oldIng.Namespace) {
return
}
var errOld, errCur error
var classCur string
if !icConfig.IgnoreIngressClass {
_, errOld = store.GetIngressClass(oldIng, icConfig)
classCur, errCur = store.GetIngressClass(curIng, icConfig)
}
if errOld != nil && errCur == nil {
if hasCatchAllIngressRule(curIng.Spec) && disableCatchAll {
klog.InfoS("ignoring update for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(curIng))
return
}
klog.InfoS("creating ingress", "ingress", klog.KObj(curIng), "ingressclass", classCur)
recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
} else if errOld == nil && errCur != nil {
klog.InfoS("removing ingress because of unknown ingressclass", "ingress", klog.KObj(curIng))
ingDeleteHandler(old)
return
} else if errCur == nil && !reflect.DeepEqual(old, cur) {
if hasCatchAllIngressRule(curIng.Spec) && disableCatchAll {
klog.InfoS("ignoring update for catch-all ingress and delete old one because of --disable-catch-all", "ingress", klog.KObj(curIng))
ingDeleteHandler(old)
return
}
recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
} else {
klog.V(3).InfoS("No changes on ingress. Skipping update", "ingress", klog.KObj(curIng))
return
}
store.syncIngress(curIng)
store.updateSecretIngressMap(curIng)
store.syncSecrets(curIng)
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
},
}
ingressClassEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ingressclass := obj.(*networkingv1.IngressClass)
foundClassByName := false
if icConfig.IngressClassByName && ingressclass.Name == icConfig.AnnotationValue {
klog.InfoS("adding ingressclass as ingress-class-by-name is configured", "ingressclass", klog.KObj(ingressclass))
foundClassByName = true
}
if !foundClassByName && ingressclass.Spec.Controller != icConfig.Controller {
klog.InfoS("ignoring ingressclass as the spec.controller is not the same of this ingress", "ingressclass", klog.KObj(ingressclass))
return
}
err := store.listers.IngressClass.Add(ingressclass)
if err != nil {
klog.InfoS("error adding ingressclass to store", "ingressclass", klog.KObj(ingressclass), "error", err)
return
}
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
},
DeleteFunc: func(obj interface{}) {
ingressclass := obj.(*networkingv1.IngressClass)
if ingressclass.Spec.Controller != icConfig.Controller {
klog.InfoS("ignoring ingressclass as the spec.controller is not the same of this ingress", "ingressclass", klog.KObj(ingressclass))
return
}
err := store.listers.IngressClass.Delete(ingressclass)
if err != nil {
klog.InfoS("error removing ingressclass from store", "ingressclass", klog.KObj(ingressclass), "error", err)
return
}
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
},
UpdateFunc: func(old, cur interface{}) {
oic := old.(*networkingv1.IngressClass)
cic := cur.(*networkingv1.IngressClass)
if cic.Spec.Controller != icConfig.Controller {
klog.InfoS("ignoring ingressclass as the spec.controller is not the same of this ingress", "ingressclass", klog.KObj(cic))
return
}
// TODO: In a future we might be interested in parse parameters and use as
// current IngressClass for this case, crossing with configmap
if !reflect.DeepEqual(cic.Spec.Parameters, oic.Spec.Parameters) {
err := store.listers.IngressClass.Update(cic)
if err != nil {
klog.InfoS("error updating ingressclass in store", "ingressclass", klog.KObj(cic), "error", err)
return
}
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
}
},
}
secrEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
sec := obj.(*corev1.Secret)
key := k8s.MetaNamespaceKey(sec)
if store.defaultSSLCertificate == key {
store.syncSecret(store.defaultSSLCertificate)
}
// find references in ingresses and update local ssl certs
if ings := store.secretIngressMap.Reference(key); len(ings) > 0 {
klog.InfoS("Secret was added and it is used in ingress annotations. Parsing", "secret", key)
for _, ingKey := range ings {
ing, err := store.getIngress(ingKey)
if err != nil {
klog.Errorf("could not find Ingress %v in local store", ingKey)
continue
}
store.syncIngress(ing)
store.syncSecrets(ing)
}
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
}
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
sec := cur.(*corev1.Secret)
key := k8s.MetaNamespaceKey(sec)
if !watchedNamespace(sec.Namespace) {
return
}
if store.defaultSSLCertificate == key {
store.syncSecret(store.defaultSSLCertificate)
}
// find references in ingresses and update local ssl certs
if ings := store.secretIngressMap.Reference(key); len(ings) > 0 {
klog.InfoS("secret was updated and it is used in ingress annotations. Parsing", "secret", key)
for _, ingKey := range ings {
ing, err := store.getIngress(ingKey)
if err != nil {
klog.ErrorS(err, "could not find Ingress in local store", "ingress", ingKey)
continue
}
store.syncSecrets(ing)
store.syncIngress(ing)
}
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
}
}
},
DeleteFunc: func(obj interface{}) {
sec, ok := obj.(*corev1.Secret)
if !ok {
// If we reached here it means the secret was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
sec, ok = tombstone.Obj.(*corev1.Secret)
if !ok {
return
}
}
if !watchedNamespace(sec.Namespace) {
return
}
store.sslStore.Delete(k8s.MetaNamespaceKey(sec))
key := k8s.MetaNamespaceKey(sec)
// find references in ingresses
if ings := store.secretIngressMap.Reference(key); len(ings) > 0 {
klog.InfoS("secret was deleted and it is used in ingress annotations. Parsing", "secret", key)
for _, ingKey := range ings {
ing, err := store.getIngress(ingKey)
if err != nil {
klog.Errorf("could not find Ingress %v in local store", ingKey)
continue
}
store.syncIngress(ing)
}
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
}
},
}
epEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
},
DeleteFunc: func(obj interface{}) {
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
},
UpdateFunc: func(old, cur interface{}) {
oep := old.(*corev1.Endpoints)
cep := cur.(*corev1.Endpoints)
if !reflect.DeepEqual(cep.Subsets, oep.Subsets) {
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
}
},
}
// TODO: add e2e test to verify that changes to one or more configmap trigger an update
changeTriggerUpdate := func(name string) bool {
return name == configmap || name == tcp || name == udp
}
handleCfgMapEvent := func(key string, cfgMap *corev1.ConfigMap, eventName string) {
// updates to configuration configmaps can trigger an update
triggerUpdate := false
if changeTriggerUpdate(key) {
triggerUpdate = true
recorder.Eventf(cfgMap, corev1.EventTypeNormal, eventName, fmt.Sprintf("ConfigMap %v", key))
if key == configmap {
store.setConfig(cfgMap)
}
}
ings := store.listers.IngressWithAnnotation.List()
for _, ingKey := range ings {
key := k8s.MetaNamespaceKey(ingKey)
ing, err := store.getIngress(key)
if err != nil {
klog.Errorf("could not find Ingress %v in local store: %v", key, err)
continue
}
if parser.AnnotationsReferencesConfigmap(ing) {
store.syncIngress(ing)
continue
}
if triggerUpdate {
store.syncIngress(ing)
}
}
if triggerUpdate {
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: cfgMap,
}
}
}
cmEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cfgMap := obj.(*corev1.ConfigMap)
key := k8s.MetaNamespaceKey(cfgMap)
handleCfgMapEvent(key, cfgMap, "CREATE")
},
UpdateFunc: func(old, cur interface{}) {
if reflect.DeepEqual(old, cur) {
return
}
cfgMap := cur.(*corev1.ConfigMap)
key := k8s.MetaNamespaceKey(cfgMap)
handleCfgMapEvent(key, cfgMap, "UPDATE")
},
}
serviceHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
svc := obj.(*corev1.Service)
if svc.Spec.Type == corev1.ServiceTypeExternalName {
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
}
},
DeleteFunc: func(obj interface{}) {
svc := obj.(*corev1.Service)
if svc.Spec.Type == corev1.ServiceTypeExternalName {
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
}
},
UpdateFunc: func(old, cur interface{}) {
oldSvc := old.(*corev1.Service)
curSvc := cur.(*corev1.Service)
if reflect.DeepEqual(oldSvc, curSvc) {
return
}
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
},
}
store.informers.Ingress.AddEventHandler(ingEventHandler)
if !icConfig.IgnoreIngressClass {
store.informers.IngressClass.AddEventHandler(ingressClassEventHandler)
}
store.informers.Endpoint.AddEventHandler(epEventHandler)
store.informers.Secret.AddEventHandler(secrEventHandler)
store.informers.ConfigMap.AddEventHandler(cmEventHandler)
store.informers.Service.AddEventHandler(serviceHandler)
// do not wait for informers to read the configmap configuration
ns, name, _ := k8s.ParseNameNS(configmap)
cm, err := client.CoreV1().ConfigMaps(ns).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
klog.Warningf("Unexpected error reading configuration configmap: %v", err)
}
store.setConfig(cm)
return store
}