credential_provider/irsa_credential_provider.go (98 lines of code) (raw):
package credential_provider
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/aws/aws-sdk-go/service/sts/stsiface"
authv1 "k8s.io/api/authentication/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/klog/v2"
)
const (
arnAnno = "eks.amazonaws.com/role-arn"
docURL = "https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html"
irsaAudience = "sts.amazonaws.com"
ProviderName = "secrets-store-csi-driver-provider-aws"
)
// IRSACredentialProvider implements CredentialProvider using IAM Roles for Service Accounts
type IRSACredentialProvider struct {
stsClient stsiface.STSAPI
k8sClient k8sv1.CoreV1Interface
region, nameSpace, svcAcc string
fetcher authTokenFetcher
ctx context.Context
}
func NewIRSACredentialProvider(
stsClient stsiface.STSAPI,
region, nameSpace, svcAcc string,
k8sClient k8sv1.CoreV1Interface,
ctx context.Context,
) CredentialProvider {
return &IRSACredentialProvider{
stsClient: stsClient,
k8sClient: k8sClient,
region: region,
nameSpace: nameSpace,
svcAcc: svcAcc,
fetcher: newIRSATokenFetcher(nameSpace, svcAcc, k8sClient),
ctx: ctx,
}
}
func (p *IRSACredentialProvider) GetAWSConfig() (*aws.Config, error) {
roleArn, err := p.getRoleARN()
if err != nil {
return nil, err
}
ar := stscreds.NewWebIdentityRoleProviderWithToken(
p.stsClient,
*roleArn,
ProviderName,
p.fetcher,
)
return aws.NewConfig().
WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint).
WithRegion(p.region).
WithCredentials(credentials.NewCredentials(ar)), nil
}
type irsaTokenFetcher struct {
nameSpace, svcAcc string
k8sClient k8sv1.CoreV1Interface
}
func newIRSATokenFetcher(
nameSpace, svcAcct string,
k8sClient k8sv1.CoreV1Interface,
) authTokenFetcher {
return &irsaTokenFetcher{
nameSpace: nameSpace,
svcAcc: svcAcct,
k8sClient: k8sClient,
}
}
// Private helper to fetch a JWT token for a given namespace and service account.
//
// See also: https://pkg.go.dev/k8s.io/client-go/kubernetes/typed/core/v1
func (p *irsaTokenFetcher) FetchToken(ctx credentials.Context) ([]byte, error) {
tokenSpec := authv1.TokenRequestSpec{
Audiences: []string{irsaAudience},
}
// Use the K8s API to fetch the token from the OIDC provider.
tokRsp, err := p.k8sClient.ServiceAccounts(p.nameSpace).CreateToken(ctx, p.svcAcc, &authv1.TokenRequest{
Spec: tokenSpec,
}, metav1.CreateOptions{})
if err != nil {
return nil, err
}
return []byte(tokRsp.Status.Token), nil
}
// Private helper to lookup the role ARN for a given pod.
//
// This method looks up the role ARN associated with the K8s service account by
// calling the K8s APIs to get the role annotation on the service account.
// See also: https://pkg.go.dev/k8s.io/client-go/kubernetes/typed/core/v1
func (p IRSACredentialProvider) getRoleARN() (arn *string, e error) {
// cli equivalent: kubectl -o yaml -n <namespace> get serviceaccount <acct>
rsp, err := p.k8sClient.ServiceAccounts(p.nameSpace).Get(p.ctx, p.svcAcc, metav1.GetOptions{})
if err != nil {
return nil, err
}
roleArn := rsp.Annotations[arnAnno]
if len(roleArn) <= 0 {
klog.Errorf("Need IAM role for service account %s (namespace: %s) - %s", p.svcAcc, p.nameSpace, docURL)
return nil, fmt.Errorf("An IAM role must be associated with service account %s (namespace: %s)", p.svcAcc, p.nameSpace)
}
klog.Infof("Role ARN for %s:%s is %s", p.nameSpace, p.svcAcc, roleArn)
return &roleArn, nil
}