in pkg/neg/controller.go [111:324]
func NewController(
kubeClient kubernetes.Interface,
svcNegClient svcnegclient.Interface,
destinationRuleClient dynamic.NamespaceableResourceInterface,
kubeSystemUID types.UID,
ingressInformer cache.SharedIndexInformer,
serviceInformer cache.SharedIndexInformer,
podInformer cache.SharedIndexInformer,
nodeInformer cache.SharedIndexInformer,
endpointInformer cache.SharedIndexInformer,
endpointSliceInformer cache.SharedIndexInformer,
destinationRuleInformer cache.SharedIndexInformer,
svcNegInformer cache.SharedIndexInformer,
hasSynced func() bool,
controllerMetrics *usage.ControllerMetrics,
l4Namer namer2.L4ResourcesNamer,
defaultBackendService utils.ServicePort,
cloud negtypes.NetworkEndpointGroupCloud,
zoneGetter negtypes.ZoneGetter,
namer negtypes.NetworkEndpointGroupNamer,
resyncPeriod time.Duration,
gcPeriod time.Duration,
enableReadinessReflector bool,
runIngress bool,
runL4Controller bool,
enableNonGcpMode bool,
enableAsm bool,
asmServiceNEGSkipNamespaces []string,
enableEndpointSlices bool,
) *Controller {
// init event recorder
// TODO: move event recorder initializer to main. Reuse it among controllers.
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
Interface: kubeClient.CoreV1().Events(""),
})
negScheme := runtime.NewScheme()
err := scheme.AddToScheme(negScheme)
if err != nil {
klog.Errorf("Errored adding default scheme to event recorder: %q", err)
}
err = svcnegv1beta1.AddToScheme(negScheme)
if err != nil {
klog.Errorf("Errored adding NEG CRD scheme to event recorder: %q", err)
}
recorder := eventBroadcaster.NewRecorder(negScheme,
apiv1.EventSource{Component: "neg-controller"})
var endpointIndexer, endpointSliceIndexer cache.Indexer
if enableEndpointSlices {
endpointSliceIndexer = endpointSliceInformer.GetIndexer()
} else {
endpointIndexer = endpointInformer.GetIndexer()
}
manager := newSyncerManager(
namer,
recorder,
cloud,
zoneGetter,
svcNegClient,
kubeSystemUID,
podInformer.GetIndexer(),
serviceInformer.GetIndexer(),
endpointIndexer,
endpointSliceIndexer,
nodeInformer.GetIndexer(),
svcNegInformer.GetIndexer(),
enableNonGcpMode,
enableEndpointSlices)
var reflector readiness.Reflector
if enableReadinessReflector {
reflector = readiness.NewReadinessReflector(
kubeClient,
podInformer.GetIndexer(),
cloud, manager)
} else {
reflector = &readiness.NoopReflector{}
}
manager.reflector = reflector
negController := &Controller{
client: kubeClient,
manager: manager,
resyncPeriod: resyncPeriod,
gcPeriod: gcPeriod,
recorder: recorder,
zoneGetter: zoneGetter,
namer: namer,
l4Namer: l4Namer,
defaultBackendService: defaultBackendService,
hasSynced: hasSynced,
ingressLister: ingressInformer.GetIndexer(),
serviceLister: serviceInformer.GetIndexer(),
serviceQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
endpointQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
nodeQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
syncTracker: utils.NewTimeTracker(),
reflector: reflector,
collector: controllerMetrics,
runL4: runL4Controller,
}
if runIngress {
ingressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*v1.Ingress)
if !utils.IsGLBCIngress(addIng) {
klog.V(4).Infof("Ignoring add for ingress %v based on annotation %v", common.NamespacedName(addIng), annotations.IngressClassKey)
return
}
negController.enqueueIngressServices(addIng)
},
DeleteFunc: func(obj interface{}) {
delIng := obj.(*v1.Ingress)
if !utils.IsGLBCIngress(delIng) {
klog.V(4).Infof("Ignoring delete for ingress %v based on annotation %v", common.NamespacedName(delIng), annotations.IngressClassKey)
return
}
negController.enqueueIngressServices(delIng)
},
UpdateFunc: func(old, cur interface{}) {
oldIng := old.(*v1.Ingress)
curIng := cur.(*v1.Ingress)
// Check if ingress class changed and previous class was a GCE ingress
// Ingress class change may require cleanup so enqueue related services
if !utils.IsGLBCIngress(curIng) && !utils.IsGLBCIngress(oldIng) {
klog.V(4).Infof("Ignoring update for ingress %v based on annotation %v", common.NamespacedName(curIng), annotations.IngressClassKey)
return
}
keys := gatherIngressServiceKeys(oldIng)
keys = keys.Union(gatherIngressServiceKeys(curIng))
for _, key := range keys.List() {
negController.enqueueService(cache.ExplicitKey(key))
}
},
})
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*apiv1.Pod)
negController.reflector.SyncPod(pod)
},
UpdateFunc: func(old, cur interface{}) {
pod := cur.(*apiv1.Pod)
negController.reflector.SyncPod(pod)
},
})
}
serviceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: negController.enqueueService,
DeleteFunc: negController.enqueueService,
UpdateFunc: func(old, cur interface{}) {
negController.enqueueService(cur)
},
})
if enableEndpointSlices {
endpointSliceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: negController.enqueueEndpointSlice,
DeleteFunc: negController.enqueueEndpointSlice,
UpdateFunc: func(old, cur interface{}) {
negController.enqueueEndpointSlice(cur)
},
})
} else {
endpointInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: negController.enqueueEndpoint,
DeleteFunc: negController.enqueueEndpoint,
UpdateFunc: func(old, cur interface{}) {
negController.enqueueEndpoint(cur)
},
})
}
nodeEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*apiv1.Node)
negController.enqueueNode(node)
},
DeleteFunc: func(obj interface{}) {
node := obj.(*apiv1.Node)
negController.enqueueNode(node)
},
}
if negController.runL4 {
nodeEventHandler.UpdateFunc = func(old, cur interface{}) {
oldNode := old.(*apiv1.Node)
currentNode := cur.(*apiv1.Node)
candidateNodeCheck := utils.CandidateNodesPredicateIncludeUnreadyExcludeUpgradingNodes
if candidateNodeCheck(oldNode) != candidateNodeCheck(currentNode) {
klog.Infof("Node %q has changed, enqueueing", currentNode.Name)
negController.enqueueNode(currentNode)
}
}
}
nodeInformer.AddEventHandler(nodeEventHandler)
if enableAsm {
negController.enableASM = enableAsm
negController.asmServiceNEGSkipNamespaces = asmServiceNEGSkipNamespaces
negController.destinationRuleLister = destinationRuleInformer.GetIndexer()
destinationRuleInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: negController.enqueueDestinationRule,
DeleteFunc: negController.enqueueDestinationRule,
UpdateFunc: func(old, cur interface{}) {
negController.enqueueDestinationRule(cur)
},
})
negController.destinationRuleClient = destinationRuleClient
}
return negController
}