in internal/client/integrations/overrides.go [91:293]
func mergeOverrides(eversion integrationVersionExternal, o overrides, grantPermission bool) (integrationVersionExternal, error) {
var err error
var serviceAccountName string
userDefineSA := true
// apply trigger overrides
for _, triggerOverride := range o.TriggerOverrides {
foundOverride := false
for triggerIndex, trigger := range eversion.TriggerConfigs {
if triggerOverride.TriggerNumber == trigger.TriggerNumber {
switch trigger.TriggerType {
case "CLOUD_PUBSUB_EXTERNAL":
if triggerOverride.ProjectId == nil || triggerOverride.TopicName == nil {
return eversion, fmt.Errorf("projectid and topicName are mandatory in the overrides")
}
trigger.TriggerId = fmt.Sprintf(pubsubTrigger, *triggerOverride.ProjectId, *triggerOverride.ProjectId, *triggerOverride.TopicName)
trigger.Properties["Subscription name"] = *triggerOverride.ProjectId + "_" + *triggerOverride.TopicName
trigger.Properties["IP Project name"] = *triggerOverride.ProjectId
if triggerOverride.ServiceAccount != nil {
if !strings.HasPrefix(*triggerOverride.ServiceAccount, configVarPrefix) {
serviceAccountName = fmt.Sprintf("%s@%s.iam.gserviceaccount.com", *triggerOverride.ServiceAccount, *triggerOverride.ProjectId)
trigger.Properties["Service account"] = serviceAccountName
} else {
// it is a config variable. Do not set anything.
userDefineSA = false
grantPermission = false
clilog.Debug.Printf("config variable detected, skipping grantPermission\n")
}
} else {
serviceAccountName, err = apiclient.GetComputeEngineDefaultServiceAccount(apiclient.GetProjectID())
if err != nil {
return eversion, fmt.Errorf("Unable to get default comput engine service account: %v\n", err)
}
trigger.Properties["Service account"] = serviceAccountName
userDefineSA = false
}
if grantPermission {
if userDefineSA {
// create the SA if it doesn't exist
if err := apiclient.CreateServiceAccount(serviceAccountName); err != nil {
return eversion, err
}
}
if err := apiclient.SetIntegrationInvokerPermission(*triggerOverride.ProjectId, serviceAccountName); err != nil {
clilog.Warning.Printf("Unable to update permissions for the service account: %v\n", err)
}
}
case "API":
if triggerOverride.APIPath == nil {
return eversion, fmt.Errorf("the field apiPath is missing from the API Trigger in overrides")
}
trigger.TriggerId = apiTrigger + *triggerOverride.APIPath
if len(triggerOverride.Properties) > 0 {
trigger.Properties = triggerOverride.Properties
}
case "CLOUD_SCHEDULER":
if triggerOverride.CloudSchedulerServiceAccount != nil {
trigger.CloudSchedulerConfig.ServiceAccountEmail = *triggerOverride.CloudSchedulerServiceAccount
}
if triggerOverride.CloudSchedulerCronTab != nil {
trigger.CloudSchedulerConfig.CronTab = *triggerOverride.CloudSchedulerCronTab
}
if triggerOverride.CloudSchedulerLocation != nil {
trigger.CloudSchedulerConfig.Location = *triggerOverride.CloudSchedulerLocation
}
case "INTEGRATION_CONNECTOR_TRIGGER":
if len(triggerOverride.Properties) > 0 {
trigger.TriggerId = fmt.Sprintf("integration_connector_trigger/projects/%s/locations/%s/connections/%s/eventSubscriptions/%s",
triggerOverride.Properties["Project name"],
triggerOverride.Properties["Region"], triggerOverride.Properties["Connection name"],
triggerOverride.Properties["Subscription name"])
trigger.Properties = triggerOverride.Properties
}
default:
clilog.Warning.Printf("unsupported trigger type %s\n", trigger.TriggerType)
}
eversion.TriggerConfigs[triggerIndex] = trigger
foundOverride = true
}
}
if !foundOverride {
clilog.Warning.Printf("trigger override id %s was not found in the integration json\n",
triggerOverride.TriggerNumber)
}
}
// apply task overrides
for _, taskOverride := range o.TaskOverrides {
foundOverride := false
for taskIndex, task := range eversion.TaskConfigs {
if taskOverride.TaskId == task.TaskId && taskOverride.Task == task.Task && task.Task != "GenericConnectorTask" {
task.Parameters = overrideParameters(taskOverride.Parameters, task.Parameters)
eversion.TaskConfigs[taskIndex] = task
foundOverride = true
}
}
if !foundOverride {
clilog.Warning.Printf("task override %s with id %s was not found in the integration json\n",
taskOverride.DisplayName, taskOverride.TaskId)
}
}
for _, paramOverride := range o.ParamOverrides {
foundOverride := false
for ipIndex, ip := range eversion.IntegrationParameters {
if paramOverride.Key == ip.Key {
ip.DefaultValue = paramOverride.DefaultValue
}
eversion.IntegrationParameters[ipIndex] = ip
foundOverride = true
}
if !foundOverride {
clilog.Warning.Printf("param override key %s with dataTpe %s was not found in the integration json\n",
paramOverride.Key, paramOverride.DataType)
}
}
// apply connection overrides
if !apiclient.DryRun() {
foundOverride := false
for _, connectionOverride := range o.ConnectionOverrides {
for taskIndex, task := range eversion.TaskConfigs {
if connectionOverride.TaskId == task.TaskId && connectionOverride.Task == task.Task {
newcp, err := getNewConnectionParams(connectionOverride.Parameters.ConnectionName,
connectionOverride.Parameters.ConnectionLocation)
if err != nil {
return eversion, err
}
cparams := task.Parameters["config"]
// Google built connector
if cparams.Value.JsonValue != nil {
cd, err := getConnectionDetails(*cparams.Value.JsonValue)
if err != nil {
return eversion, err
}
cd.Connection.ConnectionName = newcp.ConnectionName
cd.Connection.ConnectorVersion = newcp.ConnectorVersion
cd.Connection.ServiceName = newcp.ServiceName
jsonValue, err := stringifyValue(cd)
if err != nil {
return eversion, err
}
*cparams.Value.JsonValue = jsonValue
task.Parameters["config"] = cparams
if connectionOverride.Parameters.EntityType != nil {
task.Parameters["entityType"] = *connectionOverride.Parameters.EntityType
}
eversion.TaskConfigs[taskIndex] = task
foundOverride = true
}
cversion := task.Parameters["connectionVersion"]
// custom connector
if cversion.Value.StringValue != nil {
newcp, err := getNewConnectionParams(connectionOverride.Parameters.ConnectionName,
connectionOverride.Parameters.ConnectionLocation)
if err != nil {
return eversion, err
}
*task.Parameters["connectionVersion"].Value.StringValue = newcp.ConnectorVersion
*task.Parameters["connectionName"].Value.StringValue = newcp.ConnectionName
if connectionOverride.Parameters.EntityType != nil {
task.Parameters["entityType"] = *connectionOverride.Parameters.EntityType
}
eversion.TaskConfigs[taskIndex] = task
foundOverride = true
}
}
}
if !foundOverride {
clilog.Warning.Printf("task override with id %s was not found in the integration json\n",
connectionOverride.TaskId)
}
}
}
// apply integration overrides
if o.IntegrationOverrides.DatabasePersistencePolicy != "" {
eversion.DatabasePersistencePolicy = o.IntegrationOverrides.DatabasePersistencePolicy
}
eversion.CloudLoggingDetails.CloudLoggingSeverity = o.IntegrationOverrides.CloudLoggingDetails.CloudLoggingSeverity
eversion.CloudLoggingDetails.EnableCloudLogging = o.IntegrationOverrides.CloudLoggingDetails.EnableCloudLogging
if o.IntegrationOverrides.RunAsServiceAccount != nil {
eversion.RunAsServiceAccount = *o.IntegrationOverrides.RunAsServiceAccount
}
eversion.EnableVariableMasking = o.IntegrationOverrides.EnableVariableMasking
return eversion, nil
}