in pkg/controller/direct/alloydb/cluster_controller.go [303:439]
func (a *ClusterAdapter) Create(ctx context.Context, createOp *directbase.CreateOperation) error {
log := klog.FromContext(ctx)
log.V(2).Info("creating Cluster", "name", a.id)
mapCtx := &direct.MapContext{}
// 1. Resolve reference fields.
if err := a.normalizeReferences(ctx); err != nil {
return fmt.Errorf("normalizing reference for creation: %w", err)
}
// 2. Resolve secret field.
if err := a.resolveInitialUserPasswordField(ctx); err != nil {
return err
}
// 3. Set default fields that were set by the Terraform library for compatibility.
a.resolveKRMDefaultsForCreate()
// 4. Validate mutually-exclusive fields.
if a.desired.Spec.RestoreBackupSource != nil && a.desired.Spec.RestoreContinuousBackupSource != nil {
return fmt.Errorf("only one of 'spec.restoreBackupSource' " +
"and 'spec.restoreContinuousBackupSource' can be configured: " +
"both are configured")
}
desired := a.desired.DeepCopy()
resource := AlloyDBClusterSpec_ToProto(mapCtx, &desired.Spec)
if mapCtx.Err() != nil {
return mapCtx.Err()
}
// 5. Handle labels.
resource.Labels = make(map[string]string)
for k, v := range a.desired.GetObjectMeta().GetLabels() {
resource.Labels[k] = v
}
resource.Labels["managed-by-cnrm"] = "true"
var created *alloydbpb.Cluster
if desired.Spec.RestoreBackupSource != nil || desired.Spec.RestoreContinuousBackupSource != nil {
req := &alloydbpb.RestoreClusterRequest{
Parent: a.id.Parent().String(),
ClusterId: a.id.ID(),
Cluster: resource,
}
if desired.Spec.RestoreBackupSource != nil {
backupSource := BackupSource_ToProto(mapCtx, desired.Spec.RestoreBackupSource)
if mapCtx.Err() != nil {
return mapCtx.Err()
}
createOp.RecordUpdatingEvent()
req.Source = &alloydbpb.RestoreClusterRequest_BackupSource{
BackupSource: backupSource,
}
op, err := a.gcpClient.RestoreCluster(ctx, req)
if err != nil {
log.V(2).Info("error creating Cluster based on a backup source", "name", a.id, "error", err)
return fmt.Errorf("creating Cluster %s based on a backup source: %w", a.id, err)
}
created, err = op.Wait(ctx)
if err != nil {
log.V(2).Info("error waiting for op creating Cluster based on a backup source", "name", a.id, "error", err)
return fmt.Errorf("waiting for op creating Cluster %s based on a backup source: %w", a.id, err)
}
log.V(2).Info("successfully creating Cluster based on a backup source", "name", a.id)
} else if desired.Spec.RestoreContinuousBackupSource != nil {
continuousBackupSource := ContinuousBackupSource_ToProto(mapCtx, desired.Spec.RestoreContinuousBackupSource)
if mapCtx.Err() != nil {
return mapCtx.Err()
}
createOp.RecordUpdatingEvent()
req.Source = &alloydbpb.RestoreClusterRequest_ContinuousBackupSource{
ContinuousBackupSource: continuousBackupSource,
}
op, err := a.gcpClient.RestoreCluster(ctx, req)
if err != nil {
log.V(2).Info("error creating Cluster based on a source cluster", "name", a.id, "error", err)
return fmt.Errorf("creating Cluster %s based on a source cluster: %w", a.id, err)
}
created, err = op.Wait(ctx)
if err != nil {
log.V(2).Info("error waiting for op creating Cluster based on a source cluster", "name", a.id, "error", err)
return fmt.Errorf("waiting for op creating Cluster %s based on a source cluster: %w", a.id, err)
}
log.V(2).Info("successfully creating Cluster based on a source cluster", "name", a.id)
}
return a.updateStatus(ctx, mapCtx, createOp, created)
}
if resource.ClusterType == alloydbpb.Cluster_SECONDARY {
if resource.SecondaryConfig == nil {
return fmt.Errorf("cannot create secondary cluster %s without secondaryConfig", a.id)
}
createOp.RecordUpdatingEvent()
req := &alloydbpb.CreateSecondaryClusterRequest{
Parent: a.id.Parent().String(),
ClusterId: a.id.ID(),
Cluster: resource,
}
op, err := a.gcpClient.CreateSecondaryCluster(ctx, req)
if err != nil {
log.V(2).Info("error creating secondary Cluster", "name", a.id, "error", err)
return fmt.Errorf("creating secondary Cluster %s: %w", a.id, err)
}
created, err = op.Wait(ctx)
if err != nil {
log.V(2).Info("error waiting for secondary Cluster creation op", "name", a.id, "error", err)
return fmt.Errorf("secondary Cluster %s waiting creation: %w", a.id, err)
}
log.V(2).Info("successfully created secondary Cluster", "name", a.id)
} else {
if resource.SecondaryConfig != nil {
return fmt.Errorf("cannot create primary cluster %s with secondaryConfig", a.id)
}
createOp.RecordUpdatingEvent()
req := &alloydbpb.CreateClusterRequest{
Parent: a.id.Parent().String(),
ClusterId: a.id.ID(),
Cluster: resource,
}
op, err := a.gcpClient.CreateCluster(ctx, req)
if err != nil {
log.V(2).Info("error creating primary Cluster", "name", a.id, "error", err)
return fmt.Errorf("creating primary Cluster %s: %w", a.id, err)
}
created, err = op.Wait(ctx)
if err != nil {
log.V(2).Info("error waiting for primary Cluster creation op", "name", a.id, "error", err)
return fmt.Errorf("primary Cluster %s waiting creation: %w", a.id, err)
}
log.V(2).Info("successfully created Cluster", "name", a.id)
}
return a.updateStatus(ctx, mapCtx, createOp, created)
}