func run()

in cmd/recorder/main.go [60:213]


func run(ctx context.Context) error {
	var (
		prometheusScrapeEndpoint string
		metricInterval           int
		enablePprof              bool
		pprofPort                int
	)
	flag.StringVar(&prometheusScrapeEndpoint, "prometheus-scrape-endpoint", ":8888", "configure the Prometheus scrape endpoint; :8888 as default")
	flag.IntVar(&metricInterval, "metric-interval", 60, "the time interval of each recording in seconds")
	flag.BoolVar(&enablePprof, "enable-pprof", false, "Enable the pprof server.")
	flag.IntVar(&pprofPort, "pprof-port", 6060, "The port that the pprof server binds to if enabled.")
	profiler.AddFlag(flag.CommandLine)

	// Support default klog verbosity (so that we can see client-go traffic)
	klogFlagSet := goflag.NewFlagSet("klog", goflag.ExitOnError)
	klog.InitFlags(klogFlagSet)
	flag.CommandLine.AddGoFlag(klogFlagSet.Lookup("v"))

	flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
	flag.Parse()

	kccVersion := os.Getenv("CONFIG_CONNECTOR_VERSION")

	logger := klog.NewKlogr()
	ctx = klog.NewContext(ctx, logger)
	crlog.SetLogger(logger)

	logger.Info("Recording the stats of Config Connector resources")

	// Start pprof server if enabled
	if enablePprof {
		go func() {
			if err := http.ListenAndServe(fmt.Sprintf(":%d", pprofPort), nil); err != nil {
				logger.Error(err, "error while running pprof server")
			}
		}()
	}

	// Start Cloud Profiler agent if enabled
	if err := profiler.StartIfEnabled(); err != nil {
		logging.Fatal(err, "error starting Cloud Profiler agent")
	}

	// Register the Prometheus metrics
	prometheus.MustRegister(appliedResources)
	prometheus.MustRegister(metrics.NewBuildInfoCollector(kccVersion))

	// Expose the registered metrics via HTTP.
	go func() {
		http.Handle("/metrics", promhttp.Handler())
		logging.Fatal(http.ListenAndServe(prometheusScrapeEndpoint, nil), "error registering the Prometheus HTTP handler")
	}()

	// Set up the HTTP server for the readiness probe
	logger.Info("Setting container as ready...")
	ready.SetContainerAsReady()
	logger.Info("Container is ready.")

	// Get a config to talk to the apiserver
	restConfig, err := config.GetConfig()
	if err != nil {
		return fmt.Errorf("error getting kubernetes configuration: %w", err)
	}

	restHTTPClient, err := rest.HTTPClientFor(restConfig)
	if err != nil {
		return fmt.Errorf("building kubernetes http client: %w", err)
	}

	kubeTarget, err := kube.NewTarget(restConfig, restHTTPClient)
	if err != nil {
		return fmt.Errorf("building kubernetes target: %w", err)
	}

	crdGVR := schema.GroupVersionResource{Group: "apiextensions.k8s.io", Version: "v1", Resource: "customresourcedefinitions"}
	crdInfos := kube.WatchKube(ctx, kubeTarget, crdGVR, buildCRDInfo)

	statViews := make(map[CRDInfo]*kube.KubeView[ResourceStats])

	for {
		time.Sleep(time.Duration(metricInterval) * time.Second)

		// Reset all metrics before updating.
		appliedResources.Reset()

		// Skip reporting if CRDs aren't synced up.
		if !crdInfos.HasSyncedOnce() {
			logger.Info("CRDs have not yet synced, skipping metric reporting")
			continue
		}

		seenCRDs := make(map[CRDInfo]bool)
		for _, crdInfo := range crdInfos.Snapshot() {
			// Skip non-KCC resources.
			if !strings.HasSuffix(crdInfo.GVK.Group, ".cnrm.cloud.google.com") {
				continue
			}

			// Skip ignored CRDs.
			crdName := gvkToCRDName(crdInfo.GVK)
			if _, ok := opk8s.IgnoredCRDList[crdName]; ok {
				logger.Error(fmt.Errorf("unexpected CRD %s", crdName),
					fmt.Sprintf("please run `kubectl delete crd %s` to "+
						"delete the orphaned CRD", crdName),
					"crd", crdName)
				continue
			}

			// Record all KCC CRDs we see, so we can clean up unused watches.
			seenCRDs[crdInfo] = true

			// Register watch for this resource if we haven't already.
			if _, ok := statViews[crdInfo]; !ok {
				statView := kube.WatchKube(ctx, kubeTarget, crdInfo.GVR, gatherResourceStats)
				statViews[crdInfo] = statView
			}

			// Skip reporting for this resource if we aren't synced up.
			if !statViews[crdInfo].HasSyncedOnce() {
				logger.Info("CRs have not yet synced, skipping metric reporting", "gvk", crdInfo.GVK)
				continue
			}

			// Aggregate stats for each namespace.
			nsAggStats := make(map[string]*AggregatedResourceStats)
			for i, s := range statViews[crdInfo].Snapshot() {
				ns := i.Namespace
				nsStats, ok := nsAggStats[ns]
				if !ok {
					nsStats = NewAggregatedResourceStats()
					nsAggStats[ns] = nsStats
				}
				nsStats.lastConditionCounts[s.lastCondition]++
			}

			// Record stats.
			for ns, stats := range nsAggStats {
				for condition, count := range stats.lastConditionCounts {
					logger.V(2).Info("posting metrics", "namespace", ns, "gvk", crdInfo.GVK.String(), "status", condition, "count", count)
					appliedResources.WithLabelValues(ns, crdInfo.GVK.GroupKind().String(), condition).Set(float64(count))
				}
			}
		}

		// Cleanup stale watches.
		for crdInfo, view := range statViews {
			if _, ok := seenCRDs[crdInfo]; !ok {
				logger.Info("removing stale watch for resource", "gvk", crdInfo.GVK.String())
				view.Close()
				delete(statViews, crdInfo)
			}
		}
	}
}