in pkg/metricshandler/metrics_handler.go [92:175]
func (m *MetricsHandler) Run(ctx context.Context) error {
autoSharding := len(m.opts.Pod) > 0 && len(m.opts.Namespace) > 0
if !autoSharding {
klog.Info("Autosharding disabled")
m.ConfigureSharding(ctx, m.opts.Shard, m.opts.TotalShards)
<-ctx.Done()
return ctx.Err()
}
klog.Infof("Autosharding enabled with pod=%v pod_namespace=%v", m.opts.Pod, m.opts.Namespace)
klog.Infof("Auto detecting sharding settings.")
ss, err := detectStatefulSet(m.kubeClient, m.opts.Pod, m.opts.Namespace)
if err != nil {
return errors.Wrap(err, "detect StatefulSet")
}
statefulSetName := ss.Name
labelSelectorOptions := func(o *metav1.ListOptions) {
o.LabelSelector = fields.SelectorFromSet(ss.Labels).String()
}
i := cache.NewSharedIndexInformer(
cache.NewFilteredListWatchFromClient(m.kubeClient.AppsV1().RESTClient(), "statefulsets", m.opts.Namespace, labelSelectorOptions),
&appsv1.StatefulSet{}, 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
i.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
ss := o.(*appsv1.StatefulSet)
if ss.Name != statefulSetName {
return
}
shard, totalShards, err := shardingSettingsFromStatefulSet(ss, m.opts.Pod)
if err != nil {
klog.Errorf("detect sharding settings from StatefulSet: %v", err)
return
}
m.mtx.RLock()
shardingUnchanged := m.curShard == shard && m.curTotalShards == totalShards
m.mtx.RUnlock()
if shardingUnchanged {
return
}
m.ConfigureSharding(ctx, shard, totalShards)
},
UpdateFunc: func(oldo, curo interface{}) {
old := oldo.(*appsv1.StatefulSet)
cur := curo.(*appsv1.StatefulSet)
if cur.Name != statefulSetName {
return
}
if old.ResourceVersion == cur.ResourceVersion {
return
}
shard, totalShards, err := shardingSettingsFromStatefulSet(cur, m.opts.Pod)
if err != nil {
klog.Errorf("detect sharding settings from StatefulSet: %v", err)
return
}
m.mtx.RLock()
shardingUnchanged := m.curShard == shard && m.curTotalShards == totalShards
m.mtx.RUnlock()
if shardingUnchanged {
return
}
m.ConfigureSharding(ctx, shard, totalShards)
},
})
go i.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), i.HasSynced) {
return errors.New("waiting for informer cache to sync failed")
}
<-ctx.Done()
return ctx.Err()
}