cmd/operator/main.go (140 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"crypto/fips140"
"errors"
"flag"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"cloud.google.com/go/compute/metadata"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap/zapcore"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
// Blank import required to register GCP auth handlers to talk to GKE clusters.
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"github.com/GoogleCloudPlatform/prometheus-engine/pkg/operator"
)
const (
defaultTLSDir = "/etc/tls/private"
)
func main() {
ctx := context.Background()
var (
defaultProjectID string
defaultCluster string
defaultLocation string
)
errList := []error{}
if metadata.OnGCE() {
var err error
defaultProjectID, err = metadata.ProjectIDWithContext(ctx)
errList = append(errList, err)
defaultCluster, err = metadata.InstanceAttributeValueWithContext(ctx, "cluster-name")
errList = append(errList, err)
defaultLocation, err = metadata.InstanceAttributeValueWithContext(ctx, "cluster-location")
errList = append(errList, err)
}
var (
logVerbosity = flag.Int("v", 0, "Logging verbosity")
projectID = flag.String("project-id", defaultProjectID, "Project ID of the cluster. May be left empty on GKE.")
location = flag.String("location", defaultLocation, "Google Cloud region or zone where your data will be stored. May be left empty on GKE.")
cluster = flag.String("cluster", defaultCluster, "Name of the cluster the operator acts on. May be left empty on GKE.")
operatorNamespace = flag.String("operator-namespace", operator.DefaultOperatorNamespace,
"Namespace in which the operator manages its resources.")
publicNamespace = flag.String("public-namespace", operator.DefaultPublicNamespace,
"Namespace in which the operator reads user-provided resources.")
tlsCert = flag.String("tls-cert-base64", "", "The base64-encoded TLS certificate.")
tlsKey = flag.String("tls-key-base64", "", "The base64-encoded TLS key.")
caCert = flag.String("ca-cert-base64", "", "The base64-encoded certificate authority.")
certDir = flag.String("cert-dir", defaultTLSDir, "The directory which contains TLS certificates for webhook server.")
webhookAddr = flag.String("webhook-addr", ":10250",
"Address to listen to for incoming kube admission webhook connections.")
probeAddr = flag.String("probe-addr", ":18081", "Address to outputs probe statuses (e.g. /readyz and /healthz)")
metricsAddr = flag.String("metrics-addr", ":18080", "Address to emit metrics on.")
// Permit the operator to cleanup previously-managed resources that
// are missing the provided annotation. An empty string disables this
// feature.
cleanupAnnotKey = flag.String("cleanup-unless-annotation-key", "",
"Clean up operator-managed workloads without the provided annotation key.")
)
flag.Parse()
logger := zap.New(zap.Level(zapcore.Level(-*logVerbosity)))
ctrl.SetLogger(logger)
if err := errors.Join(errList...); err != nil {
logger.Error(err, "unable to fetch Google Cloud metadata")
}
if !fips140.Enabled() {
logger.Error(errors.New("FIPS mode required"), "FIPS mode not enabled")
os.Exit(1)
}
cfg, err := ctrl.GetConfig()
if err != nil {
logger.Error(err, "loading kubeconfig failed")
os.Exit(1)
}
// controller-runtime creates a registry against which its metrics are registered globally.
// Using it as our non-global registry is the easiest way to combine metrics into a single
// /metrics endpoint.
// It already has the GoCollector and ProcessCollector metrics installed.
metrics := ctrlmetrics.Registry
op, err := operator.New(logger, cfg, operator.Options{
ProjectID: *projectID,
Location: *location,
Cluster: *cluster,
OperatorNamespace: *operatorNamespace,
PublicNamespace: *publicNamespace,
ProbeAddr: *probeAddr,
TLSCert: *tlsCert,
TLSKey: *tlsKey,
CACert: *caCert,
CertDir: *certDir,
ListenAddr: *webhookAddr,
CleanupAnnotKey: *cleanupAnnotKey,
})
if err != nil {
logger.Error(err, "instantiating operator failed")
os.Exit(1)
}
var g run.Group
// Termination handler.
{
term := make(chan os.Signal, 1)
cancel := make(chan struct{})
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
g.Add(
func() error {
select {
case <-term:
logger.Info("received SIGTERM, exiting gracefully...")
case <-cancel:
}
return nil
},
func(error) {
close(cancel)
},
)
}
// Operator monitoring.
{
server := &http.Server{Addr: *metricsAddr}
http.Handle("/metrics", promhttp.HandlerFor(metrics, promhttp.HandlerOpts{Registry: metrics}))
g.Add(func() error {
return server.ListenAndServe()
}, func(error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
if err := server.Shutdown(ctx); err != nil {
logger.Error(err, "Server failed to shut down gracefully.")
}
cancel()
})
}
// Main operator loop.
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return op.Run(ctx, metrics)
}, func(error) {
cancel()
})
}
if err := g.Run(); err != nil {
logger.Error(err, "exit with error")
os.Exit(1)
}
}