internal/node/validate.go (157 lines of code) (raw):
package node
import (
"context"
"errors"
"fmt"
"net/url"
"slices"
"strings"
"golang.org/x/mod/semver"
authenticationv1 "k8s.io/api/authentication/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"github.com/aws/eks-hybrid/internal/api"
"github.com/aws/eks-hybrid/internal/network"
"github.com/aws/eks-hybrid/internal/validation"
)
// Kubelet is the kubernetes node agent.
type Kubelet interface {
// BuildClient creates a new Kubernetes client
BuildClient() (kubernetes.Interface, error)
// KubeconfigPath returns the path to the kubeconfig file
KubeconfigPath() string
// Version returns the current kubelet version
Version() (string, error)
}
type APIServerValidator struct {
kubelet Kubelet
}
func NewAPIServerValidator(kubelet Kubelet) APIServerValidator {
return APIServerValidator{
kubelet: kubelet,
}
}
const badPermissionsRemediation = "Verify the Kubernetes identity and permissions assigned to the IAM roles on this node, it should belong to the group 'system:nodes'. Check your Access Entries or aws-auth ConfigMap."
func (a APIServerValidator) MakeAuthenticatedRequest(ctx context.Context, informer validation.Informer, node *api.NodeConfig) error {
name := "kubernetes-authenticated-request"
var err error
informer.Starting(ctx, name, "Validating authenticated request to Kubernetes API endpoint")
defer func() {
informer.Done(ctx, name, err)
}()
client, err := a.client()
if err != nil {
return err
}
_, err = client.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{})
if err != nil {
err = validation.WithRemediation(err, badPermissionsRemediation)
return err
}
return nil
}
func (a APIServerValidator) CheckIdentity(ctx context.Context, informer validation.Informer, node *api.NodeConfig) error {
var err error
kubeletVersion, err := a.kubelet.Version()
if err != nil {
return err
}
// 1.27 and below don't allow SelfSubjectReview requests from nodes
if semver.Compare(kubeletVersion, "v1.28.0") < 0 {
return nil
}
name := "kubernetes-node-identity"
informer.Starting(ctx, name, "Validating Kubernetes identity matches a Node identity")
defer func() {
informer.Done(ctx, name, err)
}()
client, err := a.client()
if err != nil {
return err
}
self := &authenticationv1.SelfSubjectReview{}
self, err = client.AuthenticationV1().SelfSubjectReviews().Create(ctx, self, metav1.CreateOptions{})
if err != nil {
err = validation.WithRemediation(err, badPermissionsRemediation)
return err
}
if !slices.Contains(self.Status.UserInfo.Groups, "system:nodes") {
err = validation.WithRemediation(
fmt.Errorf(
"node identity %s for principal %s does not belong to the group 'system:nodes'",
self.Status.UserInfo.Username, principalARN(self),
),
badPermissionsRemediation,
)
return err
}
if !strings.HasPrefix(self.Status.UserInfo.Username, "system:node:") {
err = validation.WithRemediation(
fmt.Errorf("node identity %s for principal %s does not match a node identity, username should start with 'system:node:'",
self.Status.UserInfo.Username, principalARN(self),
),
badPermissionsRemediation,
)
return err
}
return nil
}
func principalARN(self *authenticationv1.SelfSubjectReview) string {
var principal string
principals := self.Status.UserInfo.Extra["arn"]
if len(principals) > 0 {
principal = principals[0]
}
return principal
}
func (a APIServerValidator) CheckVPCEndpointAccess(ctx context.Context, informer validation.Informer, node *api.NodeConfig) error {
name := "kubernetes-vpc-api-server-access"
var err error
informer.Starting(ctx, name, "Validating access to Kube-API server through VPC IPs")
defer func() {
informer.Done(ctx, name, err)
}()
client, err := a.client()
if err != nil {
return err
}
kubeEndpoint, err := client.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{})
if err != nil {
err = validation.WithRemediation(err, badPermissionsRemediation)
return err
}
if len(kubeEndpoint.Subsets) == 0 {
err = errors.New("no subsets found in the Kubernetes endpoint, can't validate VPC API server access")
return err
}
for _, subset := range kubeEndpoint.Subsets {
var port int32
for _, p := range subset.Ports {
if p.Name == "https" {
port = p.Port
break
}
}
if port == 0 {
continue
}
for _, address := range subset.Addresses {
if address.IP == "" {
continue
}
u := url.URL{
Scheme: "https",
Host: fmt.Sprintf("%s:%d", address.IP, port),
}
if err = network.CheckConnectionToHost(ctx, u); err != nil {
err = validation.WithRemediation(err,
fmt.Sprintf("Ensure the node has access to the Kube-API server endpoint %s in the VPC", address.IP),
)
return err
}
}
}
return nil
}
func (a APIServerValidator) client() (kubernetes.Interface, error) {
client, err := a.kubelet.BuildClient()
if err != nil {
return nil, validation.WithRemediation(err, fmt.Sprintf("Ensure the kubeconfig at %s has been created and is valid.", a.kubelet.KubeconfigPath()))
}
return client, nil
}