in pkg/controller/controller.go [107:316]
func NewLoadBalancerController(
ctx *context.ControllerContext,
stopCh chan struct{}) *LoadBalancerController {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof)
broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
Interface: ctx.KubeClient.CoreV1().Events(""),
})
healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendSvcPort.ID.Service)
backendPool := backends.NewPool(ctx.Cloud, ctx.ClusterNamer)
lbc := LoadBalancerController{
ctx: ctx,
nodeLister: ctx.NodeInformer.GetIndexer(),
Translator: ctx.Translator,
stopCh: stopCh,
hasSynced: ctx.HasSynced,
instancePool: ctx.InstancePool,
l7Pool: loadbalancers.NewLoadBalancerPool(ctx.Cloud, ctx.ClusterNamer, ctx, namer.NewFrontendNamerFactory(ctx.ClusterNamer, ctx.KubeSystemUID)),
backendSyncer: backends.NewBackendSyncer(backendPool, healthChecker, ctx.Cloud),
negLinker: backends.NewNEGLinker(backendPool, negtypes.NewAdapter(ctx.Cloud), ctx.Cloud, ctx.SvcNegInformer.GetIndexer()),
igLinker: backends.NewInstanceGroupLinker(ctx.InstancePool, backendPool),
metrics: ctx.ControllerMetrics,
}
if ctx.IngClassInformer != nil {
lbc.ingClassLister = ctx.IngClassInformer.GetIndexer()
lbc.ingParamsLister = ctx.IngParamsInformer.GetIndexer()
}
lbc.ingSyncer = ingsync.NewIngressSyncer(&lbc)
lbc.ingQueue = utils.NewPeriodicTaskQueueWithMultipleWorkers("ingress", "ingresses", flags.F.NumIngressWorkers, lbc.sync)
lbc.backendSyncer.Init(lbc.Translator)
// Ingress event handlers.
ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*v1.Ingress)
if !utils.IsGLBCIngress(addIng) {
klog.V(4).Infof("Ignoring add for ingress %v based on annotation %v", common.NamespacedName(addIng), annotations.IngressClassKey)
return
}
klog.V(2).Infof("Ingress %v added, enqueuing", common.NamespacedName(addIng))
lbc.ctx.Recorder(addIng.Namespace).Eventf(addIng, apiv1.EventTypeNormal, events.SyncIngress, "Scheduled for sync")
lbc.ingQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
delIng := obj.(*v1.Ingress)
if delIng == nil {
klog.Errorf("Invalid object type: %T", obj)
return
}
if delIng.ObjectMeta.DeletionTimestamp != nil {
klog.V(2).Infof("Ignoring delete event for Ingress %v, deletion will be handled via the finalizer", common.NamespacedName(delIng))
return
}
if !utils.IsGLBCIngress(delIng) {
klog.V(4).Infof("Ignoring delete for ingress %v based on annotation %v", common.NamespacedName(delIng), annotations.IngressClassKey)
return
}
klog.V(3).Infof("Ingress %v deleted, enqueueing", common.NamespacedName(delIng))
lbc.ingQueue.Enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
curIng := cur.(*v1.Ingress)
if !utils.IsGLBCIngress(curIng) {
// Ingress needs to be enqueued if a ingress finalizer exists.
// An existing finalizer means that
// 1. Ingress update for class change.
// 2. Ingress cleanup failed and re-queued.
// 3. Finalizer remove failed and re-queued.
if common.HasFinalizer(curIng.ObjectMeta) {
klog.V(2).Infof("Ingress %s class was changed but has a glbc finalizer, enqueuing", common.NamespacedName(curIng))
lbc.ingQueue.Enqueue(cur)
return
}
return
}
if reflect.DeepEqual(old, cur) {
klog.V(2).Infof("Periodic enqueueing of %s", common.NamespacedName(curIng))
} else {
klog.V(2).Infof("Ingress %s changed, enqueuing", common.NamespacedName(curIng))
}
lbc.ctx.Recorder(curIng.Namespace).Eventf(curIng, apiv1.EventTypeNormal, events.SyncIngress, "Scheduled for sync")
lbc.ingQueue.Enqueue(cur)
},
})
// Service event handlers.
ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
svc := obj.(*apiv1.Service)
ings := operator.Ingresses(ctx.Ingresses().List()).ReferencesService(svc).AsList()
lbc.ingQueue.Enqueue(convert(ings)...)
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
svc := cur.(*apiv1.Service)
ings := operator.Ingresses(ctx.Ingresses().List()).ReferencesService(svc).AsList()
lbc.ingQueue.Enqueue(convert(ings)...)
}
},
// Ingress deletes matter, service deletes don't.
})
// BackendConfig event handlers.
ctx.BackendConfigInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
klog.V(3).Infof("obj(type %T) added", obj)
beConfig := obj.(*backendconfigv1.BackendConfig)
ings := operator.Ingresses(ctx.Ingresses().List()).ReferencesBackendConfig(beConfig, operator.Services(ctx.Services().List())).AsList()
lbc.ingQueue.Enqueue(convert(ings)...)
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
klog.V(3).Infof("obj(type %T) updated", cur)
beConfig := cur.(*backendconfigv1.BackendConfig)
ings := operator.Ingresses(ctx.Ingresses().List()).ReferencesBackendConfig(beConfig, operator.Services(ctx.Services().List())).AsList()
lbc.ingQueue.Enqueue(convert(ings)...)
}
},
DeleteFunc: func(obj interface{}) {
klog.V(3).Infof("obj(type %T) deleted", obj)
var beConfig *backendconfigv1.BackendConfig
var ok, beOk bool
beConfig, ok = obj.(*backendconfigv1.BackendConfig)
if !ok {
// This can happen if the watch is closed and misses the delete event
state, stateOk := obj.(cache.DeletedFinalStateUnknown)
if !stateOk {
klog.Errorf("Wanted cache.DeleteFinalStateUnknown of backendconfig obj, got: %+v", obj)
return
}
beConfig, beOk = state.Obj.(*backendconfigv1.BackendConfig)
if !beOk {
klog.Errorf("Wanted backendconfig obj, got %+v", state.Obj)
return
}
}
ings := operator.Ingresses(ctx.Ingresses().List()).ReferencesBackendConfig(beConfig, operator.Services(ctx.Services().List())).AsList()
lbc.ingQueue.Enqueue(convert(ings)...)
},
})
// FrontendConfig event handlers.
if ctx.FrontendConfigEnabled {
ctx.FrontendConfigInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
feConfig := obj.(*frontendconfigv1beta1.FrontendConfig)
ings := operator.Ingresses(ctx.Ingresses().List()).ReferencesFrontendConfig(feConfig).AsList()
lbc.ingQueue.Enqueue(convert(ings)...)
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
feConfig := cur.(*frontendconfigv1beta1.FrontendConfig)
ings := operator.Ingresses(ctx.Ingresses().List()).ReferencesFrontendConfig(feConfig).AsList()
lbc.ingQueue.Enqueue(convert(ings)...)
}
},
DeleteFunc: func(obj interface{}) {
var feConfig *frontendconfigv1beta1.FrontendConfig
var ok, feOk bool
feConfig, ok = obj.(*frontendconfigv1beta1.FrontendConfig)
if !ok {
// This can happen if the watch is closed and misses the delete event
state, stateOk := obj.(cache.DeletedFinalStateUnknown)
if !stateOk {
klog.Errorf("Wanted cache.DeleteFinalStateUnknown of frontendconfig obj, got: %+v type: %T", obj, obj)
return
}
feConfig, feOk = state.Obj.(*frontendconfigv1beta1.FrontendConfig)
if !feOk {
klog.Errorf("Wanted frontendconfig obj, got %+v, type %T", state.Obj, state.Obj)
return
}
}
ings := operator.Ingresses(ctx.Ingresses().List()).ReferencesFrontendConfig(feConfig).AsList()
lbc.ingQueue.Enqueue(convert(ings)...)
},
})
}
// Register health check on controller context.
ctx.AddHealthCheck("ingress", func() error {
_, err := backendPool.Get("k8s-ingress-svc-acct-permission-check-probe", meta.VersionGA, meta.Global)
// If this container is scheduled on a node without compute/rw it is
// effectively useless, but it is healthy. Reporting it as unhealthy
// will lead to container crashlooping.
if utils.IsHTTPErrorCode(err, http.StatusForbidden) {
klog.Infof("Reporting cluster as healthy, but unable to list backends: %v", err)
return nil
}
return utils.IgnoreHTTPNotFound(err)
})
klog.V(3).Infof("Created new loadbalancer controller")
return &lbc
}