internal/client/integrations/overrides.go (546 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integrations
import (
"encoding/json"
"errors"
"fmt"
"internal/apiclient"
"internal/client/authconfigs"
"internal/client/connections"
"internal/clilog"
"regexp"
"strings"
)
type overrides struct {
TriggerOverrides []triggeroverrides `json:"trigger_overrides,omitempty"`
TaskOverrides []taskconfig `json:"task_overrides,omitempty"`
ConnectionOverrides []connectionoverrides `json:"connection_overrides,omitempty"`
ParamOverrides []parameterExternal `json:"param_overrides,omitempty"`
IntegrationOverrides integrationoverrides `json:"integration_overrides,omitempty"`
}
type integrationoverrides struct {
RunAsServiceAccount *string `json:"runAsServiceAccount,omitempty"`
DatabasePersistencePolicy string `json:"databasePersistencePolicy"`
EnableVariableMasking bool `json:"enableVariableMasking"`
CloudLoggingDetails cloudLoggingDetails `json:"cloudLoggingDetails,omitempty"`
}
type triggeroverrides struct {
TriggerNumber string `json:"triggerNumber,omitempty"`
TriggerType string `json:"triggerType,omitempty"`
ProjectId *string `json:"projectId,omitempty"`
TopicName *string `json:"topicName,omitempty"`
APIPath *string `json:"apiPath,omitempty"`
ServiceAccount *string `json:"serviceAccount,omitempty"`
Properties map[string]string `json:"properties,omitempty"`
CloudSchedulerServiceAccount *string `json:"cloudSchedulerServiceAccount,omitempty"`
CloudSchedulerLocation *string `json:"cloudSchedulerLocation,omitempty"`
CloudSchedulerCronTab *string `json:"cloudSchedulerCronTab,omitempty"`
}
type connectionoverrides struct {
TaskId string `json:"taskId,omitempty"`
Task string `json:"task,omitempty"`
Parameters connectionoverrideparams `json:"parameters,omitempty"`
}
type connectionoverrideparams struct {
ConnectionName string `json:"connectionName,omitempty"`
ConnectionLocation string `json:"connectionLocation,omitempty"`
EntityType *eventparameter `json:"entityType,omitempty"`
}
type connectiondetails struct {
Type string `json:"@type,omitempty"`
Connection connectionparams `json:"connection,omitempty"`
Operation string `json:"operation,omitempty"`
}
type connectionparams struct {
ServiceName string `json:"serviceName,omitempty"`
ConnectionName string `json:"connectionName,omitempty"`
ConnectorVersion string `json:"connectorVersion,omitempty"`
}
const (
pubsubTrigger = "cloud_pubsub_external_trigger/projects/%s/subscriptions/%s_%s"
apiTrigger = "api_trigger/"
)
const authConfigValue = "{ \"@type\": \"type.googleapis.com/enterprise.crm.eventbus.authconfig.AuthConfigTaskParam\",\"authConfigId\": \""
const configVarPrefix = "$`CONFIG_"
// mergeOverrides
func mergeOverrides(eversion integrationVersionExternal, o overrides, grantPermission bool) (integrationVersionExternal, error) {
var err error
var serviceAccountName string
userDefineSA := true
// apply trigger overrides
for _, triggerOverride := range o.TriggerOverrides {
foundOverride := false
for triggerIndex, trigger := range eversion.TriggerConfigs {
if triggerOverride.TriggerNumber == trigger.TriggerNumber {
switch trigger.TriggerType {
case "CLOUD_PUBSUB_EXTERNAL":
if triggerOverride.ProjectId == nil || triggerOverride.TopicName == nil {
return eversion, fmt.Errorf("projectid and topicName are mandatory in the overrides")
}
trigger.TriggerId = fmt.Sprintf(pubsubTrigger, *triggerOverride.ProjectId, *triggerOverride.ProjectId, *triggerOverride.TopicName)
trigger.Properties["Subscription name"] = *triggerOverride.ProjectId + "_" + *triggerOverride.TopicName
trigger.Properties["IP Project name"] = *triggerOverride.ProjectId
if triggerOverride.ServiceAccount != nil {
if !strings.HasPrefix(*triggerOverride.ServiceAccount, configVarPrefix) {
serviceAccountName = fmt.Sprintf("%s@%s.iam.gserviceaccount.com", *triggerOverride.ServiceAccount, *triggerOverride.ProjectId)
trigger.Properties["Service account"] = serviceAccountName
} else {
// it is a config variable. Do not set anything.
userDefineSA = false
grantPermission = false
clilog.Debug.Printf("config variable detected, skipping grantPermission\n")
}
} else {
serviceAccountName, err = apiclient.GetComputeEngineDefaultServiceAccount(apiclient.GetProjectID())
if err != nil {
return eversion, fmt.Errorf("Unable to get default comput engine service account: %v\n", err)
}
trigger.Properties["Service account"] = serviceAccountName
userDefineSA = false
}
if grantPermission {
if userDefineSA {
// create the SA if it doesn't exist
if err := apiclient.CreateServiceAccount(serviceAccountName); err != nil {
return eversion, err
}
}
if err := apiclient.SetIntegrationInvokerPermission(*triggerOverride.ProjectId, serviceAccountName); err != nil {
clilog.Warning.Printf("Unable to update permissions for the service account: %v\n", err)
}
}
case "API":
if triggerOverride.APIPath == nil {
return eversion, fmt.Errorf("the field apiPath is missing from the API Trigger in overrides")
}
trigger.TriggerId = apiTrigger + *triggerOverride.APIPath
if len(triggerOverride.Properties) > 0 {
trigger.Properties = triggerOverride.Properties
}
case "CLOUD_SCHEDULER":
if triggerOverride.CloudSchedulerServiceAccount != nil {
trigger.CloudSchedulerConfig.ServiceAccountEmail = *triggerOverride.CloudSchedulerServiceAccount
}
if triggerOverride.CloudSchedulerCronTab != nil {
trigger.CloudSchedulerConfig.CronTab = *triggerOverride.CloudSchedulerCronTab
}
if triggerOverride.CloudSchedulerLocation != nil {
trigger.CloudSchedulerConfig.Location = *triggerOverride.CloudSchedulerLocation
}
case "INTEGRATION_CONNECTOR_TRIGGER":
if len(triggerOverride.Properties) > 0 {
trigger.TriggerId = fmt.Sprintf("integration_connector_trigger/projects/%s/locations/%s/connections/%s/eventSubscriptions/%s",
triggerOverride.Properties["Project name"],
triggerOverride.Properties["Region"], triggerOverride.Properties["Connection name"],
triggerOverride.Properties["Subscription name"])
trigger.Properties = triggerOverride.Properties
}
default:
clilog.Warning.Printf("unsupported trigger type %s\n", trigger.TriggerType)
}
eversion.TriggerConfigs[triggerIndex] = trigger
foundOverride = true
}
}
if !foundOverride {
clilog.Warning.Printf("trigger override id %s was not found in the integration json\n",
triggerOverride.TriggerNumber)
}
}
// apply task overrides
for _, taskOverride := range o.TaskOverrides {
foundOverride := false
for taskIndex, task := range eversion.TaskConfigs {
if taskOverride.TaskId == task.TaskId && taskOverride.Task == task.Task && task.Task != "GenericConnectorTask" {
task.Parameters = overrideParameters(taskOverride.Parameters, task.Parameters)
eversion.TaskConfigs[taskIndex] = task
foundOverride = true
}
}
if !foundOverride {
clilog.Warning.Printf("task override %s with id %s was not found in the integration json\n",
taskOverride.DisplayName, taskOverride.TaskId)
}
}
for _, paramOverride := range o.ParamOverrides {
foundOverride := false
for ipIndex, ip := range eversion.IntegrationParameters {
if paramOverride.Key == ip.Key {
ip.DefaultValue = paramOverride.DefaultValue
}
eversion.IntegrationParameters[ipIndex] = ip
foundOverride = true
}
if !foundOverride {
clilog.Warning.Printf("param override key %s with dataTpe %s was not found in the integration json\n",
paramOverride.Key, paramOverride.DataType)
}
}
// apply connection overrides
if !apiclient.DryRun() {
foundOverride := false
for _, connectionOverride := range o.ConnectionOverrides {
for taskIndex, task := range eversion.TaskConfigs {
if connectionOverride.TaskId == task.TaskId && connectionOverride.Task == task.Task {
newcp, err := getNewConnectionParams(connectionOverride.Parameters.ConnectionName,
connectionOverride.Parameters.ConnectionLocation)
if err != nil {
return eversion, err
}
cparams := task.Parameters["config"]
// Google built connector
if cparams.Value.JsonValue != nil {
cd, err := getConnectionDetails(*cparams.Value.JsonValue)
if err != nil {
return eversion, err
}
cd.Connection.ConnectionName = newcp.ConnectionName
cd.Connection.ConnectorVersion = newcp.ConnectorVersion
cd.Connection.ServiceName = newcp.ServiceName
jsonValue, err := stringifyValue(cd)
if err != nil {
return eversion, err
}
*cparams.Value.JsonValue = jsonValue
task.Parameters["config"] = cparams
if connectionOverride.Parameters.EntityType != nil {
task.Parameters["entityType"] = *connectionOverride.Parameters.EntityType
}
eversion.TaskConfigs[taskIndex] = task
foundOverride = true
}
cversion := task.Parameters["connectionVersion"]
// custom connector
if cversion.Value.StringValue != nil {
newcp, err := getNewConnectionParams(connectionOverride.Parameters.ConnectionName,
connectionOverride.Parameters.ConnectionLocation)
if err != nil {
return eversion, err
}
*task.Parameters["connectionVersion"].Value.StringValue = newcp.ConnectorVersion
*task.Parameters["connectionName"].Value.StringValue = newcp.ConnectionName
if connectionOverride.Parameters.EntityType != nil {
task.Parameters["entityType"] = *connectionOverride.Parameters.EntityType
}
eversion.TaskConfigs[taskIndex] = task
foundOverride = true
}
}
}
if !foundOverride {
clilog.Warning.Printf("task override with id %s was not found in the integration json\n",
connectionOverride.TaskId)
}
}
}
// apply integration overrides
if o.IntegrationOverrides.DatabasePersistencePolicy != "" {
eversion.DatabasePersistencePolicy = o.IntegrationOverrides.DatabasePersistencePolicy
}
eversion.CloudLoggingDetails.CloudLoggingSeverity = o.IntegrationOverrides.CloudLoggingDetails.CloudLoggingSeverity
eversion.CloudLoggingDetails.EnableCloudLogging = o.IntegrationOverrides.CloudLoggingDetails.EnableCloudLogging
if o.IntegrationOverrides.RunAsServiceAccount != nil {
eversion.RunAsServiceAccount = *o.IntegrationOverrides.RunAsServiceAccount
}
eversion.EnableVariableMasking = o.IntegrationOverrides.EnableVariableMasking
return eversion, nil
}
func extractOverrides(iversion integrationVersion) (overrides, error) {
taskOverrides := overrides{
IntegrationOverrides: integrationoverrides{
RunAsServiceAccount: nil,
DatabasePersistencePolicy: "DATABASE_PERSISTENCE_POLICY_UNSPECIFIED",
EnableVariableMasking: false,
CloudLoggingDetails: cloudLoggingDetails{
EnableCloudLogging: false,
CloudLoggingSeverity: "CLOUD_LOGGING_SEVERITY_UNSPECIFIED",
},
},
}
for _, task := range iversion.TaskConfigs {
if task.Task == "GenericConnectorTask" {
if err := handleGenericConnectorTask(task, &taskOverrides, iversion.IntegrationConfigParameters); err != nil {
return taskOverrides, err
}
} else if task.Task == "GenericRestV2Task" {
if err := handleGenericRestV2Task(task, &taskOverrides); err != nil {
return taskOverrides, err
}
} else if task.Task == "CloudFunctionTask" {
if err := handleCloudFunctionTask(task, &taskOverrides); err != nil {
return taskOverrides, err
}
}
}
for _, param := range iversion.IntegrationParameters {
if strings.HasPrefix(param.Key, "_") && !inputOutputVariable(param.InputOutputType) {
ip := parameterExternal{}
ip.Key = param.Key
if param.DefaultValue != nil {
ip.DefaultValue = param.DefaultValue
}
taskOverrides.ParamOverrides = append(taskOverrides.ParamOverrides, ip)
}
}
for _, triggerConfig := range iversion.TriggerConfigs {
switch triggerConfig.TriggerType {
case "CLOUD_PUBSUB_EXTERNAL":
subscription := triggerConfig.Properties["Subscription name"]
triggerOverride := triggeroverrides{}
triggerOverride.ProjectId = new(string)
triggerOverride.TopicName = new(string)
*triggerOverride.ProjectId = strings.Split(subscription, "_")[0]
*triggerOverride.TopicName = strings.Split(subscription, "_")[1]
triggerOverride.TriggerNumber = triggerConfig.TriggerNumber
triggerSA := triggerConfig.Properties["Service account"]
if triggerSA != "" {
if defaultSA, err := apiclient.GetComputeEngineDefaultServiceAccount(apiclient.GetProjectID()); err == nil {
if defaultSA != triggerSA {
triggerOverride.ServiceAccount = new(string)
*triggerOverride.ServiceAccount = strings.Split(triggerConfig.Properties["Service account"], "@")[0]
}
} else {
clilog.Warning.Printf("unable to get default Compute Engine Service Account, %v\n", err)
}
}
taskOverrides.TriggerOverrides = append(taskOverrides.TriggerOverrides, triggerOverride)
case "CLOUD_SCHEDULER":
triggerOverride := triggeroverrides{}
triggerOverride.CloudSchedulerServiceAccount = new(string)
triggerOverride.CloudSchedulerLocation = new(string)
triggerOverride.CloudSchedulerCronTab = new(string)
*triggerOverride.CloudSchedulerServiceAccount = triggerConfig.CloudSchedulerConfig.ServiceAccountEmail
*triggerOverride.CloudSchedulerLocation = triggerConfig.CloudSchedulerConfig.Location
*triggerOverride.CloudSchedulerCronTab = triggerConfig.CloudSchedulerConfig.CronTab
taskOverrides.TriggerOverrides = append(taskOverrides.TriggerOverrides, triggerOverride)
case "INTEGRATION_CONNECTOR_TRIGGER":
triggerOverride := triggeroverrides{}
triggerOverride.Properties = triggerConfig.Properties
triggerOverride.TriggerNumber = triggerConfig.TriggerNumber
triggerOverride.TriggerType = triggerConfig.TriggerType
taskOverrides.TriggerOverrides = append(taskOverrides.TriggerOverrides, triggerOverride)
}
}
// handle integration overrides
if iversion.DatabasePersistencePolicy != "" {
taskOverrides.IntegrationOverrides.DatabasePersistencePolicy = iversion.DatabasePersistencePolicy
}
if iversion.RunAsServiceAccount != "" {
taskOverrides.IntegrationOverrides.RunAsServiceAccount = new(string)
*taskOverrides.IntegrationOverrides.RunAsServiceAccount = iversion.RunAsServiceAccount
}
if iversion.EnableVariableMasking {
taskOverrides.IntegrationOverrides.EnableVariableMasking = iversion.EnableVariableMasking
}
if iversion.CloudLoggingDetails.CloudLoggingSeverity != "" {
taskOverrides.IntegrationOverrides.CloudLoggingDetails.CloudLoggingSeverity = iversion.CloudLoggingDetails.CloudLoggingSeverity
}
if iversion.CloudLoggingDetails.EnableCloudLogging {
taskOverrides.IntegrationOverrides.CloudLoggingDetails.EnableCloudLogging = iversion.CloudLoggingDetails.EnableCloudLogging
}
return taskOverrides, nil
}
func inputOutputVariable(variable string) bool {
if variable == "IN" || variable == "OUT" || variable == "IN_OUT" {
return true
}
return false
}
func handleGenericRestV2Task(taskConfig taskconfig, taskOverrides *overrides) error {
tc := taskconfig{}
tc.TaskId = taskConfig.TaskId
tc.Task = taskConfig.Task
tc.Parameters = map[string]eventparameter{}
// store in overrides only if config variables are not used
urlEventParam := taskConfig.Parameters["url"]
if urlEventParam.Value.StringValue != nil && !strings.HasPrefix(*urlEventParam.Value.StringValue, configVarPrefix) {
tc.Parameters["url"] = taskConfig.Parameters["url"]
} else if urlEventParam.Value.IntValue != nil && !strings.HasPrefix(*urlEventParam.Value.IntValue, configVarPrefix) {
tc.Parameters["url"] = taskConfig.Parameters["url"]
}
if _, ok := taskConfig.Parameters["authConfig"]; ok {
displayName, err := authconfigs.GetDisplayName(getAuthConfigUuid(*taskConfig.Parameters["authConfig"].Value.JsonValue))
if err != nil {
return err
}
if displayName != "" {
eventparam := eventparameter{}
eventparam.Key = taskConfig.Parameters["authConfig"].Key
eventparam.Value.StringValue = &displayName
tc.Parameters["authConfig"] = eventparam
}
}
if len(tc.Parameters) > 0 {
taskOverrides.TaskOverrides = append(taskOverrides.TaskOverrides, tc)
}
return nil
}
func handleCloudFunctionTask(taskConfig taskconfig, taskOverrides *overrides) error {
tc := taskconfig{}
tc.TaskId = taskConfig.TaskId
tc.Task = taskConfig.Task
tc.Parameters = map[string]eventparameter{}
tc.Parameters["TriggerUrl"] = taskConfig.Parameters["TriggerUrl"]
if _, ok := taskConfig.Parameters["authConfig"]; ok {
displayName, err := authconfigs.GetDisplayName(getAuthConfigUuid(*taskConfig.Parameters["authConfig"].Value.JsonValue))
if err != nil {
return err
}
if displayName != "" {
eventparam := eventparameter{}
eventparam.Key = taskConfig.Parameters["authConfig"].Key
eventparam.Value.StringValue = &displayName
tc.Parameters["authConfig"] = eventparam
}
}
taskOverrides.TaskOverrides = append(taskOverrides.TaskOverrides, tc)
return nil
}
func handleGenericConnectorTask(taskConfig taskconfig, taskOverrides *overrides, iconfigParam []parameterConfig) error {
co := connectionoverrides{}
co.TaskId = taskConfig.TaskId
co.Task = taskConfig.Task
cparams, ok := taskConfig.Parameters["config"]
connectionNameparams, okConnectionName := taskConfig.Parameters["connectionName"]
if !ok && !okConnectionName {
return nil
}
if connectionNameparams.Key == "connectionName" {
if connectionNameparams.Value.StringValue != nil {
connectionName, err := getConnectionStringFromConnectionName(*connectionNameparams.Value.StringValue, iconfigParam)
if err != nil {
return err
}
parts := strings.Split(connectionName, "/")
connName := parts[len(parts)-1]
co.Parameters.ConnectionName = connName
}
} else if (eventparameter{}) != cparams && ok {
if cparams.Value.JsonValue != nil {
cd, err := getConnectionDetails(*cparams.Value.JsonValue)
if err != nil {
return err
}
parts := strings.Split(cd.Connection.ConnectionName, "/")
connName := parts[len(parts)-1]
co.Parameters.ConnectionName = connName
}
}
taskOverrides.ConnectionOverrides = append(taskOverrides.ConnectionOverrides, co)
return nil
}
// overrideParameters
func overrideParameters(overrideParameters map[string]eventparameter,
taskParameters map[string]eventparameter,
) map[string]eventparameter {
for overrideParamName, overrideParam := range overrideParameters {
if overrideParam.Key == "authConfig" {
apiclient.ClientPrintHttpResponse.Set(false)
acversion, err := authconfigs.Find(*overrideParam.Value.StringValue, "")
apiclient.ClientPrintHttpResponse.Set(apiclient.GetCmdPrintHttpResponseSetting())
if err != nil {
clilog.Warning.Println(err)
return taskParameters
}
*taskParameters[overrideParamName].Value.JsonValue = fmt.Sprintf("%s%s\"}", authConfigValue, acversion)
} else {
_, found := taskParameters[overrideParamName]
if found {
taskParameters[overrideParamName] = overrideParam
} else {
clilog.Warning.Printf("override param %s was not found\n", overrideParamName)
}
}
}
return taskParameters
}
func getNewConnectionParams(connectionName string, connectionLocation string) (cp connectionparams, err error) {
cp = connectionparams{}
var connectionVersionResponse map[string]interface{}
var integrationRegion string
apiclient.ClientPrintHttpResponse.Set(false)
defer apiclient.ClientPrintHttpResponse.Set(apiclient.GetCmdPrintHttpResponseSetting())
if connectionLocation != "" {
integrationRegion = apiclient.GetRegion() // store the integration location
err = apiclient.SetRegion(connectionLocation) // set the connector region
if err != nil {
return cp, err
}
}
connResp, err := connections.Get(connectionName, "BASIC", false, false) // get connector details
if connectionLocation != "" {
err = apiclient.SetRegion(integrationRegion) // set the integration region back
if err != nil {
return cp, err
}
}
if err != nil {
return cp, err
}
err = json.Unmarshal(connResp, &connectionVersionResponse)
if err != nil {
return cp, err
}
cp.ConnectorVersion = fmt.Sprintf("%v", connectionVersionResponse["connectorVersion"])
cp.ServiceName = fmt.Sprintf("%v", connectionVersionResponse["serviceDirectory"])
cp.ConnectionName = fmt.Sprintf("%v", connectionVersionResponse["name"])
return cp, nil
}
func getConnectionDetails(jsonValue string) (connectiondetails, error) {
cd := connectiondetails{}
t := strings.ReplaceAll(jsonValue, "\n", "")
t = strings.ReplaceAll(t, "\\", "")
err := json.Unmarshal([]byte(t), &cd)
return cd, err
}
func stringifyValue(cd connectiondetails) (string, error) {
jsonValue, err := json.Marshal(cd)
if err != nil {
return "", err
}
return string(jsonValue), nil
}
// getConnectionStringFromConnectionName
func getConnectionStringFromConnectionName(connectionName string, iconfigParam []parameterConfig) (connection string, err error) {
var name string
if strings.HasPrefix(connectionName, configVarPrefix) {
for _, param := range iconfigParam {
if param.Parameter.Key == strings.ReplaceAll(connectionName, "$", "") {
if param.Value != nil {
name = *param.Value.StringValue
} else if param.Parameter.DefaultValue != nil {
name = *param.Parameter.DefaultValue.StringValue
}
}
}
} else {
name = connectionName
}
re := regexp.MustCompile(`projects/(.*)/locations/(.*)/connections/(.*)`)
if !re.MatchString(name) {
return "", errors.New("Connection Name is not valid. Connection name should be in the format: projects/{projectId}/locations/{locationId}/connections/{connectionId}")
}
return name, nil
}
func ExtractCode(content []byte) (codeMap map[string]map[string]string, err error) {
codeMap = make(map[string]map[string]string)
codeMap["JavaScriptTask"] = make(map[string]string)
codeMap["JsonnetMapperTask"] = make(map[string]string)
iversion := integrationVersion{}
if err = json.Unmarshal(content, &iversion); err != nil {
return nil, err
}
for _, task := range iversion.TaskConfigs {
if task.Task == "JavaScriptTask" {
codeMap[task.Task][task.TaskId] = *task.Parameters["script"].Value.StringValue
} else if task.Task == "JsonnetMapperTask" {
codeMap[task.Task][task.TaskId] = *task.Parameters["template"].Value.StringValue
}
}
return codeMap, nil
}
func SetCode(content []byte, codeMap map[string]map[string]string) (integrationBytes []byte, err error) {
iversion := integrationVersion{}
if err = json.Unmarshal(content, &iversion); err != nil {
return nil, err
}
for _, task := range iversion.TaskConfigs {
content := codeMap[task.Task][task.TaskId]
if task.Task == "JavaScriptTask" {
if content != "" {
*task.Parameters["script"].Value.StringValue = strings.ReplaceAll(content, "\\n", "\n")
}
} else if task.Task == "JsonnetMapperTask" {
if content != "" {
*task.Parameters["template"].Value.StringValue = strings.ReplaceAll(content, "\\n", "\n")
}
}
}
return json.Marshal(iversion)
}