pkg/authenticator/target_cluster_client.go (208 lines of code) (raw):
package authenticator
import (
"context"
"fmt"
"os"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"sigs.k8s.io/controller-runtime/pkg/client"
api "github.com/aws/eks-anywhere-packages/api/v1alpha1"
)
const (
clusterNameEnvVar = "CLUSTER_NAME"
eksaSystemNamespace = "eksa-system"
)
//go:generate mockgen -source target_cluster_client.go -destination=mocks/target_cluster_client.go -package=mocks TargetClusterClient
type TargetClusterClient interface {
// Init the target cluster client
Initialize(ctx context.Context, clusterName string) error
// GetServerVersion of the target api server
GetServerVersion(ctx context.Context, clusterName string) (info *version.Info, err error)
// CreateClusterNamespace for the workload cluster.
CreateClusterNamespace(ctx context.Context, clusterName string) (err error)
// CheckNamespace tests for the existence of a namespace.
CheckNamespace(ctx context.Context, namespace string) bool
// ApplySecret for the workload cluster
ApplySecret(ctx context.Context, secret *corev1.Secret) (err error)
// Implement RESTClientGetter
ToRESTConfig() (*rest.Config, error)
ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error)
ToRESTMapper() (meta.RESTMapper, error)
ToRawKubeConfigLoader() clientcmd.ClientConfig
}
type targetClusterClient struct {
Config *rest.Config
Client client.Client
targetSelf bool
clientConfig clientcmd.ClientConfig
logger logr.Logger
}
var _ TargetClusterClient = (*targetClusterClient)(nil)
func NewTargetClusterClient(logger logr.Logger, config *rest.Config, client client.Client) *targetClusterClient {
return &targetClusterClient{logger: logger, Config: config, Client: client}
}
func (tcc *targetClusterClient) Initialize(ctx context.Context, clusterName string) error {
kubeconfig, err := tcc.getKubeconfig(ctx, clusterName)
if err != nil {
return err
}
tcc.targetSelf = false
if kubeconfig == nil {
tcc.targetSelf = true
tcc.clientConfig = clientcmd.NewDefaultClientConfig(clientcmdapi.Config{}, &clientcmd.ConfigOverrides{})
return nil
}
clientConfig, err := clientcmd.NewClientConfigFromBytes(kubeconfig)
if err != nil {
return err
}
rawConfig, err := clientConfig.RawConfig()
if err != nil {
return err
}
tcc.clientConfig = clientcmd.NewDefaultClientConfig(rawConfig, &clientcmd.ConfigOverrides{})
return nil
}
func (tcc *targetClusterClient) ToRESTConfig() (*rest.Config, error) {
if tcc.targetSelf {
return tcc.Config, nil
}
return tcc.clientConfig.ClientConfig()
}
func (tcc *targetClusterClient) ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error) {
restConfig, err := tcc.ToRESTConfig()
if err != nil {
return nil, err
}
dc, err := discovery.NewDiscoveryClientForConfig(restConfig)
if err != nil {
return nil, err
}
return memory.NewMemCacheClient(dc), nil
}
func (tcc *targetClusterClient) ToRESTMapper() (meta.RESTMapper, error) {
dc, err := tcc.ToDiscoveryClient()
if err != nil {
return nil, err
}
return restmapper.NewDeferredDiscoveryRESTMapper(dc), nil
}
func (tcc *targetClusterClient) ToRawKubeConfigLoader() clientcmd.ClientConfig {
return tcc.clientConfig
}
func (tcc *targetClusterClient) getKubeconfig(ctx context.Context, clusterName string) (kubeconfig []byte, err error) {
// Avoid using kubeconfig for ourselves
if clusterName == "" || os.Getenv(clusterNameEnvVar) == clusterName {
// Empty string will cause helm to use the current cluster
return nil, nil
}
secretName := clusterName + "-kubeconfig"
nn := types.NamespacedName{
Namespace: eksaSystemNamespace,
Name: secretName,
}
var kubeconfigSecret corev1.Secret
if err = tcc.Client.Get(ctx, nn, &kubeconfigSecret); err != nil {
return []byte{}, fmt.Errorf("getting kubeconfig for cluster %q: %w", clusterName, err)
}
return kubeconfigSecret.Data["value"], nil
}
func (tcc *targetClusterClient) GetServerVersion(ctx context.Context, clusterName string) (info *version.Info, err error) {
err = tcc.Initialize(ctx, clusterName)
if err != nil {
return nil, fmt.Errorf("initializing target client: %s", err)
}
discoveryClient, err := tcc.ToDiscoveryClient()
if err != nil {
return nil, fmt.Errorf("creating discoveryClient client: %s", err)
}
info, err = discoveryClient.ServerVersion()
if err != nil {
return nil, fmt.Errorf("getting server version: %w", err)
}
return info, nil
}
func (tcc *targetClusterClient) CreateClusterNamespace(ctx context.Context, clusterName string) (err error) {
err = tcc.Initialize(ctx, clusterName)
if err != nil {
return fmt.Errorf("initializing target client: %s", err)
}
restConfig, err := tcc.ToRESTConfig()
if err != nil {
return fmt.Errorf("create rest config for %s: %s", clusterName, err)
}
k8sClient, err := client.New(restConfig, client.Options{})
if err != nil {
return fmt.Errorf("creating client for %s: %s", clusterName, err)
}
if !tcc.CheckNamespace(ctx, api.PackageNamespace) {
err := k8sClient.Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: api.PackageNamespace},
})
if err != nil {
return fmt.Errorf("create namespace for %q: %s", clusterName, err)
}
}
return nil
}
// CheckNamespace tests for the existence of a namespace.
//
// It must only be called with an initialized target cluster client.
func (tcc *targetClusterClient) CheckNamespace(ctx context.Context, namespace string) bool {
if tcc.clientConfig == nil {
tcc.logger.Error(fmt.Errorf("client is not initialized"), "checking namespace", "namespace", "namespace")
return false
}
restConfig, err := tcc.ToRESTConfig()
if err != nil {
tcc.logger.V(6).Error(err, "creating rest config", "namespace", namespace)
return false
}
k8sClient, err := client.New(restConfig, client.Options{})
if err != nil {
tcc.logger.V(6).Error(err, "creating k8s client", "namespace", namespace)
return false
}
nn := types.NamespacedName{Name: namespace}
ns := &corev1.Namespace{}
if err := k8sClient.Get(ctx, nn, ns); err != nil {
if !apierrors.IsNotFound(err) {
tcc.logger.V(6).Error(err, "getting namespace", "namespace", namespace)
}
return false
}
return true
}
// ApplySecret on the workload cluster
//
// It must only be called with an initialized target cluster client.
func (tcc *targetClusterClient) ApplySecret(ctx context.Context, secret *corev1.Secret) error {
if tcc.clientConfig == nil {
tcc.logger.Error(fmt.Errorf("client is not initialized"), "creating secret")
return fmt.Errorf("client is not initialized")
}
restConfig, err := tcc.ToRESTConfig()
if err != nil {
tcc.logger.V(6).Error(err, "creating rest config")
return err
}
k8sClient, err := client.New(restConfig, client.Options{})
if err != nil {
tcc.logger.V(6).Error(err, "creating k8s client")
return err
}
newSecret := corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secret.ObjectMeta.Name,
Namespace: secret.ObjectMeta.Namespace,
},
Data: secret.Data,
}
err = k8sClient.Create(ctx, &newSecret)
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("create secret for workload cluster %s", err)
}
err := k8sClient.Update(ctx, &newSecret)
if err != nil {
return fmt.Errorf("update secret for workload cluster %s", err)
}
}
return nil
}