kubectl-utils/cmd/kubectl-expect/main.go (110 lines of code) (raw):
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"flag"
"fmt"
"os"
"strings"
"time"
"github.com/GoogleCloudPlatform/kubectl-ai/kubectl-utils/pkg/kel"
"github.com/GoogleCloudPlatform/kubectl-ai/kubectl-utils/pkg/kube"
celtypes "github.com/google/cel-go/common/types"
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
)
func main() {
ctx := context.Background()
if err := run(ctx); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
}
func run(ctx context.Context) error {
// log := klog.FromContext(ctx)
namespace := ""
kubeconfig := ""
pflag.StringVarP(&namespace, "namespace", "n", namespace, "If present, the namespace scope for this CLI request")
pflag.StringVar(&kubeconfig, "kubeconfig", kubeconfig, "Path to the kubeconfig file to use for CLI requests.")
klog.InitFlags(nil)
pflag.CommandLine.AddGoFlagSet(flag.CommandLine)
pflag.Parse()
args := pflag.Args()
if len(args) < 2 {
return fmt.Errorf("expected [target] [cel-expression]")
}
target := args[0]
celExpressionText := args[1]
kubeClient, err := kube.NewClient(kubeconfig)
if err != nil {
return err
}
tokens := strings.Split(target, "/")
if len(tokens) != 2 {
return fmt.Errorf("expected target like Pod/<name>")
}
// Find the resource (kind) the user is asking about
resource, err := kubeClient.FindResource(ctx, tokens[0])
if err != nil {
return err
}
// Compute namespace, defaulting to kubeconfig or default
if namespace == "" && resource.Namespaced {
namespace, err = kubeClient.DefaultNamespace()
if err != nil {
return err
}
}
// Compile the CEL expression
env, err := kel.NewEnv()
if err != nil {
return fmt.Errorf("initalizing CEL: %w", err)
}
celExpression, err := kel.NewExpression(env, celExpressionText)
if err != nil {
return err
}
// build a pretty-printer for outputing status while polling
printer, err := celExpression.BuildStatusPrinter(ctx)
if err != nil {
return fmt.Errorf("building status printer: %w", err)
}
// Get ready to get the object
id := types.NamespacedName{
Namespace: namespace,
Name: tokens[1],
}
gv := schema.GroupVersion{
Group: resource.Group,
Version: resource.Version,
}
gvr := gv.WithResource(resource.Name)
gvk := gv.WithKind(resource.Kind)
client := kubeClient.ForGVR(gvr, id.Namespace)
// Poll the object until the CEL expression returns true
for {
// We _could_ watch...
time.Sleep(1 * time.Second)
u, err := client.Get(ctx, id.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("getting %s %s: %w", gvk.Kind, id.Name, err)
}
out, err := celExpression.Eval(ctx, u)
if err != nil {
return err
}
done := false
switch out.Type() {
case celtypes.BoolType:
v := out.Value().(bool)
if v {
done = true
}
default:
return fmt.Errorf("unhandled type for CEL expression: %v", out.Type())
}
if done {
break
}
// Pretty print some intermediate values if we can
if printer != nil {
s := printer(ctx, u)
fmt.Printf("waiting for %q (%s)\n", celExpression.CELText, s)
} else {
fmt.Printf("waiting for %q\n", celExpression.CELText)
}
}
return nil
}