func()

in pkg/controller/direct/alloydb/cluster_controller.go [303:439]


func (a *ClusterAdapter) Create(ctx context.Context, createOp *directbase.CreateOperation) error {
	log := klog.FromContext(ctx)
	log.V(2).Info("creating Cluster", "name", a.id)
	mapCtx := &direct.MapContext{}

	// 1. Resolve reference fields.
	if err := a.normalizeReferences(ctx); err != nil {
		return fmt.Errorf("normalizing reference for creation: %w", err)
	}
	// 2. Resolve secret field.
	if err := a.resolveInitialUserPasswordField(ctx); err != nil {
		return err
	}
	// 3. Set default fields that were set by the Terraform library for compatibility.
	a.resolveKRMDefaultsForCreate()
	// 4. Validate mutually-exclusive fields.
	if a.desired.Spec.RestoreBackupSource != nil && a.desired.Spec.RestoreContinuousBackupSource != nil {
		return fmt.Errorf("only one of 'spec.restoreBackupSource' " +
			"and 'spec.restoreContinuousBackupSource' can be configured: " +
			"both are configured")
	}

	desired := a.desired.DeepCopy()
	resource := AlloyDBClusterSpec_ToProto(mapCtx, &desired.Spec)
	if mapCtx.Err() != nil {
		return mapCtx.Err()
	}

	// 5. Handle labels.
	resource.Labels = make(map[string]string)
	for k, v := range a.desired.GetObjectMeta().GetLabels() {
		resource.Labels[k] = v
	}
	resource.Labels["managed-by-cnrm"] = "true"

	var created *alloydbpb.Cluster
	if desired.Spec.RestoreBackupSource != nil || desired.Spec.RestoreContinuousBackupSource != nil {
		req := &alloydbpb.RestoreClusterRequest{
			Parent:    a.id.Parent().String(),
			ClusterId: a.id.ID(),
			Cluster:   resource,
		}
		if desired.Spec.RestoreBackupSource != nil {
			backupSource := BackupSource_ToProto(mapCtx, desired.Spec.RestoreBackupSource)
			if mapCtx.Err() != nil {
				return mapCtx.Err()
			}

			createOp.RecordUpdatingEvent()
			req.Source = &alloydbpb.RestoreClusterRequest_BackupSource{
				BackupSource: backupSource,
			}
			op, err := a.gcpClient.RestoreCluster(ctx, req)
			if err != nil {
				log.V(2).Info("error creating Cluster based on a backup source", "name", a.id, "error", err)
				return fmt.Errorf("creating Cluster  %s based on a backup source: %w", a.id, err)
			}
			created, err = op.Wait(ctx)
			if err != nil {
				log.V(2).Info("error waiting for op creating Cluster based on a backup source", "name", a.id, "error", err)
				return fmt.Errorf("waiting for op creating Cluster %s based on a backup source: %w", a.id, err)
			}
			log.V(2).Info("successfully creating Cluster based on a backup source", "name", a.id)

		} else if desired.Spec.RestoreContinuousBackupSource != nil {
			continuousBackupSource := ContinuousBackupSource_ToProto(mapCtx, desired.Spec.RestoreContinuousBackupSource)
			if mapCtx.Err() != nil {
				return mapCtx.Err()
			}

			createOp.RecordUpdatingEvent()
			req.Source = &alloydbpb.RestoreClusterRequest_ContinuousBackupSource{
				ContinuousBackupSource: continuousBackupSource,
			}
			op, err := a.gcpClient.RestoreCluster(ctx, req)
			if err != nil {
				log.V(2).Info("error creating Cluster based on a source cluster", "name", a.id, "error", err)
				return fmt.Errorf("creating Cluster %s based on a source cluster: %w", a.id, err)
			}
			created, err = op.Wait(ctx)
			if err != nil {
				log.V(2).Info("error waiting for op creating Cluster based on a source cluster", "name", a.id, "error", err)
				return fmt.Errorf("waiting for op creating Cluster %s based on a source cluster: %w", a.id, err)
			}
			log.V(2).Info("successfully creating Cluster based on a source cluster", "name", a.id)
		}
		return a.updateStatus(ctx, mapCtx, createOp, created)
	}

	if resource.ClusterType == alloydbpb.Cluster_SECONDARY {
		if resource.SecondaryConfig == nil {
			return fmt.Errorf("cannot create secondary cluster %s without secondaryConfig", a.id)
		}

		createOp.RecordUpdatingEvent()
		req := &alloydbpb.CreateSecondaryClusterRequest{
			Parent:    a.id.Parent().String(),
			ClusterId: a.id.ID(),
			Cluster:   resource,
		}
		op, err := a.gcpClient.CreateSecondaryCluster(ctx, req)
		if err != nil {
			log.V(2).Info("error creating secondary Cluster", "name", a.id, "error", err)
			return fmt.Errorf("creating secondary Cluster %s: %w", a.id, err)
		}
		created, err = op.Wait(ctx)
		if err != nil {
			log.V(2).Info("error waiting for secondary Cluster creation op", "name", a.id, "error", err)
			return fmt.Errorf("secondary Cluster %s waiting creation: %w", a.id, err)
		}
		log.V(2).Info("successfully created secondary Cluster", "name", a.id)
	} else {
		if resource.SecondaryConfig != nil {
			return fmt.Errorf("cannot create primary cluster %s with secondaryConfig", a.id)
		}

		createOp.RecordUpdatingEvent()
		req := &alloydbpb.CreateClusterRequest{
			Parent:    a.id.Parent().String(),
			ClusterId: a.id.ID(),
			Cluster:   resource,
		}
		op, err := a.gcpClient.CreateCluster(ctx, req)
		if err != nil {
			log.V(2).Info("error creating primary Cluster", "name", a.id, "error", err)
			return fmt.Errorf("creating primary Cluster %s: %w", a.id, err)
		}

		created, err = op.Wait(ctx)
		if err != nil {
			log.V(2).Info("error waiting for primary Cluster creation op", "name", a.id, "error", err)
			return fmt.Errorf("primary Cluster %s waiting creation: %w", a.id, err)
		}
		log.V(2).Info("successfully created Cluster", "name", a.id)
	}
	return a.updateStatus(ctx, mapCtx, createOp, created)
}