contrib/utils/kubectl_cmd.go (140 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package utils
import (
"context"
"fmt"
"net/url"
"strings"
"time"
)
// KubectlRunner is the wrapper of exec.Command to execute kubectl command.
type KubectlRunner struct {
kubeCfgPath string
namespace string
}
func NewKubectlRunner(kubeCfgPath string, namespace string) *KubectlRunner {
return &KubectlRunner{
kubeCfgPath: kubeCfgPath,
namespace: namespace,
}
}
// FQDN returns the FQDN of the cluster.
func (kr *KubectlRunner) FQDN(ctx context.Context, timeout time.Duration) (string, error) {
args := []string{}
if kr.kubeCfgPath != "" {
args = append(args, "--kubeconfig", kr.kubeCfgPath)
}
args = append(args, "cluster-info")
data, err := runCommand(ctx, timeout, "kubectl", args)
if err != nil {
return "", err
}
line := strings.Split(string(data), "\n")[0]
items := strings.Fields(line)
rawFqdn := items[len(items)-1]
rawFqdn = strings.TrimPrefix(rawFqdn, "\x1b[0;33m")
rawFqdn = strings.TrimSuffix(rawFqdn, "\x1b[0m")
fqdn, err := url.Parse(rawFqdn)
if err != nil {
return "", err
}
host := strings.Split(fqdn.Host, ":")[0]
return strings.ToLower(host), nil
}
// Wait runs wait subcommand.
func (kr *KubectlRunner) Wait(ctx context.Context, timeout time.Duration, condition, waitTimeout, target string) error {
if condition == "" {
return fmt.Errorf("condition is required")
}
if target == "" {
return fmt.Errorf("target is required")
}
args := []string{}
if kr.kubeCfgPath != "" {
args = append(args, "--kubeconfig", kr.kubeCfgPath)
}
if kr.namespace != "" {
args = append(args, "-n", kr.namespace)
}
args = append(args, "wait", "--for="+condition)
if waitTimeout != "" {
args = append(args, "--timeout="+waitTimeout)
}
args = append(args, target)
_, err := runCommand(ctx, timeout, "kubectl", args)
return err
}
// CreateNamespace creates a new namespace.
func (kr *KubectlRunner) CreateNamespace(ctx context.Context, timeout time.Duration, name string) error {
args := []string{}
if kr.kubeCfgPath != "" {
args = append(args, "--kubeconfig", kr.kubeCfgPath)
}
args = append(args, "create", "namespace", name)
_, err := runCommand(ctx, timeout, "kubectl", args)
return err
}
// DeleteNamespace delete a namespace.
func (kr *KubectlRunner) DeleteNamespace(ctx context.Context, timeout time.Duration, name string) error {
args := []string{}
if kr.kubeCfgPath != "" {
args = append(args, "--kubeconfig", kr.kubeCfgPath)
}
args = append(args, "delete", "namespace", name)
_, err := runCommand(ctx, timeout, "kubectl", args)
return err
}
// Apply runs apply subcommand.
func (kr *KubectlRunner) Apply(ctx context.Context, timeout time.Duration, filePath string) error {
args := []string{}
if kr.kubeCfgPath != "" {
args = append(args, "--kubeconfig", kr.kubeCfgPath)
}
if kr.namespace != "" {
args = append(args, "-n", kr.namespace)
}
args = append(args, "apply", "-f", filePath)
_, err := runCommand(ctx, timeout, "kubectl", args)
return err
}
// ServerSideApplyWithData runs kubectl apply with --server-side=true, with input data piped through stdin.
func (kr *KubectlRunner) ServerSideApplyWithData(ctx context.Context, timeout time.Duration, data string) error {
args := []string{}
if kr.kubeCfgPath != "" {
args = append(args, "--kubeconfig", kr.kubeCfgPath)
}
if kr.namespace != "" {
args = append(args, "-n", kr.namespace)
}
args = append(args, "apply", "--server-side=true", "--validate=ignore", "-f", "-")
_, err := runCommandWithInput(ctx, timeout, "kubectl", args, data)
return err
}
// Delete runs delete subcommand.
func (kr *KubectlRunner) Delete(ctx context.Context, timeout time.Duration, filePath string) error {
args := []string{}
if kr.kubeCfgPath != "" {
args = append(args, "--kubeconfig", kr.kubeCfgPath)
}
if kr.namespace != "" {
args = append(args, "-n", kr.namespace)
}
args = append(args, "delete", "-f", filePath)
_, err := runCommand(ctx, timeout, "kubectl", args)
return err
}
// DeploymentRestart restats a deployment.
func (kr *KubectlRunner) DeploymentRestart(ctx context.Context, timeout time.Duration, name string) error {
args := []string{}
if kr.kubeCfgPath != "" {
args = append(args, "--kubeconfig", kr.kubeCfgPath)
}
if kr.namespace != "" {
args = append(args, "-n", kr.namespace)
}
args = append(args, "rollout", "restart", "deployment", name)
_, err := runCommand(ctx, timeout, "kubectl", args)
return err
}
// DeploymentRolloutStatus watches the rollout status of a deployment.
func (kr *KubectlRunner) DeploymentRolloutStatus(ctx context.Context, timeout time.Duration, name string) error {
args := []string{}
if kr.kubeCfgPath != "" {
args = append(args, "--kubeconfig", kr.kubeCfgPath)
}
if kr.namespace != "" {
args = append(args, "-n", kr.namespace)
}
args = append(args, "rollout", "status", fmt.Sprintf("deployment/%s", name))
_, err := runCommand(ctx, timeout, "kubectl", args)
return err
}