alicloud/resource_alicloud_emrv2_cluster.go (3,205 lines of code) (raw):

package alicloud import ( "fmt" "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests" "github.com/aliyun/alibaba-cloud-sdk-go/services/ecs" "reflect" "sort" "strings" "sync" "time" "github.com/PaesslerAG/jsonpath" "github.com/hashicorp/terraform-plugin-sdk/helper/resource" "github.com/hashicorp/terraform-plugin-sdk/helper/validation" "github.com/aliyun/terraform-provider-alicloud/alicloud/connectivity" "github.com/hashicorp/terraform-plugin-sdk/helper/schema" ) func resourceAlicloudEmrV2Cluster() *schema.Resource { return &schema.Resource{ Create: resourceAlicloudEmrV2ClusterCreate, Read: resourceAlicloudEmrV2ClusterRead, Update: resourceAlicloudEmrV2ClusterUpdate, Delete: resourceAlicloudEmrV2ClusterDelete, Importer: &schema.ResourceImporter{ State: schema.ImportStatePassthrough, }, Timeouts: &schema.ResourceTimeout{ Create: schema.DefaultTimeout(10 * time.Minute), Delete: schema.DefaultTimeout(5 * time.Minute), }, Schema: map[string]*schema.Schema{ "resource_group_id": { Type: schema.TypeString, Optional: true, Computed: true, }, "payment_type": { Type: schema.TypeString, Optional: true, Computed: true, ValidateFunc: validation.StringInSlice([]string{"PayAsYouGo", "Subscription"}, false), }, "subscription_config": { Type: schema.TypeSet, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "payment_duration_unit": { Type: schema.TypeString, Required: true, ValidateFunc: validation.StringInSlice([]string{"Month", "Year"}, false), }, "payment_duration": { Type: schema.TypeInt, Required: true, ValidateFunc: validation.IntInSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 12, 24, 36, 48, 60}), }, "auto_renew": { Type: schema.TypeBool, Optional: true, }, "auto_pay_order": { Type: schema.TypeBool, Optional: true, }, "auto_renew_duration_unit": { Type: schema.TypeString, Optional: true, ValidateFunc: validation.StringInSlice([]string{"Month", "Year"}, false), }, "auto_renew_duration": { Type: schema.TypeInt, Optional: true, ValidateFunc: validation.IntInSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 12, 24, 36, 48, 60}), }, }, }, MaxItems: 1, }, "cluster_type": { Type: schema.TypeString, Required: true, ForceNew: true, ValidateFunc: validation.StringInSlice([]string{"DATALAKE", "OLAP", "DATAFLOW", "DATASERVING", "CUSTOM"}, false), }, "release_version": { Type: schema.TypeString, Required: true, ForceNew: true, }, "cluster_name": { Type: schema.TypeString, Required: true, }, "log_collect_strategy": { Type: schema.TypeString, Optional: true, Computed: true, }, "deletion_protection": { Type: schema.TypeBool, Optional: true, }, "deploy_mode": { Type: schema.TypeString, Optional: true, ForceNew: true, Computed: true, ValidateFunc: validation.StringInSlice([]string{"NORMAL", "HA"}, false), }, "security_mode": { Type: schema.TypeString, Optional: true, Computed: true, ValidateFunc: validation.StringInSlice([]string{"NORMAL", "KERBEROS"}, false), }, "applications": { Type: schema.TypeSet, Required: true, ForceNew: true, Elem: &schema.Schema{Type: schema.TypeString}, MinItems: 1, }, "application_configs": { Type: schema.TypeSet, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "application_name": { Type: schema.TypeString, Required: true, }, "config_file_name": { Type: schema.TypeString, Required: true, }, "config_item_key": { Type: schema.TypeString, Required: true, }, "config_item_value": { Type: schema.TypeString, Required: true, }, "config_scope": { Type: schema.TypeString, Optional: true, ValidateFunc: validation.StringInSlice([]string{"CLUSTER", "NODE_GROUP"}, false), }, "config_description": { Type: schema.TypeString, Optional: true, }, "node_group_name": { Type: schema.TypeString, Optional: true, }, "node_group_id": { Type: schema.TypeString, Optional: true, }, }, }, }, "node_attributes": { Type: schema.TypeSet, Required: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "vpc_id": { Type: schema.TypeString, Required: true, ForceNew: true, }, "zone_id": { Type: schema.TypeString, Required: true, ForceNew: true, }, "security_group_id": { Type: schema.TypeString, Required: true, ForceNew: true, }, "ram_role": { Type: schema.TypeString, Required: true, ForceNew: true, }, "key_pair_name": { Type: schema.TypeString, Required: true, ForceNew: true, }, "data_disk_encrypted": { Type: schema.TypeBool, Optional: true, Computed: true, ForceNew: true, }, "data_disk_kms_key_id": { Type: schema.TypeString, Optional: true, Computed: true, ForceNew: true, ValidateFunc: validation.NoZeroValues, }, "system_disk_encrypted": { Type: schema.TypeBool, Optional: true, ForceNew: true, }, "system_disk_kms_key_id": { Type: schema.TypeString, Optional: true, ForceNew: true, ValidateFunc: validation.NoZeroValues, }, }, }, ForceNew: true, }, "node_groups": { Type: schema.TypeList, Required: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "node_group_type": { Type: schema.TypeString, Required: true, ValidateFunc: StringInSlice([]string{"MASTER", "CORE", "TASK", "GATEWAY", "MASTER-EXTEND"}, false), }, "node_group_name": { Type: schema.TypeString, Required: true, }, "payment_type": { Type: schema.TypeString, Optional: true, Computed: true, ValidateFunc: validation.StringInSlice([]string{"PayAsYouGo", "Subscription"}, false), }, "deployment_set_strategy": { Type: schema.TypeString, Optional: true, Computed: true, ValidateFunc: validation.StringInSlice([]string{"NONE", "CLUSTER", "NODE_GROUP"}, false), }, "node_resize_strategy": { Type: schema.TypeString, Optional: true, Computed: true, ValidateFunc: validation.StringInSlice([]string{"PRIORITY", "COST_OPTIMIZED"}, false), }, "subscription_config": { Type: schema.TypeSet, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "payment_duration_unit": { Type: schema.TypeString, Required: true, ValidateFunc: validation.StringInSlice([]string{"Month", "Year"}, false), }, "payment_duration": { Type: schema.TypeInt, Required: true, ValidateFunc: validation.IntInSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 12, 24, 36, 48, 60}), }, "auto_renew": { Type: schema.TypeBool, Optional: true, }, "auto_pay_order": { Type: schema.TypeBool, Optional: true, }, "auto_renew_duration_unit": { Type: schema.TypeString, Optional: true, ValidateFunc: validation.StringInSlice([]string{"Month", "Year"}, false), }, "auto_renew_duration": { Type: schema.TypeInt, Optional: true, ValidateFunc: validation.IntInSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 12, 24, 36, 48, 60}), }, }, }, MaxItems: 1, }, "spot_strategy": { Type: schema.TypeString, Optional: true, ValidateFunc: StringInSlice([]string{"NoSpot", "SpotWithPriceLimit", "SpotAsPriceGo"}, false), }, "spot_bid_prices": { Type: schema.TypeSet, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "instance_type": { Type: schema.TypeString, Required: true, }, "bid_price": { Type: schema.TypeInt, Required: true, }, }, }, }, "vswitch_ids": { Type: schema.TypeSet, Optional: true, Elem: &schema.Schema{Type: schema.TypeString}, }, "with_public_ip": { Type: schema.TypeBool, Optional: true, Computed: true, }, "additional_security_group_ids": { Type: schema.TypeSet, Optional: true, Elem: &schema.Schema{Type: schema.TypeString}, }, "instance_types": { Type: schema.TypeSet, Required: true, Elem: &schema.Schema{Type: schema.TypeString}, }, "node_count": { Type: schema.TypeInt, Required: true, }, "system_disk": { Type: schema.TypeSet, Required: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "category": { Type: schema.TypeString, Required: true, ValidateFunc: validation.StringInSlice([]string{"cloud_essd", "cloud_efficiency", "cloud_ssd"}, false), }, "size": { Type: schema.TypeInt, Required: true, }, "performance_level": { Type: schema.TypeString, Optional: true, Computed: true, ValidateFunc: validation.StringInSlice([]string{"PL0", "PL1", "PL2", "PL3"}, false), }, "count": { Type: schema.TypeInt, Optional: true, Computed: true, }, }, }, MaxItems: 1, }, "data_disks": { Type: schema.TypeSet, Required: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "category": { Type: schema.TypeString, Required: true, ValidateFunc: validation.StringInSlice([]string{"cloud_efficiency", "cloud_ssd", "cloud_essd", "cloud", "local_hdd_pro", "local_disk", "local_ssd_pro"}, false), }, "size": { Type: schema.TypeInt, Required: true, }, "performance_level": { Type: schema.TypeString, Optional: true, Computed: true, ValidateFunc: validation.StringInSlice([]string{"PL0", "PL1", "PL2", "PL3"}, false), }, "count": { Type: schema.TypeInt, Optional: true, Computed: true, }, }, }, }, "graceful_shutdown": { Type: schema.TypeBool, Optional: true, Computed: true, }, "spot_instance_remedy": { Type: schema.TypeBool, Optional: true, Computed: true, }, "cost_optimized_config": { Type: schema.TypeSet, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "on_demand_base_capacity": { Type: schema.TypeInt, Required: true, }, "on_demand_percentage_above_base_capacity": { Type: schema.TypeInt, Required: true, }, "spot_instance_pools": { Type: schema.TypeInt, Required: true, }, }, }, MaxItems: 1, }, "auto_scaling_policy": { Type: schema.TypeList, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "scaling_rules": { Type: schema.TypeList, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "rule_name": { Type: schema.TypeString, Required: true, }, "trigger_type": { Type: schema.TypeString, Required: true, ValidateFunc: StringInSlice([]string{"TIME_TRIGGER", "METRICS_TRIGGER"}, false), }, "activity_type": { Type: schema.TypeString, Required: true, ValidateFunc: StringInSlice([]string{"SCALE_OUT", "SCALE_IN"}, false), }, "adjustment_type": { Type: schema.TypeString, Optional: true, ValidateFunc: StringInSlice([]string{"CHANGE_IN_CAPACITY", "EXACT_CAPACITY"}, false), }, "adjustment_value": { Type: schema.TypeInt, Required: true, ValidateFunc: IntBetween(1, 5000), }, "min_adjustment_value": { Type: schema.TypeInt, Optional: true, }, "time_trigger": { Type: schema.TypeList, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "launch_time": { Type: schema.TypeString, Required: true, }, "start_time": { Type: schema.TypeString, Optional: true, }, "end_time": { Type: schema.TypeString, Optional: true, }, "launch_expiration_time": { Type: schema.TypeInt, Optional: true, ValidateFunc: IntBetween(0, 3600), }, "recurrence_type": { Type: schema.TypeString, Optional: true, ValidateFunc: StringInSlice([]string{"MINUTELY", "HOURLY", "DAILY", "WEEKLY", "MONTHLY"}, false), }, "recurrence_value": { Type: schema.TypeString, Optional: true, }, }, }, MaxItems: 1, }, "metrics_trigger": { Type: schema.TypeList, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "time_window": { Type: schema.TypeInt, Required: true, ValidateFunc: IntBetween(30, 1800), }, "evaluation_count": { Type: schema.TypeInt, Required: true, ValidateFunc: IntBetween(1, 5), }, "cool_down_interval": { Type: schema.TypeInt, Optional: true, ValidateFunc: IntBetween(0, 10800), }, "condition_logic_operator": { Type: schema.TypeString, Optional: true, ValidateFunc: StringInSlice([]string{"And", "Or"}, false), }, "time_constraints": { Type: schema.TypeList, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "start_time": { Type: schema.TypeString, Optional: true, }, "end_time": { Type: schema.TypeString, Optional: true, }, }, }, }, "conditions": { Type: schema.TypeList, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "metric_name": { Type: schema.TypeString, Required: true, }, "statistics": { Type: schema.TypeString, Required: true, ValidateFunc: StringInSlice([]string{"MAX", "MIN", "AVG"}, false), }, "comparison_operator": { Type: schema.TypeString, Required: true, ValidateFunc: StringInSlice([]string{"EQ", "NE", "GT", "LT", "GE", "LE"}, false), }, "threshold": { Type: schema.TypeFloat, Required: true, }, "tags": { Type: schema.TypeList, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "key": { Type: schema.TypeString, Required: true, }, "value": { Type: schema.TypeString, Optional: true, }, }, }, }, }, }, }, }, }, MaxItems: 1, }, }, }, }, "constraints": { Type: schema.TypeList, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "max_capacity": { Type: schema.TypeInt, Optional: true, ValidateFunc: IntBetween(0, 2000), }, "min_capacity": { Type: schema.TypeInt, Optional: true, ValidateFunc: IntBetween(0, 2000), }, }, }, MaxItems: 1, }, }, }, MaxItems: 1, }, "ack_config": { Type: schema.TypeList, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "ack_instance_id": { Type: schema.TypeString, Required: true, }, "node_selectors": { Type: schema.TypeSet, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "key": { Type: schema.TypeString, Required: true, }, "value": { Type: schema.TypeString, Optional: true, }, }, }, }, "tolerations": { Type: schema.TypeList, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "key": { Type: schema.TypeString, Optional: true, }, "value": { Type: schema.TypeString, Optional: true, }, "operator": { Type: schema.TypeString, Optional: true, }, "effect": { Type: schema.TypeString, Optional: true, }, }, }, }, "namespace": { Type: schema.TypeString, Required: true, }, "request_cpu": { Type: schema.TypeFloat, Required: true, }, "request_memory": { Type: schema.TypeFloat, Required: true, }, "limit_cpu": { Type: schema.TypeFloat, Required: true, }, "limit_memory": { Type: schema.TypeFloat, Required: true, }, "custom_labels": { Type: schema.TypeSet, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "key": { Type: schema.TypeString, Required: true, }, "value": { Type: schema.TypeString, Optional: true, }, }, }, }, "custom_annotations": { Type: schema.TypeSet, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "key": { Type: schema.TypeString, Required: true, }, "value": { Type: schema.TypeString, Optional: true, }, }, }, }, "pvcs": { Type: schema.TypeList, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "name": { Type: schema.TypeString, Required: true, }, "path": { Type: schema.TypeString, Required: true, }, "data_disk_storage_class": { Type: schema.TypeString, Required: true, }, "data_disk_size": { Type: schema.TypeInt, Required: true, }, }, }, }, "volumes": { Type: schema.TypeList, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "name": { Type: schema.TypeString, Required: true, }, "path": { Type: schema.TypeString, Required: true, }, "type": { Type: schema.TypeString, Required: true, }, }, }, }, "volume_mounts": { Type: schema.TypeList, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "name": { Type: schema.TypeString, Required: true, }, "path": { Type: schema.TypeString, Required: true, }, }, }, }, "pre_start_command": { Type: schema.TypeList, Optional: true, Elem: &schema.Schema{Type: schema.TypeString}, }, "pod_affinity": { Type: schema.TypeString, Optional: true, }, "pod_anti_affinity": { Type: schema.TypeString, Optional: true, }, "node_affinity": { Type: schema.TypeString, Optional: true, }, }, }, MaxItems: 1, }, }, }, MinItems: 1, }, "bootstrap_scripts": { Type: schema.TypeList, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "script_name": { Type: schema.TypeString, Required: true, }, "script_path": { Type: schema.TypeString, Required: true, }, "script_args": { Type: schema.TypeString, Required: true, }, "priority": { Type: schema.TypeInt, Optional: true, Deprecated: "Field 'priority' has been deprecated from provider version 1.227.0.", }, "execution_moment": { Type: schema.TypeString, Required: true, ValidateFunc: StringInSlice([]string{"BEFORE_INSTALL", "AFTER_STARTED", "BEFORE_START"}, false), }, "execution_fail_strategy": { Type: schema.TypeString, Required: true, ValidateFunc: validation.StringInSlice([]string{"FAILED_CONTINUE", "FAILED_BLOCK"}, false), }, "node_selector": { Type: schema.TypeSet, Required: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "node_select_type": { Type: schema.TypeString, Required: true, ValidateFunc: validation.StringInSlice([]string{"CLUSTER", "NODE_GROUP", "NODE"}, false), }, "node_names": { Type: schema.TypeList, Optional: true, Elem: &schema.Schema{Type: schema.TypeString}, }, "node_group_id": { Type: schema.TypeString, Optional: true, Deprecated: "Field 'node_group_id' has been deprecated from provider version 1.227.0. New field 'node_group_ids' replaces it.", }, "node_group_ids": { Type: schema.TypeList, Optional: true, Elem: &schema.Schema{Type: schema.TypeString}, }, "node_group_types": { Type: schema.TypeList, Optional: true, Elem: &schema.Schema{Type: schema.TypeString}, }, "node_group_name": { Type: schema.TypeString, Optional: true, Deprecated: "Field 'node_group_name' has been deprecated from provider version 1.227.0. New field 'node_group_names' replaces it.", }, "node_group_names": { Type: schema.TypeList, Optional: true, Elem: &schema.Schema{Type: schema.TypeString}, }, }, }, MaxItems: 1, }, }, }, }, "tags": tagsSchema(), }, } } func resourceAlicloudEmrV2ClusterCreate(d *schema.ResourceData, meta interface{}) error { client := meta.(*connectivity.AliyunClient) var response map[string]interface{} action := "RunCluster" var err error createClusterRequest := map[string]interface{}{ "RegionId": client.RegionId, } if v, ok := d.GetOk("resource_group_id"); ok { createClusterRequest["ResourceGroupId"] = v } if v, ok := d.GetOk("payment_type"); ok { createClusterRequest["PaymentType"] = v } if v, ok := d.GetOk("subscription_config"); ok { subscriptionConfig := v.(*schema.Set).List() if len(subscriptionConfig) == 1 { subscriptionConfigSource := subscriptionConfig[0].(map[string]interface{}) subscriptionConfigMap := map[string]interface{}{ "PaymentDurationUnit": subscriptionConfigSource["payment_duration_unit"], "PaymentDuration": subscriptionConfigSource["payment_duration"], "AutoRenew": subscriptionConfigSource["auto_renew"], "AutoRenewDurationUnit": subscriptionConfigSource["auto_renew_duration_unit"], "AutoRenewDuration": subscriptionConfigSource["auto_renew_duration"], } if value, exists := subscriptionConfigSource["auto_pay_order"]; exists { subscriptionConfigMap["AutoPayOrder"] = value.(bool) } else { subscriptionConfigMap["AutoPayOrder"] = true } createClusterRequest["SubscriptionConfig"] = convertMapToJsonStringIgnoreError(subscriptionConfigMap) } } if v, ok := d.GetOk("cluster_type"); ok { createClusterRequest["ClusterType"] = v } if v, ok := d.GetOk("release_version"); ok { createClusterRequest["ReleaseVersion"] = v } if v, ok := d.GetOk("cluster_name"); ok { createClusterRequest["ClusterName"] = v } if v, ok := d.GetOk("deploy_mode"); ok { createClusterRequest["DeployMode"] = v } if v, ok := d.GetOk("security_mode"); ok { createClusterRequest["SecurityMode"] = v } if v, ok := d.GetOk("log_collect_strategy"); ok && v.(string) != "" { createClusterRequest["LogCollectStrategy"] = v.(string) } if v, ok := d.GetOk("deletion_protection"); ok { createClusterRequest["DeletionProtection"] = v } applications := make([]map[string]interface{}, 0) if apps, ok := d.GetOk("applications"); ok { for _, application := range apps.(*schema.Set).List() { applications = append(applications, map[string]interface{}{"ApplicationName": application.(string)}) } } createClusterRequest["Applications"], _ = convertListMapToJsonString(applications) applicationConfigs := make([]map[string]interface{}, 0) if appConfigs, ok := d.GetOk("application_configs"); ok { for _, appConfig := range appConfigs.(*schema.Set).List() { applicationConfig := map[string]interface{}{} kv := appConfig.(map[string]interface{}) if v, ok := kv["application_name"]; ok { applicationConfig["ApplicationName"] = v } if v, ok := kv["config_file_name"]; ok { applicationConfig["ConfigFileName"] = v } if v, ok := kv["config_item_key"]; ok { applicationConfig["ConfigItemKey"] = v } if v, ok := kv["config_item_value"]; ok { applicationConfig["ConfigItemValue"] = v } if v, ok := kv["config_scope"]; ok { applicationConfig["ConfigScope"] = v } if v, ok := kv["node_group_name"]; ok { applicationConfig["NodeGroupName"] = v } if v, ok := kv["node_group_id"]; ok { applicationConfig["NodeGroupId"] = v } applicationConfigs = append(applicationConfigs, applicationConfig) } } createClusterRequest["ApplicationConfigs"], _ = convertListMapToJsonString(applicationConfigs) if v, ok := d.GetOk("node_attributes"); ok { nodeAttributes := v.(*schema.Set).List() if len(nodeAttributes) == 1 { nodeAttributesSource := nodeAttributes[0].(map[string]interface{}) nodeAttributesSourceMap := map[string]interface{}{ "VpcId": nodeAttributesSource["vpc_id"], "ZoneId": nodeAttributesSource["zone_id"], "SecurityGroupId": nodeAttributesSource["security_group_id"], "RamRole": nodeAttributesSource["ram_role"], "KeyPairName": nodeAttributesSource["key_pair_name"], "DataDiskEncrypted": nodeAttributesSource["data_disk_encrypted"], "DataDiskKMSKeyId": nodeAttributesSource["data_disk_kms_key_id"], } if value, exists := nodeAttributesSource["system_disk_encrypted"]; exists { nodeAttributesSourceMap["SystemDiskEncrypted"] = value.(bool) } if value, exists := nodeAttributesSource["system_disk_kms_key_id"]; exists && value.(string) != "" { nodeAttributesSourceMap["SystemDiskKMSKeyId"] = value.(string) } createClusterRequest["NodeAttributes"] = convertMapToJsonStringIgnoreError(nodeAttributesSourceMap) } } nodeGroups := make([]map[string]interface{}, 0) if nodeGroupsList, ok := d.GetOk("node_groups"); ok { for _, nodeGroupItem := range nodeGroupsList.([]interface{}) { nodeGroup := map[string]interface{}{} kv := nodeGroupItem.(map[string]interface{}) if v, ok := kv["node_group_type"]; ok { nodeGroup["NodeGroupType"] = v } if v, ok := kv["node_group_name"]; ok { nodeGroup["NodeGroupName"] = v } if v, ok := kv["payment_type"]; ok { nodeGroup["PaymentType"] = v } if v, ok := kv["deployment_set_strategy"]; ok && v.(string) != "" { nodeGroup["DeploymentSetStrategy"] = v.(string) } if v, ok := kv["node_resize_strategy"]; ok && v.(string) != "" { nodeGroup["NodeResizeStrategy"] = v.(string) } if v, ok := kv["spot_strategy"]; ok && v.(string) != "" { nodeGroup["SpotStrategy"] = v.(string) } if v, ok := kv["subscription_config"]; ok { subscriptionConfigs := v.(*schema.Set).List() if len(subscriptionConfigs) == 1 { subscriptionConfig := map[string]interface{}{} subscriptionConfigMap := subscriptionConfigs[0].(map[string]interface{}) if value, exists := subscriptionConfigMap["payment_duration_unit"]; exists { subscriptionConfig["PaymentDurationUnit"] = value } if value, exists := subscriptionConfigMap["payment_duration"]; exists { subscriptionConfig["PaymentDuration"] = value } if value, exists := subscriptionConfigMap["auto_renew"]; exists { subscriptionConfig["AutoRenew"] = value } if value, exists := subscriptionConfigMap["auto_renew_duration_unit"]; exists { subscriptionConfig["AutoRenewDurationUnit"] = value } if value, exists := subscriptionConfigMap["auto_renew_duration"]; exists { subscriptionConfig["AutoRenewDuration"] = value } if value, exists := subscriptionConfigMap["auto_pay_order"]; exists { subscriptionConfig["AutoPayOrder"] = value.(bool) } else { subscriptionConfig["AutoPayOrder"] = true } nodeGroup["SubscriptionConfig"] = subscriptionConfig } } if v, ok := kv["spot_bid_prices"]; ok { spotBidPriceList := v.(*schema.Set).List() if len(spotBidPriceList) > 0 { spotBidPrices := make([]map[string]interface{}, 0) for _, spotBidPriceSource := range spotBidPriceList { spotBidPrice := map[string]interface{}{} spotBidPriceMap := spotBidPriceSource.(map[string]interface{}) if value, exists := spotBidPriceMap["instance_type"]; exists { spotBidPrice["InstanceType"] = value } if value, exists := spotBidPriceMap["bid_price"]; exists { spotBidPrice["BidPrice"] = value } spotBidPrices = append(spotBidPrices, spotBidPrice) } nodeGroup["SpotBidPrices"] = spotBidPrices } } if v, ok := kv["vswitch_ids"]; ok { var vSwitchIds []string for _, vSwitchId := range v.(*schema.Set).List() { vSwitchIds = append(vSwitchIds, vSwitchId.(string)) } nodeGroup["VSwitchIds"] = vSwitchIds } if v, ok := kv["with_public_ip"]; ok { nodeGroup["WithPublicIp"] = v } if v, ok := kv["additional_security_group_ids"]; ok { var additionalSecurityGroupIds []string for _, additionalSecurityGroupId := range v.(*schema.Set).List() { additionalSecurityGroupIds = append(additionalSecurityGroupIds, additionalSecurityGroupId.(string)) } nodeGroup["AdditionalSecurityGroupIds"] = additionalSecurityGroupIds } if v, ok := kv["instance_types"]; ok { var instanceTypes []string for _, instanceType := range v.(*schema.Set).List() { instanceTypes = append(instanceTypes, instanceType.(string)) } nodeGroup["InstanceTypes"] = instanceTypes } if v, ok := kv["node_count"]; ok { nodeGroup["NodeCount"] = v } if v, ok := kv["system_disk"]; ok { systemDisks := v.(*schema.Set).List() if len(systemDisks) == 1 { systemDisk := map[string]interface{}{} systemDiskMap := systemDisks[0].(map[string]interface{}) if value, exists := systemDiskMap["category"]; exists { systemDisk["Category"] = value } if value, exists := systemDiskMap["size"]; exists { systemDisk["Size"] = value } if value, exists := systemDiskMap["performance_level"]; exists && value.(string) != "" { systemDisk["PerformanceLevel"] = value } if value, exists := systemDiskMap["count"]; exists { systemDisk["Count"] = value } nodeGroup["SystemDisk"] = systemDisk } } if v, ok := kv["data_disks"]; ok { dataDiskList := v.(*schema.Set).List() if len(dataDiskList) > 0 { dataDisks := make([]map[string]interface{}, 0) for _, dataDiskSource := range dataDiskList { dataDisk := map[string]interface{}{} dataDiskMap := dataDiskSource.(map[string]interface{}) if value, exists := dataDiskMap["category"]; exists { dataDisk["Category"] = value } if value, exists := dataDiskMap["size"]; exists { dataDisk["Size"] = value } if value, exists := dataDiskMap["performance_level"]; exists && value.(string) != "" { dataDisk["PerformanceLevel"] = value } if value, exists := dataDiskMap["count"]; exists { dataDisk["Count"] = value } dataDisks = append(dataDisks, dataDisk) } nodeGroup["DataDisks"] = dataDisks } } if v, ok := kv["graceful_shutdown"]; ok { nodeGroup["GracefulShutdown"] = v } if v, ok := kv["spot_instance_remedy"]; ok { nodeGroup["SpotInstanceRemedy"] = v } if v, ok := kv["cost_optimized_config"]; ok { costOptimizedConfigs := v.(*schema.Set).List() if len(costOptimizedConfigs) == 1 { costOptimizedConfig := map[string]interface{}{} costOptimizedConfigMap := costOptimizedConfigs[0].(map[string]interface{}) if value, exists := costOptimizedConfigMap["on_demand_base_capacity"]; exists { costOptimizedConfig["OnDemandBaseCapacity"] = value } if value, exists := costOptimizedConfigMap["on_demand_percentage_above_base_capacity"]; exists { costOptimizedConfig["OnDemandPercentageAboveBaseCapacity"] = value } if value, exists := costOptimizedConfigMap["spot_instance_pools"]; exists { costOptimizedConfig["SpotInstancePools"] = value } nodeGroup["CostOptimizedConfig"] = costOptimizedConfig } } if v, ok := kv["auto_scaling_policy"]; ok { scalingPolicies := v.([]interface{}) if len(scalingPolicies) == 1 { nodeGroup["AutoScalingPolicy"] = adaptAutoScalingPolicyRequest(scalingPolicies[0].(map[string]interface{})) } } if v, ok := kv["ack_config"]; ok { ackConfig := v.([]interface{}) if len(ackConfig) == 1 { nodeGroup["AckConfig"] = adaptAckConfigRequest(ackConfig[0].(map[string]interface{})) nodeGroup["IaaSType"] = "K8S" delete(nodeGroup, "InstanceTypes") } } nodeGroups = append(nodeGroups, nodeGroup) } } createClusterRequest["NodeGroups"], _ = convertListMapToJsonString(nodeGroups) if scripts, ok := d.GetOk("bootstrap_scripts"); ok { bootstrapScripts := make([]map[string]interface{}, 0) for _, script := range scripts.([]interface{}) { kv := script.(map[string]interface{}) bootstrapScript := map[string]interface{}{} if v, ok := kv["script_name"]; ok { bootstrapScript["ScriptName"] = v } if v, ok := kv["script_path"]; ok { bootstrapScript["ScriptPath"] = v } if v, ok := kv["script_args"]; ok { bootstrapScript["ScriptArgs"] = v } if v, ok := kv["priority"]; ok { bootstrapScript["Priority"] = v } if v, ok := kv["execution_moment"]; ok { bootstrapScript["ExecutionMoment"] = v } if v, ok := kv["execution_fail_strategy"]; ok { bootstrapScript["ExecutionFailStrategy"] = v } if v, ok := kv["node_selector"]; ok { nodeSelectors := v.(*schema.Set).List() if len(nodeSelectors) == 1 { nodeSelector := map[string]interface{}{} nodeSelectorMap := nodeSelectors[0].(map[string]interface{}) if value, exists := nodeSelectorMap["node_select_type"]; exists { nodeSelector["NodeSelectType"] = value } if value, exists := nodeSelectorMap["node_names"]; exists { var nodeNames []string for _, nodeName := range value.([]interface{}) { nodeNames = append(nodeNames, nodeName.(string)) } nodeSelector["NodeNames"] = nodeNames } if value, exists := nodeSelectorMap["node_group_id"]; exists { nodeSelector["NodeGroupId"] = value } if value, exists := nodeSelectorMap["node_group_ids"]; exists { var nodeGroupIds []string for _, ngId := range value.([]interface{}) { nodeGroupIds = append(nodeGroupIds, ngId.(string)) } nodeSelector["NodeGroupIds"] = nodeGroupIds } if value, exists := nodeSelectorMap["node_group_types"]; exists { var nodeGroupTypes []string for _, nodeGroupType := range value.([]interface{}) { nodeGroupTypes = append(nodeGroupTypes, nodeGroupType.(string)) } nodeSelector["NodeGroupTypes"] = nodeGroupTypes } if value, exists := nodeSelectorMap["node_group_name"]; exists { nodeSelector["NodeGroupName"] = value } if value, exists := nodeSelectorMap["node_group_names"]; exists { var nodeGroupNames []string for _, ngName := range value.([]interface{}) { nodeGroupNames = append(nodeGroupNames, ngName.(string)) } nodeSelector["NodeGroupNames"] = nodeGroupNames } bootstrapScript["NodeSelector"] = nodeSelector } } bootstrapScripts = append(bootstrapScripts, bootstrapScript) } createClusterRequest["BootstrapScripts"], _ = convertListMapToJsonString(bootstrapScripts) } if v, ok := d.GetOk("tags"); ok { tags := make([]map[string]interface{}, 0) for key, value := range v.(map[string]interface{}) { tags = append(tags, map[string]interface{}{ "Key": key, "Value": value, }) } createClusterRequest["Tags"], _ = convertListMapToJsonString(tags) } wait := incrementalWait(3*time.Second, 5*time.Second) err = resource.Retry(d.Timeout(schema.TimeoutCreate), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, createClusterRequest, true) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, createClusterRequest) if err != nil { return WrapErrorf(err, DefaultErrorMsg, "alicloud_emrv2_cluster", action, AlibabaCloudSdkGoERROR) } d.SetId(fmt.Sprint(response["ClusterId"])) emrService := EmrService{client} stateConf := BuildStateConf([]string{"STARTING"}, []string{"RUNNING"}, d.Timeout(schema.TimeoutCreate), 90*time.Second, emrService.EmrV2ClusterStateRefreshFunc(d.Id(), []string{"START_FAILED", "TERMINATED_WITH_ERRORS", "TERMINATED"})) stateConf.PollInterval = 10 * time.Second if _, err := stateConf.WaitForState(); err != nil { return WrapErrorf(err, IdMsg, d.Id()) } return resourceAlicloudEmrV2ClusterRead(d, meta) } func resourceAlicloudEmrV2ClusterRead(d *schema.ResourceData, meta interface{}) error { client := meta.(*connectivity.AliyunClient) emrService := EmrService{client} object, err := emrService.GetEmrV2Cluster(d.Id()) if err != nil { if NotFoundError(err) { d.SetId("") return nil } return WrapError(err) } d.Set("cluster_name", object["ClusterName"]) d.Set("cluster_type", object["ClusterType"]) d.Set("payment_type", object["PaymentType"]) d.Set("release_version", object["ReleaseVersion"]) d.Set("deploy_mode", object["DeployMode"]) d.Set("security_mode", object["SecurityMode"]) d.Set("resource_group_id", object["ResourceGroupId"]) d.Set("log_collect_strategy", object["LogCollectStrategy"]) d.Set("deletion_protection", object["DeletionProtection"]) if _, ok := object["SubscriptionConfig"]; ok { sc := d.Get("subscription_config").(*schema.Set).List() if len(sc) > 0 { d.Set("subscription_config", []map[string]interface{}{sc[0].(map[string]interface{})}) } } var nodeAttributes []map[string]interface{} if v, ok := object["NodeAttributes"]; ok { nodeAttributesMap := v.(map[string]interface{}) nodeAttribute := map[string]interface{}{ "vpc_id": nodeAttributesMap["VpcId"], "zone_id": nodeAttributesMap["ZoneId"], "security_group_id": nodeAttributesMap["SecurityGroupId"], "ram_role": nodeAttributesMap["RamRole"], "key_pair_name": nodeAttributesMap["KeyPairName"], "data_disk_encrypted": nodeAttributesMap["DataDiskEncrypted"], "system_disk_encrypted": nodeAttributesMap["SystemDiskEncrypted"], } if v, exists := nodeAttributesMap["DataDiskKMSKeyId"]; exists && v.(string) != "" { nodeAttribute["data_disk_kms_key_id"] = v } if v, exists := nodeAttributesMap["SystemDiskKMSKeyId"]; exists && v.(string) != "" { nodeAttribute["system_disk_kms_key_id"] = v } oldNodeAttributes := d.Get("node_attributes") if oldNodeAttributes != nil && oldNodeAttributes.(*schema.Set).Len() > 0 { oldNodeAttributesMap := d.Get("node_attributes").(*schema.Set).List()[0].(map[string]interface{}) if _, exists := oldNodeAttributesMap["system_disk_encrypted"]; exists { nodeAttribute["system_disk_encrypted"] = nodeAttributesMap["SystemDiskEncrypted"] } if value, exists := oldNodeAttributesMap["system_disk_kms_key_id"]; exists && value != "" { nodeAttribute["system_disk_kms_key_id"] = nodeAttributesMap["SystemDiskKMSKeyId"] } } nodeAttributes = append(nodeAttributes, nodeAttribute) d.Set("node_attributes", nodeAttributes) } var response map[string]interface{} action := "ListApplications" request := map[string]interface{}{ "RegionId": client.RegionId, "ClusterId": d.Id(), } wait := incrementalWait(3*time.Second, 3*time.Second) err = resource.Retry(5*time.Minute, func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, request, true) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, request) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } v, err := jsonpath.Get("$.Applications", response) if err != nil { return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.Applications", response) } if v != nil && len(v.([]interface{})) > 0 { emrInternalApp := map[string]struct{}{ "TRE-EXTENSION": {}, "KERBEROS": {}, "METASTORE": {}, "MYSQL": {}, "SPARK-EXTENSION": {}, "SPARK-NATIVE": {}, "TAIHAODOCTOR": {}, "EMRHOOK": {}, "JINDOSDK": {}, "TRE": {}, "EMRRUNTIME": {}, "EMRRUNTIME-EXTENSION": {}, } var applications []string for _, item := range v.([]interface{}) { app := strings.ToUpper(item.(map[string]interface{})["ApplicationName"].(string)) if _, ok := emrInternalApp[app]; ok { continue } applications = append(applications, app) } d.Set("applications", applications) } if v, ok := d.GetOk("application_configs"); ok && len(v.(*schema.Set).List()) > 0 { var applicationConfigs []map[string]interface{} for _, ac := range v.(*schema.Set).List() { applicationConfigs = append(applicationConfigs, ac.(map[string]interface{})) } d.Set("application_configs", applicationConfigs) } action = "ListScripts" listScriptsRequest := map[string]interface{}{ "RegionId": client.RegionId, "ClusterId": d.Id(), "ScriptType": "BOOTSTRAP", } err = resource.Retry(5*time.Minute, func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, listScriptsRequest, true) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, request) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } v, err = jsonpath.Get("$.Scripts", response) if err != nil { return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.Scripts", response) } if v != nil && len(v.([]interface{})) > 0 { scriptsMaps := v.([]interface{}) var scripts []map[string]interface{} for _, item := range scriptsMaps { scriptMap := item.(map[string]interface{}) script := map[string]interface{}{ "script_name": scriptMap["ScriptName"], "script_path": scriptMap["ScriptPath"], "script_args": scriptMap["ScriptArgs"], "execution_moment": scriptMap["ExecutionMoment"], "execution_fail_strategy": scriptMap["ExecutionFailStrategy"], } if v, ok := scriptMap["NodeSelector"]; ok && len(v.(map[string]interface{})) > 0 { nodeSelectorMap := v.(map[string]interface{}) nodeSelector := map[string]interface{}{ "node_select_type": nodeSelectorMap["NodeSelectType"], } if nodeGroupId, exists := nodeSelectorMap["NodeGroupId"]; exists && nodeGroupId.(string) != "" { nodeSelector["node_group_id"] = nodeGroupId } if ngIDs, exists := nodeSelectorMap["NodeGroupIds"]; exists && len(ngIDs.([]interface{})) > 0 { var nodeGroupIDs []string for _, ngID := range ngIDs.([]interface{}) { nodeGroupIDs = append(nodeGroupIDs, ngID.(string)) } nodeSelector["node_group_ids"] = nodeGroupIDs } if nodeGroupName, exists := nodeSelectorMap["NodeGroupName"]; exists && nodeGroupName.(string) != "" { nodeSelector["node_group_name"] = nodeGroupName } if ngNames, exists := nodeSelectorMap["NodeGroupNames"]; exists && len(ngNames.([]interface{})) > 0 { var nodeGroupNames []string for _, ngName := range ngNames.([]interface{}) { nodeGroupNames = append(nodeGroupNames, ngName.(string)) } nodeSelector["node_group_names"] = nodeGroupNames } if ngTypes, exists := nodeSelectorMap["NodeGroupTypes"]; exists && len(ngTypes.([]interface{})) > 0 { var nodeGroupTypes []string for _, ngType := range ngTypes.([]interface{}) { nodeGroupTypes = append(nodeGroupTypes, ngType.(string)) } nodeSelector["node_group_types"] = nodeGroupTypes } if nn, exists := nodeSelectorMap["NodeNames"]; exists && len(nn.([]interface{})) > 0 { var nodeNames []string for _, nodeName := range nn.([]interface{}) { nodeNames = append(nodeNames, nodeName.(string)) } nodeSelector["node_names"] = nodeNames } script["node_selector"] = []map[string]interface{}{nodeSelector} } scripts = append(scripts, script) } d.Set("bootstrap_scripts", scripts) } action = "ListNodeGroups" request["MaxResults"] = PageSizeLarge request["NodeGroupStates"] = []string{"RUNNING"} var nodeGroupObjects []interface{} for { err = resource.Retry(5*time.Minute, func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, request, true) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, request) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } nodeGroupResp, err := jsonpath.Get("$.NodeGroups", response) if err != nil { return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.NodeGroups", response) } if nodeGroupResp != nil && len(nodeGroupResp.([]interface{})) > 0 { for _, ngr := range nodeGroupResp.([]interface{}) { nodeGroupObjects = append(nodeGroupObjects, ngr) } _, nextTokenExists := response["NextToken"] if len(nodeGroupResp.([]interface{})) < PageSizeLarge { break } else if len(nodeGroupResp.([]interface{})) == PageSizeLarge && !nextTokenExists { break } } else { break } nextToken, err := jsonpath.Get("$.NextToken", response) if err != nil { return WrapErrorf(err, FailedGetAttributeMsg, action, "$.NextToken", response) } request["NextToken"] = nextToken } if len(nodeGroupObjects) > 0 { v = nodeGroupObjects } delete(request, "NextToken") if v != nil && len(v.([]interface{})) > 0 { oldNodeGroupsMap := map[string]map[string]interface{}{} indexNodeGroupMap := map[string]int{} if oldNodeGroups, ok := d.GetOk("node_groups"); ok { for index, item := range oldNodeGroups.([]interface{}) { oldNodeGroup := item.(map[string]interface{}) oldNodeGroupsMap[oldNodeGroup["node_group_name"].(string)] = oldNodeGroup indexNodeGroupMap[oldNodeGroup["node_group_name"].(string)] = index } } nodeGroupMaps := v.([]interface{}) var nodeGroups []map[string]interface{} for _, item := range nodeGroupMaps { nodeGroupMap := item.(map[string]interface{}) nodeGroup := map[string]interface{}{ "node_group_type": nodeGroupMap["NodeGroupType"], "node_group_name": nodeGroupMap["NodeGroupName"], "payment_type": nodeGroupMap["PaymentType"], "with_public_ip": nodeGroupMap["WithPublicIp"], "graceful_shutdown": nodeGroupMap["GracefulShutdown"], "spot_instance_remedy": nodeGroupMap["SpotInstanceRemedy"], "node_count": nodeGroupMap["RunningNodeCount"], } if oldNodeGroup, exists := oldNodeGroupsMap[nodeGroupMap["NodeGroupName"].(string)]; exists { if v, ok := oldNodeGroup["subscription_config"]; ok { var subscriptionConfig []map[string]interface{} for _, item := range v.(*schema.Set).List() { subscriptionConfig = append(subscriptionConfig, item.(map[string]interface{})) } nodeGroup["subscription_config"] = subscriptionConfig } if v, ok := oldNodeGroup["spot_bid_prices"]; ok { var spotBidPrices []map[string]interface{} for _, item := range v.(*schema.Set).List() { spotBidPrices = append(spotBidPrices, item.(map[string]interface{})) } nodeGroup["spot_bid_prices"] = spotBidPrices } if v, ok := oldNodeGroup["auto_scaling_policy"]; ok && len(v.([]interface{})) == 1 { var scalingPolicies []map[string]interface{} scalingPolicy := map[string]interface{}{} scalingPolicyMap := v.([]interface{})[0].(map[string]interface{}) if scalingPolicyValue, scalingPolicyExists := scalingPolicyMap["scaling_rules"]; scalingPolicyExists && len(scalingPolicyValue.([]interface{})) > 0 { var scalingRules []map[string]interface{} for _, sr := range scalingPolicyValue.([]interface{}) { scalingRule := map[string]interface{}{} scalingRuleMap := sr.(map[string]interface{}) if scalingRuleValue, scalingRuleExists := scalingRuleMap["rule_name"]; scalingRuleExists && scalingRuleValue.(string) != "" { scalingRule["rule_name"] = scalingRuleValue } if scalingRuleValue, scalingRuleExists := scalingRuleMap["trigger_type"]; scalingRuleExists && scalingRuleValue.(string) != "" { scalingRule["trigger_type"] = scalingRuleValue } if scalingRuleValue, scalingRuleExists := scalingRuleMap["activity_type"]; scalingRuleExists && scalingRuleValue.(string) != "" { scalingRule["activity_type"] = scalingRuleValue } if scalingRuleValue, scalingRuleExists := scalingRuleMap["adjustment_type"]; scalingRuleExists && scalingRuleValue.(string) != "" { scalingRule["adjustment_type"] = scalingRuleValue } if scalingRuleValue, scalingRuleExists := scalingRuleMap["adjustment_value"]; scalingRuleExists && scalingRuleValue.(int) != 0 { scalingRule["adjustment_value"] = scalingRuleValue } if scalingRuleValue, scalingRuleExists := scalingRuleMap["min_adjustment_value"]; scalingRuleExists { scalingRule["min_adjustment_value"] = scalingRuleValue } if scalingRuleValue, scalingRuleExists := scalingRuleMap["time_trigger"]; scalingRuleExists && len(scalingRuleValue.([]interface{})) > 0 { var timeTriggers []map[string]interface{} for _, tt := range scalingRuleValue.([]interface{}) { timeTrigger := map[string]interface{}{} timeTriggerMap := tt.(map[string]interface{}) if timeTriggerValue, timeTriggerExists := timeTriggerMap["launch_time"]; timeTriggerExists && timeTriggerValue.(string) != "" { timeTrigger["launch_time"] = timeTriggerValue } if timeTriggerValue, timeTriggerExists := timeTriggerMap["start_time"]; timeTriggerExists && timeTriggerValue.(string) != "" { timeTrigger["start_time"] = timeTriggerValue } if timeTriggerValue, timeTriggerExists := timeTriggerMap["end_time"]; timeTriggerExists && timeTriggerValue.(string) != "" { timeTrigger["end_time"] = timeTriggerValue } if timeTriggerValue, timeTriggerExists := timeTriggerMap["launch_expiration_time"]; timeTriggerExists && timeTriggerValue.(int) != 0 { timeTrigger["launch_expiration_time"] = timeTriggerValue } if timeTriggerValue, timeTriggerExists := timeTriggerMap["recurrence_type"]; timeTriggerExists && timeTriggerValue.(string) != "" { timeTrigger["recurrence_type"] = timeTriggerValue } if timeTriggerValue, timeTriggerExists := timeTriggerMap["recurrence_value"]; timeTriggerExists && timeTriggerValue.(string) != "" { timeTrigger["recurrence_value"] = timeTriggerValue } timeTriggers = append(timeTriggers, timeTrigger) } scalingRule["time_trigger"] = timeTriggers } if scalingRuleValue, scalingRuleExists := scalingRuleMap["metrics_trigger"]; scalingRuleExists && len(scalingRuleValue.([]interface{})) > 0 { var metricsTriggers []map[string]interface{} for _, mt := range scalingRuleValue.([]interface{}) { metricsTrigger := map[string]interface{}{} metricsTriggerMap := mt.(map[string]interface{}) if metricsTriggerValue, metricsTriggerExists := metricsTriggerMap["time_window"]; metricsTriggerExists && metricsTriggerValue.(int) != 0 { metricsTrigger["time_window"] = metricsTriggerValue } if metricsTriggerValue, metricsTriggerExists := metricsTriggerMap["evaluation_count"]; metricsTriggerExists && metricsTriggerValue.(int) != 0 { metricsTrigger["evaluation_count"] = metricsTriggerValue } if metricsTriggerValue, metricsTriggerExists := metricsTriggerMap["cool_down_interval"]; metricsTriggerExists && metricsTriggerValue.(int) != 0 { metricsTrigger["cool_down_interval"] = metricsTriggerValue } if metricsTriggerValue, metricsTriggerExists := metricsTriggerMap["condition_logic_operator"]; metricsTriggerExists && metricsTriggerValue.(string) != "" { metricsTrigger["condition_logic_operator"] = metricsTriggerValue } if metricsTriggerValue, metricsTriggerExists := metricsTriggerMap["time_constraints"]; metricsTriggerExists && len(metricsTriggerValue.([]interface{})) > 0 { var timeConstraints []map[string]interface{} for _, tc := range metricsTriggerValue.([]interface{}) { timeConstraint := map[string]interface{}{} timeConstraintMap := tc.(map[string]interface{}) if timeConstraintValue, timeConstraintExists := timeConstraintMap["start_time"]; timeConstraintExists && timeConstraintValue.(string) != "" { timeConstraint["start_time"] = timeConstraintValue } if timeConstraintValue, timeConstraintExists := timeConstraintMap["end_time"]; timeConstraintExists && timeConstraintValue.(string) != "" { timeConstraint["end_time"] = timeConstraintValue } timeConstraints = append(timeConstraints, timeConstraint) } metricsTrigger["time_constraints"] = timeConstraints } if metricsTriggerValue, metricsTriggerExists := metricsTriggerMap["conditions"]; metricsTriggerExists && len(metricsTriggerValue.([]interface{})) > 0 { var conditions []map[string]interface{} for _, cd := range metricsTriggerValue.([]interface{}) { condition := map[string]interface{}{} conditionMap := cd.(map[string]interface{}) if conditionValue, conditionExists := conditionMap["metric_name"]; conditionExists && conditionValue.(string) != "" { condition["metric_name"] = conditionValue } if conditionValue, conditionExists := conditionMap["statistics"]; conditionExists && conditionValue.(string) != "" { condition["statistics"] = conditionValue } if conditionValue, conditionExists := conditionMap["comparison_operator"]; conditionExists && conditionValue.(string) != "" { condition["comparison_operator"] = conditionValue } if conditionValue, conditionExists := conditionMap["threshold"]; conditionExists && conditionValue.(float64) != 0.0 { condition["threshold"] = conditionValue } if conditionValue, conditionExists := conditionMap["tags"]; conditionExists && len(conditionValue.([]interface{})) > 0 { var tags []map[string]interface{} for _, tg := range conditionValue.([]interface{}) { tag := map[string]interface{}{} tagMap := tg.(map[string]interface{}) if tagValue, tagExists := tagMap["key"]; tagExists && tagValue.(string) != "" { tag["key"] = tagValue } if tagValue, tagExists := tagMap["value"]; tagExists && tagValue.(string) != "" { tag["value"] = tagValue } tags = append(tags, tag) } condition["tags"] = tags } conditions = append(conditions, condition) } metricsTrigger["conditions"] = conditions } metricsTriggers = append(metricsTriggers, metricsTrigger) } scalingRule["metrics_trigger"] = metricsTriggers } scalingRules = append(scalingRules, scalingRule) } scalingPolicy["scaling_rules"] = scalingRules } if scalingPolicyValue, scalingPolicyExists := scalingPolicyMap["constraints"]; scalingPolicyExists && len(scalingPolicyValue.([]interface{})) > 0 { var constraints []map[string]interface{} for _, ct := range scalingPolicyValue.([]interface{}) { constraint := map[string]interface{}{} constraintMap := ct.(map[string]interface{}) if constraintValue, constraintExists := constraintMap["max_capacity"]; constraintExists && constraintValue.(int) != 0 { constraint["max_capacity"] = constraintValue } if constraintValue, constraintExists := constraintMap["min_capacity"]; constraintExists && constraintValue.(int) != 0 { constraint["min_capacity"] = constraintValue } constraints = append(constraints, constraint) } scalingPolicy["constraints"] = constraints } scalingPolicies = append(scalingPolicies, scalingPolicy) nodeGroup["auto_scaling_policy"] = scalingPolicies } } if v, ok := nodeGroupMap["VSwitchIds"]; ok && len(v.([]interface{})) > 0 { var vSwitchIDs []string for _, item := range v.([]interface{}) { vSwitchIDs = append(vSwitchIDs, item.(string)) } nodeGroup["vswitch_ids"] = vSwitchIDs } if v, ok := nodeGroupMap["AdditionalSecurityGroupIds"]; ok && len(v.([]interface{})) > 0 { var additionalSecurityGroupIDs []string for _, item := range v.([]interface{}) { additionalSecurityGroupIDs = append(additionalSecurityGroupIDs, item.(string)) } nodeGroup["additional_security_group_ids"] = additionalSecurityGroupIDs } if v, ok := nodeGroupMap["InstanceTypes"]; ok && len(v.([]interface{})) > 0 { var instanceTypes []string for _, item := range v.([]interface{}) { instanceTypes = append(instanceTypes, item.(string)) } nodeGroup["instance_types"] = instanceTypes } if v, ok := nodeGroupMap["AckConfig"]; ok && len(v.(map[string]interface{})) > 0 { var ackConfigs []map[string]interface{} ackConfig := map[string]interface{}{} m := v.(map[string]interface{}) ackConfig["ack_instance_id"] = m["AckInstanceId"] if value, exists := m["NodeSelectors"]; exists && len(value.([]interface{})) > 0 { var nodeSelectors []map[string]interface{} for _, ackConfigValue := range value.([]interface{}) { nodeSelectors = append(nodeSelectors, map[string]interface{}{ "key": ackConfigValue.(map[string]interface{})["Key"], "value": ackConfigValue.(map[string]interface{})["Value"], }) } ackConfig["node_selectors"] = nodeSelectors } if value, exists := m["Tolerations"]; exists && len(value.([]interface{})) > 0 { var tolerations []map[string]interface{} for _, ackConfigValue := range value.([]interface{}) { tolerations = append(tolerations, map[string]interface{}{ "key": ackConfigValue.(map[string]interface{})["Key"], "value": ackConfigValue.(map[string]interface{})["Value"], "operator": ackConfigValue.(map[string]interface{})["Operator"], "effect": ackConfigValue.(map[string]interface{})["Effect"], }) } ackConfig["tolerations"] = tolerations } if value, exists := m["Namespace"]; exists && value.(string) != "" { ackConfig["namespace"] = value } if value, exists := m["RequestCpu"]; exists { ackConfig["request_cpu"] = value } if value, exists := m["RequestMemory"]; exists { ackConfig["request_memory"] = value } if value, exists := m["LimitCpu"]; exists { ackConfig["limit_cpu"] = value } if value, exists := m["LimitMemory"]; exists { ackConfig["limit_memory"] = value } if value, exists := m["CustomLabels"]; exists && len(value.([]interface{})) > 0 { var customLabels []map[string]interface{} for _, ackConfigValue := range value.([]interface{}) { customLabels = append(customLabels, map[string]interface{}{ "key": ackConfigValue.(map[string]interface{})["Key"], "value": ackConfigValue.(map[string]interface{})["Value"], }) } ackConfig["custom_labels"] = customLabels } if value, exists := m["CustomAnnotations"]; exists && len(value.([]interface{})) > 0 { var customAnnotations []map[string]interface{} for _, ackConfigValue := range value.([]interface{}) { customAnnotations = append(customAnnotations, map[string]interface{}{ "key": ackConfigValue.(map[string]interface{})["Key"], "value": ackConfigValue.(map[string]interface{})["Value"], }) } ackConfig["custom_annotations"] = customAnnotations } if value, exists := m["Pvcs"]; exists && len(value.([]interface{})) > 0 { var pvcs []map[string]interface{} for _, ackConfigValue := range value.([]interface{}) { pvcs = append(pvcs, map[string]interface{}{ "name": ackConfigValue.(map[string]interface{})["Name"], "path": ackConfigValue.(map[string]interface{})["Path"], "data_disk_storage_class": ackConfigValue.(map[string]interface{})["DataDiskStorageClass"], "data_disk_size": ackConfigValue.(map[string]interface{})["DataDiskSize"], }) } ackConfig["pvcs"] = pvcs } if value, exists := m["Volumes"]; exists && len(value.([]interface{})) > 0 { var volumes []map[string]interface{} for _, ackConfigValue := range value.([]interface{}) { volumes = append(volumes, map[string]interface{}{ "name": ackConfigValue.(map[string]interface{})["Name"], "path": ackConfigValue.(map[string]interface{})["Path"], "type": ackConfigValue.(map[string]interface{})["Type"], }) } ackConfig["volumes"] = volumes } if value, exists := m["VolumeMounts"]; exists && len(value.([]interface{})) > 0 { var volumeMounts []map[string]interface{} for _, ackConfigValue := range value.([]interface{}) { volumeMounts = append(volumeMounts, map[string]interface{}{ "name": ackConfigValue.(map[string]interface{})["Name"], "path": ackConfigValue.(map[string]interface{})["Path"], }) } ackConfig["volume_mounts"] = volumeMounts } if value, exists := m["PreStartCommand"]; exists && len(value.([]interface{})) > 0 { var preStartCommands []string for _, ackConfigValue := range value.([]interface{}) { preStartCommands = append(preStartCommands, ackConfigValue.(string)) } ackConfig["pre_start_command"] = preStartCommands } if value, exists := m["PodAffinity"]; exists && value.(string) != "" { ackConfig["pod_affinity"] = value } if value, exists := m["PodAntiAffinity"]; exists && value.(string) != "" { ackConfig["pod_anti_affinity"] = value } if value, exists := m["NodeAffinity"]; exists && value.(string) != "" { ackConfig["node_affinity"] = value } ackConfigs = append(ackConfigs, ackConfig) nodeGroup["ack_config"] = ackConfigs if ong, exists := oldNodeGroupsMap[nodeGroupMap["NodeGroupName"].(string)]; exists { if instValue, instExists := ong["instance_types"]; instExists { var instanceTypes []string for _, item := range instValue.(*schema.Set).List() { instanceTypes = append(instanceTypes, item.(string)) } nodeGroup["instance_types"] = instanceTypes } } } if oldNodeGroup, exists := oldNodeGroupsMap[nodeGroupMap["NodeGroupName"].(string)]; exists { if v, ok := oldNodeGroup["deployment_set_strategy"]; ok && v.(string) != "" { deploymentSetStrategy := v.(string) if st, stExists := nodeGroupMap["DeploymentSetStrategy"]; stExists && st.(string) != "" { deploymentSetStrategy = st.(string) } nodeGroup["deployment_set_strategy"] = deploymentSetStrategy } } if oldNodeGroup, exists := oldNodeGroupsMap[nodeGroupMap["NodeGroupName"].(string)]; exists { if v, ok := oldNodeGroup["node_resize_strategy"]; ok && v.(string) != "" { nodeGroup["node_resize_strategy"] = v.(string) } if v, ok := oldNodeGroup["spot_strategy"]; ok && v.(string) != "" { nodeGroup["spot_strategy"] = v.(string) } if v, ok := oldNodeGroup["cost_optimized_config"]; ok && len(v.(*schema.Set).List()) > 0 { var costOptimizedConfig []map[string]interface{} for _, coc := range v.(*schema.Set).List() { costOptimizedConfig = append(costOptimizedConfig, coc.(map[string]interface{})) } nodeGroup["cost_optimized_config"] = costOptimizedConfig } } if v, ok := nodeGroupMap["SystemDisk"]; ok { systemDiskMap := v.(map[string]interface{}) systemDisk := map[string]interface{}{ "category": systemDiskMap["Category"], "size": formatInt(systemDiskMap["Size"]), } if oldNodeGroup, exists := oldNodeGroupsMap[nodeGroupMap["NodeGroupName"].(string)]; exists { oldSystemDisk := oldNodeGroup["system_disk"].(*schema.Set).List()[0].(map[string]interface{}) if v, exists := oldSystemDisk["performance_level"]; exists && v != nil && v.(string) != "" { systemDisk["performance_level"] = v } if v, exists := oldSystemDisk["count"]; exists && v != nil { if count := formatInt(v); count > 0 { systemDisk["count"] = count } } } nodeGroup["system_disk"] = []map[string]interface{}{systemDisk} } if v, ok := nodeGroupMap["DataDisks"]; ok && len(v.([]interface{})) > 0 { var dataDisks []map[string]interface{} for _, item := range v.([]interface{}) { dataDisksMap := item.(map[string]interface{}) dataDisk := map[string]interface{}{ "category": dataDisksMap["Category"], "size": formatInt(dataDisksMap["Size"]), "count": formatInt(dataDisksMap["Count"]), } if oldNodeGroup, exists := oldNodeGroupsMap[nodeGroupMap["NodeGroupName"].(string)]; exists { oldDataDisk := oldNodeGroup["data_disks"].(*schema.Set).List()[0].(map[string]interface{}) if v, exists := oldDataDisk["performance_level"]; exists && v != nil && v.(string) != "" { dataDisk["performance_level"] = v } if v, exists := oldDataDisk["count"]; exists && v != nil && v.(int) != 0 { if count := formatInt(v); count > 0 { dataDisk["count"] = formatInt(v) } } } dataDisks = append(dataDisks, dataDisk) } nodeGroup["data_disks"] = dataDisks } nodeGroups = append(nodeGroups, nodeGroup) } sort.Slice(nodeGroups, func(i, j int) bool { return indexNodeGroupMap[nodeGroups[i]["node_group_name"].(string)] < indexNodeGroupMap[nodeGroups[j]["node_group_name"].(string)] }) d.Set("node_groups", nodeGroups) } tags, err := emrService.ListTagResourcesNew(d.Id(), string(TagResourceCluster)) if err != nil { return WrapError(err) } d.Set("tags", tagsToMap(tags)) return nil } func resourceAlicloudEmrV2ClusterUpdate(d *schema.ResourceData, meta interface{}) error { client := meta.(*connectivity.AliyunClient) var err error var response map[string]interface{} d.Partial(true) emrService := EmrService{client} if err := emrService.SetEmrClusterTagsNew(d); err != nil { return WrapError(err) } if d.HasChange("cluster_name") || d.HasChange("log_collect_strategy") || d.HasChange("deletion_protection") { action := "UpdateClusterAttribute" request := map[string]interface{}{ "ClusterId": d.Id(), "RegionId": client.RegionId, } if d.HasChange("cluster_name") && d.Get("cluster_name").(string) != "" { request["ClusterName"] = d.Get("cluster_name") } if d.HasChange("log_collect_strategy") && d.Get("log_collect_strategy").(string) != "" { request["LogCollectStrategy"] = d.Get("log_collect_strategy") } if d.HasChange("deletion_protection") { request["DeletionProtection"] = d.Get("deletion_protection") } wait := incrementalWait(3*time.Second, 3*time.Second) err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, request, false) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } addDebug(action, response, request) return nil }) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } d.SetPartial("cluster_name") } if d.HasChange("payment_type") { _, newPaymentType := d.GetChange("payment_type") if "PayAsYouGo" == newPaymentType.(string) { return WrapError(Error("EMR cluster can only change paymentType from PayAsYouGo to Subscription.")) } if !d.HasChange("node_groups") { return WrapError(Error("Subscription paymentType of emr cluster can not contains PayAsYouGo node group with 'MASTER' or 'CORE'.")) } } if d.HasChange("node_groups") { oldNodeGroupsList, newNodeGroupsList := d.GetChange("node_groups") listNodeGroupsRequest := map[string]interface{}{ "ClusterId": d.Id(), "RegionId": client.RegionId, "MaxResults": PageSizeLarge, } action := "ListNodeGroups" wait := incrementalWait(3*time.Second, 5*time.Second) err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, listNodeGroupsRequest, false) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, listNodeGroupsRequest) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } resp, err := jsonpath.Get("$.NodeGroups", response) if err != nil { return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.NodeGroups", response) } oldNodeGroupMap := map[string]map[string]interface{}{} for _, nodeGroupItem := range resp.([]interface{}) { oldNodeGroup := nodeGroupItem.(map[string]interface{}) oldNodeGroupMap[oldNodeGroup["NodeGroupName"].(string)] = oldNodeGroup } originNodeGroupMap := map[string]map[string]interface{}{} for _, nodeGroupItem := range oldNodeGroupsList.([]interface{}) { originNodeGroup := nodeGroupItem.(map[string]interface{}) originNodeGroupMap[originNodeGroup["node_group_name"].(string)] = originNodeGroup } newNodeGroupMap := map[string]map[string]interface{}{} for _, nodeGroupItem := range newNodeGroupsList.([]interface{}) { newNodeGroup := nodeGroupItem.(map[string]interface{}) newNodeGroupMap[newNodeGroup["node_group_name"].(string)] = newNodeGroup } isUpdateClusterPaymentType := false if d.HasChange("payment_type") { action = "UpdateClusterPaymentType" updateClusterPaymentTypeRequest := map[string]interface{}{ "ClusterId": d.Id(), "RegionId": client.RegionId, } if sc, ok := d.GetOk("subscription_config"); ok { if autoPay, exists := sc.(*schema.Set).List()[0].(map[string]interface{})["auto_pay_order"]; exists { updateClusterPaymentTypeRequest["AutoPayOrder"] = autoPay.(bool) } else { updateClusterPaymentTypeRequest["AutoPayOrder"] = true } if autoRenew, exists := sc.(*schema.Set).List()[0].(map[string]interface{})["auto_renew"]; exists { updateClusterPaymentTypeRequest["AutoRenew"] = autoRenew.(bool) } else { updateClusterPaymentTypeRequest["AutoRenew"] = false } } var convertNodeGroups []map[string]interface{} for originNodeGroupName := range originNodeGroupMap { convertNodeGroup := map[string]interface{}{ "NodeGroupId": oldNodeGroupMap[originNodeGroupName]["NodeGroupId"], } if newNodeGroup, ok := newNodeGroupMap[originNodeGroupName]; ok { if newNodeGroupValue, newNodeGroupExists := newNodeGroup["payment_type"]; newNodeGroupExists && "Subscription" == newNodeGroupValue { convertNodeGroup["PaymentType"] = newNodeGroupValue } else if "PayAsYouGo" == newNodeGroupValue && ("MASTER" == newNodeGroup["node_group_type"] || "CORE" == newNodeGroup["node_group_type"]) { return WrapError(Error("Subscription paymentType of emr cluster can not contains PayAsYouGo node group with 'MASTER' or 'CORE'.")) } else { continue } if subscriptionConfig, exists := newNodeGroup["subscription_config"]; exists && len(subscriptionConfig.(*schema.Set).List()) > 0 { subscriptionConfigMap := subscriptionConfig.(*schema.Set).List()[0].(map[string]interface{}) if subscriptionConfigValue, subscriptionConfigExists := subscriptionConfigMap["payment_duration"]; subscriptionConfigExists { convertNodeGroup["PaymentDuration"] = subscriptionConfigValue } if subscriptionConfigValue, subscriptionConfigExists := subscriptionConfigMap["payment_duration_unit"]; subscriptionConfigExists { convertNodeGroup["PaymentDurationUnit"] = subscriptionConfigValue } } else { return WrapError(Error("The '%s' nodeGroup: '%s' is needed parameter 'subscription_config' for changing paymentType.", newNodeGroup["node_group_type"], newNodeGroup["node_group_name"])) } convertNodeGroups = append(convertNodeGroups, convertNodeGroup) } } updateClusterPaymentTypeRequest["NodeGroups"] = convertNodeGroups wait = incrementalWait(3*time.Second, 5*time.Second) err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, updateClusterPaymentTypeRequest, false) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, updateClusterPaymentTypeRequest) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } // Wait for cluster payment type has been changed if err = resource.Retry(5*time.Minute, func() *resource.RetryError { if cluster, err := emrService.GetEmrV2Cluster(d.Id()); err != nil { return resource.NonRetryableError(err) } else if cluster["PaymentType"].(string) == "Subscription" { return nil } return resource.RetryableError(Error("Waiting for cluster %s payment type to be changed.", d.Id())) }); err != nil { return WrapError(err) } isUpdateClusterPaymentType = true } var increaseNodesGroups []map[string]interface{} var decreaseNodesGroups []map[string]interface{} for nodeGroupName, newNodeGroup := range newNodeGroupMap { if oldNodeGroup, ok := oldNodeGroupMap[nodeGroupName]; ok { if !reflect.DeepEqual(originNodeGroupMap[nodeGroupName]["auto_scaling_policy"], newNodeGroup["auto_scaling_policy"]) { removeScalingPolicy := false if len(newNodeGroup["auto_scaling_policy"].([]interface{})) > 0 { adaptedScalingPolicy := adaptAutoScalingPolicyRequest(newNodeGroup["auto_scaling_policy"].([]interface{})[0].(map[string]interface{})) rulesExists := false constraintsExists := false if aspValue, aspExists := adaptedScalingPolicy["scalingRules"]; aspExists { rulesExists = aspExists adaptedScalingPolicy["ScalingRules"] = aspValue delete(adaptedScalingPolicy, "scalingRules") } if aspValue, aspExists := adaptedScalingPolicy["constraints"]; aspExists { constraintsExists = aspExists adaptedScalingPolicy["Constraints"] = aspValue delete(adaptedScalingPolicy, "constraints") } if rulesExists || constraintsExists { adaptedScalingPolicy["RegionId"] = client.RegionId adaptedScalingPolicy["ClusterId"] = d.Id() adaptedScalingPolicy["NodeGroupId"] = oldNodeGroup["NodeGroupId"] action = "PutAutoScalingPolicy" err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, adaptedScalingPolicy, false) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, adaptedScalingPolicy) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } } else { removeScalingPolicy = true } } else { removeScalingPolicy = true } if removeScalingPolicy { removeScalingPolicyRequest := map[string]interface{}{ "RegionId": client.RegionId, "ClusterId": d.Id(), "NodeGroupId": oldNodeGroup["NodeGroupId"], } action = "RemoveAutoScalingPolicy" err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, removeScalingPolicyRequest, false) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, removeScalingPolicyRequest) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } } } if !reflect.DeepEqual(originNodeGroupMap[nodeGroupName]["ack_config"], newNodeGroup["ack_config"]) && "K8S" == oldNodeGroup["IaaSType"] { ackConfigs := newNodeGroup["ack_config"].([]interface{}) if len(ackConfigs) > 0 { updateNodeGroupAttributesRequest := map[string]interface{}{ "RegionId": client.RegionId, "ClusterId": d.Id(), "NodeGroupId": oldNodeGroup["NodeGroupId"], "AckConfig": adaptAckConfigRequest(ackConfigs[0].(map[string]interface{})), } action = "UpdateNodeGroupAttributes" err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, updateNodeGroupAttributesRequest, false) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, updateNodeGroupAttributesRequest) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } } } newNodeCount := formatInt(newNodeGroup["node_count"]) oldNodeCount := formatInt(oldNodeGroup["RunningNodeCount"]) // increase nodes if oldNodeCount < newNodeCount { if "MASTER" == newNodeGroup["node_group_type"].(string) { return WrapError(Error("EMR cluster can not increase the node group type of ['MASTER'].")) } count := newNodeCount - oldNodeCount increaseNodesGroup := map[string]interface{}{} increaseNodesGroup["RegionId"] = client.RegionId increaseNodesGroup["ClusterId"] = d.Id() increaseNodesGroup["NodeGroupId"] = oldNodeGroup["NodeGroupId"] increaseNodesGroup["IncreaseNodeCount"] = count increaseNodesGroup["AutoRenew"] = false if "Subscription" == newNodeGroup["payment_type"].(string) { subscriptionConfig := newNodeGroup["subscription_config"].(*schema.Set).List() if len(subscriptionConfig) == 1 { configMap := subscriptionConfig[0].(map[string]interface{}) increaseNodesGroup["PaymentDuration"] = configMap["payment_duration"] increaseNodesGroup["PaymentDurationUnit"] = configMap["payment_duration_unit"] if value, exists := configMap["auto_pay_order"]; exists { increaseNodesGroup["AutoPayOrder"] = value.(bool) } else { increaseNodesGroup["AutoPayOrder"] = true } if value, exists := configMap["auto_renew"]; exists { increaseNodesGroup["AutoRenew"] = value.(bool) } } } increaseNodesGroups = append(increaseNodesGroups, increaseNodesGroup) } else if oldNodeCount > newNodeCount { // decrease nodes // EMR cluster can only decrease 'TASK, GATEWAY' node group. nodeGroupType := newNodeGroup["node_group_type"].(string) if "TASK" != nodeGroupType && "GATEWAY" != nodeGroupType { return WrapError(Error("EMR cluster can only decrease the node group type of ['TASK', 'GATEWAY'].")) } decreaseNodesGroup := map[string]interface{}{ "ClusterId": d.Id(), "RegionId": client.RegionId, "DecreaseNodeCount": oldNodeCount - newNodeCount, "NodeGroupId": oldNodeGroup["NodeGroupId"], } decreaseNodesGroups = append(decreaseNodesGroups, decreaseNodesGroup) } // increase node disk size, we can only support single disk type. currDataDisk := oldNodeGroup["DataDisks"].([]interface{})[0].(map[string]interface{}) targetDataDisk := newNodeGroup["data_disks"].(*schema.Set).List()[0].(map[string]interface{}) if formatInt(targetDataDisk["size"]) < formatInt(currDataDisk["Size"]) { return WrapError(Error("EMR cluster can only increase node disk, decrease node disk is not supported.")) } else if formatInt(targetDataDisk["size"]) > formatInt(currDataDisk["Size"]) { if currDataDisk["Category"].(string) == "local_hdd_pro" { return WrapError(Error("EMR cluster can not support increase node disk with 'local_hdd_pro' disk type.")) } action := "IncreaseNodesDiskSize" increaseNodeDiskSizeRequest := map[string]interface{}{ "ClusterId": d.Id(), "RegionId": client.RegionId, "NodeGroupId": oldNodeGroup["NodeGroupId"], "DataDiskSizes": []map[string]interface{}{ { "Category": currDataDisk["Category"].(string), "Size": formatInt(targetDataDisk["size"]), }, }, } wait := incrementalWait(3*time.Second, 5*time.Second) err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, increaseNodeDiskSizeRequest, false) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, increaseNodeDiskSizeRequest) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } } } else { // 'Task' NodeGroupType may not exist when create emr_cluster subscriptionConfig := map[string]interface{}{} if "Subscription" == newNodeGroup["payment_type"] { subscriptionMap := newNodeGroup["subscription_config"].(*schema.Set).List()[0].(map[string]interface{}) subscriptionConfig["PaymentDurationUnit"] = subscriptionMap["payment_duration_unit"] subscriptionConfig["PaymentDuration"] = subscriptionMap["payment_duration"] subscriptionConfig["AutoRenewDurationUnit"] = subscriptionMap["auto_renew_duration_unit"] subscriptionConfig["AutoRenewDuration"] = subscriptionMap["auto_renew_duration"] subscriptionConfig["AutoRenew"] = false if value, exists := subscriptionMap["auto_pay_order"]; exists { subscriptionConfig["AutoPayOrder"] = value.(bool) } else { subscriptionConfig["AutoPayOrder"] = true } if value, exists := subscriptionMap["auto_renew"]; exists { subscriptionConfig["AutoRenew"] = value.(bool) } } var spotBidPrices []map[string]interface{} for _, v := range newNodeGroup["spot_bid_prices"].(*schema.Set).List() { sbpMap := v.(map[string]interface{}) spotBidPrices = append(spotBidPrices, map[string]interface{}{ "InstanceType": sbpMap["instance_type"], "BidPrice": sbpMap["bid_price"], }) } systemDiskMap := newNodeGroup["system_disk"].(*schema.Set).List()[0].(map[string]interface{}) var dataDisks []map[string]interface{} for _, v := range newNodeGroup["data_disks"].(*schema.Set).List() { dataDiskMap := v.(map[string]interface{}) dataDisk := map[string]interface{}{ "Category": dataDiskMap["category"], "Size": dataDiskMap["size"], "Count": dataDiskMap["count"], } if value, exists := dataDiskMap["performance_level"]; exists && value.(string) != "" { dataDisk["PerformanceLevel"] = value.(string) } dataDisks = append(dataDisks, dataDisk) } nodeGroupParam := map[string]interface{}{ "NodeGroupType": newNodeGroup["node_group_type"], "NodeGroupName": nodeGroupName, "PaymentType": newNodeGroup["payment_type"], "SubscriptionConfig": subscriptionConfig, "SpotBidPrices": spotBidPrices, "WithPublicIp": newNodeGroup["with_public_ip"], "NodeCount": newNodeGroup["node_count"], "DataDisks": dataDisks, "GracefulShutdown": newNodeGroup["graceful_shutdown"], "SpotInstanceRemedy": newNodeGroup["spot_instance_remedy"], } if value, exists := newNodeGroup["auto_scaling_policy"]; exists && len(value.([]interface{})) > 0 { nodeGroupParam["AutoScalingPolicy"] = adaptAutoScalingPolicyRequest(value.([]interface{})[0].(map[string]interface{})) } if value, exists := newNodeGroup["deployment_set_strategy"]; exists && value.(string) != "" { nodeGroupParam["DeploymentSetStrategy"] = value.(string) } if value, exists := newNodeGroup["node_resize_strategy"]; exists && value.(string) != "" { nodeGroupParam["NodeResizeStrategy"] = value.(string) } if value, exists := newNodeGroup["spot_strategy"]; exists && value.(string) != "" { nodeGroupParam["SpotStrategy"] = value.(string) } vSwitchIDList := newNodeGroup["vswitch_ids"].(*schema.Set).List() if len(vSwitchIDList) > 0 { var vSwitchIDs []string for _, vSwitchID := range vSwitchIDList { vSwitchIDs = append(vSwitchIDs, vSwitchID.(string)) } nodeGroupParam["VSwitchIds"] = vSwitchIDs } systemDisk := map[string]interface{}{ "Category": systemDiskMap["category"], "Size": systemDiskMap["size"], "Count": systemDiskMap["count"], } if value, exists := systemDiskMap["performance_level"]; exists && value.(string) != "" { systemDisk["PerformanceLevel"] = value.(string) } nodeGroupParam["SystemDisk"] = systemDisk instanceTypeList := newNodeGroup["instance_types"].(*schema.Set).List() if len(instanceTypeList) > 0 { var instanceTypes []string for _, instanceType := range instanceTypeList { instanceTypes = append(instanceTypes, instanceType.(string)) } nodeGroupParam["InstanceTypes"] = instanceTypes } if value, exists := newNodeGroup["ack_config"]; exists && len(value.([]interface{})) > 0 { nodeGroupParam["AckConfig"] = adaptAckConfigRequest(value.([]interface{})[0].(map[string]interface{})) nodeGroupParam["IaaSType"] = "K8S" delete(nodeGroupParam, "InstanceTypes") } addSecurityGroupIDList := newNodeGroup["additional_security_group_ids"].(*schema.Set).List() if len(addSecurityGroupIDList) > 0 { var addSecurityGroupIDs []string for _, addSecurityGroupID := range addSecurityGroupIDList { addSecurityGroupIDs = append(addSecurityGroupIDs, addSecurityGroupID.(string)) } nodeGroupParam["AdditionalSecurityGroupIds"] = addSecurityGroupIDs } costOptimizedConfigList := newNodeGroup["cost_optimized_config"].(*schema.Set).List() if len(costOptimizedConfigList) > 0 { costOptimizedConfig := costOptimizedConfigList[0].(map[string]interface{}) nodeGroupParam["CostOptimizedConfig"] = map[string]interface{}{ "OnDemandBaseCapacity": costOptimizedConfig["on_demand_base_capacity"], "OnDemandPercentageAboveBaseCapacity": costOptimizedConfig["on_demand_percentage_above_base_capacity"], "SpotInstancePools": costOptimizedConfig["spot_instance_pools"], } } createNodeGroupRequest := map[string]interface{}{ "ClusterId": d.Id(), "RegionId": client.RegionId, "NodeGroup": nodeGroupParam, } action = "CreateNodeGroup" err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, createNodeGroupRequest, false) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, createNodeGroupRequest) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } listNodeGroupsRequest := map[string]interface{}{ "ClusterId": d.Id(), "NodeGroupNames": []string{nodeGroupName}, "RegionId": client.RegionId, } action = "ListNodeGroups" err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, listNodeGroupsRequest, false) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, listNodeGroupsRequest) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } resp, err := jsonpath.Get("$.NodeGroups", response) if err != nil { return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.NodeGroups", response) } if len(resp.([]interface{})) == 0 { continue } nodeGroupId := resp.([]interface{})[0].(map[string]interface{})["NodeGroupId"].(string) newNodeCount := formatInt(newNodeGroup["node_count"]) if newNodeCount > 0 { increaseNodesGroup := map[string]interface{}{} increaseNodesGroup["RegionId"] = client.RegionId increaseNodesGroup["ClusterId"] = d.Id() increaseNodesGroup["NodeGroupId"] = nodeGroupId increaseNodesGroup["IncreaseNodeCount"] = newNodeCount increaseNodesGroup["AutoPayOrder"] = true increaseNodesGroup["AutoRenew"] = false if "Subscription" == newNodeGroup["payment_type"].(string) { subscriptionConfig := newNodeGroup["subscription_config"].(*schema.Set).List() if len(subscriptionConfig) == 1 { configMap := subscriptionConfig[0].(map[string]interface{}) increaseNodesGroup["PaymentDuration"] = configMap["payment_duration"] increaseNodesGroup["PaymentDurationUnit"] = configMap["payment_duration_unit"] if value, exists := configMap["auto_pay_order"]; exists { increaseNodesGroup["AutoPayOrder"] = value.(bool) } if value, exists := configMap["auto_renew"]; exists { increaseNodesGroup["AutoRenew"] = value.(bool) } } } increaseNodesGroups = append(increaseNodesGroups, increaseNodesGroup) } } } var deleteNodeGroups []map[string]interface{} for nodeGroupName, oldNodeGroup := range oldNodeGroupMap { // Delete empty nodeGroup if newNodeGroup, ok := newNodeGroupMap[nodeGroupName]; !ok { oldNodeCount := formatInt(oldNodeGroup["RunningNodeCount"]) if oldNodeCount > 0 { return WrapError(Error("The [nodeGroup: %v, nodeGroupType: %v] can not delete cause exists running nodes", nodeGroupName, oldNodeGroup["NodeGroupType"].(string))) } deleteNodeGroups = append(deleteNodeGroups, map[string]interface{}{ "ClusterId": d.Id(), "NodeGroupId": oldNodeGroup["NodeGroupId"], "RegionId": client.RegionId, }) } else if newNodeGroup["payment_type"] == "Subscription" && oldNodeGroup["PaymentType"] == "PayAsYouGo" && !isUpdateClusterPaymentType && d.Get("payment_type") == "Subscription" { action = "UpdateNodeGroupPaymentType" UpdateNodeGroupPaymentTypeRequest := map[string]interface{}{ "ClusterId": d.Id(), "RegionId": client.RegionId, } updateNodeGroupPaymentType := map[string]interface{}{ "NodeGroupId": oldNodeGroup["NodeGroupId"], } if newNodeGroupValue, newNodeGroupExists := newNodeGroup["payment_type"]; newNodeGroupExists { updateNodeGroupPaymentType["PaymentType"] = newNodeGroupValue } if subscriptionConfig, exists := newNodeGroup["subscription_config"]; exists && len(subscriptionConfig.(*schema.Set).List()) > 0 { subscriptionConfigMap := subscriptionConfig.(*schema.Set).List()[0].(map[string]interface{}) if subscriptionConfigValue, subscriptionConfigExists := subscriptionConfigMap["payment_duration"]; subscriptionConfigExists { updateNodeGroupPaymentType["PaymentDuration"] = subscriptionConfigValue } if subscriptionConfigValue, subscriptionConfigExists := subscriptionConfigMap["payment_duration_unit"]; subscriptionConfigExists { updateNodeGroupPaymentType["PaymentDurationUnit"] = subscriptionConfigValue } if subscriptionConfigValue, subscriptionConfigExists := subscriptionConfigMap["auto_pay_order"]; subscriptionConfigExists { UpdateNodeGroupPaymentTypeRequest["AutoPayOrder"] = subscriptionConfigValue } else { UpdateNodeGroupPaymentTypeRequest["AutoPayOrder"] = true } if subscriptionConfigValue, subscriptionConfigExists := subscriptionConfigMap["auto_renew"]; subscriptionConfigExists { UpdateNodeGroupPaymentTypeRequest["AutoRenew"] = subscriptionConfigValue } else { UpdateNodeGroupPaymentTypeRequest["AutoRenew"] = false } } else { return WrapError(Error("The '%s' nodeGroup: '%s' is needed parameter 'subscription_config' for changing paymentType.", newNodeGroup["node_group_type"], newNodeGroup["node_group_name"])) } UpdateNodeGroupPaymentTypeRequest["NodeGroup"] = updateNodeGroupPaymentType wait = incrementalWait(3*time.Second, 5*time.Second) err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, UpdateNodeGroupPaymentTypeRequest, false) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, UpdateNodeGroupPaymentTypeRequest) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } // Wait for node group payment type has been changed if err = resource.Retry(5*time.Minute, func() *resource.RetryError { nodeGroupId := updateNodeGroupPaymentType["NodeGroupId"].(string) if nodeGroups, err := emrService.ListEmrV2NodeGroups(d.Id(), []string{nodeGroupId}); err != nil { return resource.NonRetryableError(err) } else if len(nodeGroups) > 0 && "Subscription" == nodeGroups[0].(map[string]interface{})["PaymentType"].(string) { return nil } return resource.RetryableError(Error("Waiting for node group %s payment type to be changed.", nodeGroupId)) }); err != nil { return WrapError(err) } } else if !isUpdateClusterPaymentType && d.Get("payment_type") == "Subscription" && oldNodeGroup["PaymentType"] == "Subscription" && newNodeGroup["payment_type"] == "PayAsYouGo" { return WrapError(Error("EMR cluster can only change paymentType from PayAsYouGo to Subscription.")) } } var wg sync.WaitGroup var cm sync.Map waitFlag := false for _, increaseNodesGroupRequest := range increaseNodesGroups { waitFlag = true action := "IncreaseNodes" wait := incrementalWait(3*time.Second, 5*time.Second) err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, increaseNodesGroupRequest, false) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, increaseNodesGroupRequest) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } resp, err := jsonpath.Get("$.OperationId", response) if err != nil { return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.OperationId", response) } wg.Add(1) go emrService.WaitForEmrV2Operation(d.Id(), increaseNodesGroupRequest["NodeGroupId"].(string), resp.(string), 2*Timeout5Minute, &wg, &cm) } for _, decreaseNodesGroupRequest := range decreaseNodesGroups { waitFlag = true action := "DecreaseNodes" wait := incrementalWait(3*time.Second, 5*time.Second) err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, decreaseNodesGroupRequest, false) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, decreaseNodesGroupRequest) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } resp, err := jsonpath.Get("$.OperationId", response) if err != nil { return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.OperationId", response) } wg.Add(1) go emrService.WaitForEmrV2Operation(d.Id(), decreaseNodesGroupRequest["NodeGroupId"].(string), resp.(string), 2*Timeout5Minute, &wg, &cm) } if waitFlag { wg.Wait() var failedNodeGroupId []string cm.Range(func(k, v interface{}) bool { if v != nil && v.(bool) == false { failedNodeGroupId = append(failedNodeGroupId, k.(string)) } return true }) if len(failedNodeGroupId) > 0 { return WrapError(Error("EMR cluster resize found error result with the failed nodeGroupIds: [%s].", strings.Join(failedNodeGroupId, ","))) } } for _, deleteNodeGroupRequest := range deleteNodeGroups { action := "DeleteNodeGroup" wait := incrementalWait(3*time.Second, 5*time.Second) err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, deleteNodeGroupRequest, false) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, deleteNodeGroupRequest) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } } d.SetPartial("node_groups") } if d.HasChange("bootstrap_scripts") { _, newBootstrapScripts := d.GetChange("bootstrap_scripts") listScriptsRequest := map[string]interface{}{ "ClusterId": d.Id(), "RegionId": client.RegionId, "ScriptType": "BOOTSTRAP", } action := "ListScripts" wait := incrementalWait(3*time.Second, 5*time.Second) err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, listScriptsRequest, false) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, listScriptsRequest) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } resp, err := jsonpath.Get("$.Scripts", response) if err != nil { return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.Scripts", response) } if resp != nil && len(resp.([]interface{})) > 0 { deleteScriptRequest := map[string]interface{}{ "ClusterId": d.Id(), "RegionId": client.RegionId, "ScriptType": "BOOTSTRAP", } action = "DeleteScript" for _, v := range resp.([]interface{}) { scriptMap := v.(map[string]interface{}) deleteScriptRequest["ScriptId"] = scriptMap["ScriptId"] err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, deleteScriptRequest, false) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, deleteScriptRequest) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } } } if newBootstrapScripts != nil && len(newBootstrapScripts.([]interface{})) > 0 { var newScripts []map[string]interface{} createScriptRequest := map[string]interface{}{ "ClusterId": d.Id(), "RegionId": client.RegionId, "ScriptType": "BOOTSTRAP", } for _, bs := range newBootstrapScripts.([]interface{}) { newScriptMap := bs.(map[string]interface{}) newScript := map[string]interface{}{ "ScriptName": newScriptMap["script_name"], "ScriptPath": newScriptMap["script_path"], "ScriptArgs": newScriptMap["script_args"], "ExecutionMoment": newScriptMap["execution_moment"], "ExecutionFailStrategy": newScriptMap["execution_fail_strategy"], } if value, exists := newScriptMap["priority"]; exists && value.(int) > 0 { newScript["Priority"] = value.(int) } if value, exists := newScriptMap["node_selector"]; exists && len(value.(*schema.Set).List()) == 1 { nodeSelectorMap := value.(*schema.Set).List()[0].(map[string]interface{}) nodeSelector := map[string]interface{}{ "NodeSelectType": nodeSelectorMap["node_select_type"], "NodeGroupId": nodeSelectorMap["node_group_id"], "NodeGroupName": nodeSelectorMap["node_group_name"], } if v, ok := nodeSelectorMap["node_names"]; ok && len(v.([]interface{})) > 0 { var nodeNames []string for _, nn := range v.([]interface{}) { nodeNames = append(nodeNames, nn.(string)) } nodeSelector["NodeNames"] = nodeNames } if v, ok := nodeSelectorMap["node_group_ids"]; ok && len(v.([]interface{})) > 0 { var nodeGroupIds []string for _, ngId := range v.([]interface{}) { nodeGroupIds = append(nodeGroupIds, ngId.(string)) } nodeSelector["NodeGroupIds"] = nodeGroupIds } if v, ok := nodeSelectorMap["node_group_types"]; ok && len(v.([]interface{})) > 0 { var nodeGroupTypes []string for _, ngType := range v.([]interface{}) { nodeGroupTypes = append(nodeGroupTypes, ngType.(string)) } nodeSelector["NodeGroupTypes"] = nodeGroupTypes } if v, ok := nodeSelectorMap["node_group_names"]; ok && len(v.([]interface{})) > 0 { var nodeGroupNames []string for _, ngName := range v.([]interface{}) { nodeGroupNames = append(nodeGroupNames, ngName.(string)) } nodeSelector["NodeGroupNames"] = nodeGroupNames } newScript["NodeSelector"] = nodeSelector } newScripts = append(newScripts, newScript) } createScriptRequest["Scripts"] = newScripts action = "CreateScript" err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, createScriptRequest, false) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, createScriptRequest) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } } } d.Partial(false) return nil } func resourceAlicloudEmrV2ClusterDelete(d *schema.ResourceData, meta interface{}) error { client := meta.(*connectivity.AliyunClient) emrService := EmrService{client} var response map[string]interface{} var err error if paymentType, ok := d.GetOk("payment_type"); ok && paymentType.(string) == "Subscription" { v, err := emrService.ListEmrV2NodeGroups(d.Id(), []string{}) if err != nil { return WrapError(err) } if v != nil && len(v) > 0 { action := "ListNodes" request := map[string]interface{}{ "ClusterId": d.Id(), "RegionId": client.RegionId, } wait := incrementalWait(3*time.Second, 5*time.Second) for _, item := range v { nodeGroupMap := item.(map[string]interface{}) if value, exists := nodeGroupMap["PaymentType"]; exists && value.(string) == "Subscription" { request["MaxResults"] = 100 request["NodeGroupIds"] = []string{nodeGroupMap["NodeGroupId"].(string)} err = resource.Retry(d.Timeout(schema.TimeoutDelete), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, request, true) if err != nil { if NeedRetry(err) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, request) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } nodes, err := jsonpath.Get("$.Nodes", response) if err != nil { return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.Nodes", response) } if nodes != nil && len(nodes.([]interface{})) > 0 { if err = deleteSubscriptionInstances(d, meta, nodes.([]interface{})); err != nil { return WrapError(err) } } } } } } action := "DeleteCluster" request := map[string]interface{}{ "ClusterId": d.Id(), "RegionId": client.RegionId, } wait := incrementalWait(3*time.Second, 5*time.Second) err = resource.Retry(d.Timeout(schema.TimeoutDelete), func() *resource.RetryError { response, err = client.RpcPost("Emr", "2021-03-20", action, nil, request, true) if err != nil { if NeedRetry(err) || strings.Contains(err.Error(), "cluster exists nonempty pre-paid nodeGroups") { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }) addDebug(action, response, request) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } stateConf := BuildStateConf([]string{"TERMINATING"}, []string{}, d.Timeout(schema.TimeoutDelete), 1*time.Millisecond, emrService.EmrV2ClusterStateRefreshFunc(d.Id(), []string{"TERMINATE_FAILED"})) stateConf.PollInterval = 5 * time.Second if _, err := stateConf.WaitForState(); err != nil { return WrapErrorf(err, IdMsg, d.Id()) } return WrapError(emrService.WaitForEmrV2Cluster(d.Id(), Deleted, DefaultTimeoutMedium)) } func deleteSubscriptionInstances(d *schema.ResourceData, meta interface{}, instances []interface{}) error { client := meta.(*connectivity.AliyunClient) ecsService := EcsService{client} request := ecs.CreateModifyInstanceChargeTypeRequest() var instanceIds []interface{} for _, item := range instances { instanceIds = append(instanceIds, item.(map[string]interface{})["NodeId"]) } request.InstanceIds = convertListToJsonString(instanceIds) request.AutoPay = requests.NewBoolean(true) request.DryRun = requests.NewBoolean(false) request.InstanceChargeType = string(PostPaid) if err := resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError { raw, err := client.WithEcsClient(func(ecsClient *ecs.Client) (interface{}, error) { return ecsClient.ModifyInstanceChargeType(request) }) if err != nil { if NeedRetry(err) || IsExpectedErrors(err, []string{"InternalError"}) { time.Sleep(3 * time.Second) return resource.RetryableError(err) } return resource.NonRetryableError(err) } addDebug(request.GetActionName(), raw, request.RpcRequest, request) return nil }); err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), request.GetActionName(), AlibabaCloudSdkGoERROR) } for _, instanceId := range instanceIds { deleteRequest := ecs.CreateDeleteInstanceRequest() deleteRequest.InstanceId = instanceId.(string) deleteRequest.Force = requests.NewBoolean(true) wait := incrementalWait(1*time.Second, 1*time.Second) err := resource.Retry(d.Timeout(schema.TimeoutDelete), func() *resource.RetryError { raw, err := client.WithEcsClient(func(ecsClient *ecs.Client) (interface{}, error) { return ecsClient.DeleteInstance(deleteRequest) }) if err != nil { if IsExpectedErrors(err, []string{"IncorrectInstanceStatus", "DependencyViolation.RouteEntry", "IncorrectInstanceStatus.Initializing"}) { return resource.RetryableError(err) } if IsExpectedErrors(err, []string{Throttling, "LastTokenProcessing"}) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } addDebug(deleteRequest.GetActionName(), raw) return nil }) if err != nil { if IsExpectedErrors(err, EcsNotFound) { return nil } return WrapErrorf(err, DefaultErrorMsg, d.Id(), deleteRequest.GetActionName(), AlibabaCloudSdkGoERROR) } stateConf := BuildStateConf([]string{"Pending", "Running", "Stopped", "Stopping"}, []string{}, d.Timeout(schema.TimeoutDelete), 10*time.Second, ecsService.InstanceStateRefreshFunc(d.Id(), []string{})) if _, err = stateConf.WaitForState(); err != nil { return WrapErrorf(err, IdMsg, d.Id()) } } return nil } func adaptAutoScalingPolicyRequest(r map[string]interface{}) map[string]interface{} { scalingPolicy := map[string]interface{}{} if value, exists := r["constraints"]; exists { constraints := value.([]interface{}) if len(constraints) == 1 { constraint := map[string]interface{}{} constraintMap := constraints[0].(map[string]interface{}) if constraintValue, constraintExists := constraintMap["max_capacity"]; constraintExists { constraint["maxCapacity"] = constraintValue constraint["MaxCapacity"] = constraintValue } if constraintValue, constraintExists := constraintMap["min_capacity"]; constraintExists { constraint["minCapacity"] = constraintValue constraint["MinCapacity"] = constraintValue } scalingPolicy["constraints"] = constraint scalingPolicy["Constraints"] = constraint } } if value, exists := r["scaling_rules"]; exists { var scalingRules []map[string]interface{} for _, sr := range value.([]interface{}) { scalingRule := map[string]interface{}{} scalingRuleMap := sr.(map[string]interface{}) if scalingRuleValue, scalingRuleExists := scalingRuleMap["rule_name"]; scalingRuleExists { scalingRule["RuleName"] = scalingRuleValue } if scalingRuleValue, scalingRuleExists := scalingRuleMap["trigger_type"]; scalingRuleExists { scalingRule["TriggerType"] = scalingRuleValue } if scalingRuleValue, scalingRuleExists := scalingRuleMap["activity_type"]; scalingRuleExists { scalingRule["ActivityType"] = scalingRuleValue } if scalingRuleValue, scalingRuleExists := scalingRuleMap["adjustment_type"]; scalingRuleExists { scalingRule["AdjustmentType"] = scalingRuleValue } if scalingRuleValue, scalingRuleExists := scalingRuleMap["adjustment_value"]; scalingRuleExists { scalingRule["AdjustmentValue"] = scalingRuleValue } if scalingRuleValue, scalingRuleExists := scalingRuleMap["min_adjustment_value"]; scalingRuleExists { scalingRule["MinAdjustmentValue"] = scalingRuleValue } if scalingRuleValue, scalingRuleExists := scalingRuleMap["time_trigger"]; scalingRuleExists { timeTriggers := scalingRuleValue.([]interface{}) if len(timeTriggers) == 1 { timeTrigger := map[string]interface{}{} timeTriggerMap := timeTriggers[0].(map[string]interface{}) if timeTriggerValue, timeTriggerExists := timeTriggerMap["launch_time"]; timeTriggerExists { timeTrigger["LaunchTime"] = timeTriggerValue } if timeTriggerValue, timeTriggerExists := timeTriggerMap["start_time"]; timeTriggerExists { timeTrigger["StartTime"] = timeTriggerValue } if timeTriggerValue, timeTriggerExists := timeTriggerMap["end_time"]; timeTriggerExists { timeTrigger["EndTime"] = timeTriggerValue } if timeTriggerValue, timeTriggerExists := timeTriggerMap["launch_expiration_time"]; timeTriggerExists { timeTrigger["LaunchExpirationTime"] = timeTriggerValue } if timeTriggerValue, timeTriggerExists := timeTriggerMap["recurrence_type"]; timeTriggerExists { timeTrigger["RecurrenceType"] = timeTriggerValue } if timeTriggerValue, timeTriggerExists := timeTriggerMap["recurrence_value"]; timeTriggerExists { timeTrigger["RecurrenceValue"] = timeTriggerValue } scalingRule["TimeTrigger"] = timeTrigger } } if scalingRuleValue, scalingRuleExists := scalingRuleMap["metrics_trigger"]; scalingRuleExists { metricsTriggers := scalingRuleValue.([]interface{}) if len(metricsTriggers) == 1 { metricsTrigger := map[string]interface{}{} metricsTriggerMap := metricsTriggers[0].(map[string]interface{}) if metricsTriggerValue, metricsTriggerExists := metricsTriggerMap["time_window"]; metricsTriggerExists { metricsTrigger["TimeWindow"] = metricsTriggerValue } if metricsTriggerValue, metricsTriggerExists := metricsTriggerMap["evaluation_count"]; metricsTriggerExists { metricsTrigger["EvaluationCount"] = metricsTriggerValue } if metricsTriggerValue, metricsTriggerExists := metricsTriggerMap["cool_down_interval"]; metricsTriggerExists { metricsTrigger["CoolDownInterval"] = metricsTriggerValue } if metricsTriggerValue, metricsTriggerExists := metricsTriggerMap["condition_logic_operator"]; metricsTriggerExists { metricsTrigger["ConditionLogicOperator"] = metricsTriggerValue } if metricsTriggerValue, metricsTriggerExists := metricsTriggerMap["time_constraints"]; metricsTriggerExists { var timeConstraints []map[string]interface{} for _, tc := range metricsTriggerValue.([]interface{}) { timeConstraint := map[string]interface{}{} timeConstraintMap := tc.(map[string]interface{}) if timeConstraintValue, timeConstraintExists := timeConstraintMap["start_time"]; timeConstraintExists { timeConstraint["StartTime"] = timeConstraintValue } if timeConstraintValue, timeConstraintExists := timeConstraintMap["end_time"]; timeConstraintExists { timeConstraint["EndTime"] = timeConstraintValue } timeConstraints = append(timeConstraints, timeConstraint) } metricsTrigger["TimeConstraints"] = timeConstraints } if metricsTriggerValue, metricsTriggerExists := metricsTriggerMap["conditions"]; metricsTriggerExists { var conditions []map[string]interface{} for _, cd := range metricsTriggerValue.([]interface{}) { condition := map[string]interface{}{} conditionMap := cd.(map[string]interface{}) if conditionValue, conditionExists := conditionMap["metric_name"]; conditionExists { condition["MetricName"] = conditionValue } if conditionValue, conditionExists := conditionMap["statistics"]; conditionExists { condition["Statistics"] = conditionValue } if conditionValue, conditionExists := conditionMap["comparison_operator"]; conditionExists { condition["ComparisonOperator"] = conditionValue } if conditionValue, conditionExists := conditionMap["threshold"]; conditionExists { condition["Threshold"] = conditionValue } if conditionValue, conditionExists := conditionMap["tags"]; conditionExists { var tags []map[string]interface{} for _, t := range conditionValue.([]interface{}) { tag := map[string]interface{}{} tagMap := t.(map[string]interface{}) if tagValue, tagExists := tagMap["key"]; tagExists { tag["Key"] = tagValue } if tagValue, tagExists := tagMap["value"]; tagExists { tag["Value"] = tagValue } tags = append(tags, tag) } condition["Tags"] = tags } conditions = append(conditions, condition) } metricsTrigger["Conditions"] = conditions } scalingRule["MetricsTrigger"] = metricsTrigger } } scalingRules = append(scalingRules, scalingRule) } scalingPolicy["scalingRules"] = scalingRules scalingPolicy["ScalingRules"] = scalingRules } return scalingPolicy } func adaptAckConfigRequest(r map[string]interface{}) map[string]interface{} { ackConfig := map[string]interface{}{} if value, exists := r["ack_instance_id"]; exists { ackConfig["AckInstanceId"] = value } if value, exists := r["node_selectors"]; exists { nodeSelectors := value.(*schema.Set).List() if len(nodeSelectors) > 0 { var nodeSelectorsReq []map[string]interface{} for _, ns := range nodeSelectors { nodeSelectorsReq = append(nodeSelectorsReq, map[string]interface{}{ "Key": ns.(map[string]interface{})["key"], "Value": ns.(map[string]interface{})["value"], }) } ackConfig["NodeSelectors"] = nodeSelectorsReq } } if value, exists := r["tolerations"]; exists { tolerations := value.([]interface{}) if len(tolerations) > 0 { var tolerationsReq []map[string]interface{} for _, t := range tolerations { toleration := t.(map[string]interface{}) tolerationsReq = append(tolerationsReq, map[string]interface{}{ "Key": toleration["key"], "Value": toleration["value"], "Operator": toleration["operator"], "Effect": toleration["effect"], }) } ackConfig["Tolerations"] = tolerationsReq } } if value, exists := r["namespace"]; exists { ackConfig["Namespace"] = value } if value, exists := r["request_cpu"]; exists { ackConfig["RequestCpu"] = value } if value, exists := r["request_memory"]; exists { ackConfig["RequestMemory"] = value } if value, exists := r["limit_cpu"]; exists { ackConfig["LimitCpu"] = value } if value, exists := r["limit_memory"]; exists { ackConfig["LimitMemory"] = value } if value, exists := r["custom_labels"]; exists { customLabels := value.(*schema.Set).List() if len(customLabels) > 0 { var customLabelsReq []map[string]interface{} for _, cl := range customLabels { customLabel := cl.(map[string]interface{}) customLabelsReq = append(customLabelsReq, map[string]interface{}{ "Key": customLabel["key"], "Value": customLabel["value"], }) } ackConfig["CustomLabels"] = customLabelsReq } } if value, exists := r["custom_annotations"]; exists { customAnnotations := value.(*schema.Set).List() if len(customAnnotations) > 0 { var customAnnotationsReq []map[string]interface{} for _, ca := range customAnnotations { customAnnotation := ca.(map[string]interface{}) customAnnotationsReq = append(customAnnotationsReq, map[string]interface{}{ "Key": customAnnotation["key"], "Value": customAnnotation["value"], }) } ackConfig["CustomAnnotations"] = customAnnotationsReq } } if value, exists := r["pvcs"]; exists { pvcs := value.([]interface{}) if len(pvcs) > 0 { var pvcsReq []map[string]interface{} for _, pvc := range pvcs { pvcMap := pvc.(map[string]interface{}) pvcsReq = append(pvcsReq, map[string]interface{}{ "DataDiskStorageClass": pvcMap["data_disk_storage_class"], "DataDiskSize": pvcMap["data_disk_size"], "Path": pvcMap["path"], "Name": pvcMap["name"], }) } ackConfig["Pvcs"] = pvcsReq } } if value, exists := r["volumes"]; exists { volumes := value.([]interface{}) if len(volumes) > 0 { var volumesReq []map[string]interface{} for _, vl := range volumes { volume := vl.(map[string]interface{}) volumesReq = append(volumesReq, map[string]interface{}{ "Name": volume["name"], "Path": volume["path"], "Type": volume["type"], }) } ackConfig["Volumes"] = volumesReq } } if value, exists := r["volume_mounts"]; exists { volumeMounts := value.([]interface{}) if len(volumeMounts) > 0 { var volumeMountsReq []map[string]interface{} for _, vm := range volumeMounts { volumeMount := vm.(map[string]interface{}) volumeMountsReq = append(volumeMountsReq, map[string]interface{}{ "Name": volumeMount["name"], "Path": volumeMount["path"], }) } ackConfig["VolumeMounts"] = volumeMountsReq } } if value, exists := r["pre_start_command"]; exists { preStartCommand := value.([]interface{}) if len(preStartCommand) > 0 { var preStartCommandReq []string for _, prc := range preStartCommand { preStartCommandReq = append(preStartCommandReq, prc.(string)) } ackConfig["PreStartCommand"] = preStartCommandReq } } if value, exists := r["pod_affinity"]; exists { ackConfig["PodAffinity"] = value } if value, exists := r["pod_anti_affinity"]; exists { ackConfig["PodAntiAffinity"] = value } if value, exists := r["node_affinity"]; exists { ackConfig["NodeAffinity"] = value } return ackConfig }