in backend/main.go [115:284]
func Run(cmd *cobra.Command, args []string) error {
handler := slog.NewJSONHandler(os.Stdout, nil)
logger := slog.New(handler)
klog.SetLogger(logr.FromSlogHandler(handler))
// Use pod name as the lock identity.
hostname, err := os.Hostname()
if err != nil {
return err
}
kubeconfig, err := newKubeconfig(argKubeconfig)
if err != nil {
return fmt.Errorf("failed to create Kubernetes configuration: %w", err)
}
leaderElectionLock, err := resourcelock.NewFromKubeconfig(
resourcelock.LeasesResourceLock,
argNamespace,
leaderElectionLockName,
resourcelock.ResourceLockConfig{
Identity: hostname,
},
kubeconfig,
leaderElectionRenewDeadline)
if err != nil {
return fmt.Errorf("failed to create leader election lock: %w", err)
}
// Create the database client.
cosmosDatabaseClient, err := database.NewCosmosDatabaseClient(
argCosmosURL,
argCosmosName,
azcore.ClientOptions{
// FIXME Cloud should be determined by other means.
Cloud: cloud.AzurePublic,
},
)
if err != nil {
return fmt.Errorf("failed to create the CosmosDB client: %w", err)
}
dbClient, err := database.NewDBClient(context.Background(), cosmosDatabaseClient)
if err != nil {
return fmt.Errorf("failed to create the database client: %w", err)
}
// Create OCM connection
ocmConnection, err := ocmsdk.NewUnauthenticatedConnectionBuilder().
URL(argClustersServiceURL).
Insecure(argInsecure).
Build()
if err != nil {
return fmt.Errorf("failed to create OCM connection: %w", err)
}
logger.Info(fmt.Sprintf("%s (%s) started", cmd.Short, cmd.Version))
// Create HealthzAdaptor for leader election
electionChecker := leaderelection.NewLeaderHealthzAdaptor(time.Second * 20)
group, ctx := errgroup.WithContext(context.Background())
// Handle requests directly for /healthz endpoint
if argPortListenAddress != "" {
backendHealthGauge := promauto.With(prometheus.DefaultRegisterer).NewGauge(prometheus.GaugeOpts{Name: "backend_health", Help: "backend_health is 1 when healthy"})
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
if err := electionChecker.Check(r); err != nil {
http.Error(w, "lease not renewed", http.StatusServiceUnavailable)
backendHealthGauge.Set(0.0)
return
}
w.WriteHeader(http.StatusOK)
backendHealthGauge.Set(1.0)
})
healthzServer := &http.Server{Addr: argPortListenAddress}
group.Go(func() error {
logger.Info(fmt.Sprintf("Healthz server listening on %s", argPortListenAddress))
err := healthzServer.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) {
return nil
}
return err
})
}
var srv *http.Server
if argMetricsListenAddress != "" {
http.Handle("/metrics", promhttp.InstrumentMetricHandler(
prometheus.DefaultRegisterer,
promhttp.HandlerFor(
prometheus.DefaultGatherer,
promhttp.HandlerOpts{},
),
))
srv = &http.Server{Addr: argMetricsListenAddress}
group.Go(func() error {
logger.Info(fmt.Sprintf("metrics server listening on %s", argMetricsListenAddress))
err := srv.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) {
return nil
}
return err
})
}
ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()
go func() {
<-ctx.Done()
logger.Info("Caught interrupt signal")
if srv != nil {
_ = srv.Close()
}
}()
group.Go(func() error {
var (
startedLeading atomic.Bool
operationsScanner = NewOperationsScanner(dbClient, ocmConnection)
)
// FIXME Integrate leaderelection.HealthzAdaptor into a /healthz endpoint.
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: leaderElectionLock,
LeaseDuration: leaderElectionLeaseDuration,
RenewDeadline: leaderElectionRenewDeadline,
RetryPeriod: leaderElectionRetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
operationsScanner.leaderGauge.Set(1)
startedLeading.Store(true)
go operationsScanner.Run(ctx, logger)
},
OnStoppedLeading: func() {
operationsScanner.leaderGauge.Set(0)
if startedLeading.Load() {
operationsScanner.Join()
}
},
},
ReleaseOnCancel: true,
WatchDog: electionChecker,
Name: leaderElectionLockName,
})
if err != nil {
return err
}
le.Run(ctx)
return nil
})
if err := group.Wait(); err != nil {
logger.Error(err.Error())
os.Exit(1)
}
logger.Info(fmt.Sprintf("%s (%s) stopped", cmd.Short, cmd.Version))
return nil
}