in server/server.go [106:236]
func handleMountEvent(ctx context.Context, client *secretmanager.Client, creds credentials.PerRPCCredentials, cfg *config.MountConfig, regionalClients map[string]*secretmanager.Client, smOpts []option.ClientOption) (*v1alpha1.MountResponse, error) {
results := make([]*secretmanagerpb.AccessSecretVersionResponse, len(cfg.Secrets))
errs := make([]error, len(cfg.Secrets))
// need to build a per-rpc call option based of the tokensource
callAuth := gax.WithGRPCOptions(grpc.PerRPCCredentials(creds))
// In parallel fetch all secrets needed for the mount
wg := sync.WaitGroup{}
for i, secret := range cfg.Secrets {
loc, err := locationFromSecretResource(secret.ResourceName)
if err != nil {
errs[i] = err
continue
}
if len(loc) > locationLengthLimit {
errs[i] = fmt.Errorf("invalid location string, please check the location")
continue
}
var secretClient *secretmanager.Client
if loc == "" {
secretClient = client
} else {
if _, ok := regionalClients[loc]; !ok {
ep := option.WithEndpoint(fmt.Sprintf("secretmanager.%s.rep.googleapis.com:443", loc))
regionalClient, err := secretmanager.NewClient(ctx, append(smOpts, ep)...)
if err != nil {
errs[i] = err
continue
}
regionalClients[loc] = regionalClient
}
secretClient = regionalClients[loc]
}
wg.Add(1)
i, secret := i, secret
go func() {
defer wg.Done()
req := &secretmanagerpb.AccessSecretVersionRequest{
Name: secret.ResourceName,
}
smMetricRecorder := csrmetrics.OutboundRPCStartRecorder("secretmanager_access_secret_version_requests")
resp, err := secretClient.AccessSecretVersion(ctx, req, callAuth)
if err != nil {
if e, ok := status.FromError(err); ok {
smMetricRecorder(csrmetrics.OutboundRPCStatus(e.Code().String()))
}
} else {
smMetricRecorder(csrmetrics.OutboundRPCStatusOK)
}
results[i] = resp
errs[i] = err
}()
}
wg.Wait()
// If any access failed, return a grpc status error that includes each
// individual status error in the Details field.
//
// If there are any failures then there will be no changes to the
// filesystem. Initial mount events will fail (preventing pod start) and
// the secrets-store-csi-driver will emit pod events on rotation failures.
// By erroring out on any failures we prevent partial rotations (i.e. the
// username file was updated to a new value but the corresponding password
// field was not).
if err := buildErr(errs); err != nil {
return nil, err
}
out := &v1alpha1.MountResponse{}
// Add secrets to response.
ovs := make([]*v1alpha1.ObjectVersion, len(cfg.Secrets))
for i, secret := range cfg.Secrets {
if cfg.Permissions > math.MaxInt32 {
return nil, fmt.Errorf("invalid file permission %d", cfg.Permissions)
}
// #nosec G115 Checking limit
mode := int32(cfg.Permissions)
if secret.Mode != nil {
mode = *secret.Mode
}
result := results[i]
extractJSONKey := secret.ExtractJSONKey
var content []byte
// If extractJSONKey is null, then set the entire data
if extractJSONKey == "" {
content = result.Payload.Data
} else {
var data map[string]interface{}
err := json.Unmarshal(result.Payload.Data, &data)
if err != nil {
return nil, fmt.Errorf("secret data not in JSON format")
}
value, ok := data[extractJSONKey]
// If the key is not present, an error will be raised
if !ok {
return nil, fmt.Errorf("key %v does not exist at the secret path", extractJSONKey)
} else {
dataContent, ok := value.(string)
// If there is a type conversion error
if !ok {
return nil, fmt.Errorf("wrong type for content, expected string")
}
content = []byte(dataContent)
}
}
out.Files = append(out.Files, &v1alpha1.File{
Path: secret.PathString(),
Mode: mode,
Contents: content,
})
klog.V(5).InfoS("added secret to response", "resource_name", secret.ResourceName, "file_name", secret.FileName, "pod", klog.ObjectRef{Namespace: cfg.PodInfo.Namespace, Name: cfg.PodInfo.Name})
ovs[i] = &v1alpha1.ObjectVersion{
Id: secret.ResourceName,
Version: result.GetName(),
}
}
out.ObjectVersion = ovs
return out, nil
}