backend/main.go (227 lines of code) (raw):
// Copyright 2025 Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"path/filepath"
"sync/atomic"
"syscall"
"time"
"github.com/go-logr/logr"
ocmsdk "github.com/openshift-online/ocm-sdk-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud"
"github.com/Azure/ARO-HCP/internal/database"
"github.com/Azure/ARO-HCP/internal/version"
)
const (
leaderElectionLockName = "backend-leader"
leaderElectionLeaseDuration = 15 * time.Second
leaderElectionRenewDeadline = 10 * time.Second
leaderElectionRetryPeriod = 2 * time.Second
)
var (
argKubeconfig string
argNamespace string
argLocation string
argCosmosName string
argCosmosURL string
argClustersServiceURL string
argInsecure bool
argMetricsListenAddress string
argPortListenAddress string
processName = filepath.Base(os.Args[0])
rootCmd = &cobra.Command{
Use: processName,
Args: cobra.NoArgs,
Short: "ARO HCP Backend",
Long: fmt.Sprintf(`ARO HCP Backend
The command runs the ARO HCP Backend. It executes background processing that
communicates with Clusters Service and CosmosDB.
# Run ARO HCP Backend locally to connect to a local Clusters Service at http://localhost:8000
%s --cosmos-name ${DB_NAME} --cosmos-url ${DB_URL} --location ${LOCATION} \
--clusters-service-url "http://localhost:8000"
`, processName),
Version: "unknown", // overridden by build info below
RunE: Run,
SilenceErrors: true, // errors are printed after Execute
}
)
func init() {
rootCmd.SetErrPrefix(rootCmd.Short + " error:")
rootCmd.Flags().StringVar(&argKubeconfig, "kubeconfig", "", "Absolute path to the kubeconfig file")
rootCmd.Flags().StringVar(&argNamespace, "namespace", os.Getenv("NAMESPACE"), "Kubernetes namespace")
rootCmd.Flags().StringVar(&argLocation, "location", os.Getenv("LOCATION"), "Azure location")
rootCmd.Flags().StringVar(&argCosmosName, "cosmos-name", os.Getenv("DB_NAME"), "Cosmos database name")
rootCmd.Flags().StringVar(&argCosmosURL, "cosmos-url", os.Getenv("DB_URL"), "Cosmos database URL")
rootCmd.Flags().StringVar(&argClustersServiceURL, "clusters-service-url", "https://api.openshift.com", "URL of the OCM API gateway")
rootCmd.Flags().BoolVar(&argInsecure, "insecure", false, "Skip validating TLS for clusters-service")
rootCmd.Flags().StringVar(&argMetricsListenAddress, "metrics-listen-address", ":8081", "Address on which to expose metrics")
rootCmd.Flags().StringVar(&argPortListenAddress, "healthz-listen-address", ":8083", "Address on which Healthz endpoint will be supported")
rootCmd.MarkFlagsRequiredTogether("cosmos-name", "cosmos-url")
rootCmd.Version = version.CommitSHA
}
func newKubeconfig(kubeconfig string) (*rest.Config, error) {
loader := clientcmd.NewDefaultClientConfigLoadingRules()
if kubeconfig != "" {
loader.ExplicitPath = kubeconfig
}
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loader, nil).ClientConfig()
}
func Run(cmd *cobra.Command, args []string) error {
handler := slog.NewJSONHandler(os.Stdout, nil)
logger := slog.New(handler)
klog.SetLogger(logr.FromSlogHandler(handler))
// Use pod name as the lock identity.
hostname, err := os.Hostname()
if err != nil {
return err
}
kubeconfig, err := newKubeconfig(argKubeconfig)
if err != nil {
return fmt.Errorf("failed to create Kubernetes configuration: %w", err)
}
leaderElectionLock, err := resourcelock.NewFromKubeconfig(
resourcelock.LeasesResourceLock,
argNamespace,
leaderElectionLockName,
resourcelock.ResourceLockConfig{
Identity: hostname,
},
kubeconfig,
leaderElectionRenewDeadline)
if err != nil {
return fmt.Errorf("failed to create leader election lock: %w", err)
}
// Create the database client.
cosmosDatabaseClient, err := database.NewCosmosDatabaseClient(
argCosmosURL,
argCosmosName,
azcore.ClientOptions{
// FIXME Cloud should be determined by other means.
Cloud: cloud.AzurePublic,
},
)
if err != nil {
return fmt.Errorf("failed to create the CosmosDB client: %w", err)
}
dbClient, err := database.NewDBClient(context.Background(), cosmosDatabaseClient)
if err != nil {
return fmt.Errorf("failed to create the database client: %w", err)
}
// Create OCM connection
ocmConnection, err := ocmsdk.NewUnauthenticatedConnectionBuilder().
URL(argClustersServiceURL).
Insecure(argInsecure).
Build()
if err != nil {
return fmt.Errorf("failed to create OCM connection: %w", err)
}
logger.Info(fmt.Sprintf("%s (%s) started", cmd.Short, cmd.Version))
// Create HealthzAdaptor for leader election
electionChecker := leaderelection.NewLeaderHealthzAdaptor(time.Second * 20)
group, ctx := errgroup.WithContext(context.Background())
// Handle requests directly for /healthz endpoint
if argPortListenAddress != "" {
backendHealthGauge := promauto.With(prometheus.DefaultRegisterer).NewGauge(prometheus.GaugeOpts{Name: "backend_health", Help: "backend_health is 1 when healthy"})
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
if err := electionChecker.Check(r); err != nil {
http.Error(w, "lease not renewed", http.StatusServiceUnavailable)
backendHealthGauge.Set(0.0)
return
}
w.WriteHeader(http.StatusOK)
backendHealthGauge.Set(1.0)
})
healthzServer := &http.Server{Addr: argPortListenAddress}
group.Go(func() error {
logger.Info(fmt.Sprintf("Healthz server listening on %s", argPortListenAddress))
err := healthzServer.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) {
return nil
}
return err
})
}
var srv *http.Server
if argMetricsListenAddress != "" {
http.Handle("/metrics", promhttp.InstrumentMetricHandler(
prometheus.DefaultRegisterer,
promhttp.HandlerFor(
prometheus.DefaultGatherer,
promhttp.HandlerOpts{},
),
))
srv = &http.Server{Addr: argMetricsListenAddress}
group.Go(func() error {
logger.Info(fmt.Sprintf("metrics server listening on %s", argMetricsListenAddress))
err := srv.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) {
return nil
}
return err
})
}
ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()
go func() {
<-ctx.Done()
logger.Info("Caught interrupt signal")
if srv != nil {
_ = srv.Close()
}
}()
group.Go(func() error {
var (
startedLeading atomic.Bool
operationsScanner = NewOperationsScanner(dbClient, ocmConnection)
)
// FIXME Integrate leaderelection.HealthzAdaptor into a /healthz endpoint.
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: leaderElectionLock,
LeaseDuration: leaderElectionLeaseDuration,
RenewDeadline: leaderElectionRenewDeadline,
RetryPeriod: leaderElectionRetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
operationsScanner.leaderGauge.Set(1)
startedLeading.Store(true)
go operationsScanner.Run(ctx, logger)
},
OnStoppedLeading: func() {
operationsScanner.leaderGauge.Set(0)
if startedLeading.Load() {
operationsScanner.Join()
}
},
},
ReleaseOnCancel: true,
WatchDog: electionChecker,
Name: leaderElectionLockName,
})
if err != nil {
return err
}
le.Run(ctx)
return nil
})
if err := group.Wait(); err != nil {
logger.Error(err.Error())
os.Exit(1)
}
logger.Info(fmt.Sprintf("%s (%s) stopped", cmd.Short, cmd.Version))
return nil
}
func main() {
if err := rootCmd.Execute(); err != nil {
rootCmd.PrintErrln(rootCmd.ErrPrefix(), err.Error())
os.Exit(1)
}
}