pkg/cli/preview/recorder.go (179 lines of code) (raw):
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package preview
import (
"context"
"strings"
"sync"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/structuredreporting"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// GKNN is the canonical identity for a kube object; it is short for Group-Kind-Namespaced-Name
// (Version is an encoding artifact, and does not change the identity of the object)
type GKNN struct {
Group string
Kind string
Namespace string
Name string
}
// Recorder holds the information from reconciling the objects
type Recorder struct {
mutex sync.Mutex
objects map[GKNN]*objectInfo
}
// NewRecorder creates a new Recorder.
func NewRecorder() *Recorder {
return &Recorder{
objects: make(map[GKNN]*objectInfo),
}
}
// objectInfo holds the activity from reconciling the objects
type objectInfo struct {
events []event
}
type event struct {
// eventType is the type of event
eventType EventType
// diff is the diff that was recorded
diff *structuredreporting.Diff
// kubeAction is the kube action that was recorded
kubeAction *kubeAction
// gcpAction is the gcp action that was recorded
gcpAction *gcpAction
// object is the object that was reconciled
object *unstructured.Unstructured
}
type EventType string
const (
EventTypeReconcileStart EventType = "reconcileStart"
EventTypeDiff EventType = "diff"
EventTypeKubeAction EventType = "kubeAction"
EventTypeGCPAction EventType = "gcpAction"
)
// kubeAction holds a kubernetes action that was recorded
type kubeAction struct {
method string
action Action
}
// Action indicates whether we blocked or ignored the requested action.
type Action string
const (
// ActionIgnored indicates that the action was ignored.
ActionIgnored Action = "ignored"
// ActionBlocked indicates that the action was blocked.
ActionBlocked Action = "blocked"
)
// gcpAction holds a GCP action that was recorded
type gcpAction struct {
method string
url string
body string
action Action
}
// NewStructuredReportingListener creates a new StructuredReportingListener.
func (r *Recorder) NewStructuredReportingListener() structuredreporting.Listener {
return &structuredReportingListener{recorder: r}
}
// structuredReportingListener is a listener for structured reporting events.
type structuredReportingListener struct {
recorder *Recorder
}
// OnError is called by the structured reporting subsystem when an error occurs.
func (l *structuredReportingListener) OnError(ctx context.Context, err error, args ...any) {
blockedGCPError, ok := ExtractBlockedGCPError(err)
if !ok {
return
}
l.recorder.recordGCPAction(ctx, blockedGCPError, args, ActionBlocked)
}
// OnReconcileStart is called by the structured reporting subsystem when a reconcile starts.
func (l *structuredReportingListener) OnReconcileStart(ctx context.Context, u *unstructured.Unstructured) {
l.recorder.recordReconcileStart(ctx, u)
}
// OnDiff is called by the structured reporting subsystem when a diff occurs.
func (l *structuredReportingListener) OnDiff(ctx context.Context, diff *structuredreporting.Diff) {
l.recorder.recordDiff(ctx, diff)
}
// RecordBlockedKubeMethod is called by the interceptingKubeClient when a write operation is blocked.
func (r *Recorder) RecordBlockedKubeMethod(ctx context.Context, method string, args ...any) {
r.recordKubeAction(ctx, method, args, ActionBlocked)
}
// RecordIgnoredKubeMethod is called by the interceptingKubeClient when a read operation is ignored.
func (r *Recorder) RecordIgnoredKubeMethod(ctx context.Context, method string, args ...any) {
r.recordKubeAction(ctx, method, args, ActionIgnored)
}
// recordDiff captures the diff into our recorder.
func (r *Recorder) recordDiff(ctx context.Context, diff *structuredreporting.Diff) {
log := klog.FromContext(ctx)
gknn := GKNN{}
if u := diff.Object; u != nil {
gknn = gknnFromUnstructured(u)
}
log.Info("recordDiffs", "gknn", gknn)
info := r.getObjectInfo(gknn)
info.events = append(info.events, event{
eventType: EventTypeDiff,
diff: diff,
})
}
// recordReconcileStart captures the reconcile start into our recorder.
func (r *Recorder) recordReconcileStart(ctx context.Context, u *unstructured.Unstructured) {
gknn := gknnFromUnstructured(u)
info := r.getObjectInfo(gknn)
info.events = append(info.events, event{
eventType: EventTypeReconcileStart,
object: u.DeepCopy(),
})
}
func gknnFromUnstructured(u *unstructured.Unstructured) GKNN {
return GKNN{
Group: u.GroupVersionKind().Group,
Kind: u.GroupVersionKind().Kind,
Namespace: u.GetNamespace(),
Name: u.GetName(),
}
}
// recordKubeAction captures the kube action into our recorder.
func (r *Recorder) recordKubeAction(ctx context.Context, method string, args []any, action Action) {
klog.Infof("recordKubeAction %v %v %v", method, args, action)
var gknn GKNN
kubeAction := &kubeAction{
method: method,
action: action,
}
for _, arg := range args {
switch arg := arg.(type) {
case *unstructured.Unstructured:
gvk := arg.GroupVersionKind()
gknn = GKNN{
Group: gvk.Group,
Kind: gvk.Kind,
Namespace: arg.GetNamespace(),
Name: arg.GetName(),
}
// We could capture the object here: kubeAction.object = arg.DeepCopy()
case []client.SubResourceUpdateOption:
// ignore
case []client.UpdateOption:
// ignore
default:
klog.Fatalf("unhandled arg type %T", arg)
}
}
info := r.getObjectInfo(gknn)
info.events = append(info.events, event{
eventType: EventTypeKubeAction,
kubeAction: kubeAction,
})
}
// recordGCPAction captures the GCP action into our recorder.
func (r *Recorder) recordGCPAction(ctx context.Context, err *BlockedGCPError, args []any, action Action) {
var gknn GKNN
gcpAction := &gcpAction{
method: err.Method,
body: err.Body,
url: err.URL,
action: action,
}
for _, arg := range args {
switch arg := arg.(type) {
case *k8s.Resource:
group := strings.Split(arg.APIVersion, "/")[0]
gknn = GKNN{
Group: group,
Kind: arg.Kind,
Namespace: arg.Namespace,
Name: arg.Name,
}
default:
klog.Fatalf("unhandled arg type %T", arg)
}
}
info := r.getObjectInfo(gknn)
info.events = append(info.events, event{
eventType: EventTypeGCPAction,
gcpAction: gcpAction,
})
}
// getObjectInfo returns the objectInfo for the given GKNN.
// If there is no objectInfo, it creates a new one and returns it.
func (r *Recorder) getObjectInfo(gknn GKNN) *objectInfo {
r.mutex.Lock()
defer r.mutex.Unlock()
info := r.objects[gknn]
if info == nil {
info = &objectInfo{}
r.objects[gknn] = info
}
return info
}