func NewLoadBalancerController()

in pkg/controller/controller.go [107:316]


func NewLoadBalancerController(
	ctx *context.ControllerContext,
	stopCh chan struct{}) *LoadBalancerController {

	broadcaster := record.NewBroadcaster()
	broadcaster.StartLogging(klog.Infof)
	broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
		Interface: ctx.KubeClient.CoreV1().Events(""),
	})

	healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendSvcPort.ID.Service)
	backendPool := backends.NewPool(ctx.Cloud, ctx.ClusterNamer)

	lbc := LoadBalancerController{
		ctx:           ctx,
		nodeLister:    ctx.NodeInformer.GetIndexer(),
		Translator:    ctx.Translator,
		stopCh:        stopCh,
		hasSynced:     ctx.HasSynced,
		instancePool:  ctx.InstancePool,
		l7Pool:        loadbalancers.NewLoadBalancerPool(ctx.Cloud, ctx.ClusterNamer, ctx, namer.NewFrontendNamerFactory(ctx.ClusterNamer, ctx.KubeSystemUID)),
		backendSyncer: backends.NewBackendSyncer(backendPool, healthChecker, ctx.Cloud),
		negLinker:     backends.NewNEGLinker(backendPool, negtypes.NewAdapter(ctx.Cloud), ctx.Cloud, ctx.SvcNegInformer.GetIndexer()),
		igLinker:      backends.NewInstanceGroupLinker(ctx.InstancePool, backendPool),
		metrics:       ctx.ControllerMetrics,
	}

	if ctx.IngClassInformer != nil {
		lbc.ingClassLister = ctx.IngClassInformer.GetIndexer()
		lbc.ingParamsLister = ctx.IngParamsInformer.GetIndexer()
	}

	lbc.ingSyncer = ingsync.NewIngressSyncer(&lbc)
	lbc.ingQueue = utils.NewPeriodicTaskQueueWithMultipleWorkers("ingress", "ingresses", flags.F.NumIngressWorkers, lbc.sync)
	lbc.backendSyncer.Init(lbc.Translator)

	// Ingress event handlers.
	ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			addIng := obj.(*v1.Ingress)
			if !utils.IsGLBCIngress(addIng) {
				klog.V(4).Infof("Ignoring add for ingress %v based on annotation %v", common.NamespacedName(addIng), annotations.IngressClassKey)
				return
			}

			klog.V(2).Infof("Ingress %v added, enqueuing", common.NamespacedName(addIng))
			lbc.ctx.Recorder(addIng.Namespace).Eventf(addIng, apiv1.EventTypeNormal, events.SyncIngress, "Scheduled for sync")
			lbc.ingQueue.Enqueue(obj)
		},
		DeleteFunc: func(obj interface{}) {
			delIng := obj.(*v1.Ingress)
			if delIng == nil {
				klog.Errorf("Invalid object type: %T", obj)
				return
			}
			if delIng.ObjectMeta.DeletionTimestamp != nil {
				klog.V(2).Infof("Ignoring delete event for Ingress %v, deletion will be handled via the finalizer", common.NamespacedName(delIng))
				return
			}

			if !utils.IsGLBCIngress(delIng) {
				klog.V(4).Infof("Ignoring delete for ingress %v based on annotation %v", common.NamespacedName(delIng), annotations.IngressClassKey)
				return
			}

			klog.V(3).Infof("Ingress %v deleted, enqueueing", common.NamespacedName(delIng))
			lbc.ingQueue.Enqueue(obj)
		},
		UpdateFunc: func(old, cur interface{}) {
			curIng := cur.(*v1.Ingress)
			if !utils.IsGLBCIngress(curIng) {
				// Ingress needs to be enqueued if a ingress finalizer exists.
				// An existing finalizer means that
				// 1. Ingress update for class change.
				// 2. Ingress cleanup failed and re-queued.
				// 3. Finalizer remove failed and re-queued.
				if common.HasFinalizer(curIng.ObjectMeta) {
					klog.V(2).Infof("Ingress %s class was changed but has a glbc finalizer, enqueuing", common.NamespacedName(curIng))
					lbc.ingQueue.Enqueue(cur)
					return
				}
				return
			}
			if reflect.DeepEqual(old, cur) {
				klog.V(2).Infof("Periodic enqueueing of %s", common.NamespacedName(curIng))
			} else {
				klog.V(2).Infof("Ingress %s changed, enqueuing", common.NamespacedName(curIng))
			}
			lbc.ctx.Recorder(curIng.Namespace).Eventf(curIng, apiv1.EventTypeNormal, events.SyncIngress, "Scheduled for sync")
			lbc.ingQueue.Enqueue(cur)
		},
	})

	// Service event handlers.
	ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			svc := obj.(*apiv1.Service)
			ings := operator.Ingresses(ctx.Ingresses().List()).ReferencesService(svc).AsList()
			lbc.ingQueue.Enqueue(convert(ings)...)
		},
		UpdateFunc: func(old, cur interface{}) {
			if !reflect.DeepEqual(old, cur) {
				svc := cur.(*apiv1.Service)
				ings := operator.Ingresses(ctx.Ingresses().List()).ReferencesService(svc).AsList()
				lbc.ingQueue.Enqueue(convert(ings)...)
			}
		},
		// Ingress deletes matter, service deletes don't.
	})

	// BackendConfig event handlers.
	ctx.BackendConfigInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			klog.V(3).Infof("obj(type %T) added", obj)
			beConfig := obj.(*backendconfigv1.BackendConfig)
			ings := operator.Ingresses(ctx.Ingresses().List()).ReferencesBackendConfig(beConfig, operator.Services(ctx.Services().List())).AsList()
			lbc.ingQueue.Enqueue(convert(ings)...)
		},
		UpdateFunc: func(old, cur interface{}) {
			if !reflect.DeepEqual(old, cur) {
				klog.V(3).Infof("obj(type %T) updated", cur)
				beConfig := cur.(*backendconfigv1.BackendConfig)
				ings := operator.Ingresses(ctx.Ingresses().List()).ReferencesBackendConfig(beConfig, operator.Services(ctx.Services().List())).AsList()
				lbc.ingQueue.Enqueue(convert(ings)...)
			}
		},
		DeleteFunc: func(obj interface{}) {
			klog.V(3).Infof("obj(type %T) deleted", obj)
			var beConfig *backendconfigv1.BackendConfig
			var ok, beOk bool
			beConfig, ok = obj.(*backendconfigv1.BackendConfig)
			if !ok {
				// This can happen if the watch is closed and misses the delete event
				state, stateOk := obj.(cache.DeletedFinalStateUnknown)
				if !stateOk {
					klog.Errorf("Wanted cache.DeleteFinalStateUnknown of backendconfig obj, got: %+v", obj)
					return
				}

				beConfig, beOk = state.Obj.(*backendconfigv1.BackendConfig)
				if !beOk {
					klog.Errorf("Wanted backendconfig obj, got %+v", state.Obj)
					return
				}
			}

			ings := operator.Ingresses(ctx.Ingresses().List()).ReferencesBackendConfig(beConfig, operator.Services(ctx.Services().List())).AsList()
			lbc.ingQueue.Enqueue(convert(ings)...)
		},
	})

	// FrontendConfig event handlers.
	if ctx.FrontendConfigEnabled {
		ctx.FrontendConfigInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
			AddFunc: func(obj interface{}) {
				feConfig := obj.(*frontendconfigv1beta1.FrontendConfig)
				ings := operator.Ingresses(ctx.Ingresses().List()).ReferencesFrontendConfig(feConfig).AsList()
				lbc.ingQueue.Enqueue(convert(ings)...)

			},
			UpdateFunc: func(old, cur interface{}) {
				if !reflect.DeepEqual(old, cur) {
					feConfig := cur.(*frontendconfigv1beta1.FrontendConfig)
					ings := operator.Ingresses(ctx.Ingresses().List()).ReferencesFrontendConfig(feConfig).AsList()
					lbc.ingQueue.Enqueue(convert(ings)...)
				}
			},
			DeleteFunc: func(obj interface{}) {
				var feConfig *frontendconfigv1beta1.FrontendConfig
				var ok, feOk bool
				feConfig, ok = obj.(*frontendconfigv1beta1.FrontendConfig)
				if !ok {
					// This can happen if the watch is closed and misses the delete event
					state, stateOk := obj.(cache.DeletedFinalStateUnknown)
					if !stateOk {
						klog.Errorf("Wanted cache.DeleteFinalStateUnknown of frontendconfig obj, got: %+v type: %T", obj, obj)
						return
					}

					feConfig, feOk = state.Obj.(*frontendconfigv1beta1.FrontendConfig)
					if !feOk {
						klog.Errorf("Wanted frontendconfig obj, got %+v, type %T", state.Obj, state.Obj)
						return
					}
				}

				ings := operator.Ingresses(ctx.Ingresses().List()).ReferencesFrontendConfig(feConfig).AsList()
				lbc.ingQueue.Enqueue(convert(ings)...)
			},
		})
	}

	// Register health check on controller context.
	ctx.AddHealthCheck("ingress", func() error {
		_, err := backendPool.Get("k8s-ingress-svc-acct-permission-check-probe", meta.VersionGA, meta.Global)

		// If this container is scheduled on a node without compute/rw it is
		// effectively useless, but it is healthy. Reporting it as unhealthy
		// will lead to container crashlooping.
		if utils.IsHTTPErrorCode(err, http.StatusForbidden) {
			klog.Infof("Reporting cluster as healthy, but unable to list backends: %v", err)
			return nil
		}
		return utils.IgnoreHTTPNotFound(err)
	})

	klog.V(3).Infof("Created new loadbalancer controller")

	return &lbc
}