in cmd/recorder/main.go [60:213]
func run(ctx context.Context) error {
var (
prometheusScrapeEndpoint string
metricInterval int
enablePprof bool
pprofPort int
)
flag.StringVar(&prometheusScrapeEndpoint, "prometheus-scrape-endpoint", ":8888", "configure the Prometheus scrape endpoint; :8888 as default")
flag.IntVar(&metricInterval, "metric-interval", 60, "the time interval of each recording in seconds")
flag.BoolVar(&enablePprof, "enable-pprof", false, "Enable the pprof server.")
flag.IntVar(&pprofPort, "pprof-port", 6060, "The port that the pprof server binds to if enabled.")
profiler.AddFlag(flag.CommandLine)
// Support default klog verbosity (so that we can see client-go traffic)
klogFlagSet := goflag.NewFlagSet("klog", goflag.ExitOnError)
klog.InitFlags(klogFlagSet)
flag.CommandLine.AddGoFlag(klogFlagSet.Lookup("v"))
flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
flag.Parse()
kccVersion := os.Getenv("CONFIG_CONNECTOR_VERSION")
logger := klog.NewKlogr()
ctx = klog.NewContext(ctx, logger)
crlog.SetLogger(logger)
logger.Info("Recording the stats of Config Connector resources")
// Start pprof server if enabled
if enablePprof {
go func() {
if err := http.ListenAndServe(fmt.Sprintf(":%d", pprofPort), nil); err != nil {
logger.Error(err, "error while running pprof server")
}
}()
}
// Start Cloud Profiler agent if enabled
if err := profiler.StartIfEnabled(); err != nil {
logging.Fatal(err, "error starting Cloud Profiler agent")
}
// Register the Prometheus metrics
prometheus.MustRegister(appliedResources)
prometheus.MustRegister(metrics.NewBuildInfoCollector(kccVersion))
// Expose the registered metrics via HTTP.
go func() {
http.Handle("/metrics", promhttp.Handler())
logging.Fatal(http.ListenAndServe(prometheusScrapeEndpoint, nil), "error registering the Prometheus HTTP handler")
}()
// Set up the HTTP server for the readiness probe
logger.Info("Setting container as ready...")
ready.SetContainerAsReady()
logger.Info("Container is ready.")
// Get a config to talk to the apiserver
restConfig, err := config.GetConfig()
if err != nil {
return fmt.Errorf("error getting kubernetes configuration: %w", err)
}
restHTTPClient, err := rest.HTTPClientFor(restConfig)
if err != nil {
return fmt.Errorf("building kubernetes http client: %w", err)
}
kubeTarget, err := kube.NewTarget(restConfig, restHTTPClient)
if err != nil {
return fmt.Errorf("building kubernetes target: %w", err)
}
crdGVR := schema.GroupVersionResource{Group: "apiextensions.k8s.io", Version: "v1", Resource: "customresourcedefinitions"}
crdInfos := kube.WatchKube(ctx, kubeTarget, crdGVR, buildCRDInfo)
statViews := make(map[CRDInfo]*kube.KubeView[ResourceStats])
for {
time.Sleep(time.Duration(metricInterval) * time.Second)
// Reset all metrics before updating.
appliedResources.Reset()
// Skip reporting if CRDs aren't synced up.
if !crdInfos.HasSyncedOnce() {
logger.Info("CRDs have not yet synced, skipping metric reporting")
continue
}
seenCRDs := make(map[CRDInfo]bool)
for _, crdInfo := range crdInfos.Snapshot() {
// Skip non-KCC resources.
if !strings.HasSuffix(crdInfo.GVK.Group, ".cnrm.cloud.google.com") {
continue
}
// Skip ignored CRDs.
crdName := gvkToCRDName(crdInfo.GVK)
if _, ok := opk8s.IgnoredCRDList[crdName]; ok {
logger.Error(fmt.Errorf("unexpected CRD %s", crdName),
fmt.Sprintf("please run `kubectl delete crd %s` to "+
"delete the orphaned CRD", crdName),
"crd", crdName)
continue
}
// Record all KCC CRDs we see, so we can clean up unused watches.
seenCRDs[crdInfo] = true
// Register watch for this resource if we haven't already.
if _, ok := statViews[crdInfo]; !ok {
statView := kube.WatchKube(ctx, kubeTarget, crdInfo.GVR, gatherResourceStats)
statViews[crdInfo] = statView
}
// Skip reporting for this resource if we aren't synced up.
if !statViews[crdInfo].HasSyncedOnce() {
logger.Info("CRs have not yet synced, skipping metric reporting", "gvk", crdInfo.GVK)
continue
}
// Aggregate stats for each namespace.
nsAggStats := make(map[string]*AggregatedResourceStats)
for i, s := range statViews[crdInfo].Snapshot() {
ns := i.Namespace
nsStats, ok := nsAggStats[ns]
if !ok {
nsStats = NewAggregatedResourceStats()
nsAggStats[ns] = nsStats
}
nsStats.lastConditionCounts[s.lastCondition]++
}
// Record stats.
for ns, stats := range nsAggStats {
for condition, count := range stats.lastConditionCounts {
logger.V(2).Info("posting metrics", "namespace", ns, "gvk", crdInfo.GVK.String(), "status", condition, "count", count)
appliedResources.WithLabelValues(ns, crdInfo.GVK.GroupKind().String(), condition).Set(float64(count))
}
}
}
// Cleanup stale watches.
for crdInfo, view := range statViews {
if _, ok := seenCRDs[crdInfo]; !ok {
logger.Info("removing stale watch for resource", "gvk", crdInfo.GVK.String())
view.Close()
delete(statViews, crdInfo)
}
}
}
}