pkg/k8sapi/k8sutils.go (208 lines of code) (raw):
package k8sapi
import (
"context"
"fmt"
"os"
"time"
eniconfigscheme "github.com/aws/amazon-vpc-cni-k8s/pkg/apis/crd/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger"
"github.com/aws/amazon-vpc-cni-k8s/utils"
rcscheme "github.com/aws/amazon-vpc-resource-controller-k8s/apis/vpcresources/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
cache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)
const (
awsNode = "aws-node"
envEnablePodENI = "ENABLE_POD_ENI"
restCfgTimeout = 5 * time.Second
)
var log = logger.Get()
// Get cache filters for IPAMD
func getIPAMDCacheFilters() map[client.Object]cache.ByObject {
if nodeName := os.Getenv("MY_NODE_NAME"); nodeName != "" {
filter := map[client.Object]cache.ByObject{
&corev1.Pod{}: {
Field: fields.Set{"spec.nodeName": nodeName}.AsSelector(),
},
&corev1.Node{}: {
Field: fields.Set{"metadata.name": nodeName}.AsSelector(),
},
}
// only cache CNINode when SGP is in use
enabledPodENI := utils.GetBoolAsStringEnvVar(envEnablePodENI, false)
if enabledPodENI {
log.Infof("SGP is in use, adding CNINode to cache.")
filter[&rcscheme.CNINode{}] = cache.ByObject{
Field: fields.Set{"metadata.name": nodeName}.AsSelector(),
}
}
return filter
}
return nil
}
// Get cache filters for CNI Metrics Helper
func getMetricsHelperCacheFilters() map[client.Object]cache.ByObject {
return map[client.Object]cache.ByObject{
&corev1.Pod{}: {
Label: labels.Set(map[string]string{
"k8s-app": awsNode}).AsSelector(),
}}
}
// Create cache reader for Kubernetes client
func CreateKubeClientCache(restCfg *rest.Config, scheme *runtime.Scheme, filterMap map[client.Object]cache.ByObject) (cache.Cache, error) {
// Get HTTP client and REST mapper for cache
httpClient, err := rest.HTTPClientFor(restCfg)
if err != nil {
return nil, err
}
mapper, err := apiutil.NewDynamicRESTMapper(restCfg, httpClient)
if err != nil {
return nil, err
}
// Create a cache for the client to read from in order to decrease the number of API server calls.
cacheOptions := cache.Options{
ByObject: filterMap,
Mapper: mapper,
Scheme: scheme,
}
cache, err := cache.New(restCfg, cacheOptions)
if err != nil {
return nil, err
}
return cache, nil
}
func StartKubeClientCache(cache cache.Cache) {
stopChan := ctrl.SetupSignalHandler()
go func() {
cache.Start(stopChan)
}()
cache.WaitForCacheSync(stopChan)
}
// CreateKubeClient creates a k8s client
func CreateKubeClient(appName string) (client.Client, error) {
restCfg, err := GetRestConfig()
if err != nil {
return nil, err
}
// The scheme should only contain GVKs that the client will access.
vpcCniScheme := runtime.NewScheme()
corev1.AddToScheme(vpcCniScheme)
eniconfigscheme.AddToScheme(vpcCniScheme)
rcscheme.AddToScheme(vpcCniScheme)
var filterMap map[client.Object]cache.ByObject
if appName == awsNode {
filterMap = getIPAMDCacheFilters()
} else {
filterMap = getMetricsHelperCacheFilters()
}
cacheReader, err := CreateKubeClientCache(restCfg, vpcCniScheme, filterMap)
if err != nil {
log.Warnf("Skipping cache-based Kubernetes client: %s", err)
cacheReader = nil
}
clientOpts := client.Options{Scheme: vpcCniScheme}
if cacheReader != nil {
log.Info("Cache-based Kubernetes client successfully created.")
StartKubeClientCache(cacheReader)
clientOpts.Cache = &client.CacheOptions{Reader: cacheReader}
} else {
log.Warn("Running Kubernetes client in direct mode (no cache)")
}
k8sClient, err := client.New(restCfg, clientOpts)
if err != nil {
return nil, err
}
log.Info("k8sClient created successfully")
return k8sClient, nil
}
func GetKubeClientSet() (kubernetes.Interface, error) {
// creates the in-cluster config
config, err := GetRestConfig()
if err != nil {
return nil, err
}
// creates the clientset
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
return clientSet, nil
}
func CheckAPIServerConnectivity() error {
restCfg, err := GetRestConfig()
if err != nil {
return err
}
restCfg.Timeout = restCfgTimeout
clientSet, err := kubernetes.NewForConfig(restCfg)
if err != nil {
return fmt.Errorf("creating kube config, %w", err)
}
log.Infof("Testing communication with server")
// Reconcile the API server query after waiting for a second, as the request
// times out in one second if it fails to connect to the server
return wait.PollUntilContextCancel(context.Background(), 2*time.Second, true, func(ctx context.Context) (bool, error) {
version, err := clientSet.Discovery().ServerVersion()
if err != nil {
// When times out return no error, so the PollInfinite will retry with the given interval
if os.IsTimeout(err) {
log.Errorf("Unable to reach API Server, %v", err)
return false, nil
}
return false, fmt.Errorf("error communicating with apiserver: %v", err)
}
log.Infof("Successful communication with the Cluster! Cluster Version is: v%s.%s. git version: %s. git tree state: %s. commit: %s. platform: %s",
version.Major, version.Minor, version.GitVersion, version.GitTreeState, version.GitCommit, version.Platform)
return true, nil
})
}
func CheckAPIServerConnectivityWithTimeout(pollInterval time.Duration, pollTimeout time.Duration) error {
restCfg, err := GetRestConfig()
if err != nil {
return err
}
// timeout for each connect try
restCfg.Timeout = restCfgTimeout
clientSet, err := kubernetes.NewForConfig(restCfg)
if err != nil {
return fmt.Errorf("creating kube config, %w", err)
}
log.Info("Testing communication with server ...")
return wait.PollUntilContextTimeout(context.Background(), pollInterval, pollTimeout, true, func(ctx context.Context) (bool, error) {
version, err := clientSet.Discovery().ServerVersion()
if err != nil {
log.Errorf("Unable to reach API Server: %v", err)
return false, nil // Retry
}
log.Infof("Successful communication with the Cluster! Cluster Version is: %s", version.GitVersion)
return true, nil
})
}
// GetRestConfig returns a Kubernetes REST config for API interactions
func GetRestConfig() (*rest.Config, error) {
restCfg, err := ctrl.GetConfig()
if err != nil {
return nil, err
}
restCfg.UserAgent = os.Args[0] + "-" + utils.GetEnv("VPC_CNI_VERSION", "")
if endpoint, ok := os.LookupEnv("CLUSTER_ENDPOINT"); ok {
restCfg.Host = endpoint
}
return restCfg, nil
}
func GetNode(ctx context.Context, k8sClient client.Client) (corev1.Node, error) {
nodeName := os.Getenv("MY_NODE_NAME")
log.Infof("Get Node Info for: %s", nodeName)
node := corev1.Node{
ObjectMeta: metav1.ObjectMeta{Name: nodeName},
}
// If API server is unavailable, return immediately
if k8sClient == nil {
log.Warnf("Skipping GetNode() as Kubernetes API client is unavailable.")
return node, fmt.Errorf("Kubernetes API client is not available")
}
// Create a context with timeout to avoid hanging indefinitely
apiCtx, cancel := context.WithTimeout(ctx, 3*time.Second) // Set 3-second timeout
defer cancel()
err := k8sClient.Get(apiCtx, types.NamespacedName{Name: nodeName}, &node)
if err != nil {
klog.Errorf("Failed to get node %s: %v", nodeName, err)
return node, err
}
return node, nil
}