func()

in pkg/cloud_provider/auth/token_sources.go [144:183]


func (ts *GCPTokenSource) fetchGCPSAToken(ctx context.Context, identityBindingToken *oauth2.Token) (*oauth2.Token, error) {
	gcpSAName, err := ts.k8sClients.GetGCPServiceAccountName(ctx, ts.k8sSANamespace, ts.k8sSAName)
	if err != nil {
		return nil, fmt.Errorf("failed to get GCP SA from Kubernetes SA [%s/%s] annotation: %w", ts.k8sSANamespace, ts.k8sSAName, err)
	}
	if gcpSAName == "" {
		klog.V(4).Infof("Kubernetes SA [%s/%s] is not bound with a GCP SA, proceed with the IdentityBindingToken", ts.k8sSANamespace, ts.k8sSAName)

		return identityBindingToken, nil
	}

	gcpSAClient, err := credentials.NewIamCredentialsClient(
		ctx,
		option.WithTokenSource(oauth2.StaticTokenSource(identityBindingToken)),
	)
	if err != nil {
		return nil, fmt.Errorf("create credentials client error: %w", err)
	}
	defer gcpSAClient.Close()

	resp, err := gcpSAClient.GenerateAccessToken(
		ctx,
		&credentialspb.GenerateAccessTokenRequest{
			Name: "projects/-/serviceAccounts/" + gcpSAName,
			Scope: []string{
				storage.ScopeFullControl,
			},
		},
	)
	if err != nil {
		return nil, fmt.Errorf("fetch GCP service account token error: %w", err)
	}

	token := &oauth2.Token{AccessToken: resp.GetAccessToken()}
	if t := resp.GetExpireTime(); t != nil {
		token.Expiry = time.Unix(t.GetSeconds(), int64(t.GetNanos())).UTC()
	}

	return token, nil
}