benchmarks/benchmark/tools/model-load-benchmark/k8sclient/k8sclient.go (193 lines of code) (raw):
package k8sclient
import (
"context"
"fmt"
"os"
"path/filepath"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
const (
maxRetryInterval = 60 * time.Second
failureThreshold = 10
)
// Client holds the k8s clientset
type Client struct {
client *kubernetes.Clientset
ClusterName string
}
func InitClient() (*Client, error) {
kubeconfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),
&clientcmd.ConfigOverrides{},
)
config, err := kubeconfig.ClientConfig()
if err != nil {
return nil, fmt.Errorf("failed to load Kubernetes config: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create Kubernetes clientset: %v", err)
}
clusterName, _ := getClusterName()
client := &Client{
client: clientset,
ClusterName: clusterName,
}
return client, nil
}
func getClusterName() (string, error) {
kubeconfigPath := filepath.Join(homedir.HomeDir(), ".kube", "config")
if envPath := os.Getenv("KUBECONFIG"); envPath != "" {
kubeconfigPath = envPath
}
kubeconfig, err := clientcmd.LoadFromFile(kubeconfigPath)
if err != nil {
return "", fmt.Errorf("failed to load kubeconfig: %w", err)
}
currentContext := kubeconfig.CurrentContext
if currentContext == "" {
return "", fmt.Errorf("no current context is set in kubeconfig")
}
contextConfig, exists := kubeconfig.Contexts[currentContext]
if !exists {
return "", fmt.Errorf("context %s not found in kubeconfig", currentContext)
}
clusterName := contextConfig.Cluster
return clusterName, nil
}
func (k *Client) GetNodes() (*v1.NodeList, error) {
nodes, err := k.client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get nodes: %v", err)
}
return nodes, nil
}
func (k *Client) PodExists(podName, namespace string) (bool, error) {
_, err := k.client.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return false, nil // Pod doesn't exist
}
return false, fmt.Errorf("failed to get pod: %v", err)
}
return true, nil
}
// DeletePod deletes given pod, if already being deleted, waits till it is deleted.
func (k *Client) DeletePod(pod *v1.Pod) error {
exists, err := k.PodExists(pod.Name, pod.Namespace)
if err != nil {
return fmt.Errorf("failed to check if pod exists: %v", err)
}
if !exists {
return nil
}
currentPod, err := k.client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get pod: %v", err)
}
// Delete the pod if it's not already being deleted
if currentPod.DeletionTimestamp == nil {
err = k.client.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete pod %s in namespace %s: %v", pod.Name, pod.Namespace, err)
}
}
// Wait for the pod to be deleted
for {
exists, err := k.PodExists(pod.Name, pod.Namespace)
if err != nil {
return fmt.Errorf("failed to check if pod exists: %v", err)
}
if !exists {
return nil
}
time.Sleep(5 * time.Second)
}
}
// GetPodNode waits until the pod is scheduled to a node or till a timeout.
func (k *Client) GetPodNode(pod *v1.Pod) error {
timeout := 90 * time.Second
interval := 5 * time.Second
startTime := time.Now()
for {
updatedPod, err := k.client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get pod status: %v", err)
}
if updatedPod.Spec.NodeName != "" {
return nil
}
if time.Since(startTime) >= timeout {
return fmt.Errorf("90s timeout reached: pod %s in namespace %s is still not scheduled to any node", pod.Name, pod.Namespace)
}
time.Sleep(interval)
}
}
// DeployAndMonitorPod deploys the pod and waits till all pod and its containers are ready
func (k *Client) DeployAndMonitorPod(pod *v1.Pod) (time.Duration, error) {
maxPeriodSeconds := int32(5)
maxInitialDelay := int32(0)
for _, container := range pod.Spec.Containers {
if container.ReadinessProbe != nil && container.ReadinessProbe.PeriodSeconds > maxPeriodSeconds {
maxPeriodSeconds = container.ReadinessProbe.PeriodSeconds
maxInitialDelay = max(maxInitialDelay, container.ReadinessProbe.InitialDelaySeconds)
}
}
// Create the pod
namespace := "default"
if pod.GetNamespace() != "" {
namespace = pod.GetNamespace()
} else {
pod.SetNamespace(namespace)
}
err := k.DeletePod(pod)
if err != nil {
return -1, fmt.Errorf("failed to delete existing pod: %v", err)
}
pod, err = k.client.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
return -1, fmt.Errorf("failed to create pod: %v", err)
}
// wait for pod to be placed on node
err = k.GetPodNode(pod)
if err != nil {
return -1, fmt.Errorf("failed to deploy pod: %v", err)
}
startTime := time.Now()
defer k.DeletePod(pod)
time.Sleep(time.Duration(maxInitialDelay) * time.Second)
// Monitor the pod status with exponential backoff
retryInterval := time.Duration(maxPeriodSeconds) * time.Second
failureCount := 0
for {
pod, err := k.client.CoreV1().Pods("default").Get(context.TODO(), pod.Name, metav1.GetOptions{})
if err != nil {
return -1, fmt.Errorf("failed to get pod status: %v", err)
}
// Check if all containers are ready
allContainersReady := true
for _, containerStatus := range pod.Status.ContainerStatuses {
if !containerStatus.Ready {
allContainersReady = false
break
}
}
if pod.Status.Phase == v1.PodRunning && allContainersReady {
endTime := time.Now()
return endTime.Sub(startTime), nil
}
switch pod.Status.Phase {
case v1.PodSucceeded:
endTime := time.Now()
return endTime.Sub(startTime), nil
case v1.PodFailed:
return -1, fmt.Errorf("pod %s failed: %s", pod.Name, pod.Status.Reason)
}
// Exponential backoff with a maximum interval
time.Sleep(retryInterval)
retryInterval *= 2
if retryInterval > maxRetryInterval && retryInterval > time.Duration(maxPeriodSeconds)*time.Second {
retryInterval = maxRetryInterval
}
failureCount++
if failureCount > failureThreshold {
break
}
}
return -1, fmt.Errorf("pod monitoring timeout, not all containers ready")
}