pkg/controller/direct/notebooks/instance_controller.go (212 lines of code) (raw):

// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package notebooks import ( "context" "fmt" krm "github.com/GoogleCloudPlatform/k8s-config-connector/apis/notebooks/v1alpha1" refs "github.com/GoogleCloudPlatform/k8s-config-connector/apis/refs/v1beta1" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/config" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/common" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/directbase" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/registry" gcp "cloud.google.com/go/notebooks/apiv1" notebookspb "cloud.google.com/go/notebooks/apiv1/notebookspb" "google.golang.org/api/option" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" ) func init() { registry.RegisterModel(krm.NotebookInstanceGVK, NewInstanceModel) } func NewInstanceModel(ctx context.Context, config *config.ControllerConfig) (directbase.Model, error) { return &modelInstance{config: *config}, nil } var _ directbase.Model = &modelInstance{} type modelInstance struct { config config.ControllerConfig } func (m *modelInstance) client(ctx context.Context) (*gcp.NotebookClient, error) { var opts []option.ClientOption opts, err := m.config.GRPCClientOptions() if err != nil { return nil, err } gcpClient, err := gcp.NewNotebookClient(ctx, opts...) if err != nil { return nil, fmt.Errorf("building Instance client: %w", err) } return gcpClient, err } func (m *modelInstance) AdapterForObject(ctx context.Context, reader client.Reader, u *unstructured.Unstructured) (directbase.Adapter, error) { obj := &krm.NotebookInstance{} if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &obj); err != nil { return nil, fmt.Errorf("error converting to %T: %w", obj, err) } id, err := krm.NewInstanceIdentity(ctx, reader, obj) if err != nil { return nil, err } // Get notebooks GCP client gcpClient, err := m.client(ctx) if err != nil { return nil, err } return &InstanceAdapter{ id: id, gcpClient: gcpClient, desired: obj, }, nil } func (m *modelInstance) AdapterForURL(ctx context.Context, url string) (directbase.Adapter, error) { // TODO: Support URLs return nil, nil } type InstanceAdapter struct { id *krm.InstanceIdentity gcpClient *gcp.NotebookClient desired *krm.NotebookInstance actual *notebookspb.Instance } var _ directbase.Adapter = &InstanceAdapter{} // Find retrieves the GCP resource. // Return true means the object is found. This triggers Adapter `Update` call. // Return false means the object is not found. This triggers Adapter `Create` call. // Return a non-nil error requeues the requests. func (a *InstanceAdapter) Find(ctx context.Context) (bool, error) { log := klog.FromContext(ctx) log.V(2).Info("getting Instance", "name", a.id) req := &notebookspb.GetInstanceRequest{Name: a.id.String()} instancepb, err := a.gcpClient.GetInstance(ctx, req) if err != nil { if direct.IsNotFound(err) { return false, nil } return false, fmt.Errorf("getting Instance %q: %w", a.id, err) } a.actual = instancepb return true, nil } // Create creates the resource in GCP based on `spec` and update the Config Connector object `status` based on the GCP response. func (a *InstanceAdapter) Create(ctx context.Context, createOp *directbase.CreateOperation) error { log := klog.FromContext(ctx) log.V(2).Info("creating Instance", "name", a.id) mapCtx := &direct.MapContext{} desired := a.desired.DeepCopy() resource := NotebookInstanceSpec_ToProto(mapCtx, &desired.Spec) if mapCtx.Err() != nil { return mapCtx.Err() } req := &notebookspb.CreateInstanceRequest{ Parent: a.id.Parent().String(), InstanceId: a.id.ID(), Instance: resource, } op, err := a.gcpClient.CreateInstance(ctx, req) if err != nil { return fmt.Errorf("creating Instance %s: %w", a.id, err) } created, err := op.Wait(ctx) if err != nil { return fmt.Errorf("instance %s waiting creation: %w", a.id, err) } log.V(2).Info("successfully created Instance", "name", a.id) status := &krm.NotebookInstanceStatus{} status.ObservedState = NotebookInstanceObservedState_FromProto(mapCtx, created) if mapCtx.Err() != nil { return mapCtx.Err() } status.ExternalRef = direct.LazyPtr(a.id.String()) return createOp.UpdateStatus(ctx, status, nil) } // Update updates the resource in GCP based on `spec` and update the Config Connector object `status` based on the GCP response. func (a *InstanceAdapter) Update(ctx context.Context, updateOp *directbase.UpdateOperation) error { log := klog.FromContext(ctx) log.V(2).Info("updating Instance", "name", a.id) mapCtx := &direct.MapContext{} desiredPb := NotebookInstanceSpec_ToProto(mapCtx, &a.desired.DeepCopy().Spec) if mapCtx.Err() != nil { return mapCtx.Err() } paths, err := common.CompareProtoMessage(desiredPb, a.actual, common.BasicDiff) if err != nil { return err } if len(paths) == 0 { log.V(2).Info("no field needs update", "name", a.id) } var updated *notebookspb.Instance if paths.Has("metadata") { req := &notebookspb.UpdateInstanceMetadataItemsRequest{ Name: a.id.String(), Items: desiredPb.Metadata, } _, err = a.gcpClient.UpdateInstanceMetadataItems(ctx, req) if err != nil { return fmt.Errorf("updating Instance %s: %w", a.id, err) } } if paths.Has("shieldedInstanceConfig") { req := &notebookspb.UpdateShieldedInstanceConfigRequest{ Name: a.id.String(), ShieldedInstanceConfig: desiredPb.ShieldedInstanceConfig, } op, err := a.gcpClient.UpdateShieldedInstanceConfig(ctx, req) if err != nil { return fmt.Errorf("updating Instance %s: %w", a.id, err) } updated, err = op.Wait(ctx) if err != nil { return fmt.Errorf("instance %s waiting creation: %w", a.id, err) } } log.V(2).Info("successfully updated Instance", "name", a.id) status := &krm.NotebookInstanceStatus{} if updated != nil { status.ObservedState = NotebookInstanceObservedState_FromProto(mapCtx, updated) if mapCtx.Err() != nil { return mapCtx.Err() } } else { status.ObservedState = NotebookInstanceObservedState_FromProto(mapCtx, a.actual) if mapCtx.Err() != nil { return mapCtx.Err() } } status.ExternalRef = direct.LazyPtr(a.id.String()) return updateOp.UpdateStatus(ctx, status, nil) } // Export maps the GCP object to a Config Connector resource `spec`. func (a *InstanceAdapter) Export(ctx context.Context) (*unstructured.Unstructured, error) { if a.actual == nil { return nil, fmt.Errorf("Find() not called") } u := &unstructured.Unstructured{} obj := &krm.NotebookInstance{} mapCtx := &direct.MapContext{} obj.Spec = direct.ValueOf(NotebookInstanceSpec_FromProto(mapCtx, a.actual)) if mapCtx.Err() != nil { return nil, mapCtx.Err() } obj.Spec.ProjectRef = &refs.ProjectRef{External: a.id.Parent().ProjectID} obj.Spec.Zone = a.id.Parent().Location uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) if err != nil { return nil, err } u.SetName(a.id.ID()) u.SetGroupVersionKind(krm.NotebookInstanceGVK) u.Object = uObj return u, nil } // Delete the resource from GCP service when the corresponding Config Connector resource is deleted. func (a *InstanceAdapter) Delete(ctx context.Context, deleteOp *directbase.DeleteOperation) (bool, error) { log := klog.FromContext(ctx) log.V(2).Info("deleting Instance", "name", a.id) req := &notebookspb.DeleteInstanceRequest{Name: a.id.String()} op, err := a.gcpClient.DeleteInstance(ctx, req) if err != nil { if direct.IsNotFound(err) { // Return success if not found (assume it was already deleted). log.V(2).Info("skipping delete for non-existent Instance, assuming it was already deleted", "name", a.id) return true, nil } return false, fmt.Errorf("deleting Instance %s: %w", a.id, err) } log.V(2).Info("successfully deleted Instance", "name", a.id) err = op.Wait(ctx) if err != nil { return false, fmt.Errorf("waiting delete Instance %s: %w", a.id, err) } return true, nil }