in alicloud/resource_alicloud_emrv2_cluster.go [887:1309]
func resourceAlicloudEmrV2ClusterCreate(d *schema.ResourceData, meta interface{}) error {
client := meta.(*connectivity.AliyunClient)
var response map[string]interface{}
action := "RunCluster"
var err error
createClusterRequest := map[string]interface{}{
"RegionId": client.RegionId,
}
if v, ok := d.GetOk("resource_group_id"); ok {
createClusterRequest["ResourceGroupId"] = v
}
if v, ok := d.GetOk("payment_type"); ok {
createClusterRequest["PaymentType"] = v
}
if v, ok := d.GetOk("subscription_config"); ok {
subscriptionConfig := v.(*schema.Set).List()
if len(subscriptionConfig) == 1 {
subscriptionConfigSource := subscriptionConfig[0].(map[string]interface{})
subscriptionConfigMap := map[string]interface{}{
"PaymentDurationUnit": subscriptionConfigSource["payment_duration_unit"],
"PaymentDuration": subscriptionConfigSource["payment_duration"],
"AutoRenew": subscriptionConfigSource["auto_renew"],
"AutoRenewDurationUnit": subscriptionConfigSource["auto_renew_duration_unit"],
"AutoRenewDuration": subscriptionConfigSource["auto_renew_duration"],
}
if value, exists := subscriptionConfigSource["auto_pay_order"]; exists {
subscriptionConfigMap["AutoPayOrder"] = value.(bool)
} else {
subscriptionConfigMap["AutoPayOrder"] = true
}
createClusterRequest["SubscriptionConfig"] = convertMapToJsonStringIgnoreError(subscriptionConfigMap)
}
}
if v, ok := d.GetOk("cluster_type"); ok {
createClusterRequest["ClusterType"] = v
}
if v, ok := d.GetOk("release_version"); ok {
createClusterRequest["ReleaseVersion"] = v
}
if v, ok := d.GetOk("cluster_name"); ok {
createClusterRequest["ClusterName"] = v
}
if v, ok := d.GetOk("deploy_mode"); ok {
createClusterRequest["DeployMode"] = v
}
if v, ok := d.GetOk("security_mode"); ok {
createClusterRequest["SecurityMode"] = v
}
if v, ok := d.GetOk("log_collect_strategy"); ok && v.(string) != "" {
createClusterRequest["LogCollectStrategy"] = v.(string)
}
if v, ok := d.GetOk("deletion_protection"); ok {
createClusterRequest["DeletionProtection"] = v
}
applications := make([]map[string]interface{}, 0)
if apps, ok := d.GetOk("applications"); ok {
for _, application := range apps.(*schema.Set).List() {
applications = append(applications, map[string]interface{}{"ApplicationName": application.(string)})
}
}
createClusterRequest["Applications"], _ = convertListMapToJsonString(applications)
applicationConfigs := make([]map[string]interface{}, 0)
if appConfigs, ok := d.GetOk("application_configs"); ok {
for _, appConfig := range appConfigs.(*schema.Set).List() {
applicationConfig := map[string]interface{}{}
kv := appConfig.(map[string]interface{})
if v, ok := kv["application_name"]; ok {
applicationConfig["ApplicationName"] = v
}
if v, ok := kv["config_file_name"]; ok {
applicationConfig["ConfigFileName"] = v
}
if v, ok := kv["config_item_key"]; ok {
applicationConfig["ConfigItemKey"] = v
}
if v, ok := kv["config_item_value"]; ok {
applicationConfig["ConfigItemValue"] = v
}
if v, ok := kv["config_scope"]; ok {
applicationConfig["ConfigScope"] = v
}
if v, ok := kv["node_group_name"]; ok {
applicationConfig["NodeGroupName"] = v
}
if v, ok := kv["node_group_id"]; ok {
applicationConfig["NodeGroupId"] = v
}
applicationConfigs = append(applicationConfigs, applicationConfig)
}
}
createClusterRequest["ApplicationConfigs"], _ = convertListMapToJsonString(applicationConfigs)
if v, ok := d.GetOk("node_attributes"); ok {
nodeAttributes := v.(*schema.Set).List()
if len(nodeAttributes) == 1 {
nodeAttributesSource := nodeAttributes[0].(map[string]interface{})
nodeAttributesSourceMap := map[string]interface{}{
"VpcId": nodeAttributesSource["vpc_id"],
"ZoneId": nodeAttributesSource["zone_id"],
"SecurityGroupId": nodeAttributesSource["security_group_id"],
"RamRole": nodeAttributesSource["ram_role"],
"KeyPairName": nodeAttributesSource["key_pair_name"],
"DataDiskEncrypted": nodeAttributesSource["data_disk_encrypted"],
"DataDiskKMSKeyId": nodeAttributesSource["data_disk_kms_key_id"],
}
if value, exists := nodeAttributesSource["system_disk_encrypted"]; exists {
nodeAttributesSourceMap["SystemDiskEncrypted"] = value.(bool)
}
if value, exists := nodeAttributesSource["system_disk_kms_key_id"]; exists && value.(string) != "" {
nodeAttributesSourceMap["SystemDiskKMSKeyId"] = value.(string)
}
createClusterRequest["NodeAttributes"] = convertMapToJsonStringIgnoreError(nodeAttributesSourceMap)
}
}
nodeGroups := make([]map[string]interface{}, 0)
if nodeGroupsList, ok := d.GetOk("node_groups"); ok {
for _, nodeGroupItem := range nodeGroupsList.([]interface{}) {
nodeGroup := map[string]interface{}{}
kv := nodeGroupItem.(map[string]interface{})
if v, ok := kv["node_group_type"]; ok {
nodeGroup["NodeGroupType"] = v
}
if v, ok := kv["node_group_name"]; ok {
nodeGroup["NodeGroupName"] = v
}
if v, ok := kv["payment_type"]; ok {
nodeGroup["PaymentType"] = v
}
if v, ok := kv["deployment_set_strategy"]; ok && v.(string) != "" {
nodeGroup["DeploymentSetStrategy"] = v.(string)
}
if v, ok := kv["node_resize_strategy"]; ok && v.(string) != "" {
nodeGroup["NodeResizeStrategy"] = v.(string)
}
if v, ok := kv["spot_strategy"]; ok && v.(string) != "" {
nodeGroup["SpotStrategy"] = v.(string)
}
if v, ok := kv["subscription_config"]; ok {
subscriptionConfigs := v.(*schema.Set).List()
if len(subscriptionConfigs) == 1 {
subscriptionConfig := map[string]interface{}{}
subscriptionConfigMap := subscriptionConfigs[0].(map[string]interface{})
if value, exists := subscriptionConfigMap["payment_duration_unit"]; exists {
subscriptionConfig["PaymentDurationUnit"] = value
}
if value, exists := subscriptionConfigMap["payment_duration"]; exists {
subscriptionConfig["PaymentDuration"] = value
}
if value, exists := subscriptionConfigMap["auto_renew"]; exists {
subscriptionConfig["AutoRenew"] = value
}
if value, exists := subscriptionConfigMap["auto_renew_duration_unit"]; exists {
subscriptionConfig["AutoRenewDurationUnit"] = value
}
if value, exists := subscriptionConfigMap["auto_renew_duration"]; exists {
subscriptionConfig["AutoRenewDuration"] = value
}
if value, exists := subscriptionConfigMap["auto_pay_order"]; exists {
subscriptionConfig["AutoPayOrder"] = value.(bool)
} else {
subscriptionConfig["AutoPayOrder"] = true
}
nodeGroup["SubscriptionConfig"] = subscriptionConfig
}
}
if v, ok := kv["spot_bid_prices"]; ok {
spotBidPriceList := v.(*schema.Set).List()
if len(spotBidPriceList) > 0 {
spotBidPrices := make([]map[string]interface{}, 0)
for _, spotBidPriceSource := range spotBidPriceList {
spotBidPrice := map[string]interface{}{}
spotBidPriceMap := spotBidPriceSource.(map[string]interface{})
if value, exists := spotBidPriceMap["instance_type"]; exists {
spotBidPrice["InstanceType"] = value
}
if value, exists := spotBidPriceMap["bid_price"]; exists {
spotBidPrice["BidPrice"] = value
}
spotBidPrices = append(spotBidPrices, spotBidPrice)
}
nodeGroup["SpotBidPrices"] = spotBidPrices
}
}
if v, ok := kv["vswitch_ids"]; ok {
var vSwitchIds []string
for _, vSwitchId := range v.(*schema.Set).List() {
vSwitchIds = append(vSwitchIds, vSwitchId.(string))
}
nodeGroup["VSwitchIds"] = vSwitchIds
}
if v, ok := kv["with_public_ip"]; ok {
nodeGroup["WithPublicIp"] = v
}
if v, ok := kv["additional_security_group_ids"]; ok {
var additionalSecurityGroupIds []string
for _, additionalSecurityGroupId := range v.(*schema.Set).List() {
additionalSecurityGroupIds = append(additionalSecurityGroupIds, additionalSecurityGroupId.(string))
}
nodeGroup["AdditionalSecurityGroupIds"] = additionalSecurityGroupIds
}
if v, ok := kv["instance_types"]; ok {
var instanceTypes []string
for _, instanceType := range v.(*schema.Set).List() {
instanceTypes = append(instanceTypes, instanceType.(string))
}
nodeGroup["InstanceTypes"] = instanceTypes
}
if v, ok := kv["node_count"]; ok {
nodeGroup["NodeCount"] = v
}
if v, ok := kv["system_disk"]; ok {
systemDisks := v.(*schema.Set).List()
if len(systemDisks) == 1 {
systemDisk := map[string]interface{}{}
systemDiskMap := systemDisks[0].(map[string]interface{})
if value, exists := systemDiskMap["category"]; exists {
systemDisk["Category"] = value
}
if value, exists := systemDiskMap["size"]; exists {
systemDisk["Size"] = value
}
if value, exists := systemDiskMap["performance_level"]; exists && value.(string) != "" {
systemDisk["PerformanceLevel"] = value
}
if value, exists := systemDiskMap["count"]; exists {
systemDisk["Count"] = value
}
nodeGroup["SystemDisk"] = systemDisk
}
}
if v, ok := kv["data_disks"]; ok {
dataDiskList := v.(*schema.Set).List()
if len(dataDiskList) > 0 {
dataDisks := make([]map[string]interface{}, 0)
for _, dataDiskSource := range dataDiskList {
dataDisk := map[string]interface{}{}
dataDiskMap := dataDiskSource.(map[string]interface{})
if value, exists := dataDiskMap["category"]; exists {
dataDisk["Category"] = value
}
if value, exists := dataDiskMap["size"]; exists {
dataDisk["Size"] = value
}
if value, exists := dataDiskMap["performance_level"]; exists && value.(string) != "" {
dataDisk["PerformanceLevel"] = value
}
if value, exists := dataDiskMap["count"]; exists {
dataDisk["Count"] = value
}
dataDisks = append(dataDisks, dataDisk)
}
nodeGroup["DataDisks"] = dataDisks
}
}
if v, ok := kv["graceful_shutdown"]; ok {
nodeGroup["GracefulShutdown"] = v
}
if v, ok := kv["spot_instance_remedy"]; ok {
nodeGroup["SpotInstanceRemedy"] = v
}
if v, ok := kv["cost_optimized_config"]; ok {
costOptimizedConfigs := v.(*schema.Set).List()
if len(costOptimizedConfigs) == 1 {
costOptimizedConfig := map[string]interface{}{}
costOptimizedConfigMap := costOptimizedConfigs[0].(map[string]interface{})
if value, exists := costOptimizedConfigMap["on_demand_base_capacity"]; exists {
costOptimizedConfig["OnDemandBaseCapacity"] = value
}
if value, exists := costOptimizedConfigMap["on_demand_percentage_above_base_capacity"]; exists {
costOptimizedConfig["OnDemandPercentageAboveBaseCapacity"] = value
}
if value, exists := costOptimizedConfigMap["spot_instance_pools"]; exists {
costOptimizedConfig["SpotInstancePools"] = value
}
nodeGroup["CostOptimizedConfig"] = costOptimizedConfig
}
}
if v, ok := kv["auto_scaling_policy"]; ok {
scalingPolicies := v.([]interface{})
if len(scalingPolicies) == 1 {
nodeGroup["AutoScalingPolicy"] = adaptAutoScalingPolicyRequest(scalingPolicies[0].(map[string]interface{}))
}
}
if v, ok := kv["ack_config"]; ok {
ackConfig := v.([]interface{})
if len(ackConfig) == 1 {
nodeGroup["AckConfig"] = adaptAckConfigRequest(ackConfig[0].(map[string]interface{}))
nodeGroup["IaaSType"] = "K8S"
delete(nodeGroup, "InstanceTypes")
}
}
nodeGroups = append(nodeGroups, nodeGroup)
}
}
createClusterRequest["NodeGroups"], _ = convertListMapToJsonString(nodeGroups)
if scripts, ok := d.GetOk("bootstrap_scripts"); ok {
bootstrapScripts := make([]map[string]interface{}, 0)
for _, script := range scripts.([]interface{}) {
kv := script.(map[string]interface{})
bootstrapScript := map[string]interface{}{}
if v, ok := kv["script_name"]; ok {
bootstrapScript["ScriptName"] = v
}
if v, ok := kv["script_path"]; ok {
bootstrapScript["ScriptPath"] = v
}
if v, ok := kv["script_args"]; ok {
bootstrapScript["ScriptArgs"] = v
}
if v, ok := kv["priority"]; ok {
bootstrapScript["Priority"] = v
}
if v, ok := kv["execution_moment"]; ok {
bootstrapScript["ExecutionMoment"] = v
}
if v, ok := kv["execution_fail_strategy"]; ok {
bootstrapScript["ExecutionFailStrategy"] = v
}
if v, ok := kv["node_selector"]; ok {
nodeSelectors := v.(*schema.Set).List()
if len(nodeSelectors) == 1 {
nodeSelector := map[string]interface{}{}
nodeSelectorMap := nodeSelectors[0].(map[string]interface{})
if value, exists := nodeSelectorMap["node_select_type"]; exists {
nodeSelector["NodeSelectType"] = value
}
if value, exists := nodeSelectorMap["node_names"]; exists {
var nodeNames []string
for _, nodeName := range value.([]interface{}) {
nodeNames = append(nodeNames, nodeName.(string))
}
nodeSelector["NodeNames"] = nodeNames
}
if value, exists := nodeSelectorMap["node_group_id"]; exists {
nodeSelector["NodeGroupId"] = value
}
if value, exists := nodeSelectorMap["node_group_ids"]; exists {
var nodeGroupIds []string
for _, ngId := range value.([]interface{}) {
nodeGroupIds = append(nodeGroupIds, ngId.(string))
}
nodeSelector["NodeGroupIds"] = nodeGroupIds
}
if value, exists := nodeSelectorMap["node_group_types"]; exists {
var nodeGroupTypes []string
for _, nodeGroupType := range value.([]interface{}) {
nodeGroupTypes = append(nodeGroupTypes, nodeGroupType.(string))
}
nodeSelector["NodeGroupTypes"] = nodeGroupTypes
}
if value, exists := nodeSelectorMap["node_group_name"]; exists {
nodeSelector["NodeGroupName"] = value
}
if value, exists := nodeSelectorMap["node_group_names"]; exists {
var nodeGroupNames []string
for _, ngName := range value.([]interface{}) {
nodeGroupNames = append(nodeGroupNames, ngName.(string))
}
nodeSelector["NodeGroupNames"] = nodeGroupNames
}
bootstrapScript["NodeSelector"] = nodeSelector
}
}
bootstrapScripts = append(bootstrapScripts, bootstrapScript)
}
createClusterRequest["BootstrapScripts"], _ = convertListMapToJsonString(bootstrapScripts)
}
if v, ok := d.GetOk("tags"); ok {
tags := make([]map[string]interface{}, 0)
for key, value := range v.(map[string]interface{}) {
tags = append(tags, map[string]interface{}{
"Key": key,
"Value": value,
})
}
createClusterRequest["Tags"], _ = convertListMapToJsonString(tags)
}
wait := incrementalWait(3*time.Second, 5*time.Second)
err = resource.Retry(d.Timeout(schema.TimeoutCreate), func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, createClusterRequest, true)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, createClusterRequest)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, "alicloud_emrv2_cluster", action, AlibabaCloudSdkGoERROR)
}
d.SetId(fmt.Sprint(response["ClusterId"]))
emrService := EmrService{client}
stateConf := BuildStateConf([]string{"STARTING"}, []string{"RUNNING"}, d.Timeout(schema.TimeoutCreate),
90*time.Second, emrService.EmrV2ClusterStateRefreshFunc(d.Id(), []string{"START_FAILED", "TERMINATED_WITH_ERRORS", "TERMINATED"}))
stateConf.PollInterval = 10 * time.Second
if _, err := stateConf.WaitForState(); err != nil {
return WrapErrorf(err, IdMsg, d.Id())
}
return resourceAlicloudEmrV2ClusterRead(d, meta)
}