main.go (190 lines of code) (raw):

// Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Binary secrets-store-csi-driver-provider-gcp is a plugin for the // secrets-store-csi-driver for fetching secrets from Google Cloud's Secret // Manager API. package main import ( "context" "flag" "fmt" "net" "net/http" "net/http/pprof" "os" "os/signal" "path/filepath" "syscall" "time" "cloud.google.com/go/compute/metadata" iam "cloud.google.com/go/iam/credentials/apiv1" secretmanager "cloud.google.com/go/secretmanager/apiv1" "github.com/GoogleCloudPlatform/secrets-store-csi-driver-provider-gcp/auth" "github.com/GoogleCloudPlatform/secrets-store-csi-driver-provider-gcp/infra" "github.com/GoogleCloudPlatform/secrets-store-csi-driver-provider-gcp/server" "github.com/GoogleCloudPlatform/secrets-store-csi-driver-provider-gcp/vars" "github.com/prometheus/client_golang/prometheus/promhttp" otelprom "go.opentelemetry.io/otel/exporters/prometheus" "google.golang.org/api/option" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" logsapi "k8s.io/component-base/logs/api/v1" jlogs "k8s.io/component-base/logs/json" "k8s.io/klog/v2" "sigs.k8s.io/secrets-store-csi-driver/provider/v1alpha1" ) var ( kubeconfig = flag.String("kubeconfig", "", "absolute path to kubeconfig file") logFormatJSON = flag.Bool("log-format-json", true, "set log formatter to json") metricsAddr = flag.String("metrics_addr", ":8095", "configure http listener for reporting metrics") enableProfile = flag.Bool("enable-pprof", false, "enable pprof profiling") debugAddr = flag.String("debug_addr", "localhost:6060", "port for pprof profiling") _ = flag.Bool("write_secrets", false, "[unused]") smConnectionPoolSize = flag.Int("sm_connection_pool_size", 5, "size of the connection pool for the secret manager API client") iamConnectionPoolSize = flag.Int("iam_connection_pool_size", 5, "size of the connection pool for the IAM API client") version = "dev" ) func main() { klog.InitFlags(nil) defer klog.Flush() flag.Parse() if *logFormatJSON { jsonFactory := jlogs.Factory{} logger, _ := jsonFactory.Create(logsapi.LoggingConfiguration{Format: "json"}, logsapi.LoggingOptions{ErrorStream: os.Stderr, InfoStream: os.Stdout}) klog.SetLogger(logger) } ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() var err error uai, err := vars.UserAgentIdentifier.GetValue() if err != nil { klog.ErrorS(err, "failed to get user agent identifier") klog.Fatal("failed to get user agent identifier") } ua := fmt.Sprintf("%s/%s", uai, version) klog.InfoS(fmt.Sprintf("starting %s", ua)) // Kubernetes Client var rc *rest.Config if *kubeconfig != "" { klog.V(5).InfoS("using kubeconfig", "path", *kubeconfig) rc, err = clientcmd.BuildConfigFromFlags("", *kubeconfig) } else { klog.V(5).InfoS("using in-cluster kubeconfig") rc, err = rest.InClusterConfig() } if err != nil { klog.ErrorS(err, "failed to read kubeconfig") klog.Fatal("failed to read kubeconfig") } rc.ContentType = runtime.ContentTypeProtobuf clientset, err := kubernetes.NewForConfig(rc) if err != nil { klog.ErrorS(err, "failed to configure k8s client") klog.Fatal("failed to configure k8s client") } // Secret Manager client // // build without auth so that authentication can be re-added on a per-RPC // basis for each mount smOpts := []option.ClientOption{ option.WithUserAgent(ua), // tell the secretmanager library to not add transport-level ADC since // we need to override on a per call basis option.WithoutAuthentication(), // grpc oauth TokenSource credentials require transport security, so // this must be set explicitly even though TLS is used option.WithGRPCDialOption(grpc.WithTransportCredentials(credentials.NewTLS(nil))), // establish a pool of underlying connections to the Secret Manager API // to decrease blocking since same client will be used across concurrent // requests. Note that this is implemented in // google.golang.org/api/option and not grpc itself. option.WithGRPCConnectionPool(*smConnectionPoolSize), } sc, err := secretmanager.NewClient(ctx, smOpts...) if err != nil { klog.ErrorS(err, "failed to create secretmanager client") klog.Fatal("failed to create secretmanager client") } // To cache the clients for regional endpoints. m := make(map[string]*secretmanager.Client) // IAM client // // build without auth so that authentication can be re-added on a per-RPC // basis for each mount iamOpts := []option.ClientOption{ option.WithUserAgent(ua), // tell the secretmanager library to not add transport-level ADC since // we need to override on a per call basis option.WithoutAuthentication(), // grpc oauth TokenSource credentials require transport security, so // this must be set explicitly even though TLS is used option.WithGRPCDialOption(grpc.WithTransportCredentials(credentials.NewTLS(nil))), // establish a pool of underlying connections to the Secret Manager API // to decrease blocking since same client will be used across concurrent // requests. Note that this is implemented in // google.golang.org/api/option and not grpc itself. option.WithGRPCConnectionPool(*iamConnectionPoolSize), } iamc, err := iam.NewIamCredentialsClient(ctx, iamOpts...) if err != nil { klog.ErrorS(err, "failed to create iam client") klog.Fatal("failed to create iam client") } // HTTP client hc := &http.Client{ Transport: &http.Transport{ Dial: (&net.Dialer{ Timeout: 2 * time.Second, KeepAlive: 30 * time.Second, }).Dial, }, Timeout: 60 * time.Second, } c := &auth.Client{ KubeClient: clientset, IAMClient: iamc, MetadataClient: metadata.NewClient(hc), HTTPClient: hc, } // setup provider grpc server s := &server.Server{ SecretClient: sc, AuthClient: c, RegionalSecretClients: m, SmOpts: smOpts, } p, err := vars.ProviderName.GetValue() if err != nil { klog.ErrorS(err, "failed to get provider name") klog.Fatal("failed to get provider name") } socketPath := filepath.Join(os.Getenv("TARGET_DIR"), fmt.Sprintf("%s.sock", p)) // Attempt to remove the UDS to handle cases where a previous execution was // killed before fully closing the socket listener and unlinking. _ = os.Remove(socketPath) l, err := net.Listen("unix", socketPath) if err != nil { klog.ErrorS(err, "unable to listen to unix socket", "path", socketPath) klog.Fatalln("unable to start") } defer l.Close() g := grpc.NewServer( grpc.UnaryInterceptor(infra.LogInterceptor()), ) v1alpha1.RegisterCSIDriverProviderServer(g, s) go g.Serve(l) // initialize metrics and health http server mux := http.NewServeMux() ms := http.Server{ Addr: *metricsAddr, Handler: mux, ReadTimeout: 10 * time.Second, } defer ms.Shutdown(ctx) _, err = otelprom.New() if err != nil { klog.ErrorS(err, "unable to initialize prometheus registry") klog.Fatalln("unable to initialize prometheus registry") } mux.Handle("/metrics", promhttp.Handler()) mux.HandleFunc("/live", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) go func() { if err := ms.ListenAndServe(); err != nil && err != http.ErrServerClosed { klog.ErrorS(err, "metrics http server error") } }() klog.InfoS("health server listening", "addr", *metricsAddr) if *enableProfile { dmux := http.NewServeMux() dmux.HandleFunc("/debug/pprof/", pprof.Index) dmux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) dmux.HandleFunc("/debug/pprof/profile", pprof.Profile) dmux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) dmux.HandleFunc("/debug/pprof/trace", pprof.Trace) ds := http.Server{ Addr: *debugAddr, Handler: dmux, ReadTimeout: 10 * time.Second, } defer ds.Shutdown(ctx) go func() { if err := ds.ListenAndServe(); err != nil && err != http.ErrServerClosed { klog.ErrorS(err, "debug http server error") } }() klog.InfoS("debug server listening", "addr", *debugAddr) } <-ctx.Done() klog.InfoS("terminating") g.GracefulStop() }