pkg/controller/direct/datacatalog/entry_controller.go (258 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +tool:controller
// proto.service: google.cloud.datacatalog.v1.DataCatalog
// proto.message: google.cloud.datacatalog.v1.Entry
// crd.type: DataCatalogEntry
// crd.version: v1alpha1
package datacatalog
import (
"context"
"fmt"
"slices"
api "cloud.google.com/go/datacatalog/apiv1"
pb "cloud.google.com/go/datacatalog/apiv1/datacatalogpb"
"google.golang.org/api/option"
"google.golang.org/protobuf/types/known/fieldmaskpb"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
krm "github.com/GoogleCloudPlatform/k8s-config-connector/apis/datacatalog/v1alpha1"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/config"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/common"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/directbase"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/registry"
)
func init() {
registry.RegisterModel(krm.DataCatalogEntryGVK, NewEntryModel)
}
func NewEntryModel(ctx context.Context, config *config.ControllerConfig) (directbase.Model, error) {
return &entryModel{config: *config}, nil
}
var _ directbase.Model = &entryModel{}
type entryModel struct {
config config.ControllerConfig
}
func (m *entryModel) client(ctx context.Context, projectID string) (*api.Client, error) {
var opts []option.ClientOption
config := m.config
// the service requires that a quota project be set
if !config.UserProjectOverride || config.BillingProject == "" {
config.UserProjectOverride = true
config.BillingProject = projectID // Use passed projectID
}
opts, err := config.RESTClientOptions()
if err != nil {
return nil, err
}
gcpClient, err := api.NewRESTClient(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("building datacatalog entry client: %w", err)
}
return gcpClient, nil
}
func (m *entryModel) AdapterForObject(ctx context.Context, reader client.Reader, u *unstructured.Unstructured) (directbase.Adapter, error) { // Reverted back to client.Reader
obj := &krm.DataCatalogEntry{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &obj); err != nil {
return nil, fmt.Errorf("error converting to %T: %w", obj, err)
}
id, err := krm.NewEntryIdentity(ctx, reader, obj)
if err != nil {
return nil, err
}
gcpClient, err := m.client(ctx, id.Parent().ProjectID)
if err != nil {
return nil, err
}
return &entryAdapter{
gcpClient: gcpClient,
id: id,
desired: obj,
reader: reader,
}, nil
}
func (m *entryModel) AdapterForURL(ctx context.Context, url string) (directbase.Adapter, error) {
// TODO: Support URLs
return nil, nil
}
type entryAdapter struct {
gcpClient *api.Client
id *krm.EntryIdentity
desired *krm.DataCatalogEntry
actual *pb.Entry
reader client.Reader // Reverted back to client.Reader
}
var _ directbase.Adapter = &entryAdapter{}
func (a *entryAdapter) Find(ctx context.Context) (bool, error) {
log := klog.FromContext(ctx)
log.V(2).Info("getting datacatalog entry", "name", a.id)
req := &pb.GetEntryRequest{Name: a.id.String()}
actual, err := a.gcpClient.GetEntry(ctx, req)
if err != nil {
if direct.IsNotFound(err) {
return false, nil
}
// PermissionDenied is observed for non-existent entries
if direct.IsPermissionDenied(err) {
return false, nil
}
return false, fmt.Errorf("getting datacatalog entry %q from gcp: %w", a.id.String(), err)
}
a.actual = actual
return true, nil
}
func (a *entryAdapter) Create(ctx context.Context, createOp *directbase.CreateOperation) error {
log := klog.FromContext(ctx)
log.V(2).Info("creating datacatalog entry", "name", a.id)
mapCtx := &direct.MapContext{}
desired := DataCatalogEntrySpec_ToProto(mapCtx, &a.desired.Spec)
if mapCtx.Err() != nil {
return mapCtx.Err()
}
req := &pb.CreateEntryRequest{
Parent: a.id.Parent().String(),
EntryId: a.id.ID(),
Entry: desired,
}
created, err := a.gcpClient.CreateEntry(ctx, req)
if err != nil {
return fmt.Errorf("creating datacatalog entry %s: %w", a.id.String(), err)
}
log.V(2).Info("successfully created datacatalog entry in gcp", "name", a.id)
status := &krm.DataCatalogEntryStatus{}
status.ObservedState = DataCatalogEntryObservedState_FromProto(mapCtx, created)
if mapCtx.Err() != nil {
return mapCtx.Err()
}
status.ExternalRef = direct.LazyPtr(a.id.String())
return createOp.UpdateStatus(ctx, status, nil)
}
func (a *entryAdapter) Update(ctx context.Context, updateOp *directbase.UpdateOperation) error {
log := klog.FromContext(ctx)
log.V(2).Info("updating datacatalog entry", "name", a.id)
mapCtx := &direct.MapContext{}
desired := DataCatalogEntrySpec_ToProto(mapCtx, &a.desired.Spec)
// Removed err variable from assignment above as function now returns 1 value
if mapCtx.Err() != nil { // Check mapCtx error immediately after call
return mapCtx.Err()
}
// Removed redundant mapCtx check below
desired.Name = a.id.String() // Set the name field for update requests
diffs, err := common.CompareProtoMessage(desired, a.actual, common.BasicDiff)
if err != nil {
return fmt.Errorf("comparing desired and actual proto: %w", err)
}
updateMask := &fieldmaskpb.FieldMask{}
// These fields are specified as mutable in the API documentation or by convention
// FQN is immutable after creation. EntryType and System oneofs are generally immutable.
updatableFields := []string{
"display_name",
"description",
"schema",
"source_system_timestamps",
"labels",
"gcs_fileset_spec", // Assuming this can be updated if type is FILESET
// Specs within the 'spec' oneof
"database_table_spec",
"data_source_connection_spec",
"routine_spec",
"dataset_spec",
"fileset_spec",
"service_spec",
"model_spec",
"feature_online_store_spec",
// Specs within the 'system_spec' oneof (usually tied to user_specified_system which is immutable)
"sql_database_system_spec",
"looker_system_spec",
// BusinessContext has its own modify methods, likely not updatable here.
}
for _, field := range updatableFields {
if diffs.Has(field) {
updateMask.Paths = append(updateMask.Paths, field)
}
}
// Handle oneof fields explicitly if needed, though changing type/system is usually disallowed.
// For example, if GcsFilesetSpec needs update, ensure the path is added.
if diffs.Has("gcs_fileset_spec") && !slices.Contains(updateMask.Paths, "gcs_fileset_spec") {
updateMask.Paths = append(updateMask.Paths, "gcs_fileset_spec")
}
// Add similar checks for other oneof fields if they are mutable
var updated *pb.Entry
if len(updateMask.Paths) == 0 {
log.V(2).Info("no field needs update", "name", a.id)
// Even though there is no update, we still want to update KRM status
updated = a.actual
} else {
log.V(2).Info("updating fields", "name", a.id, "fields", updateMask.Paths)
req := &pb.UpdateEntryRequest{
Entry: desired,
UpdateMask: updateMask,
}
updated, err = a.gcpClient.UpdateEntry(ctx, req)
if err != nil {
return fmt.Errorf("updating datacatalog entry %s: %w", a.id.String(), err)
}
log.V(2).Info("successfully updated datacatalog entry in gcp", "name", a.id)
}
status := &krm.DataCatalogEntryStatus{}
status.ObservedState = DataCatalogEntryObservedState_FromProto(mapCtx, updated)
if mapCtx.Err() != nil {
return mapCtx.Err()
}
return updateOp.UpdateStatus(ctx, status, nil)
}
func (a *entryAdapter) Export(ctx context.Context) (*unstructured.Unstructured, error) {
if a.actual == nil {
return nil, fmt.Errorf("Find() not called")
}
u := &unstructured.Unstructured{}
obj := &krm.DataCatalogEntry{}
mapCtx := &direct.MapContext{}
specProto := DataCatalogEntrySpec_FromProto(mapCtx, a.actual) // Renamed to avoid conflict if 'spec' name is used later
obj.Spec = direct.ValueOf(specProto)
if mapCtx.Err() != nil {
return nil, mapCtx.Err()
}
// Populate the reference from the identity
// Assuming krm.EntryParent has an EntryGroupRef field holding the KRM reference.
if parent := a.id.Parent(); parent != nil { // Check parent and nested ref
obj.Spec.EntryGroupRef.External = parent.String()
}
uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return nil, fmt.Errorf("converting to unstructured: %w", err)
}
u.SetName(a.id.ID())
u.SetNamespace(a.desired.GetNamespace()) // Preserve namespace
u.SetGroupVersionKind(krm.DataCatalogEntryGVK)
// Set labels and annotations from desired?
// u.SetLabels(a.desired.GetLabels())
// u.SetAnnotations(a.desired.GetAnnotations())
u.Object = uObj
// Clear status fields from the exported object
unstructured.RemoveNestedField(u.Object, "status")
return u, nil
}
func (a *entryAdapter) Delete(ctx context.Context, deleteOp *directbase.DeleteOperation) (bool, error) {
log := klog.FromContext(ctx)
log.V(2).Info("deleting datacatalog entry", "name", a.id)
// Check if the entry type is one that can be deleted.
// "You can delete only the entries created by the CreateEntry method."
// This typically means custom types or specific supported types like FILESET.
// Entries synced from other systems (BigQuery, PubSub etc.) usually cannot be deleted via API.
isDeletable := false
if a.actual != nil { // If Find was successful
switch a.actual.System.(type) {
case *pb.Entry_UserSpecifiedSystem:
isDeletable = true // Assume user-specified system entries are deletable
case *pb.Entry_IntegratedSystem:
// Check if the integrated system entry type allows deletion
switch a.actual.GetIntegratedSystem() {
case pb.IntegratedSystem_INTEGRATED_SYSTEM_UNSPECIFIED:
// Unspecified might imply custom or deletable? Needs clarification. Let's assume not deletable for safety.
case pb.IntegratedSystem_BIGQUERY, pb.IntegratedSystem_CLOUD_PUBSUB, pb.IntegratedSystem_DATAPROC_METASTORE, pb.IntegratedSystem_DATAPLEX, pb.IntegratedSystem_CLOUD_SPANNER, pb.IntegratedSystem_CLOUD_BIGTABLE, pb.IntegratedSystem_CLOUD_SQL, pb.IntegratedSystem_LOOKER, pb.IntegratedSystem_VERTEX_AI:
// These are typically synced and not deletable via DataCatalog API directly.
log.V(1).Info("Skipping deletion of synced datacatalog entry", "name", a.id, "system", a.actual.GetIntegratedSystem())
return false, fmt.Errorf("entries synced from %s cannot be deleted directly via the Data Catalog API", a.actual.GetIntegratedSystem().String()) // Use String() for better error message
}
}
// Also check entry type if needed, e.g., FILESET might be deletable even if integrated.
switch a.actual.EntryType.(type) {
case *pb.Entry_UserSpecifiedType:
isDeletable = true
case *pb.Entry_Type:
// Per CreateEntry docs, only FILESET, CLUSTER, DATA_STREAM or custom types are creatable/deletable.
switch a.actual.GetType() {
case pb.EntryType_FILESET, pb.EntryType_CLUSTER, pb.EntryType_DATA_STREAM:
isDeletable = true
}
}
} else {
// If Find failed, assume it might be deletable (or already deleted).
// Let the API call determine deletability.
isDeletable = true
}
if !isDeletable {
// This scenario might occur if the resource was somehow created outside KCC for a non-deletable type.
log.V(1).Info("Skipping deletion attempt for non-user-managed datacatalog entry", "name", a.id)
// Returning 'false' indicates KCC should abandon the deletion.
// Returning an error might cause retries. Abandoning seems safer.
return false, fmt.Errorf("entry %q is not of a type that can be deleted via the Data Catalog API", a.id.String())
}
req := &pb.DeleteEntryRequest{Name: a.id.String()}
err := a.gcpClient.DeleteEntry(ctx, req)
if err != nil {
if direct.IsNotFound(err) {
log.V(2).Info("skipping delete for non-existent datacatalog entry, assuming it was already deleted", "name", a.id)
return true, nil
}
// PermissionDenied can also mean not found
if direct.IsPermissionDenied(err) {
log.V(2).Info("skipping delete for datacatalog entry due to permission denied, potentially already deleted or access revoked", "name", a.id)
// Treat as deleted to avoid endless loops if permissions are gone.
return true, nil
}
return false, fmt.Errorf("deleting datacatalog entry %s: %w", a.id.String(), err)
}
log.V(2).Info("successfully deleted datacatalog entry", "name", a.id)
return true, nil
}