func resourceAlicloudEmrV2ClusterUpdate()

in alicloud/resource_alicloud_emrv2_cluster.go [2009:2887]


func resourceAlicloudEmrV2ClusterUpdate(d *schema.ResourceData, meta interface{}) error {
	client := meta.(*connectivity.AliyunClient)
	var err error
	var response map[string]interface{}
	d.Partial(true)
	emrService := EmrService{client}
	if err := emrService.SetEmrClusterTagsNew(d); err != nil {
		return WrapError(err)
	}

	if d.HasChange("cluster_name") || d.HasChange("log_collect_strategy") || d.HasChange("deletion_protection") {
		action := "UpdateClusterAttribute"
		request := map[string]interface{}{
			"ClusterId": d.Id(),
			"RegionId":  client.RegionId,
		}
		if d.HasChange("cluster_name") && d.Get("cluster_name").(string) != "" {
			request["ClusterName"] = d.Get("cluster_name")
		}
		if d.HasChange("log_collect_strategy") && d.Get("log_collect_strategy").(string) != "" {
			request["LogCollectStrategy"] = d.Get("log_collect_strategy")
		}
		if d.HasChange("deletion_protection") {
			request["DeletionProtection"] = d.Get("deletion_protection")
		}
		wait := incrementalWait(3*time.Second, 3*time.Second)
		err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
			response, err = client.RpcPost("Emr", "2021-03-20", action, nil, request, false)
			if err != nil {
				if NeedRetry(err) {
					wait()
					return resource.RetryableError(err)
				}
				return resource.NonRetryableError(err)
			}
			addDebug(action, response, request)
			return nil
		})
		if err != nil {
			return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
		}
		d.SetPartial("cluster_name")
	}

	if d.HasChange("payment_type") {
		_, newPaymentType := d.GetChange("payment_type")
		if "PayAsYouGo" == newPaymentType.(string) {
			return WrapError(Error("EMR cluster can only change paymentType from PayAsYouGo to Subscription."))
		}
		if !d.HasChange("node_groups") {
			return WrapError(Error("Subscription paymentType of emr cluster can not contains PayAsYouGo node group with 'MASTER' or 'CORE'."))
		}
	}

	if d.HasChange("node_groups") {
		oldNodeGroupsList, newNodeGroupsList := d.GetChange("node_groups")

		listNodeGroupsRequest := map[string]interface{}{
			"ClusterId":  d.Id(),
			"RegionId":   client.RegionId,
			"MaxResults": PageSizeLarge,
		}
		action := "ListNodeGroups"
		wait := incrementalWait(3*time.Second, 5*time.Second)
		err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
			response, err = client.RpcPost("Emr", "2021-03-20", action, nil, listNodeGroupsRequest, false)
			if err != nil {
				if NeedRetry(err) {
					wait()
					return resource.RetryableError(err)
				}
				return resource.NonRetryableError(err)
			}
			return nil
		})
		addDebug(action, response, listNodeGroupsRequest)
		if err != nil {
			return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
		}
		resp, err := jsonpath.Get("$.NodeGroups", response)

		if err != nil {
			return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.NodeGroups", response)
		}

		oldNodeGroupMap := map[string]map[string]interface{}{}
		for _, nodeGroupItem := range resp.([]interface{}) {
			oldNodeGroup := nodeGroupItem.(map[string]interface{})
			oldNodeGroupMap[oldNodeGroup["NodeGroupName"].(string)] = oldNodeGroup
		}

		originNodeGroupMap := map[string]map[string]interface{}{}
		for _, nodeGroupItem := range oldNodeGroupsList.([]interface{}) {
			originNodeGroup := nodeGroupItem.(map[string]interface{})
			originNodeGroupMap[originNodeGroup["node_group_name"].(string)] = originNodeGroup
		}

		newNodeGroupMap := map[string]map[string]interface{}{}
		for _, nodeGroupItem := range newNodeGroupsList.([]interface{}) {
			newNodeGroup := nodeGroupItem.(map[string]interface{})
			newNodeGroupMap[newNodeGroup["node_group_name"].(string)] = newNodeGroup
		}

		isUpdateClusterPaymentType := false
		if d.HasChange("payment_type") {
			action = "UpdateClusterPaymentType"
			updateClusterPaymentTypeRequest := map[string]interface{}{
				"ClusterId": d.Id(),
				"RegionId":  client.RegionId,
			}
			if sc, ok := d.GetOk("subscription_config"); ok {
				if autoPay, exists := sc.(*schema.Set).List()[0].(map[string]interface{})["auto_pay_order"]; exists {
					updateClusterPaymentTypeRequest["AutoPayOrder"] = autoPay.(bool)
				} else {
					updateClusterPaymentTypeRequest["AutoPayOrder"] = true
				}
				if autoRenew, exists := sc.(*schema.Set).List()[0].(map[string]interface{})["auto_renew"]; exists {
					updateClusterPaymentTypeRequest["AutoRenew"] = autoRenew.(bool)
				} else {
					updateClusterPaymentTypeRequest["AutoRenew"] = false
				}
			}
			var convertNodeGroups []map[string]interface{}
			for originNodeGroupName := range originNodeGroupMap {
				convertNodeGroup := map[string]interface{}{
					"NodeGroupId": oldNodeGroupMap[originNodeGroupName]["NodeGroupId"],
				}
				if newNodeGroup, ok := newNodeGroupMap[originNodeGroupName]; ok {
					if newNodeGroupValue, newNodeGroupExists := newNodeGroup["payment_type"]; newNodeGroupExists && "Subscription" == newNodeGroupValue {
						convertNodeGroup["PaymentType"] = newNodeGroupValue
					} else if "PayAsYouGo" == newNodeGroupValue && ("MASTER" == newNodeGroup["node_group_type"] || "CORE" == newNodeGroup["node_group_type"]) {
						return WrapError(Error("Subscription paymentType of emr cluster can not contains PayAsYouGo node group with 'MASTER' or 'CORE'."))
					} else {
						continue
					}
					if subscriptionConfig, exists := newNodeGroup["subscription_config"]; exists && len(subscriptionConfig.(*schema.Set).List()) > 0 {
						subscriptionConfigMap := subscriptionConfig.(*schema.Set).List()[0].(map[string]interface{})
						if subscriptionConfigValue, subscriptionConfigExists := subscriptionConfigMap["payment_duration"]; subscriptionConfigExists {
							convertNodeGroup["PaymentDuration"] = subscriptionConfigValue
						}
						if subscriptionConfigValue, subscriptionConfigExists := subscriptionConfigMap["payment_duration_unit"]; subscriptionConfigExists {
							convertNodeGroup["PaymentDurationUnit"] = subscriptionConfigValue
						}
					} else {
						return WrapError(Error("The '%s' nodeGroup: '%s' is needed parameter 'subscription_config' for changing paymentType.",
							newNodeGroup["node_group_type"], newNodeGroup["node_group_name"]))
					}
					convertNodeGroups = append(convertNodeGroups, convertNodeGroup)
				}
			}
			updateClusterPaymentTypeRequest["NodeGroups"] = convertNodeGroups
			wait = incrementalWait(3*time.Second, 5*time.Second)
			err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
				response, err = client.RpcPost("Emr", "2021-03-20", action, nil, updateClusterPaymentTypeRequest, false)
				if err != nil {
					if NeedRetry(err) {
						wait()
						return resource.RetryableError(err)
					}
					return resource.NonRetryableError(err)
				}
				return nil
			})
			addDebug(action, response, updateClusterPaymentTypeRequest)
			if err != nil {
				return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
			}
			// Wait for cluster payment type has been changed
			if err = resource.Retry(5*time.Minute, func() *resource.RetryError {
				if cluster, err := emrService.GetEmrV2Cluster(d.Id()); err != nil {
					return resource.NonRetryableError(err)
				} else if cluster["PaymentType"].(string) == "Subscription" {
					return nil
				}
				return resource.RetryableError(Error("Waiting for cluster %s payment type to be changed.", d.Id()))
			}); err != nil {
				return WrapError(err)
			}
			isUpdateClusterPaymentType = true
		}

		var increaseNodesGroups []map[string]interface{}
		var decreaseNodesGroups []map[string]interface{}

		for nodeGroupName, newNodeGroup := range newNodeGroupMap {
			if oldNodeGroup, ok := oldNodeGroupMap[nodeGroupName]; ok {
				if !reflect.DeepEqual(originNodeGroupMap[nodeGroupName]["auto_scaling_policy"], newNodeGroup["auto_scaling_policy"]) {
					removeScalingPolicy := false
					if len(newNodeGroup["auto_scaling_policy"].([]interface{})) > 0 {
						adaptedScalingPolicy := adaptAutoScalingPolicyRequest(newNodeGroup["auto_scaling_policy"].([]interface{})[0].(map[string]interface{}))
						rulesExists := false
						constraintsExists := false
						if aspValue, aspExists := adaptedScalingPolicy["scalingRules"]; aspExists {
							rulesExists = aspExists
							adaptedScalingPolicy["ScalingRules"] = aspValue
							delete(adaptedScalingPolicy, "scalingRules")
						}
						if aspValue, aspExists := adaptedScalingPolicy["constraints"]; aspExists {
							constraintsExists = aspExists
							adaptedScalingPolicy["Constraints"] = aspValue
							delete(adaptedScalingPolicy, "constraints")
						}
						if rulesExists || constraintsExists {
							adaptedScalingPolicy["RegionId"] = client.RegionId
							adaptedScalingPolicy["ClusterId"] = d.Id()
							adaptedScalingPolicy["NodeGroupId"] = oldNodeGroup["NodeGroupId"]

							action = "PutAutoScalingPolicy"
							err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
								response, err = client.RpcPost("Emr", "2021-03-20", action, nil, adaptedScalingPolicy, false)
								if err != nil {
									if NeedRetry(err) {
										wait()
										return resource.RetryableError(err)
									}
									return resource.NonRetryableError(err)
								}
								return nil
							})
							addDebug(action, response, adaptedScalingPolicy)
							if err != nil {
								return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
							}
						} else {
							removeScalingPolicy = true
						}
					} else {
						removeScalingPolicy = true
					}
					if removeScalingPolicy {
						removeScalingPolicyRequest := map[string]interface{}{
							"RegionId":    client.RegionId,
							"ClusterId":   d.Id(),
							"NodeGroupId": oldNodeGroup["NodeGroupId"],
						}
						action = "RemoveAutoScalingPolicy"
						err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
							response, err = client.RpcPost("Emr", "2021-03-20", action, nil, removeScalingPolicyRequest, false)
							if err != nil {
								if NeedRetry(err) {
									wait()
									return resource.RetryableError(err)
								}
								return resource.NonRetryableError(err)
							}
							return nil
						})
						addDebug(action, response, removeScalingPolicyRequest)
						if err != nil {
							return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
						}
					}
				}
				if !reflect.DeepEqual(originNodeGroupMap[nodeGroupName]["ack_config"], newNodeGroup["ack_config"]) && "K8S" == oldNodeGroup["IaaSType"] {
					ackConfigs := newNodeGroup["ack_config"].([]interface{})
					if len(ackConfigs) > 0 {
						updateNodeGroupAttributesRequest := map[string]interface{}{
							"RegionId":    client.RegionId,
							"ClusterId":   d.Id(),
							"NodeGroupId": oldNodeGroup["NodeGroupId"],
							"AckConfig":   adaptAckConfigRequest(ackConfigs[0].(map[string]interface{})),
						}
						action = "UpdateNodeGroupAttributes"
						err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
							response, err = client.RpcPost("Emr", "2021-03-20", action, nil, updateNodeGroupAttributesRequest, false)
							if err != nil {
								if NeedRetry(err) {
									wait()
									return resource.RetryableError(err)
								}
								return resource.NonRetryableError(err)
							}
							return nil
						})
						addDebug(action, response, updateNodeGroupAttributesRequest)
						if err != nil {
							return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
						}
					}
				}

				newNodeCount := formatInt(newNodeGroup["node_count"])
				oldNodeCount := formatInt(oldNodeGroup["RunningNodeCount"])

				// increase nodes
				if oldNodeCount < newNodeCount {
					if "MASTER" == newNodeGroup["node_group_type"].(string) {
						return WrapError(Error("EMR cluster can not increase the node group type of ['MASTER']."))
					}
					count := newNodeCount - oldNodeCount
					increaseNodesGroup := map[string]interface{}{}
					increaseNodesGroup["RegionId"] = client.RegionId
					increaseNodesGroup["ClusterId"] = d.Id()
					increaseNodesGroup["NodeGroupId"] = oldNodeGroup["NodeGroupId"]
					increaseNodesGroup["IncreaseNodeCount"] = count
					increaseNodesGroup["AutoRenew"] = false
					if "Subscription" == newNodeGroup["payment_type"].(string) {
						subscriptionConfig := newNodeGroup["subscription_config"].(*schema.Set).List()
						if len(subscriptionConfig) == 1 {
							configMap := subscriptionConfig[0].(map[string]interface{})
							increaseNodesGroup["PaymentDuration"] = configMap["payment_duration"]
							increaseNodesGroup["PaymentDurationUnit"] = configMap["payment_duration_unit"]
							if value, exists := configMap["auto_pay_order"]; exists {
								increaseNodesGroup["AutoPayOrder"] = value.(bool)
							} else {
								increaseNodesGroup["AutoPayOrder"] = true
							}
							if value, exists := configMap["auto_renew"]; exists {
								increaseNodesGroup["AutoRenew"] = value.(bool)
							}
						}
					}
					increaseNodesGroups = append(increaseNodesGroups, increaseNodesGroup)
				} else if oldNodeCount > newNodeCount { // decrease nodes
					// EMR cluster can only decrease 'TASK, GATEWAY' node group.
					nodeGroupType := newNodeGroup["node_group_type"].(string)
					if "TASK" != nodeGroupType && "GATEWAY" != nodeGroupType {
						return WrapError(Error("EMR cluster can only decrease the node group type of ['TASK', 'GATEWAY']."))
					}
					decreaseNodesGroup := map[string]interface{}{
						"ClusterId":         d.Id(),
						"RegionId":          client.RegionId,
						"DecreaseNodeCount": oldNodeCount - newNodeCount,
						"NodeGroupId":       oldNodeGroup["NodeGroupId"],
					}
					decreaseNodesGroups = append(decreaseNodesGroups, decreaseNodesGroup)
				}

				// increase node disk size, we can only support single disk type.
				currDataDisk := oldNodeGroup["DataDisks"].([]interface{})[0].(map[string]interface{})
				targetDataDisk := newNodeGroup["data_disks"].(*schema.Set).List()[0].(map[string]interface{})
				if formatInt(targetDataDisk["size"]) < formatInt(currDataDisk["Size"]) {
					return WrapError(Error("EMR cluster can only increase node disk, decrease node disk is not supported."))
				} else if formatInt(targetDataDisk["size"]) > formatInt(currDataDisk["Size"]) {
					if currDataDisk["Category"].(string) == "local_hdd_pro" {
						return WrapError(Error("EMR cluster can not support increase node disk with 'local_hdd_pro' disk type."))
					}
					action := "IncreaseNodesDiskSize"
					increaseNodeDiskSizeRequest := map[string]interface{}{
						"ClusterId":   d.Id(),
						"RegionId":    client.RegionId,
						"NodeGroupId": oldNodeGroup["NodeGroupId"],
						"DataDiskSizes": []map[string]interface{}{
							{
								"Category": currDataDisk["Category"].(string),
								"Size":     formatInt(targetDataDisk["size"]),
							},
						},
					}
					wait := incrementalWait(3*time.Second, 5*time.Second)
					err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
						response, err = client.RpcPost("Emr", "2021-03-20", action, nil, increaseNodeDiskSizeRequest, false)
						if err != nil {
							if NeedRetry(err) {
								wait()
								return resource.RetryableError(err)
							}
							return resource.NonRetryableError(err)
						}
						return nil
					})
					addDebug(action, response, increaseNodeDiskSizeRequest)
					if err != nil {
						return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
					}
				}
			} else { // 'Task' NodeGroupType may not exist when create emr_cluster
				subscriptionConfig := map[string]interface{}{}
				if "Subscription" == newNodeGroup["payment_type"] {
					subscriptionMap := newNodeGroup["subscription_config"].(*schema.Set).List()[0].(map[string]interface{})
					subscriptionConfig["PaymentDurationUnit"] = subscriptionMap["payment_duration_unit"]
					subscriptionConfig["PaymentDuration"] = subscriptionMap["payment_duration"]
					subscriptionConfig["AutoRenewDurationUnit"] = subscriptionMap["auto_renew_duration_unit"]
					subscriptionConfig["AutoRenewDuration"] = subscriptionMap["auto_renew_duration"]
					subscriptionConfig["AutoRenew"] = false
					if value, exists := subscriptionMap["auto_pay_order"]; exists {
						subscriptionConfig["AutoPayOrder"] = value.(bool)
					} else {
						subscriptionConfig["AutoPayOrder"] = true
					}
					if value, exists := subscriptionMap["auto_renew"]; exists {
						subscriptionConfig["AutoRenew"] = value.(bool)
					}
				}
				var spotBidPrices []map[string]interface{}
				for _, v := range newNodeGroup["spot_bid_prices"].(*schema.Set).List() {
					sbpMap := v.(map[string]interface{})
					spotBidPrices = append(spotBidPrices, map[string]interface{}{
						"InstanceType": sbpMap["instance_type"],
						"BidPrice":     sbpMap["bid_price"],
					})
				}
				systemDiskMap := newNodeGroup["system_disk"].(*schema.Set).List()[0].(map[string]interface{})
				var dataDisks []map[string]interface{}
				for _, v := range newNodeGroup["data_disks"].(*schema.Set).List() {
					dataDiskMap := v.(map[string]interface{})
					dataDisk := map[string]interface{}{
						"Category": dataDiskMap["category"],
						"Size":     dataDiskMap["size"],
						"Count":    dataDiskMap["count"],
					}
					if value, exists := dataDiskMap["performance_level"]; exists && value.(string) != "" {
						dataDisk["PerformanceLevel"] = value.(string)
					}
					dataDisks = append(dataDisks, dataDisk)
				}
				nodeGroupParam := map[string]interface{}{
					"NodeGroupType":      newNodeGroup["node_group_type"],
					"NodeGroupName":      nodeGroupName,
					"PaymentType":        newNodeGroup["payment_type"],
					"SubscriptionConfig": subscriptionConfig,
					"SpotBidPrices":      spotBidPrices,
					"WithPublicIp":       newNodeGroup["with_public_ip"],
					"NodeCount":          newNodeGroup["node_count"],
					"DataDisks":          dataDisks,
					"GracefulShutdown":   newNodeGroup["graceful_shutdown"],
					"SpotInstanceRemedy": newNodeGroup["spot_instance_remedy"],
				}
				if value, exists := newNodeGroup["auto_scaling_policy"]; exists && len(value.([]interface{})) > 0 {
					nodeGroupParam["AutoScalingPolicy"] = adaptAutoScalingPolicyRequest(value.([]interface{})[0].(map[string]interface{}))
				}
				if value, exists := newNodeGroup["deployment_set_strategy"]; exists && value.(string) != "" {
					nodeGroupParam["DeploymentSetStrategy"] = value.(string)
				}
				if value, exists := newNodeGroup["node_resize_strategy"]; exists && value.(string) != "" {
					nodeGroupParam["NodeResizeStrategy"] = value.(string)
				}
				if value, exists := newNodeGroup["spot_strategy"]; exists && value.(string) != "" {
					nodeGroupParam["SpotStrategy"] = value.(string)
				}
				vSwitchIDList := newNodeGroup["vswitch_ids"].(*schema.Set).List()
				if len(vSwitchIDList) > 0 {
					var vSwitchIDs []string
					for _, vSwitchID := range vSwitchIDList {
						vSwitchIDs = append(vSwitchIDs, vSwitchID.(string))
					}
					nodeGroupParam["VSwitchIds"] = vSwitchIDs
				}
				systemDisk := map[string]interface{}{
					"Category": systemDiskMap["category"],
					"Size":     systemDiskMap["size"],
					"Count":    systemDiskMap["count"],
				}
				if value, exists := systemDiskMap["performance_level"]; exists && value.(string) != "" {
					systemDisk["PerformanceLevel"] = value.(string)
				}
				nodeGroupParam["SystemDisk"] = systemDisk

				instanceTypeList := newNodeGroup["instance_types"].(*schema.Set).List()
				if len(instanceTypeList) > 0 {
					var instanceTypes []string
					for _, instanceType := range instanceTypeList {
						instanceTypes = append(instanceTypes, instanceType.(string))
					}
					nodeGroupParam["InstanceTypes"] = instanceTypes
				}
				if value, exists := newNodeGroup["ack_config"]; exists && len(value.([]interface{})) > 0 {
					nodeGroupParam["AckConfig"] = adaptAckConfigRequest(value.([]interface{})[0].(map[string]interface{}))
					nodeGroupParam["IaaSType"] = "K8S"
					delete(nodeGroupParam, "InstanceTypes")
				}

				addSecurityGroupIDList := newNodeGroup["additional_security_group_ids"].(*schema.Set).List()
				if len(addSecurityGroupIDList) > 0 {
					var addSecurityGroupIDs []string
					for _, addSecurityGroupID := range addSecurityGroupIDList {
						addSecurityGroupIDs = append(addSecurityGroupIDs, addSecurityGroupID.(string))
					}
					nodeGroupParam["AdditionalSecurityGroupIds"] = addSecurityGroupIDs
				}

				costOptimizedConfigList := newNodeGroup["cost_optimized_config"].(*schema.Set).List()
				if len(costOptimizedConfigList) > 0 {
					costOptimizedConfig := costOptimizedConfigList[0].(map[string]interface{})
					nodeGroupParam["CostOptimizedConfig"] = map[string]interface{}{
						"OnDemandBaseCapacity":                costOptimizedConfig["on_demand_base_capacity"],
						"OnDemandPercentageAboveBaseCapacity": costOptimizedConfig["on_demand_percentage_above_base_capacity"],
						"SpotInstancePools":                   costOptimizedConfig["spot_instance_pools"],
					}
				}
				createNodeGroupRequest := map[string]interface{}{
					"ClusterId": d.Id(),
					"RegionId":  client.RegionId,
					"NodeGroup": nodeGroupParam,
				}

				action = "CreateNodeGroup"
				err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
					response, err = client.RpcPost("Emr", "2021-03-20", action, nil, createNodeGroupRequest, false)
					if err != nil {
						if NeedRetry(err) {
							wait()
							return resource.RetryableError(err)
						}
						return resource.NonRetryableError(err)
					}
					return nil
				})
				addDebug(action, response, createNodeGroupRequest)
				if err != nil {
					return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
				}

				listNodeGroupsRequest := map[string]interface{}{
					"ClusterId":      d.Id(),
					"NodeGroupNames": []string{nodeGroupName},
					"RegionId":       client.RegionId,
				}
				action = "ListNodeGroups"
				err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
					response, err = client.RpcPost("Emr", "2021-03-20", action, nil, listNodeGroupsRequest, false)
					if err != nil {
						if NeedRetry(err) {
							wait()
							return resource.RetryableError(err)
						}
						return resource.NonRetryableError(err)
					}
					return nil
				})
				addDebug(action, response, listNodeGroupsRequest)
				if err != nil {
					return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
				}
				resp, err := jsonpath.Get("$.NodeGroups", response)
				if err != nil {
					return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.NodeGroups", response)
				}

				if len(resp.([]interface{})) == 0 {
					continue
				}

				nodeGroupId := resp.([]interface{})[0].(map[string]interface{})["NodeGroupId"].(string)

				newNodeCount := formatInt(newNodeGroup["node_count"])
				if newNodeCount > 0 {
					increaseNodesGroup := map[string]interface{}{}
					increaseNodesGroup["RegionId"] = client.RegionId
					increaseNodesGroup["ClusterId"] = d.Id()
					increaseNodesGroup["NodeGroupId"] = nodeGroupId
					increaseNodesGroup["IncreaseNodeCount"] = newNodeCount
					increaseNodesGroup["AutoPayOrder"] = true
					increaseNodesGroup["AutoRenew"] = false
					if "Subscription" == newNodeGroup["payment_type"].(string) {
						subscriptionConfig := newNodeGroup["subscription_config"].(*schema.Set).List()
						if len(subscriptionConfig) == 1 {
							configMap := subscriptionConfig[0].(map[string]interface{})
							increaseNodesGroup["PaymentDuration"] = configMap["payment_duration"]
							increaseNodesGroup["PaymentDurationUnit"] = configMap["payment_duration_unit"]
							if value, exists := configMap["auto_pay_order"]; exists {
								increaseNodesGroup["AutoPayOrder"] = value.(bool)
							}
							if value, exists := configMap["auto_renew"]; exists {
								increaseNodesGroup["AutoRenew"] = value.(bool)
							}
						}
					}
					increaseNodesGroups = append(increaseNodesGroups, increaseNodesGroup)
				}
			}
		}

		var deleteNodeGroups []map[string]interface{}
		for nodeGroupName, oldNodeGroup := range oldNodeGroupMap { // Delete empty nodeGroup
			if newNodeGroup, ok := newNodeGroupMap[nodeGroupName]; !ok {
				oldNodeCount := formatInt(oldNodeGroup["RunningNodeCount"])
				if oldNodeCount > 0 {
					return WrapError(Error("The [nodeGroup: %v, nodeGroupType: %v] can not delete cause exists running nodes", nodeGroupName, oldNodeGroup["NodeGroupType"].(string)))
				}
				deleteNodeGroups = append(deleteNodeGroups, map[string]interface{}{
					"ClusterId":   d.Id(),
					"NodeGroupId": oldNodeGroup["NodeGroupId"],
					"RegionId":    client.RegionId,
				})
			} else if newNodeGroup["payment_type"] == "Subscription" && oldNodeGroup["PaymentType"] == "PayAsYouGo" &&
				!isUpdateClusterPaymentType && d.Get("payment_type") == "Subscription" {
				action = "UpdateNodeGroupPaymentType"
				UpdateNodeGroupPaymentTypeRequest := map[string]interface{}{
					"ClusterId": d.Id(),
					"RegionId":  client.RegionId,
				}
				updateNodeGroupPaymentType := map[string]interface{}{
					"NodeGroupId": oldNodeGroup["NodeGroupId"],
				}
				if newNodeGroupValue, newNodeGroupExists := newNodeGroup["payment_type"]; newNodeGroupExists {
					updateNodeGroupPaymentType["PaymentType"] = newNodeGroupValue
				}
				if subscriptionConfig, exists := newNodeGroup["subscription_config"]; exists && len(subscriptionConfig.(*schema.Set).List()) > 0 {
					subscriptionConfigMap := subscriptionConfig.(*schema.Set).List()[0].(map[string]interface{})
					if subscriptionConfigValue, subscriptionConfigExists := subscriptionConfigMap["payment_duration"]; subscriptionConfigExists {
						updateNodeGroupPaymentType["PaymentDuration"] = subscriptionConfigValue
					}
					if subscriptionConfigValue, subscriptionConfigExists := subscriptionConfigMap["payment_duration_unit"]; subscriptionConfigExists {
						updateNodeGroupPaymentType["PaymentDurationUnit"] = subscriptionConfigValue
					}
					if subscriptionConfigValue, subscriptionConfigExists := subscriptionConfigMap["auto_pay_order"]; subscriptionConfigExists {
						UpdateNodeGroupPaymentTypeRequest["AutoPayOrder"] = subscriptionConfigValue
					} else {
						UpdateNodeGroupPaymentTypeRequest["AutoPayOrder"] = true
					}
					if subscriptionConfigValue, subscriptionConfigExists := subscriptionConfigMap["auto_renew"]; subscriptionConfigExists {
						UpdateNodeGroupPaymentTypeRequest["AutoRenew"] = subscriptionConfigValue
					} else {
						UpdateNodeGroupPaymentTypeRequest["AutoRenew"] = false
					}
				} else {
					return WrapError(Error("The '%s' nodeGroup: '%s' is needed parameter 'subscription_config' for changing paymentType.",
						newNodeGroup["node_group_type"], newNodeGroup["node_group_name"]))
				}
				UpdateNodeGroupPaymentTypeRequest["NodeGroup"] = updateNodeGroupPaymentType
				wait = incrementalWait(3*time.Second, 5*time.Second)
				err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
					response, err = client.RpcPost("Emr", "2021-03-20", action, nil, UpdateNodeGroupPaymentTypeRequest, false)
					if err != nil {
						if NeedRetry(err) {
							wait()
							return resource.RetryableError(err)
						}
						return resource.NonRetryableError(err)
					}
					return nil
				})
				addDebug(action, response, UpdateNodeGroupPaymentTypeRequest)
				if err != nil {
					return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
				}
				// Wait for node group payment type has been changed
				if err = resource.Retry(5*time.Minute, func() *resource.RetryError {
					nodeGroupId := updateNodeGroupPaymentType["NodeGroupId"].(string)
					if nodeGroups, err := emrService.ListEmrV2NodeGroups(d.Id(), []string{nodeGroupId}); err != nil {
						return resource.NonRetryableError(err)
					} else if len(nodeGroups) > 0 && "Subscription" == nodeGroups[0].(map[string]interface{})["PaymentType"].(string) {
						return nil
					}
					return resource.RetryableError(Error("Waiting for node group %s payment type to be changed.", nodeGroupId))
				}); err != nil {
					return WrapError(err)
				}
			} else if !isUpdateClusterPaymentType && d.Get("payment_type") == "Subscription" &&
				oldNodeGroup["PaymentType"] == "Subscription" && newNodeGroup["payment_type"] == "PayAsYouGo" {
				return WrapError(Error("EMR cluster can only change paymentType from PayAsYouGo to Subscription."))
			}
		}

		var wg sync.WaitGroup
		var cm sync.Map
		waitFlag := false
		for _, increaseNodesGroupRequest := range increaseNodesGroups {
			waitFlag = true
			action := "IncreaseNodes"
			wait := incrementalWait(3*time.Second, 5*time.Second)
			err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
				response, err = client.RpcPost("Emr", "2021-03-20", action, nil, increaseNodesGroupRequest, false)
				if err != nil {
					if NeedRetry(err) {
						wait()
						return resource.RetryableError(err)
					}
					return resource.NonRetryableError(err)
				}
				return nil
			})
			addDebug(action, response, increaseNodesGroupRequest)
			if err != nil {
				return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
			}
			resp, err := jsonpath.Get("$.OperationId", response)
			if err != nil {
				return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.OperationId", response)
			}
			wg.Add(1)
			go emrService.WaitForEmrV2Operation(d.Id(), increaseNodesGroupRequest["NodeGroupId"].(string), resp.(string), 2*Timeout5Minute, &wg, &cm)
		}

		for _, decreaseNodesGroupRequest := range decreaseNodesGroups {
			waitFlag = true
			action := "DecreaseNodes"
			wait := incrementalWait(3*time.Second, 5*time.Second)
			err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
				response, err = client.RpcPost("Emr", "2021-03-20", action, nil, decreaseNodesGroupRequest, false)
				if err != nil {
					if NeedRetry(err) {
						wait()
						return resource.RetryableError(err)
					}
					return resource.NonRetryableError(err)
				}
				return nil
			})
			addDebug(action, response, decreaseNodesGroupRequest)
			if err != nil {
				return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
			}
			resp, err := jsonpath.Get("$.OperationId", response)
			if err != nil {
				return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.OperationId", response)
			}
			wg.Add(1)
			go emrService.WaitForEmrV2Operation(d.Id(), decreaseNodesGroupRequest["NodeGroupId"].(string), resp.(string), 2*Timeout5Minute, &wg, &cm)
		}
		if waitFlag {
			wg.Wait()
			var failedNodeGroupId []string
			cm.Range(func(k, v interface{}) bool {
				if v != nil && v.(bool) == false {
					failedNodeGroupId = append(failedNodeGroupId, k.(string))
				}
				return true
			})
			if len(failedNodeGroupId) > 0 {
				return WrapError(Error("EMR cluster resize found error result with the failed nodeGroupIds: [%s].", strings.Join(failedNodeGroupId, ",")))
			}
		}

		for _, deleteNodeGroupRequest := range deleteNodeGroups {
			action := "DeleteNodeGroup"
			wait := incrementalWait(3*time.Second, 5*time.Second)
			err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
				response, err = client.RpcPost("Emr", "2021-03-20", action, nil, deleteNodeGroupRequest, false)
				if err != nil {
					if NeedRetry(err) {
						wait()
						return resource.RetryableError(err)
					}
					return resource.NonRetryableError(err)
				}
				return nil
			})
			addDebug(action, response, deleteNodeGroupRequest)
			if err != nil {
				return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
			}
		}

		d.SetPartial("node_groups")
	}

	if d.HasChange("bootstrap_scripts") {
		_, newBootstrapScripts := d.GetChange("bootstrap_scripts")
		listScriptsRequest := map[string]interface{}{
			"ClusterId":  d.Id(),
			"RegionId":   client.RegionId,
			"ScriptType": "BOOTSTRAP",
		}
		action := "ListScripts"
		wait := incrementalWait(3*time.Second, 5*time.Second)
		err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
			response, err = client.RpcPost("Emr", "2021-03-20", action, nil, listScriptsRequest, false)
			if err != nil {
				if NeedRetry(err) {
					wait()
					return resource.RetryableError(err)
				}
				return resource.NonRetryableError(err)
			}
			return nil
		})
		addDebug(action, response, listScriptsRequest)
		if err != nil {
			return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
		}
		resp, err := jsonpath.Get("$.Scripts", response)

		if err != nil {
			return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.Scripts", response)
		}

		if resp != nil && len(resp.([]interface{})) > 0 {
			deleteScriptRequest := map[string]interface{}{
				"ClusterId":  d.Id(),
				"RegionId":   client.RegionId,
				"ScriptType": "BOOTSTRAP",
			}
			action = "DeleteScript"
			for _, v := range resp.([]interface{}) {
				scriptMap := v.(map[string]interface{})
				deleteScriptRequest["ScriptId"] = scriptMap["ScriptId"]

				err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
					response, err = client.RpcPost("Emr", "2021-03-20", action, nil, deleteScriptRequest, false)
					if err != nil {
						if NeedRetry(err) {
							wait()
							return resource.RetryableError(err)
						}
						return resource.NonRetryableError(err)
					}
					return nil
				})
				addDebug(action, response, deleteScriptRequest)
				if err != nil {
					return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
				}
			}
		}

		if newBootstrapScripts != nil && len(newBootstrapScripts.([]interface{})) > 0 {
			var newScripts []map[string]interface{}
			createScriptRequest := map[string]interface{}{
				"ClusterId":  d.Id(),
				"RegionId":   client.RegionId,
				"ScriptType": "BOOTSTRAP",
			}
			for _, bs := range newBootstrapScripts.([]interface{}) {
				newScriptMap := bs.(map[string]interface{})
				newScript := map[string]interface{}{
					"ScriptName":            newScriptMap["script_name"],
					"ScriptPath":            newScriptMap["script_path"],
					"ScriptArgs":            newScriptMap["script_args"],
					"ExecutionMoment":       newScriptMap["execution_moment"],
					"ExecutionFailStrategy": newScriptMap["execution_fail_strategy"],
				}
				if value, exists := newScriptMap["priority"]; exists && value.(int) > 0 {
					newScript["Priority"] = value.(int)
				}
				if value, exists := newScriptMap["node_selector"]; exists && len(value.(*schema.Set).List()) == 1 {
					nodeSelectorMap := value.(*schema.Set).List()[0].(map[string]interface{})
					nodeSelector := map[string]interface{}{
						"NodeSelectType": nodeSelectorMap["node_select_type"],
						"NodeGroupId":    nodeSelectorMap["node_group_id"],
						"NodeGroupName":  nodeSelectorMap["node_group_name"],
					}
					if v, ok := nodeSelectorMap["node_names"]; ok && len(v.([]interface{})) > 0 {
						var nodeNames []string
						for _, nn := range v.([]interface{}) {
							nodeNames = append(nodeNames, nn.(string))
						}
						nodeSelector["NodeNames"] = nodeNames
					}
					if v, ok := nodeSelectorMap["node_group_ids"]; ok && len(v.([]interface{})) > 0 {
						var nodeGroupIds []string
						for _, ngId := range v.([]interface{}) {
							nodeGroupIds = append(nodeGroupIds, ngId.(string))
						}
						nodeSelector["NodeGroupIds"] = nodeGroupIds
					}
					if v, ok := nodeSelectorMap["node_group_types"]; ok && len(v.([]interface{})) > 0 {
						var nodeGroupTypes []string
						for _, ngType := range v.([]interface{}) {
							nodeGroupTypes = append(nodeGroupTypes, ngType.(string))
						}
						nodeSelector["NodeGroupTypes"] = nodeGroupTypes
					}
					if v, ok := nodeSelectorMap["node_group_names"]; ok && len(v.([]interface{})) > 0 {
						var nodeGroupNames []string
						for _, ngName := range v.([]interface{}) {
							nodeGroupNames = append(nodeGroupNames, ngName.(string))
						}
						nodeSelector["NodeGroupNames"] = nodeGroupNames
					}
					newScript["NodeSelector"] = nodeSelector
				}
				newScripts = append(newScripts, newScript)
			}
			createScriptRequest["Scripts"] = newScripts
			action = "CreateScript"
			err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
				response, err = client.RpcPost("Emr", "2021-03-20", action, nil, createScriptRequest, false)
				if err != nil {
					if NeedRetry(err) {
						wait()
						return resource.RetryableError(err)
					}
					return resource.NonRetryableError(err)
				}
				return nil
			})
			addDebug(action, response, createScriptRequest)
			if err != nil {
				return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
			}
		}
	}

	d.Partial(false)

	return nil
}