server/server.go (199 lines of code) (raw):

// Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package server implements a grpc server to receive mount events package server import ( "context" "encoding/json" "fmt" "math" "os" "regexp" "strconv" "strings" "sync" "github.com/GoogleCloudPlatform/secrets-store-csi-driver-provider-gcp/auth" "github.com/GoogleCloudPlatform/secrets-store-csi-driver-provider-gcp/config" "github.com/GoogleCloudPlatform/secrets-store-csi-driver-provider-gcp/csrmetrics" "github.com/googleapis/gax-go/v2" secretmanager "cloud.google.com/go/secretmanager/apiv1" "cloud.google.com/go/secretmanager/apiv1/secretmanagerpb" "google.golang.org/api/option" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/oauth" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/anypb" "k8s.io/klog/v2" "sigs.k8s.io/secrets-store-csi-driver/provider/v1alpha1" ) type Server struct { RuntimeVersion string AuthClient *auth.Client SecretClient *secretmanager.Client RegionalSecretClients map[string]*secretmanager.Client SmOpts []option.ClientOption } var _ v1alpha1.CSIDriverProviderServer = &Server{} // Mount implements provider csi-provider method func (s *Server) Mount(ctx context.Context, req *v1alpha1.MountRequest) (*v1alpha1.MountResponse, error) { p, err := strconv.ParseUint(req.GetPermission(), 10, 32) if err != nil { return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("Unable to parse permissions: %s", req.GetPermission())) } params := &config.MountParams{ Attributes: req.GetAttributes(), KubeSecrets: req.GetSecrets(), TargetPath: req.GetTargetPath(), Permissions: os.FileMode(p), } cfg, err := config.Parse(params) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } ts, err := s.AuthClient.TokenSource(ctx, cfg) if err != nil { klog.ErrorS(err, "unable to obtain auth for mount", "pod", klog.ObjectRef{Namespace: cfg.PodInfo.Namespace, Name: cfg.PodInfo.Name}) return nil, status.Error(codes.PermissionDenied, fmt.Sprintf("unable to obtain auth for mount: %v", err)) } // Build a grpc credentials.PerRPCCredentials using // the grpc google.golang.org/grpc/credentials/oauth package, not to be // confused with the oauth2.TokenSource that it wraps. gts := oauth.TokenSource{TokenSource: ts} // Fetch the secrets from the secretmanager API based on the // SecretProviderClass configuration. return handleMountEvent(ctx, s.SecretClient, gts, cfg, s.RegionalSecretClients, s.SmOpts) } // Version implements provider csi-provider method func (s *Server) Version(ctx context.Context, req *v1alpha1.VersionRequest) (*v1alpha1.VersionResponse, error) { return &v1alpha1.VersionResponse{ Version: "v1alpha1", RuntimeName: "secrets-store-csi-driver-provider-gcp", RuntimeVersion: s.RuntimeVersion, }, nil } // handleMountEvent fetches the secrets from the secretmanager API and // include them in the MountResponse based on the SecretProviderClass // configuration. func handleMountEvent(ctx context.Context, client *secretmanager.Client, creds credentials.PerRPCCredentials, cfg *config.MountConfig, regionalClients map[string]*secretmanager.Client, smOpts []option.ClientOption) (*v1alpha1.MountResponse, error) { results := make([]*secretmanagerpb.AccessSecretVersionResponse, len(cfg.Secrets)) errs := make([]error, len(cfg.Secrets)) // need to build a per-rpc call option based of the tokensource callAuth := gax.WithGRPCOptions(grpc.PerRPCCredentials(creds)) // In parallel fetch all secrets needed for the mount wg := sync.WaitGroup{} for i, secret := range cfg.Secrets { loc, err := locationFromSecretResource(secret.ResourceName) if err != nil { errs[i] = err continue } if len(loc) > locationLengthLimit { errs[i] = fmt.Errorf("invalid location string, please check the location") continue } var secretClient *secretmanager.Client if loc == "" { secretClient = client } else { if _, ok := regionalClients[loc]; !ok { ep := option.WithEndpoint(fmt.Sprintf("secretmanager.%s.rep.googleapis.com:443", loc)) regionalClient, err := secretmanager.NewClient(ctx, append(smOpts, ep)...) if err != nil { errs[i] = err continue } regionalClients[loc] = regionalClient } secretClient = regionalClients[loc] } wg.Add(1) i, secret := i, secret go func() { defer wg.Done() req := &secretmanagerpb.AccessSecretVersionRequest{ Name: secret.ResourceName, } smMetricRecorder := csrmetrics.OutboundRPCStartRecorder("secretmanager_access_secret_version_requests") resp, err := secretClient.AccessSecretVersion(ctx, req, callAuth) if err != nil { if e, ok := status.FromError(err); ok { smMetricRecorder(csrmetrics.OutboundRPCStatus(e.Code().String())) } } else { smMetricRecorder(csrmetrics.OutboundRPCStatusOK) } results[i] = resp errs[i] = err }() } wg.Wait() // If any access failed, return a grpc status error that includes each // individual status error in the Details field. // // If there are any failures then there will be no changes to the // filesystem. Initial mount events will fail (preventing pod start) and // the secrets-store-csi-driver will emit pod events on rotation failures. // By erroring out on any failures we prevent partial rotations (i.e. the // username file was updated to a new value but the corresponding password // field was not). if err := buildErr(errs); err != nil { return nil, err } out := &v1alpha1.MountResponse{} // Add secrets to response. ovs := make([]*v1alpha1.ObjectVersion, len(cfg.Secrets)) for i, secret := range cfg.Secrets { if cfg.Permissions > math.MaxInt32 { return nil, fmt.Errorf("invalid file permission %d", cfg.Permissions) } // #nosec G115 Checking limit mode := int32(cfg.Permissions) if secret.Mode != nil { mode = *secret.Mode } result := results[i] extractJSONKey := secret.ExtractJSONKey var content []byte // If extractJSONKey is null, then set the entire data if extractJSONKey == "" { content = result.Payload.Data } else { var data map[string]interface{} err := json.Unmarshal(result.Payload.Data, &data) if err != nil { return nil, fmt.Errorf("secret data not in JSON format") } value, ok := data[extractJSONKey] // If the key is not present, an error will be raised if !ok { return nil, fmt.Errorf("key %v does not exist at the secret path", extractJSONKey) } else { dataContent, ok := value.(string) // If there is a type conversion error if !ok { return nil, fmt.Errorf("wrong type for content, expected string") } content = []byte(dataContent) } } out.Files = append(out.Files, &v1alpha1.File{ Path: secret.PathString(), Mode: mode, Contents: content, }) klog.V(5).InfoS("added secret to response", "resource_name", secret.ResourceName, "file_name", secret.FileName, "pod", klog.ObjectRef{Namespace: cfg.PodInfo.Namespace, Name: cfg.PodInfo.Name}) ovs[i] = &v1alpha1.ObjectVersion{ Id: secret.ResourceName, Version: result.GetName(), } } out.ObjectVersion = ovs return out, nil } // buildErr consolidates many errors into a single Status protobuf error message // with each individual error included into the status Details any proto. The // consolidated proto is converted to a general error. func buildErr(errs []error) error { msgs := make([]string, 0, len(errs)) hasErr := false s := &spb.Status{ Code: int32(codes.Internal), Details: make([]*anypb.Any, 0), } for i := range errs { if errs[i] == nil { continue } hasErr = true msgs = append(msgs, errs[i].Error()) any, _ := anypb.New(status.Convert(errs[i]).Proto()) s.Details = append(s.Details, any) } if !hasErr { return nil } s.Message = strings.Join(msgs, ",") return status.FromProto(s).Err() } // locationFromSecretResource returns location from the secret resource if the resource is in format "projects/<project_id>/locations/<location_id>/..." // returns "" for global secret resource. func locationFromSecretResource(resource string) (string, error) { globalSecretRegexp := regexp.MustCompile(globalSecretRegex) if m := globalSecretRegexp.FindStringSubmatch(resource); m != nil { return "", nil } regionalSecretRegexp := regexp.MustCompile(regionalSecretRegex) if m := regionalSecretRegexp.FindStringSubmatch(resource); m != nil { return m[2], nil } return "", status.Errorf(codes.InvalidArgument, "Invalid secret resource name: %s", resource) }