in pkg/controller/direct/bigquerydatatransfer/bigquerydatatransferconfig_controller.go [72:190]
func (m *model) AdapterForObject(ctx context.Context, reader client.Reader, u *unstructured.Unstructured) (directbase.Adapter, error) {
obj := &krm.BigQueryDataTransferConfig{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &obj); err != nil {
return nil, fmt.Errorf("error converting to %T: %w", obj, err)
}
// Get ResourceID
resourceID := direct.ValueOf(obj.Spec.ResourceID)
if resourceID == "" {
serviceGeneratedID, err := parseServiceGeneratedIDFromExternalRef(obj)
if err != nil {
return nil, err
}
resourceID = serviceGeneratedID
}
location := obj.Spec.Location
if location == "" {
return nil, fmt.Errorf("cannot resolve location")
}
// Resolve PubSubTopic Ref
if obj.Spec.PubSubTopicRef != nil {
topic, err := refv1beta1.ResolvePubSubTopic(ctx, reader, obj, obj.Spec.PubSubTopicRef)
if err != nil {
return nil, err
}
obj.Spec.PubSubTopicRef.External = topic.String()
}
// Resolve PubSubSubscription Ref
if obj.Spec.ScheduleOptionsV2 != nil &&
obj.Spec.ScheduleOptionsV2.EventDrivenSchedule != nil &&
obj.Spec.ScheduleOptionsV2.EventDrivenSchedule.PubSubSubscriptionRef != nil {
subscription, err := refv1beta1.ResolvePubSubSubscription(ctx, reader, obj, obj.Spec.ScheduleOptionsV2.EventDrivenSchedule.PubSubSubscriptionRef)
if err != nil {
return nil, err
}
obj.Spec.ScheduleOptionsV2.EventDrivenSchedule.PubSubSubscriptionRef.External = subscription.String()
}
// Resolve BigQueryDataSet Ref
if obj.Spec.DatasetRef != nil {
dataset, err := obj.Spec.DatasetRef.NormalizedExternal(ctx, reader, obj.GetNamespace())
if err != nil {
return nil, err
}
// for backwards compatibility and to satisfy the GCP API constraints, we must overrite the
// external reference in the payloads to just the resource ID of the dataset.
_, id, err := bigquerykrmapi.ParseDatasetExternal(dataset)
if err != nil {
return nil, err
}
obj.Spec.DatasetRef.External = id
}
// Resolve KMSCryptoKey Ref
if obj.Spec.EncryptionConfiguration != nil {
kmsCryptoKeyRef, err := refv1beta1.ResolveKMSCryptoKeyRef(ctx, reader, obj, obj.Spec.EncryptionConfiguration.KmsKeyRef)
if err != nil {
return nil, err
}
obj.Spec.EncryptionConfiguration.KmsKeyRef = kmsCryptoKeyRef
}
// Resolve ServiceAccount Ref
if obj.Spec.ServiceAccountRef != nil {
err := obj.Spec.ServiceAccountRef.Resolve(ctx, reader, obj)
if err != nil {
return nil, err
}
}
// Resolve Project Ref
projectRef, err := refv1beta1.ResolveProject(ctx, reader, obj.GetNamespace(), obj.Spec.ProjectRef)
if err != nil {
return nil, err
}
projectID := projectRef.ProjectID
if projectID == "" {
return nil, fmt.Errorf("cannot resolve project")
}
var id *BigQueryDataTransferConfigIdentity
externalRef := direct.ValueOf(obj.Status.ExternalRef)
if externalRef == "" {
id = BuildID(projectID, location, resourceID)
} else {
id, err = asID(externalRef)
if err != nil {
return nil, err
}
if id.projectID != projectID {
return nil, fmt.Errorf("BigQueryDataTransferConfig %s/%s has spec.projectRef changed, expect %s, got %s",
u.GetNamespace(), u.GetName(), id.projectID, projectID)
}
if id.location != location {
return nil, fmt.Errorf("BigQueryDataTransferConfig %s/%s has spec.location changed, expect %s, got %s",
u.GetNamespace(), u.GetName(), id.location, location)
}
if id.transferConfigID != resourceID {
return nil, fmt.Errorf("BigQueryDataTransferConfig %s/%s spec.resourceID changed or does not match the service generated ID, expect %s, got %s",
u.GetNamespace(), u.GetName(), id.transferConfigID, resourceID)
}
}
// Get bigquerydatatransfer GCP client
gcpClient, err := m.client(ctx)
if err != nil {
return nil, err
}
return &Adapter{
id: id,
gcpClient: gcpClient,
desired: obj,
}, nil
}