func resourceAlicloudEmrV2ClusterCreate()

in alicloud/resource_alicloud_emrv2_cluster.go [887:1309]


func resourceAlicloudEmrV2ClusterCreate(d *schema.ResourceData, meta interface{}) error {
	client := meta.(*connectivity.AliyunClient)
	var response map[string]interface{}
	action := "RunCluster"
	var err error
	createClusterRequest := map[string]interface{}{
		"RegionId": client.RegionId,
	}
	if v, ok := d.GetOk("resource_group_id"); ok {
		createClusterRequest["ResourceGroupId"] = v
	}

	if v, ok := d.GetOk("payment_type"); ok {
		createClusterRequest["PaymentType"] = v
	}

	if v, ok := d.GetOk("subscription_config"); ok {
		subscriptionConfig := v.(*schema.Set).List()
		if len(subscriptionConfig) == 1 {
			subscriptionConfigSource := subscriptionConfig[0].(map[string]interface{})
			subscriptionConfigMap := map[string]interface{}{
				"PaymentDurationUnit":   subscriptionConfigSource["payment_duration_unit"],
				"PaymentDuration":       subscriptionConfigSource["payment_duration"],
				"AutoRenew":             subscriptionConfigSource["auto_renew"],
				"AutoRenewDurationUnit": subscriptionConfigSource["auto_renew_duration_unit"],
				"AutoRenewDuration":     subscriptionConfigSource["auto_renew_duration"],
			}
			if value, exists := subscriptionConfigSource["auto_pay_order"]; exists {
				subscriptionConfigMap["AutoPayOrder"] = value.(bool)
			} else {
				subscriptionConfigMap["AutoPayOrder"] = true
			}

			createClusterRequest["SubscriptionConfig"] = convertMapToJsonStringIgnoreError(subscriptionConfigMap)
		}
	}

	if v, ok := d.GetOk("cluster_type"); ok {
		createClusterRequest["ClusterType"] = v
	}

	if v, ok := d.GetOk("release_version"); ok {
		createClusterRequest["ReleaseVersion"] = v
	}

	if v, ok := d.GetOk("cluster_name"); ok {
		createClusterRequest["ClusterName"] = v
	}

	if v, ok := d.GetOk("deploy_mode"); ok {
		createClusterRequest["DeployMode"] = v
	}

	if v, ok := d.GetOk("security_mode"); ok {
		createClusterRequest["SecurityMode"] = v
	}

	if v, ok := d.GetOk("log_collect_strategy"); ok && v.(string) != "" {
		createClusterRequest["LogCollectStrategy"] = v.(string)
	}

	if v, ok := d.GetOk("deletion_protection"); ok {
		createClusterRequest["DeletionProtection"] = v
	}

	applications := make([]map[string]interface{}, 0)
	if apps, ok := d.GetOk("applications"); ok {
		for _, application := range apps.(*schema.Set).List() {
			applications = append(applications, map[string]interface{}{"ApplicationName": application.(string)})
		}
	}
	createClusterRequest["Applications"], _ = convertListMapToJsonString(applications)

	applicationConfigs := make([]map[string]interface{}, 0)
	if appConfigs, ok := d.GetOk("application_configs"); ok {
		for _, appConfig := range appConfigs.(*schema.Set).List() {
			applicationConfig := map[string]interface{}{}
			kv := appConfig.(map[string]interface{})
			if v, ok := kv["application_name"]; ok {
				applicationConfig["ApplicationName"] = v
			}
			if v, ok := kv["config_file_name"]; ok {
				applicationConfig["ConfigFileName"] = v
			}
			if v, ok := kv["config_item_key"]; ok {
				applicationConfig["ConfigItemKey"] = v
			}
			if v, ok := kv["config_item_value"]; ok {
				applicationConfig["ConfigItemValue"] = v
			}
			if v, ok := kv["config_scope"]; ok {
				applicationConfig["ConfigScope"] = v
			}
			if v, ok := kv["node_group_name"]; ok {
				applicationConfig["NodeGroupName"] = v
			}
			if v, ok := kv["node_group_id"]; ok {
				applicationConfig["NodeGroupId"] = v
			}
			applicationConfigs = append(applicationConfigs, applicationConfig)
		}
	}
	createClusterRequest["ApplicationConfigs"], _ = convertListMapToJsonString(applicationConfigs)

	if v, ok := d.GetOk("node_attributes"); ok {
		nodeAttributes := v.(*schema.Set).List()
		if len(nodeAttributes) == 1 {
			nodeAttributesSource := nodeAttributes[0].(map[string]interface{})
			nodeAttributesSourceMap := map[string]interface{}{
				"VpcId":             nodeAttributesSource["vpc_id"],
				"ZoneId":            nodeAttributesSource["zone_id"],
				"SecurityGroupId":   nodeAttributesSource["security_group_id"],
				"RamRole":           nodeAttributesSource["ram_role"],
				"KeyPairName":       nodeAttributesSource["key_pair_name"],
				"DataDiskEncrypted": nodeAttributesSource["data_disk_encrypted"],
				"DataDiskKMSKeyId":  nodeAttributesSource["data_disk_kms_key_id"],
			}
			if value, exists := nodeAttributesSource["system_disk_encrypted"]; exists {
				nodeAttributesSourceMap["SystemDiskEncrypted"] = value.(bool)
			}
			if value, exists := nodeAttributesSource["system_disk_kms_key_id"]; exists && value.(string) != "" {
				nodeAttributesSourceMap["SystemDiskKMSKeyId"] = value.(string)
			}
			createClusterRequest["NodeAttributes"] = convertMapToJsonStringIgnoreError(nodeAttributesSourceMap)
		}
	}

	nodeGroups := make([]map[string]interface{}, 0)
	if nodeGroupsList, ok := d.GetOk("node_groups"); ok {
		for _, nodeGroupItem := range nodeGroupsList.([]interface{}) {
			nodeGroup := map[string]interface{}{}
			kv := nodeGroupItem.(map[string]interface{})
			if v, ok := kv["node_group_type"]; ok {
				nodeGroup["NodeGroupType"] = v
			}
			if v, ok := kv["node_group_name"]; ok {
				nodeGroup["NodeGroupName"] = v
			}
			if v, ok := kv["payment_type"]; ok {
				nodeGroup["PaymentType"] = v
			}
			if v, ok := kv["deployment_set_strategy"]; ok && v.(string) != "" {
				nodeGroup["DeploymentSetStrategy"] = v.(string)
			}
			if v, ok := kv["node_resize_strategy"]; ok && v.(string) != "" {
				nodeGroup["NodeResizeStrategy"] = v.(string)
			}
			if v, ok := kv["spot_strategy"]; ok && v.(string) != "" {
				nodeGroup["SpotStrategy"] = v.(string)
			}
			if v, ok := kv["subscription_config"]; ok {
				subscriptionConfigs := v.(*schema.Set).List()
				if len(subscriptionConfigs) == 1 {
					subscriptionConfig := map[string]interface{}{}
					subscriptionConfigMap := subscriptionConfigs[0].(map[string]interface{})
					if value, exists := subscriptionConfigMap["payment_duration_unit"]; exists {
						subscriptionConfig["PaymentDurationUnit"] = value
					}
					if value, exists := subscriptionConfigMap["payment_duration"]; exists {
						subscriptionConfig["PaymentDuration"] = value
					}
					if value, exists := subscriptionConfigMap["auto_renew"]; exists {
						subscriptionConfig["AutoRenew"] = value
					}
					if value, exists := subscriptionConfigMap["auto_renew_duration_unit"]; exists {
						subscriptionConfig["AutoRenewDurationUnit"] = value
					}
					if value, exists := subscriptionConfigMap["auto_renew_duration"]; exists {
						subscriptionConfig["AutoRenewDuration"] = value
					}
					if value, exists := subscriptionConfigMap["auto_pay_order"]; exists {
						subscriptionConfig["AutoPayOrder"] = value.(bool)
					} else {
						subscriptionConfig["AutoPayOrder"] = true
					}
					nodeGroup["SubscriptionConfig"] = subscriptionConfig
				}
			}
			if v, ok := kv["spot_bid_prices"]; ok {
				spotBidPriceList := v.(*schema.Set).List()
				if len(spotBidPriceList) > 0 {
					spotBidPrices := make([]map[string]interface{}, 0)
					for _, spotBidPriceSource := range spotBidPriceList {
						spotBidPrice := map[string]interface{}{}
						spotBidPriceMap := spotBidPriceSource.(map[string]interface{})
						if value, exists := spotBidPriceMap["instance_type"]; exists {
							spotBidPrice["InstanceType"] = value
						}
						if value, exists := spotBidPriceMap["bid_price"]; exists {
							spotBidPrice["BidPrice"] = value
						}
						spotBidPrices = append(spotBidPrices, spotBidPrice)
					}
					nodeGroup["SpotBidPrices"] = spotBidPrices
				}
			}
			if v, ok := kv["vswitch_ids"]; ok {
				var vSwitchIds []string
				for _, vSwitchId := range v.(*schema.Set).List() {
					vSwitchIds = append(vSwitchIds, vSwitchId.(string))
				}
				nodeGroup["VSwitchIds"] = vSwitchIds
			}
			if v, ok := kv["with_public_ip"]; ok {
				nodeGroup["WithPublicIp"] = v
			}
			if v, ok := kv["additional_security_group_ids"]; ok {
				var additionalSecurityGroupIds []string
				for _, additionalSecurityGroupId := range v.(*schema.Set).List() {
					additionalSecurityGroupIds = append(additionalSecurityGroupIds, additionalSecurityGroupId.(string))
				}
				nodeGroup["AdditionalSecurityGroupIds"] = additionalSecurityGroupIds
			}
			if v, ok := kv["instance_types"]; ok {
				var instanceTypes []string
				for _, instanceType := range v.(*schema.Set).List() {
					instanceTypes = append(instanceTypes, instanceType.(string))
				}
				nodeGroup["InstanceTypes"] = instanceTypes
			}
			if v, ok := kv["node_count"]; ok {
				nodeGroup["NodeCount"] = v
			}
			if v, ok := kv["system_disk"]; ok {
				systemDisks := v.(*schema.Set).List()
				if len(systemDisks) == 1 {
					systemDisk := map[string]interface{}{}
					systemDiskMap := systemDisks[0].(map[string]interface{})
					if value, exists := systemDiskMap["category"]; exists {
						systemDisk["Category"] = value
					}
					if value, exists := systemDiskMap["size"]; exists {
						systemDisk["Size"] = value
					}
					if value, exists := systemDiskMap["performance_level"]; exists && value.(string) != "" {
						systemDisk["PerformanceLevel"] = value
					}
					if value, exists := systemDiskMap["count"]; exists {
						systemDisk["Count"] = value
					}
					nodeGroup["SystemDisk"] = systemDisk
				}
			}
			if v, ok := kv["data_disks"]; ok {
				dataDiskList := v.(*schema.Set).List()
				if len(dataDiskList) > 0 {
					dataDisks := make([]map[string]interface{}, 0)
					for _, dataDiskSource := range dataDiskList {
						dataDisk := map[string]interface{}{}
						dataDiskMap := dataDiskSource.(map[string]interface{})
						if value, exists := dataDiskMap["category"]; exists {
							dataDisk["Category"] = value
						}
						if value, exists := dataDiskMap["size"]; exists {
							dataDisk["Size"] = value
						}
						if value, exists := dataDiskMap["performance_level"]; exists && value.(string) != "" {
							dataDisk["PerformanceLevel"] = value
						}
						if value, exists := dataDiskMap["count"]; exists {
							dataDisk["Count"] = value
						}
						dataDisks = append(dataDisks, dataDisk)
					}
					nodeGroup["DataDisks"] = dataDisks
				}
			}
			if v, ok := kv["graceful_shutdown"]; ok {
				nodeGroup["GracefulShutdown"] = v
			}
			if v, ok := kv["spot_instance_remedy"]; ok {
				nodeGroup["SpotInstanceRemedy"] = v
			}
			if v, ok := kv["cost_optimized_config"]; ok {
				costOptimizedConfigs := v.(*schema.Set).List()
				if len(costOptimizedConfigs) == 1 {
					costOptimizedConfig := map[string]interface{}{}
					costOptimizedConfigMap := costOptimizedConfigs[0].(map[string]interface{})
					if value, exists := costOptimizedConfigMap["on_demand_base_capacity"]; exists {
						costOptimizedConfig["OnDemandBaseCapacity"] = value
					}
					if value, exists := costOptimizedConfigMap["on_demand_percentage_above_base_capacity"]; exists {
						costOptimizedConfig["OnDemandPercentageAboveBaseCapacity"] = value
					}
					if value, exists := costOptimizedConfigMap["spot_instance_pools"]; exists {
						costOptimizedConfig["SpotInstancePools"] = value
					}
					nodeGroup["CostOptimizedConfig"] = costOptimizedConfig
				}
			}
			if v, ok := kv["auto_scaling_policy"]; ok {
				scalingPolicies := v.([]interface{})
				if len(scalingPolicies) == 1 {
					nodeGroup["AutoScalingPolicy"] = adaptAutoScalingPolicyRequest(scalingPolicies[0].(map[string]interface{}))
				}
			}
			if v, ok := kv["ack_config"]; ok {
				ackConfig := v.([]interface{})
				if len(ackConfig) == 1 {
					nodeGroup["AckConfig"] = adaptAckConfigRequest(ackConfig[0].(map[string]interface{}))
					nodeGroup["IaaSType"] = "K8S"
					delete(nodeGroup, "InstanceTypes")
				}
			}
			nodeGroups = append(nodeGroups, nodeGroup)
		}
	}
	createClusterRequest["NodeGroups"], _ = convertListMapToJsonString(nodeGroups)

	if scripts, ok := d.GetOk("bootstrap_scripts"); ok {
		bootstrapScripts := make([]map[string]interface{}, 0)
		for _, script := range scripts.([]interface{}) {
			kv := script.(map[string]interface{})
			bootstrapScript := map[string]interface{}{}
			if v, ok := kv["script_name"]; ok {
				bootstrapScript["ScriptName"] = v
			}
			if v, ok := kv["script_path"]; ok {
				bootstrapScript["ScriptPath"] = v
			}
			if v, ok := kv["script_args"]; ok {
				bootstrapScript["ScriptArgs"] = v
			}
			if v, ok := kv["priority"]; ok {
				bootstrapScript["Priority"] = v
			}
			if v, ok := kv["execution_moment"]; ok {
				bootstrapScript["ExecutionMoment"] = v
			}
			if v, ok := kv["execution_fail_strategy"]; ok {
				bootstrapScript["ExecutionFailStrategy"] = v
			}
			if v, ok := kv["node_selector"]; ok {
				nodeSelectors := v.(*schema.Set).List()
				if len(nodeSelectors) == 1 {
					nodeSelector := map[string]interface{}{}
					nodeSelectorMap := nodeSelectors[0].(map[string]interface{})
					if value, exists := nodeSelectorMap["node_select_type"]; exists {
						nodeSelector["NodeSelectType"] = value
					}
					if value, exists := nodeSelectorMap["node_names"]; exists {
						var nodeNames []string
						for _, nodeName := range value.([]interface{}) {
							nodeNames = append(nodeNames, nodeName.(string))
						}
						nodeSelector["NodeNames"] = nodeNames
					}
					if value, exists := nodeSelectorMap["node_group_id"]; exists {
						nodeSelector["NodeGroupId"] = value
					}
					if value, exists := nodeSelectorMap["node_group_ids"]; exists {
						var nodeGroupIds []string
						for _, ngId := range value.([]interface{}) {
							nodeGroupIds = append(nodeGroupIds, ngId.(string))
						}
						nodeSelector["NodeGroupIds"] = nodeGroupIds
					}
					if value, exists := nodeSelectorMap["node_group_types"]; exists {
						var nodeGroupTypes []string
						for _, nodeGroupType := range value.([]interface{}) {
							nodeGroupTypes = append(nodeGroupTypes, nodeGroupType.(string))
						}
						nodeSelector["NodeGroupTypes"] = nodeGroupTypes
					}
					if value, exists := nodeSelectorMap["node_group_name"]; exists {
						nodeSelector["NodeGroupName"] = value
					}
					if value, exists := nodeSelectorMap["node_group_names"]; exists {
						var nodeGroupNames []string
						for _, ngName := range value.([]interface{}) {
							nodeGroupNames = append(nodeGroupNames, ngName.(string))
						}
						nodeSelector["NodeGroupNames"] = nodeGroupNames
					}
					bootstrapScript["NodeSelector"] = nodeSelector
				}
			}
			bootstrapScripts = append(bootstrapScripts, bootstrapScript)
		}
		createClusterRequest["BootstrapScripts"], _ = convertListMapToJsonString(bootstrapScripts)
	}

	if v, ok := d.GetOk("tags"); ok {
		tags := make([]map[string]interface{}, 0)
		for key, value := range v.(map[string]interface{}) {
			tags = append(tags, map[string]interface{}{
				"Key":   key,
				"Value": value,
			})
		}
		createClusterRequest["Tags"], _ = convertListMapToJsonString(tags)
	}

	wait := incrementalWait(3*time.Second, 5*time.Second)
	err = resource.Retry(d.Timeout(schema.TimeoutCreate), func() *resource.RetryError {
		response, err = client.RpcPost("Emr", "2021-03-20", action, nil, createClusterRequest, true)
		if err != nil {
			if NeedRetry(err) {
				wait()
				return resource.RetryableError(err)
			}
			return resource.NonRetryableError(err)
		}
		return nil
	})
	addDebug(action, response, createClusterRequest)

	if err != nil {
		return WrapErrorf(err, DefaultErrorMsg, "alicloud_emrv2_cluster", action, AlibabaCloudSdkGoERROR)
	}

	d.SetId(fmt.Sprint(response["ClusterId"]))

	emrService := EmrService{client}
	stateConf := BuildStateConf([]string{"STARTING"}, []string{"RUNNING"}, d.Timeout(schema.TimeoutCreate),
		90*time.Second, emrService.EmrV2ClusterStateRefreshFunc(d.Id(), []string{"START_FAILED", "TERMINATED_WITH_ERRORS", "TERMINATED"}))
	stateConf.PollInterval = 10 * time.Second
	if _, err := stateConf.WaitForState(); err != nil {
		return WrapErrorf(err, IdMsg, d.Id())
	}

	return resourceAlicloudEmrV2ClusterRead(d, meta)
}