in controllers/service/controller.go [104:198]
func New(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
serviceInformer coreinformers.ServiceInformer,
nodeInformer coreinformers.NodeInformer,
clusterName string,
featureGate featuregate.FeatureGate,
) (*Controller, error) {
broadcaster := record.NewBroadcaster()
broadcaster.StartStructuredLogging(0)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"})
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage(subSystemName, kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
registerMetrics()
s := &Controller{
cloud: cloud,
knownHosts: []*v1.Node{},
kubeClient: kubeClient,
clusterName: clusterName,
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
eventBroadcaster: broadcaster,
eventRecorder: recorder,
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
// nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached.
nodeSyncCh: make(chan interface{}, 1),
}
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(cur interface{}) {
svc, ok := cur.(*v1.Service)
// Check cleanup here can provide a remedy when controller failed to handle
// changes before it exiting (e.g. crashing, restart, etc.).
if ok && (wantsLoadBalancer(svc) || needsCleanup(svc)) {
s.enqueueService(cur)
}
},
UpdateFunc: func(old, cur interface{}) {
oldSvc, ok1 := old.(*v1.Service)
curSvc, ok2 := cur.(*v1.Service)
if ok1 && ok2 && (s.needsUpdate(oldSvc, curSvc) || needsCleanup(curSvc)) {
s.enqueueService(cur)
}
},
// No need to handle deletion event because the deletion would be handled by
// the update path when the deletion timestamp is added.
},
serviceSyncPeriod,
)
s.serviceLister = serviceInformer.Lister()
s.serviceListerSynced = serviceInformer.Informer().HasSynced
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(cur interface{}) {
s.triggerNodeSync()
},
UpdateFunc: func(old, cur interface{}) {
oldNode, ok := old.(*v1.Node)
if !ok {
return
}
curNode, ok := cur.(*v1.Node)
if !ok {
return
}
if !shouldSyncNode(oldNode, curNode) {
return
}
s.triggerNodeSync()
},
DeleteFunc: func(old interface{}) {
s.triggerNodeSync()
},
},
time.Duration(0),
)
if err := s.init(); err != nil {
return nil, err
}
return s, nil
}