in pkg/ingress/controller/controller.go [259:380]
func NewController(conf config.Config) *Controller {
// initialize k8s client
kubeClient, err := createApiserverClient(conf.Kubernetes.ApiserverHost, conf.Kubernetes.KubeConfig)
if err != nil {
log.WithFields(log.Fields{
"api_server": conf.Kubernetes.ApiserverHost,
"kuberconfig": conf.Kubernetes.KubeConfig,
"error": err,
}).Fatal("failed to initialize kubernetes client")
}
// initialize openstack client
var osClient *openstack.OpenStack
osClient, err = openstack.NewOpenStack(conf)
if err != nil {
log.WithFields(log.Fields{
"error": err,
}).Fatal("failed to initialize openstack client")
}
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, time.Second*30)
serviceInformer := kubeInformerFactory.Core().V1().Services()
nodeInformer := kubeInformerFactory.Core().V1().Nodes()
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
Interface: kubeClient.CoreV1().Events(""),
})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "openstack-ingress-controller"})
controller := &Controller{
config: conf,
queue: queue,
stopCh: make(chan struct{}),
informer: kubeInformerFactory,
recorder: recorder,
serviceLister: serviceInformer.Lister(),
serviceListerSynced: serviceInformer.Informer().HasSynced,
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
knownNodes: []*apiv1.Node{},
osClient: osClient,
kubeClient: kubeClient,
}
ingInformer := kubeInformerFactory.Networking().V1().Ingresses()
ingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*nwv1.Ingress)
key := fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name)
if !IsValid(addIng) {
log.Infof("ignore ingress %s", key)
return
}
recorder.Event(addIng, apiv1.EventTypeNormal, "Creating", fmt.Sprintf("Ingress %s", key))
controller.queue.AddRateLimited(Event{Obj: addIng, Type: CreateEvent})
},
UpdateFunc: func(old, new interface{}) {
newIng := new.(*nwv1.Ingress)
oldIng := old.(*nwv1.Ingress)
if newIng.ResourceVersion == oldIng.ResourceVersion {
// Periodic resync will send update events for all known Ingresses.
// Two different versions of the same Ingress will always have different RVs.
return
}
newAnnotations := newIng.ObjectMeta.Annotations
oldAnnotations := oldIng.ObjectMeta.Annotations
delete(newAnnotations, "kubectl.kubernetes.io/last-applied-configuration")
delete(oldAnnotations, "kubectl.kubernetes.io/last-applied-configuration")
key := fmt.Sprintf("%s/%s", newIng.Namespace, newIng.Name)
validOld := IsValid(oldIng)
validCur := IsValid(newIng)
if !validOld && validCur {
recorder.Event(newIng, apiv1.EventTypeNormal, "Creating", fmt.Sprintf("Ingress %s", key))
controller.queue.AddRateLimited(Event{Obj: newIng, Type: CreateEvent})
} else if validOld && !validCur {
recorder.Event(newIng, apiv1.EventTypeNormal, "Deleting", fmt.Sprintf("Ingress %s", key))
controller.queue.AddRateLimited(Event{Obj: newIng, Type: DeleteEvent})
} else if validCur && (!reflect.DeepEqual(newIng.Spec, oldIng.Spec) || !reflect.DeepEqual(newAnnotations, oldAnnotations)) {
recorder.Event(newIng, apiv1.EventTypeNormal, "Updating", fmt.Sprintf("Ingress %s", key))
controller.queue.AddRateLimited(Event{Obj: newIng, Type: UpdateEvent})
} else {
return
}
},
DeleteFunc: func(obj interface{}) {
delIng, ok := obj.(*nwv1.Ingress)
if !ok {
// If we reached here it means the ingress was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
log.Errorf("couldn't get object from tombstone %#v", obj)
return
}
delIng, ok = tombstone.Obj.(*nwv1.Ingress)
if !ok {
log.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
return
}
}
key := fmt.Sprintf("%s/%s", delIng.Namespace, delIng.Name)
if !IsValid(delIng) {
log.Infof("ignore ingress %s", key)
return
}
recorder.Event(delIng, apiv1.EventTypeNormal, "Deleting", fmt.Sprintf("Ingress %s", key))
controller.queue.AddRateLimited(Event{Obj: delIng, Type: DeleteEvent})
},
})
controller.ingressLister = ingInformer.Lister()
controller.ingressListerSynced = ingInformer.Informer().HasSynced
return controller
}