pkg/controller/tf/controller.go (512 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tf
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/GoogleCloudPlatform/k8s-config-connector/operator/pkg/apis/core/v1beta1"
"github.com/GoogleCloudPlatform/k8s-config-connector/operator/pkg/kccstate"
corekccv1alpha1 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/core/v1alpha1"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/jitter"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/lifecyclehandler"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/metrics"
kccpredicate "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/predicate"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/ratelimiter"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/resourceactuation"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/resourcewatcher"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/execution"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/krmtotf"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/lease/leaser"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/resourceoverrides"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/resourceoverrides/operations"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/servicemapping/servicemappingloader"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/structuredreporting"
tfresource "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/tf/resource"
"github.com/go-logr/logr"
tfschema "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/terraform"
"golang.org/x/sync/semaphore"
corev1 "k8s.io/api/core/v1"
apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
klog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
var logger = klog.Log
type Reconciler struct {
lifecyclehandler.LifecycleHandler
metrics.ReconcilerMetrics
resourceLeaser *leaser.ResourceLeaser
defaulters []k8s.Defaulter
mgr manager.Manager
schemaRef *k8s.SchemaReference
schemaRefMu sync.RWMutex
provider *tfschema.Provider
smLoader *servicemappingloader.ServiceMappingLoader
logger logr.Logger
jitterGenerator jitter.Generator
// Fields used for triggering reconciliations when dependencies are ready
immediateReconcileRequests chan event.GenericEvent
resourceWatcherRoutines *semaphore.Weighted // Used to cap number of goroutines watching unready dependencies
}
func Add(mgr manager.Manager, crd *apiextensions.CustomResourceDefinition, provider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader, defaulters []k8s.Defaulter, jitterGenerator jitter.Generator, additionalPredicate predicate.Predicate) (k8s.SchemaReferenceUpdater, error) {
kind := crd.Spec.Names.Kind
apiVersion := k8s.GetAPIVersionFromCRD(crd)
controllerName := fmt.Sprintf("%v-controller", strings.ToLower(kind))
immediateReconcileRequests := make(chan event.GenericEvent, k8s.ImmediateReconcileRequestsBufferSize)
resourceWatcherRoutines := semaphore.NewWeighted(k8s.MaxNumResourceWatcherRoutines)
r, err := NewReconciler(mgr, crd, provider, smLoader, immediateReconcileRequests, resourceWatcherRoutines, defaulters, jitterGenerator)
if err != nil {
return nil, err
}
obj := &unstructured.Unstructured{
Object: map[string]interface{}{
"kind": kind,
"apiVersion": apiVersion,
},
}
predicateList := []predicate.Predicate{kccpredicate.UnderlyingResourceOutOfSyncPredicate{}}
if additionalPredicate != nil {
predicateList = append(predicateList, additionalPredicate)
}
_, err = builder.
ControllerManagedBy(mgr).
Named(controllerName).
WithOptions(controller.Options{MaxConcurrentReconciles: k8s.ControllerMaxConcurrentReconciles, RateLimiter: ratelimiter.NewRateLimiter()}).
WatchesRawSource(&source.Channel{Source: immediateReconcileRequests}, &handler.EnqueueRequestForObject{}).
For(obj, builder.OnlyMetadata, builder.WithPredicates(predicateList...)).
Build(r)
if err != nil {
return nil, fmt.Errorf("error creating new controller: %w", err)
}
log := mgr.GetLogger()
log.Info("Registered controller", "kind", kind, "apiVersion", apiVersion)
return r, nil
}
func NewReconciler(mgr manager.Manager,
crd *apiextensions.CustomResourceDefinition,
p *tfschema.Provider,
smLoader *servicemappingloader.ServiceMappingLoader,
immediateReconcileRequests chan event.GenericEvent,
resourceWatcherRoutines *semaphore.Weighted,
defaulters []k8s.Defaulter,
jitterGenerator jitter.Generator) (*Reconciler, error) {
if jitterGenerator == nil {
return nil, fmt.Errorf("jitterGenerator must not be nil")
}
controllerName := fmt.Sprintf("%v-controller", strings.ToLower(crd.Spec.Names.Kind))
return &Reconciler{
LifecycleHandler: lifecyclehandler.NewLifecycleHandler(
mgr.GetClient(),
mgr.GetEventRecorderFor(controllerName),
),
resourceLeaser: leaser.NewResourceLeaser(p, smLoader, mgr.GetClient()),
defaulters: defaulters,
mgr: mgr,
schemaRef: &k8s.SchemaReference{
CRD: crd,
JSONSchema: k8s.GetOpenAPIV3SchemaFromCRD(crd),
GVK: schema.GroupVersionKind{
Group: crd.Spec.Group,
Version: k8s.GetVersionFromCRD(crd),
Kind: crd.Spec.Names.Kind,
},
},
ReconcilerMetrics: metrics.ReconcilerMetrics{
ResourceNameLabel: metrics.ResourceNameLabel,
},
provider: p,
smLoader: smLoader,
logger: logger.WithName(controllerName),
immediateReconcileRequests: immediateReconcileRequests,
resourceWatcherRoutines: resourceWatcherRoutines,
jitterGenerator: jitterGenerator,
}, nil
}
func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) {
r.schemaRefMu.RLock()
defer r.schemaRefMu.RUnlock()
r.logger.Info("starting reconcile", "resource", req.NamespacedName)
startTime := time.Now()
r.RecordReconcileWorkers(ctx, r.schemaRef.GVK)
defer r.AfterReconcile()
defer r.RecordReconcileMetrics(ctx, r.schemaRef.GVK, req.Namespace, req.Name, startTime, &err)
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(r.schemaRef.GVK)
if err := r.Get(ctx, req.NamespacedName, u); err != nil {
if apierrors.IsNotFound(err) {
r.logger.Info("resource not found in API server; finishing reconcile", "resource", req.NamespacedName)
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
structuredreporting.ReportReconcileStart(ctx, u)
skip, err := resourceactuation.ShouldSkip(u)
if err != nil {
return reconcile.Result{}, err
}
if skip {
r.logger.Info("Skipping reconcile as nothing has changed and 0 reconcile period is set", "resource", req.NamespacedName)
return reconcile.Result{}, nil
}
sm, err := r.smLoader.GetServiceMapping(u.GroupVersionKind().Group)
if err != nil {
return reconcile.Result{}, err
}
u, err = k8s.TriggerManagedFieldsMetadata(ctx, r.Client, u)
if err != nil {
return reconcile.Result{}, fmt.Errorf("error triggering Server-Side Apply (SSA) metadata: %w", err)
}
resource, err := krmtotf.NewResource(u, sm, r.provider)
if err != nil {
return reconcile.Result{}, fmt.Errorf("could not parse resource %s: %w", req.NamespacedName.String(), err)
}
if err := r.handleDefaults(ctx, resource); err != nil {
return reconcile.Result{}, fmt.Errorf("error handling default values for resource '%v': %w", k8s.GetNamespacedName(resource), err)
}
if err := r.applyChangesForBackwardsCompatibility(ctx, resource); err != nil {
return reconcile.Result{}, fmt.Errorf("error applying changes to resource '%v' for backwards compatibility: %w", k8s.GetNamespacedName(resource), err)
}
cc, ccc, err := kccstate.FetchLiveKCCState(ctx, r.mgr.GetClient(), req.NamespacedName)
if err != nil {
return reconcile.Result{}, err
}
am := resourceactuation.DecideActuationMode(cc, ccc)
switch am {
case v1beta1.Reconciling:
r.logger.V(2).Info("Actuating a resource as actuation mode is \"Reconciling\"", "resource", req.NamespacedName)
case v1beta1.Paused:
jitteredPeriod, err := r.jitterGenerator.JitteredReenqueue(r.schemaRef.GVK, u)
if err != nil {
return reconcile.Result{}, err
}
// add finalizers for deletion defender to make sure we don't delete cloud provider resources when uninstalling
if resource.GetDeletionTimestamp().IsZero() {
if err := r.EnsureFinalizers(ctx, resource.Original, &resource.Resource, k8s.ControllerFinalizerName, k8s.DeletionDefenderFinalizerName); err != nil {
return reconcile.Result{}, err
}
}
r.logger.Info("Skipping actuation of resource as actuation mode is \"Paused\"", "resource", req.NamespacedName, "time to next reconciliation", jitteredPeriod)
return reconcile.Result{RequeueAfter: jitteredPeriod}, nil
default:
return reconcile.Result{}, fmt.Errorf("unknown actuation mode %v", am)
}
// Apply pre-actuation transformation.
if err := resourceoverrides.Handler.PreActuationTransform(&resource.Resource); err != nil {
return reconcile.Result{}, r.HandlePreActuationTransformFailed(ctx, &resource.Resource, fmt.Errorf("error applying pre-actuation transformation to resource '%v': %w", req.NamespacedName.String(), err))
}
requeue, err := r.sync(ctx, resource)
if err != nil {
return reconcile.Result{}, err
}
if requeue {
return reconcile.Result{Requeue: true}, nil
}
jitteredPeriod, err := r.jitterGenerator.JitteredReenqueue(r.schemaRef.GVK, u)
if err != nil {
return reconcile.Result{}, err
}
r.logger.Info("successfully finished reconcile", "resource", k8s.GetNamespacedName(resource), "time to next reconciliation", jitteredPeriod)
return reconcile.Result{RequeueAfter: jitteredPeriod}, nil
}
func (r *Reconciler) sync(ctx context.Context, krmResource *krmtotf.Resource) (requeue bool, err error) {
// isolate any panics to only this function
defer execution.RecoverWithInternalError(&err)
if !krmResource.GetDeletionTimestamp().IsZero() {
// Deleting
r.logger.Info("finalizing resource deletion", "resource", k8s.GetNamespacedName(krmResource))
if !k8s.HasFinalizer(krmResource, k8s.ControllerFinalizerName) {
r.logger.Info("no controller finalizer is present; no finalization necessary",
"resource", k8s.GetNamespacedName(krmResource))
return false, nil
}
if k8s.HasFinalizer(krmResource, k8s.DeletionDefenderFinalizerName) {
r.logger.Info("deletion defender has not yet been finalized; requeuing", "resource", k8s.GetNamespacedName(krmResource))
return true, nil
}
if err := r.HandleDeleting(ctx, &krmResource.Resource); err != nil {
return false, err
}
if k8s.HasAbandonAnnotation(krmResource) {
r.logger.Info("deletion policy set to abandon; abandoning underlying resource", "resource", k8s.GetNamespacedName(krmResource))
return false, r.handleDeleted(ctx, krmResource)
}
if krmtotf.ShouldResolveParentForDelete(krmResource) {
orphaned, parent, err := r.isOrphaned(krmResource)
// Handle orphaned resources
if err != nil {
return false, err
}
if orphaned {
r.logger.Info("resource has been orphaned; no API call necessary", "resource", k8s.GetNamespacedName(krmResource))
return false, r.handleDeleted(ctx, krmResource)
}
if parent != nil && !k8s.IsResourceReady(parent) {
if krmtotf.ShouldCheckParentReadyForDelete(krmResource, parent) {
// If this resource has a parent and is not orphaned, ensure its parent
// is ready before attempting deletion.
// Requeue resource for reconciliation with exponential backoff applied
return true, r.HandleUnresolvableDeps(ctx, &krmResource.Resource, k8s.NewReferenceNotReadyErrorForResource(parent))
}
}
}
liveState, err := krmtotf.FetchLiveStateForDelete(ctx, krmResource, r.provider, r, r.smLoader)
if err != nil {
return false, r.HandleDeleteFailed(ctx, &krmResource.Resource, fmt.Errorf("error fetching live state: %w", err))
}
if liveState.Empty() {
r.logger.Info("underlying resource does not exist; no API call necessary", "resource", k8s.GetNamespacedName(krmResource))
return false, r.handleDeleted(ctx, krmResource)
}
if err := r.obtainResourceLeaseIfNecessary(ctx, krmResource, liveState); err != nil {
return false, err
}
r.logger.Info("deleting underlying resource", "resource", k8s.GetNamespacedName(krmResource))
if _, err := krmResource.TFResource.Apply(ctx, liveState, &terraform.InstanceDiff{Destroy: true}, r.provider.Meta()); err != nil {
return false, r.HandleDeleteFailed(ctx, &krmResource.Resource, fmt.Errorf("error deleting resource: %v", err))
}
return false, r.handleDeleted(ctx, krmResource)
}
liveState, err := krmtotf.FetchLiveStateForCreateAndUpdate(ctx, krmResource, r.provider, r, r.smLoader)
if err != nil {
if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
r.logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(krmResource))
return r.handleUnresolvableDeps(ctx, &krmResource.Resource, unwrappedErr)
}
return false, r.HandleUpdateFailed(ctx, &krmResource.Resource, fmt.Errorf("error fetching live state: %w", err))
}
if err := r.obtainResourceLeaseIfNecessary(ctx, krmResource, liveState); err != nil {
return false, err
}
ok, err := r.hasServerGeneratedIDAndHadBeenCreatedOnceAlready(krmResource)
if err != nil {
return false, r.HandleUpdateFailed(ctx, &krmResource.Resource, err)
}
if ok && liveState.Empty() {
// GCP resource with server-generated ID had been created once already,
// but no longer exists. Don't "recreate" the resource, since
// "recreating" resources with server-generated IDs technically creates
// a brand new resource instead.
return false, r.HandleUpdateFailed(ctx, &krmResource.Resource,
fmt.Errorf("underlying resource no longer exists and can't be recreated without creating a brand new resource"))
}
config, secretVersions, err := krmtotf.KRMResourceToTFResourceConfigFull(
krmResource, r, r.smLoader, liveState, r.schemaRef.JSONSchema, true,
)
if err != nil {
if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
r.logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(krmResource))
return r.handleUnresolvableDeps(ctx, &krmResource.Resource, unwrappedErr)
}
return false, r.HandleUpdateFailed(ctx, &krmResource.Resource, fmt.Errorf("error expanding resource configuration for kind %s: %w", krmResource.Kind, err))
}
// Apply last-minute apply overrides
if err := resourceoverrides.Handler.PreTerraformApply(ctx, krmResource.GroupVersionKind(), &operations.PreTerraformApply{KRMResource: krmResource, TerraformConfig: config, LiveState: liveState}); err != nil {
return false, r.HandleUpdateFailed(ctx, &krmResource.Resource, fmt.Errorf("error applying pre-apply transformation to resource: %w", err))
}
diff, err := krmResource.TFResource.Diff(ctx, liveState, config, r.provider.Meta())
if err != nil {
return false, r.HandleUpdateFailed(ctx, &krmResource.Resource, fmt.Errorf("error calculating diff: %w", err))
}
if !liveState.Empty() && diff.RequiresNew() {
return false, r.HandleUpdateFailed(ctx, &krmResource.Resource,
k8s.NewImmutableFieldsMutationError(tfresource.ImmutableFieldsFromDiff(diff)))
}
if err := r.EnsureFinalizers(ctx, krmResource.Original, &krmResource.Resource, k8s.ControllerFinalizerName, k8s.DeletionDefenderFinalizerName); err != nil {
return false, err
}
if diff.Empty() {
r.logger.Info("underlying resource already up to date", "resource", k8s.GetNamespacedName(krmResource))
return false, r.handleUpToDate(ctx, krmResource, liveState, secretVersions)
}
// Report diff to structured-reporting subsystem
{
report := &structuredreporting.Diff{}
u, err := krmResource.MarshalAsUnstructured()
if err != nil {
log := log.FromContext(ctx)
log.Error(err, "error reporting diff")
}
report.Object = u
if diff != nil {
for k, attr := range diff.Attributes {
report.Fields = append(report.Fields, structuredreporting.DiffField{
ID: k,
Old: attr.Old,
New: attr.New,
})
}
}
report.IsNewObject = liveState.Empty()
structuredreporting.ReportDiff(ctx, report)
}
r.logger.Info("creating/updating underlying resource", "resource", k8s.GetNamespacedName(krmResource))
if err := r.HandleUpdating(ctx, &krmResource.Resource); err != nil {
return false, err
}
// If creating a new resource, turn off RequiresNew in the diff. This is
// done to prevent TF from clearing providerMeta (which contains blueprint
// attribution information) during Apply() (b/193567082#comment15).
if liveState.Empty() {
for _, d := range diff.Attributes {
d.RequiresNew = false
}
}
newState, diagnostics := krmResource.TFResource.Apply(ctx, liveState, diff, r.provider.Meta())
if err := krmtotf.NewErrorFromDiagnostics(diagnostics); err != nil {
r.logger.Error(err, "error applying desired state", "resource", krmResource.GetNamespacedName())
return false, r.HandleUpdateFailed(ctx, &krmResource.Resource, fmt.Errorf("error applying desired state: %w", err))
}
return false, r.handleUpToDate(ctx, krmResource, newState, secretVersions)
}
func (r *Reconciler) supportsImmediateReconciliations() bool {
return r.immediateReconcileRequests != nil
}
func (r *Reconciler) handleUnresolvableDeps(ctx context.Context, resource *k8s.Resource, originErr error) (requeue bool, err error) {
refGVK, refNN, ok := lifecyclehandler.CausedByUnreadyOrNonexistentResourceRefs(originErr)
if !ok || !r.supportsImmediateReconciliations() {
// Requeue resource for reconciliation with exponential backoff applied
return true, r.HandleUnresolvableDeps(ctx, resource, originErr)
}
// Don't start a watch on the reference if there
// are too many ongoing watches already
if !r.resourceWatcherRoutines.TryAcquire(1) {
// Requeue resource for reconciliation with exponential backoff applied
return true, r.HandleUnresolvableDeps(ctx, resource, originErr)
}
logger := r.logger.WithValues(
"resource", resource.GetNamespacedName(),
"resourceGVK", resource.GroupVersionKind(),
"reference", refNN,
"referenceGVK", refGVK)
// Create a logger for ResourceWatcher that contains info
// about the referencing resource. This is done since the
// messages logged by ResourceWatcher only include the
// information of the resource it is watching by default.
watcherLogger := r.logger.WithValues(
"referencingResource", resource.GetNamespacedName(),
"referencingResourceGVK", resource.GroupVersionKind())
watcher, err := resourcewatcher.New(r.mgr.GetConfig(), watcherLogger)
if err != nil {
return false, r.HandleUpdateFailed(ctx, resource, fmt.Errorf("error initializing new resourcewatcher: %w", err))
}
go func() {
// Decrement the count of active resource watches after
// the watch finishes
defer r.resourceWatcherRoutines.Release(1)
timeoutPeriod := r.jitterGenerator.WatchJitteredTimeout()
ctx, cancel := context.WithTimeout(ctx, timeoutPeriod)
defer cancel()
logger.Info("starting wait with timeout on resource's reference", "timeout", timeoutPeriod)
if err := watcher.WaitForResourceToBeReady(ctx, refNN, refGVK); err != nil {
logger.Error(err, "error while waiting for resource's reference to be ready")
return
}
logger.Info("enqueuing resource for immediate reconciliation now that its reference is ready")
r.enqueueForImmediateReconciliation(resource.GetNamespacedName())
}()
// Do not requeue resource for immediate reconciliation. Wait for either
// the next periodic reconciliation or for the referenced resource to be ready (which
// triggers a reconciliation), whichever comes first.
return false, r.HandleUnresolvableDeps(ctx, resource, originErr)
}
// enqueueForImmediateReconciliation enqueues the given resource for immediate
// reconciliation. Note that this function only takes in the name and namespace
// of the resource and not its GVK since the controller instance that this
// reconcile instance belongs to can only reconcile resources of one GVK.
func (r *Reconciler) enqueueForImmediateReconciliation(resourceNN types.NamespacedName) {
genEvent := event.GenericEvent{}
genEvent.Object = &unstructured.Unstructured{}
genEvent.Object.SetNamespace(resourceNN.Namespace)
genEvent.Object.SetName(resourceNN.Name)
r.immediateReconcileRequests <- genEvent
}
func (r *Reconciler) handleDefaults(ctx context.Context, resource *krmtotf.Resource) error {
for _, defaulter := range r.defaulters {
if _, err := defaulter.ApplyDefaults(ctx, resource); err != nil {
return err
}
}
return nil
}
func (r *Reconciler) applyChangesForBackwardsCompatibility(ctx context.Context, resource *krmtotf.Resource) error {
rc := resource.ResourceConfig
// Ensure the resource has a management-conflict-prevention-policy
// annotation. This is done to be backwards compatible with resources
// created before the webhook for defaulting the annotation was added.
if err := k8s.EnsureManagementConflictPreventionAnnotationForTFBasedResource(ctx, r.Client, resource, &rc, r.provider.ResourcesMap); err != nil {
return fmt.Errorf("error ensuring resource '%v' has a management conflict policy: %w", k8s.GetNamespacedName(resource), err)
}
// Ensure the resource has a hierarchical reference. This is done to be
// backwards compatible with resources created before the webhook for
// defaulting hierarchical references was added.
if err := k8s.EnsureHierarchicalReference(ctx, &resource.Resource, rc.HierarchicalReferences, rc.Containers, r.Client); err != nil {
return fmt.Errorf("error ensuring resource '%v' has a hierarchical reference: %w", k8s.GetNamespacedName(resource), err)
}
return nil
}
func (r *Reconciler) obtainResourceLeaseIfNecessary(ctx context.Context, krmResource *krmtotf.Resource, liveState *terraform.InstanceState) error {
conflictPolicy, err := k8s.GetManagementConflictPreventionAnnotationValue(krmResource)
if err != nil {
return err
}
if conflictPolicy != k8s.ManagementConflictPreventionPolicyResource {
return nil
}
ok, err := r.resourceLeaser.IsLeasable(krmResource)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("kind '%v' does not support usage of %v of '%v'", krmResource.GroupVersionKind(),
k8s.ManagementConflictPreventionPolicyAnnotation, conflictPolicy)
}
// Use SoftObtain instead of Obtain so that obtaining the lease ONLY changes the 'labels' value on the local krmResource and does not write the results
// to GCP. The reason to do that is to reduce the number of writes to GCP and therefore improve performance and reduce errors.
// The labels are written to GCP by the main sync(...) function because the changes to the labels show up in the diff.
if err := r.resourceLeaser.SoftObtain(ctx, &krmResource.Resource, krmtotf.GetLabelsFromState(krmResource, liveState)); err != nil {
return r.HandleObtainLeaseFailed(ctx, &krmResource.Resource, fmt.Errorf("error obtaining lease on '%v': %w",
k8s.GetNamespacedName(krmResource), err))
}
return nil
}
func (r *Reconciler) handleDeleted(ctx context.Context, resource *krmtotf.Resource) error {
if err := resourceoverrides.Handler.PostActuationTransform(resource.Original, &resource.Resource, nil, nil); err != nil {
return r.HandlePostActuationTransformFailed(ctx, &resource.Resource, fmt.Errorf("error applying post-actuation transformation to resource '%v': %w", resource.GetNamespacedName(), err))
}
return r.HandleDeleted(ctx, &resource.Resource)
}
func (r *Reconciler) handleUpToDate(ctx context.Context, resource *krmtotf.Resource, liveState *terraform.InstanceState, secretVersions map[string]string) error {
resource.Spec, resource.Status = krmtotf.ResolveSpecAndStatusWithResourceID(resource, liveState)
if err := updateMutableButUnreadableFieldsAnnotationFor(resource); err != nil {
return err
}
if err := updateObservedSecretVersionsAnnotationFor(resource, secretVersions); err != nil {
return err
}
if err := resourceoverrides.Handler.PostActuationTransform(resource.Original, &resource.Resource, liveState, nil); err != nil {
return r.HandlePostActuationTransformFailed(ctx, &resource.Resource, fmt.Errorf("error applying post-actuation transformation to resource '%v': %w", resource.GetNamespacedName(), err))
}
if !k8s.IsSpecOrStatusUpdateRequired(&resource.Resource, resource.Original) &&
!k8s.IsAnnotationsUpdateRequired(&resource.Resource, resource.Original) &&
k8s.ReadyConditionMatches(&resource.Resource, corev1.ConditionTrue, k8s.UpToDate, k8s.UpToDateMessage) {
return nil
}
return r.HandleUpToDate(ctx, &resource.Resource)
}
// isOrphaned returns whether the resource has been orphaned (i.e. its parent
// Kubernetes resource has already been deleted). Note:
// * A resource with no parent will always return false.
// * A resource whose parent is an external resource will always return false.
// * Hierarchical resources are also considered parents.
// It is assumed that parent and hierarchical references are always at the top
// level.
func (r *Reconciler) isOrphaned(resource *krmtotf.Resource) (orphaned bool, parent *k8s.Resource, err error) {
// Currently, it's assumed that parent reference fields only support one resource type.
parentConfigs := make([]corekccv1alpha1.TypeConfig, 0)
for _, ref := range resource.ResourceConfig.ResourceReferences {
if krmtotf.IsRequiredParentReference(ref, resource) {
parentConfigs = append(parentConfigs, ref.TypeConfig)
}
}
if len(parentConfigs) == 0 {
return false, nil, nil
}
return lifecyclehandler.IsOrphaned(&resource.Resource, parentConfigs, r.Client)
}
func (r *Reconciler) hasServerGeneratedIDAndHadBeenCreatedOnceAlready(resource *krmtotf.Resource) (bool, error) {
if !resource.HasServerGeneratedIDField() {
return false, nil
}
val, err := resource.GetServerGeneratedID()
if err != nil {
if _, ok := k8s.AsServerGeneratedIDNotFoundError(err); ok {
return false, nil
}
return false, err
}
return val != "", nil
}
var _ k8s.SchemaReferenceUpdater = &Reconciler{}
func (r *Reconciler) UpdateSchema(crd *apiextensions.CustomResourceDefinition) error {
r.schemaRefMu.Lock()
defer r.schemaRefMu.Unlock()
return k8s.UpdateSchema(r.schemaRef, crd)
}
func updateMutableButUnreadableFieldsAnnotationFor(resource *krmtotf.Resource) error {
// The annotation should only be set for resources with mutable-but-unreadable fields.
if len(resource.ResourceConfig.MutableButUnreadableFields) == 0 {
k8s.RemoveAnnotation(k8s.MutableButUnreadableFieldsAnnotation, resource)
return nil
}
annotationVal, err := krmtotf.MutableButUnreadableFieldsAnnotationFor(resource)
if err != nil {
return fmt.Errorf("error constructing value for %v: %w", k8s.MutableButUnreadableFieldsAnnotation, err)
}
k8s.SetAnnotation(k8s.MutableButUnreadableFieldsAnnotation, annotationVal, resource)
return nil
}
func updateObservedSecretVersionsAnnotationFor(resource *krmtotf.Resource, secretVersions map[string]string) error {
hasSensitiveFields := tfresource.TFResourceHasSensitiveFields(resource.TFResource)
return k8s.UpdateOrRemoveObservedSecretVersionsAnnotation(&resource.Resource, secretVersions, hasSensitiveFields)
}