in alicloud/resource_alicloud_emrv2_cluster.go [2009:2887]
func resourceAlicloudEmrV2ClusterUpdate(d *schema.ResourceData, meta interface{}) error {
client := meta.(*connectivity.AliyunClient)
var err error
var response map[string]interface{}
d.Partial(true)
emrService := EmrService{client}
if err := emrService.SetEmrClusterTagsNew(d); err != nil {
return WrapError(err)
}
if d.HasChange("cluster_name") || d.HasChange("log_collect_strategy") || d.HasChange("deletion_protection") {
action := "UpdateClusterAttribute"
request := map[string]interface{}{
"ClusterId": d.Id(),
"RegionId": client.RegionId,
}
if d.HasChange("cluster_name") && d.Get("cluster_name").(string) != "" {
request["ClusterName"] = d.Get("cluster_name")
}
if d.HasChange("log_collect_strategy") && d.Get("log_collect_strategy").(string) != "" {
request["LogCollectStrategy"] = d.Get("log_collect_strategy")
}
if d.HasChange("deletion_protection") {
request["DeletionProtection"] = d.Get("deletion_protection")
}
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, request, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(action, response, request)
return nil
})
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
d.SetPartial("cluster_name")
}
if d.HasChange("payment_type") {
_, newPaymentType := d.GetChange("payment_type")
if "PayAsYouGo" == newPaymentType.(string) {
return WrapError(Error("EMR cluster can only change paymentType from PayAsYouGo to Subscription."))
}
if !d.HasChange("node_groups") {
return WrapError(Error("Subscription paymentType of emr cluster can not contains PayAsYouGo node group with 'MASTER' or 'CORE'."))
}
}
if d.HasChange("node_groups") {
oldNodeGroupsList, newNodeGroupsList := d.GetChange("node_groups")
listNodeGroupsRequest := map[string]interface{}{
"ClusterId": d.Id(),
"RegionId": client.RegionId,
"MaxResults": PageSizeLarge,
}
action := "ListNodeGroups"
wait := incrementalWait(3*time.Second, 5*time.Second)
err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, listNodeGroupsRequest, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, listNodeGroupsRequest)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
resp, err := jsonpath.Get("$.NodeGroups", response)
if err != nil {
return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.NodeGroups", response)
}
oldNodeGroupMap := map[string]map[string]interface{}{}
for _, nodeGroupItem := range resp.([]interface{}) {
oldNodeGroup := nodeGroupItem.(map[string]interface{})
oldNodeGroupMap[oldNodeGroup["NodeGroupName"].(string)] = oldNodeGroup
}
originNodeGroupMap := map[string]map[string]interface{}{}
for _, nodeGroupItem := range oldNodeGroupsList.([]interface{}) {
originNodeGroup := nodeGroupItem.(map[string]interface{})
originNodeGroupMap[originNodeGroup["node_group_name"].(string)] = originNodeGroup
}
newNodeGroupMap := map[string]map[string]interface{}{}
for _, nodeGroupItem := range newNodeGroupsList.([]interface{}) {
newNodeGroup := nodeGroupItem.(map[string]interface{})
newNodeGroupMap[newNodeGroup["node_group_name"].(string)] = newNodeGroup
}
isUpdateClusterPaymentType := false
if d.HasChange("payment_type") {
action = "UpdateClusterPaymentType"
updateClusterPaymentTypeRequest := map[string]interface{}{
"ClusterId": d.Id(),
"RegionId": client.RegionId,
}
if sc, ok := d.GetOk("subscription_config"); ok {
if autoPay, exists := sc.(*schema.Set).List()[0].(map[string]interface{})["auto_pay_order"]; exists {
updateClusterPaymentTypeRequest["AutoPayOrder"] = autoPay.(bool)
} else {
updateClusterPaymentTypeRequest["AutoPayOrder"] = true
}
if autoRenew, exists := sc.(*schema.Set).List()[0].(map[string]interface{})["auto_renew"]; exists {
updateClusterPaymentTypeRequest["AutoRenew"] = autoRenew.(bool)
} else {
updateClusterPaymentTypeRequest["AutoRenew"] = false
}
}
var convertNodeGroups []map[string]interface{}
for originNodeGroupName := range originNodeGroupMap {
convertNodeGroup := map[string]interface{}{
"NodeGroupId": oldNodeGroupMap[originNodeGroupName]["NodeGroupId"],
}
if newNodeGroup, ok := newNodeGroupMap[originNodeGroupName]; ok {
if newNodeGroupValue, newNodeGroupExists := newNodeGroup["payment_type"]; newNodeGroupExists && "Subscription" == newNodeGroupValue {
convertNodeGroup["PaymentType"] = newNodeGroupValue
} else if "PayAsYouGo" == newNodeGroupValue && ("MASTER" == newNodeGroup["node_group_type"] || "CORE" == newNodeGroup["node_group_type"]) {
return WrapError(Error("Subscription paymentType of emr cluster can not contains PayAsYouGo node group with 'MASTER' or 'CORE'."))
} else {
continue
}
if subscriptionConfig, exists := newNodeGroup["subscription_config"]; exists && len(subscriptionConfig.(*schema.Set).List()) > 0 {
subscriptionConfigMap := subscriptionConfig.(*schema.Set).List()[0].(map[string]interface{})
if subscriptionConfigValue, subscriptionConfigExists := subscriptionConfigMap["payment_duration"]; subscriptionConfigExists {
convertNodeGroup["PaymentDuration"] = subscriptionConfigValue
}
if subscriptionConfigValue, subscriptionConfigExists := subscriptionConfigMap["payment_duration_unit"]; subscriptionConfigExists {
convertNodeGroup["PaymentDurationUnit"] = subscriptionConfigValue
}
} else {
return WrapError(Error("The '%s' nodeGroup: '%s' is needed parameter 'subscription_config' for changing paymentType.",
newNodeGroup["node_group_type"], newNodeGroup["node_group_name"]))
}
convertNodeGroups = append(convertNodeGroups, convertNodeGroup)
}
}
updateClusterPaymentTypeRequest["NodeGroups"] = convertNodeGroups
wait = incrementalWait(3*time.Second, 5*time.Second)
err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, updateClusterPaymentTypeRequest, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, updateClusterPaymentTypeRequest)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
// Wait for cluster payment type has been changed
if err = resource.Retry(5*time.Minute, func() *resource.RetryError {
if cluster, err := emrService.GetEmrV2Cluster(d.Id()); err != nil {
return resource.NonRetryableError(err)
} else if cluster["PaymentType"].(string) == "Subscription" {
return nil
}
return resource.RetryableError(Error("Waiting for cluster %s payment type to be changed.", d.Id()))
}); err != nil {
return WrapError(err)
}
isUpdateClusterPaymentType = true
}
var increaseNodesGroups []map[string]interface{}
var decreaseNodesGroups []map[string]interface{}
for nodeGroupName, newNodeGroup := range newNodeGroupMap {
if oldNodeGroup, ok := oldNodeGroupMap[nodeGroupName]; ok {
if !reflect.DeepEqual(originNodeGroupMap[nodeGroupName]["auto_scaling_policy"], newNodeGroup["auto_scaling_policy"]) {
removeScalingPolicy := false
if len(newNodeGroup["auto_scaling_policy"].([]interface{})) > 0 {
adaptedScalingPolicy := adaptAutoScalingPolicyRequest(newNodeGroup["auto_scaling_policy"].([]interface{})[0].(map[string]interface{}))
rulesExists := false
constraintsExists := false
if aspValue, aspExists := adaptedScalingPolicy["scalingRules"]; aspExists {
rulesExists = aspExists
adaptedScalingPolicy["ScalingRules"] = aspValue
delete(adaptedScalingPolicy, "scalingRules")
}
if aspValue, aspExists := adaptedScalingPolicy["constraints"]; aspExists {
constraintsExists = aspExists
adaptedScalingPolicy["Constraints"] = aspValue
delete(adaptedScalingPolicy, "constraints")
}
if rulesExists || constraintsExists {
adaptedScalingPolicy["RegionId"] = client.RegionId
adaptedScalingPolicy["ClusterId"] = d.Id()
adaptedScalingPolicy["NodeGroupId"] = oldNodeGroup["NodeGroupId"]
action = "PutAutoScalingPolicy"
err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, adaptedScalingPolicy, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, adaptedScalingPolicy)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
} else {
removeScalingPolicy = true
}
} else {
removeScalingPolicy = true
}
if removeScalingPolicy {
removeScalingPolicyRequest := map[string]interface{}{
"RegionId": client.RegionId,
"ClusterId": d.Id(),
"NodeGroupId": oldNodeGroup["NodeGroupId"],
}
action = "RemoveAutoScalingPolicy"
err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, removeScalingPolicyRequest, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, removeScalingPolicyRequest)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
}
}
if !reflect.DeepEqual(originNodeGroupMap[nodeGroupName]["ack_config"], newNodeGroup["ack_config"]) && "K8S" == oldNodeGroup["IaaSType"] {
ackConfigs := newNodeGroup["ack_config"].([]interface{})
if len(ackConfigs) > 0 {
updateNodeGroupAttributesRequest := map[string]interface{}{
"RegionId": client.RegionId,
"ClusterId": d.Id(),
"NodeGroupId": oldNodeGroup["NodeGroupId"],
"AckConfig": adaptAckConfigRequest(ackConfigs[0].(map[string]interface{})),
}
action = "UpdateNodeGroupAttributes"
err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, updateNodeGroupAttributesRequest, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, updateNodeGroupAttributesRequest)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
}
}
newNodeCount := formatInt(newNodeGroup["node_count"])
oldNodeCount := formatInt(oldNodeGroup["RunningNodeCount"])
// increase nodes
if oldNodeCount < newNodeCount {
if "MASTER" == newNodeGroup["node_group_type"].(string) {
return WrapError(Error("EMR cluster can not increase the node group type of ['MASTER']."))
}
count := newNodeCount - oldNodeCount
increaseNodesGroup := map[string]interface{}{}
increaseNodesGroup["RegionId"] = client.RegionId
increaseNodesGroup["ClusterId"] = d.Id()
increaseNodesGroup["NodeGroupId"] = oldNodeGroup["NodeGroupId"]
increaseNodesGroup["IncreaseNodeCount"] = count
increaseNodesGroup["AutoRenew"] = false
if "Subscription" == newNodeGroup["payment_type"].(string) {
subscriptionConfig := newNodeGroup["subscription_config"].(*schema.Set).List()
if len(subscriptionConfig) == 1 {
configMap := subscriptionConfig[0].(map[string]interface{})
increaseNodesGroup["PaymentDuration"] = configMap["payment_duration"]
increaseNodesGroup["PaymentDurationUnit"] = configMap["payment_duration_unit"]
if value, exists := configMap["auto_pay_order"]; exists {
increaseNodesGroup["AutoPayOrder"] = value.(bool)
} else {
increaseNodesGroup["AutoPayOrder"] = true
}
if value, exists := configMap["auto_renew"]; exists {
increaseNodesGroup["AutoRenew"] = value.(bool)
}
}
}
increaseNodesGroups = append(increaseNodesGroups, increaseNodesGroup)
} else if oldNodeCount > newNodeCount { // decrease nodes
// EMR cluster can only decrease 'TASK, GATEWAY' node group.
nodeGroupType := newNodeGroup["node_group_type"].(string)
if "TASK" != nodeGroupType && "GATEWAY" != nodeGroupType {
return WrapError(Error("EMR cluster can only decrease the node group type of ['TASK', 'GATEWAY']."))
}
decreaseNodesGroup := map[string]interface{}{
"ClusterId": d.Id(),
"RegionId": client.RegionId,
"DecreaseNodeCount": oldNodeCount - newNodeCount,
"NodeGroupId": oldNodeGroup["NodeGroupId"],
}
decreaseNodesGroups = append(decreaseNodesGroups, decreaseNodesGroup)
}
// increase node disk size, we can only support single disk type.
currDataDisk := oldNodeGroup["DataDisks"].([]interface{})[0].(map[string]interface{})
targetDataDisk := newNodeGroup["data_disks"].(*schema.Set).List()[0].(map[string]interface{})
if formatInt(targetDataDisk["size"]) < formatInt(currDataDisk["Size"]) {
return WrapError(Error("EMR cluster can only increase node disk, decrease node disk is not supported."))
} else if formatInt(targetDataDisk["size"]) > formatInt(currDataDisk["Size"]) {
if currDataDisk["Category"].(string) == "local_hdd_pro" {
return WrapError(Error("EMR cluster can not support increase node disk with 'local_hdd_pro' disk type."))
}
action := "IncreaseNodesDiskSize"
increaseNodeDiskSizeRequest := map[string]interface{}{
"ClusterId": d.Id(),
"RegionId": client.RegionId,
"NodeGroupId": oldNodeGroup["NodeGroupId"],
"DataDiskSizes": []map[string]interface{}{
{
"Category": currDataDisk["Category"].(string),
"Size": formatInt(targetDataDisk["size"]),
},
},
}
wait := incrementalWait(3*time.Second, 5*time.Second)
err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, increaseNodeDiskSizeRequest, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, increaseNodeDiskSizeRequest)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
}
} else { // 'Task' NodeGroupType may not exist when create emr_cluster
subscriptionConfig := map[string]interface{}{}
if "Subscription" == newNodeGroup["payment_type"] {
subscriptionMap := newNodeGroup["subscription_config"].(*schema.Set).List()[0].(map[string]interface{})
subscriptionConfig["PaymentDurationUnit"] = subscriptionMap["payment_duration_unit"]
subscriptionConfig["PaymentDuration"] = subscriptionMap["payment_duration"]
subscriptionConfig["AutoRenewDurationUnit"] = subscriptionMap["auto_renew_duration_unit"]
subscriptionConfig["AutoRenewDuration"] = subscriptionMap["auto_renew_duration"]
subscriptionConfig["AutoRenew"] = false
if value, exists := subscriptionMap["auto_pay_order"]; exists {
subscriptionConfig["AutoPayOrder"] = value.(bool)
} else {
subscriptionConfig["AutoPayOrder"] = true
}
if value, exists := subscriptionMap["auto_renew"]; exists {
subscriptionConfig["AutoRenew"] = value.(bool)
}
}
var spotBidPrices []map[string]interface{}
for _, v := range newNodeGroup["spot_bid_prices"].(*schema.Set).List() {
sbpMap := v.(map[string]interface{})
spotBidPrices = append(spotBidPrices, map[string]interface{}{
"InstanceType": sbpMap["instance_type"],
"BidPrice": sbpMap["bid_price"],
})
}
systemDiskMap := newNodeGroup["system_disk"].(*schema.Set).List()[0].(map[string]interface{})
var dataDisks []map[string]interface{}
for _, v := range newNodeGroup["data_disks"].(*schema.Set).List() {
dataDiskMap := v.(map[string]interface{})
dataDisk := map[string]interface{}{
"Category": dataDiskMap["category"],
"Size": dataDiskMap["size"],
"Count": dataDiskMap["count"],
}
if value, exists := dataDiskMap["performance_level"]; exists && value.(string) != "" {
dataDisk["PerformanceLevel"] = value.(string)
}
dataDisks = append(dataDisks, dataDisk)
}
nodeGroupParam := map[string]interface{}{
"NodeGroupType": newNodeGroup["node_group_type"],
"NodeGroupName": nodeGroupName,
"PaymentType": newNodeGroup["payment_type"],
"SubscriptionConfig": subscriptionConfig,
"SpotBidPrices": spotBidPrices,
"WithPublicIp": newNodeGroup["with_public_ip"],
"NodeCount": newNodeGroup["node_count"],
"DataDisks": dataDisks,
"GracefulShutdown": newNodeGroup["graceful_shutdown"],
"SpotInstanceRemedy": newNodeGroup["spot_instance_remedy"],
}
if value, exists := newNodeGroup["auto_scaling_policy"]; exists && len(value.([]interface{})) > 0 {
nodeGroupParam["AutoScalingPolicy"] = adaptAutoScalingPolicyRequest(value.([]interface{})[0].(map[string]interface{}))
}
if value, exists := newNodeGroup["deployment_set_strategy"]; exists && value.(string) != "" {
nodeGroupParam["DeploymentSetStrategy"] = value.(string)
}
if value, exists := newNodeGroup["node_resize_strategy"]; exists && value.(string) != "" {
nodeGroupParam["NodeResizeStrategy"] = value.(string)
}
if value, exists := newNodeGroup["spot_strategy"]; exists && value.(string) != "" {
nodeGroupParam["SpotStrategy"] = value.(string)
}
vSwitchIDList := newNodeGroup["vswitch_ids"].(*schema.Set).List()
if len(vSwitchIDList) > 0 {
var vSwitchIDs []string
for _, vSwitchID := range vSwitchIDList {
vSwitchIDs = append(vSwitchIDs, vSwitchID.(string))
}
nodeGroupParam["VSwitchIds"] = vSwitchIDs
}
systemDisk := map[string]interface{}{
"Category": systemDiskMap["category"],
"Size": systemDiskMap["size"],
"Count": systemDiskMap["count"],
}
if value, exists := systemDiskMap["performance_level"]; exists && value.(string) != "" {
systemDisk["PerformanceLevel"] = value.(string)
}
nodeGroupParam["SystemDisk"] = systemDisk
instanceTypeList := newNodeGroup["instance_types"].(*schema.Set).List()
if len(instanceTypeList) > 0 {
var instanceTypes []string
for _, instanceType := range instanceTypeList {
instanceTypes = append(instanceTypes, instanceType.(string))
}
nodeGroupParam["InstanceTypes"] = instanceTypes
}
if value, exists := newNodeGroup["ack_config"]; exists && len(value.([]interface{})) > 0 {
nodeGroupParam["AckConfig"] = adaptAckConfigRequest(value.([]interface{})[0].(map[string]interface{}))
nodeGroupParam["IaaSType"] = "K8S"
delete(nodeGroupParam, "InstanceTypes")
}
addSecurityGroupIDList := newNodeGroup["additional_security_group_ids"].(*schema.Set).List()
if len(addSecurityGroupIDList) > 0 {
var addSecurityGroupIDs []string
for _, addSecurityGroupID := range addSecurityGroupIDList {
addSecurityGroupIDs = append(addSecurityGroupIDs, addSecurityGroupID.(string))
}
nodeGroupParam["AdditionalSecurityGroupIds"] = addSecurityGroupIDs
}
costOptimizedConfigList := newNodeGroup["cost_optimized_config"].(*schema.Set).List()
if len(costOptimizedConfigList) > 0 {
costOptimizedConfig := costOptimizedConfigList[0].(map[string]interface{})
nodeGroupParam["CostOptimizedConfig"] = map[string]interface{}{
"OnDemandBaseCapacity": costOptimizedConfig["on_demand_base_capacity"],
"OnDemandPercentageAboveBaseCapacity": costOptimizedConfig["on_demand_percentage_above_base_capacity"],
"SpotInstancePools": costOptimizedConfig["spot_instance_pools"],
}
}
createNodeGroupRequest := map[string]interface{}{
"ClusterId": d.Id(),
"RegionId": client.RegionId,
"NodeGroup": nodeGroupParam,
}
action = "CreateNodeGroup"
err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, createNodeGroupRequest, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, createNodeGroupRequest)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
listNodeGroupsRequest := map[string]interface{}{
"ClusterId": d.Id(),
"NodeGroupNames": []string{nodeGroupName},
"RegionId": client.RegionId,
}
action = "ListNodeGroups"
err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, listNodeGroupsRequest, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, listNodeGroupsRequest)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
resp, err := jsonpath.Get("$.NodeGroups", response)
if err != nil {
return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.NodeGroups", response)
}
if len(resp.([]interface{})) == 0 {
continue
}
nodeGroupId := resp.([]interface{})[0].(map[string]interface{})["NodeGroupId"].(string)
newNodeCount := formatInt(newNodeGroup["node_count"])
if newNodeCount > 0 {
increaseNodesGroup := map[string]interface{}{}
increaseNodesGroup["RegionId"] = client.RegionId
increaseNodesGroup["ClusterId"] = d.Id()
increaseNodesGroup["NodeGroupId"] = nodeGroupId
increaseNodesGroup["IncreaseNodeCount"] = newNodeCount
increaseNodesGroup["AutoPayOrder"] = true
increaseNodesGroup["AutoRenew"] = false
if "Subscription" == newNodeGroup["payment_type"].(string) {
subscriptionConfig := newNodeGroup["subscription_config"].(*schema.Set).List()
if len(subscriptionConfig) == 1 {
configMap := subscriptionConfig[0].(map[string]interface{})
increaseNodesGroup["PaymentDuration"] = configMap["payment_duration"]
increaseNodesGroup["PaymentDurationUnit"] = configMap["payment_duration_unit"]
if value, exists := configMap["auto_pay_order"]; exists {
increaseNodesGroup["AutoPayOrder"] = value.(bool)
}
if value, exists := configMap["auto_renew"]; exists {
increaseNodesGroup["AutoRenew"] = value.(bool)
}
}
}
increaseNodesGroups = append(increaseNodesGroups, increaseNodesGroup)
}
}
}
var deleteNodeGroups []map[string]interface{}
for nodeGroupName, oldNodeGroup := range oldNodeGroupMap { // Delete empty nodeGroup
if newNodeGroup, ok := newNodeGroupMap[nodeGroupName]; !ok {
oldNodeCount := formatInt(oldNodeGroup["RunningNodeCount"])
if oldNodeCount > 0 {
return WrapError(Error("The [nodeGroup: %v, nodeGroupType: %v] can not delete cause exists running nodes", nodeGroupName, oldNodeGroup["NodeGroupType"].(string)))
}
deleteNodeGroups = append(deleteNodeGroups, map[string]interface{}{
"ClusterId": d.Id(),
"NodeGroupId": oldNodeGroup["NodeGroupId"],
"RegionId": client.RegionId,
})
} else if newNodeGroup["payment_type"] == "Subscription" && oldNodeGroup["PaymentType"] == "PayAsYouGo" &&
!isUpdateClusterPaymentType && d.Get("payment_type") == "Subscription" {
action = "UpdateNodeGroupPaymentType"
UpdateNodeGroupPaymentTypeRequest := map[string]interface{}{
"ClusterId": d.Id(),
"RegionId": client.RegionId,
}
updateNodeGroupPaymentType := map[string]interface{}{
"NodeGroupId": oldNodeGroup["NodeGroupId"],
}
if newNodeGroupValue, newNodeGroupExists := newNodeGroup["payment_type"]; newNodeGroupExists {
updateNodeGroupPaymentType["PaymentType"] = newNodeGroupValue
}
if subscriptionConfig, exists := newNodeGroup["subscription_config"]; exists && len(subscriptionConfig.(*schema.Set).List()) > 0 {
subscriptionConfigMap := subscriptionConfig.(*schema.Set).List()[0].(map[string]interface{})
if subscriptionConfigValue, subscriptionConfigExists := subscriptionConfigMap["payment_duration"]; subscriptionConfigExists {
updateNodeGroupPaymentType["PaymentDuration"] = subscriptionConfigValue
}
if subscriptionConfigValue, subscriptionConfigExists := subscriptionConfigMap["payment_duration_unit"]; subscriptionConfigExists {
updateNodeGroupPaymentType["PaymentDurationUnit"] = subscriptionConfigValue
}
if subscriptionConfigValue, subscriptionConfigExists := subscriptionConfigMap["auto_pay_order"]; subscriptionConfigExists {
UpdateNodeGroupPaymentTypeRequest["AutoPayOrder"] = subscriptionConfigValue
} else {
UpdateNodeGroupPaymentTypeRequest["AutoPayOrder"] = true
}
if subscriptionConfigValue, subscriptionConfigExists := subscriptionConfigMap["auto_renew"]; subscriptionConfigExists {
UpdateNodeGroupPaymentTypeRequest["AutoRenew"] = subscriptionConfigValue
} else {
UpdateNodeGroupPaymentTypeRequest["AutoRenew"] = false
}
} else {
return WrapError(Error("The '%s' nodeGroup: '%s' is needed parameter 'subscription_config' for changing paymentType.",
newNodeGroup["node_group_type"], newNodeGroup["node_group_name"]))
}
UpdateNodeGroupPaymentTypeRequest["NodeGroup"] = updateNodeGroupPaymentType
wait = incrementalWait(3*time.Second, 5*time.Second)
err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, UpdateNodeGroupPaymentTypeRequest, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, UpdateNodeGroupPaymentTypeRequest)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
// Wait for node group payment type has been changed
if err = resource.Retry(5*time.Minute, func() *resource.RetryError {
nodeGroupId := updateNodeGroupPaymentType["NodeGroupId"].(string)
if nodeGroups, err := emrService.ListEmrV2NodeGroups(d.Id(), []string{nodeGroupId}); err != nil {
return resource.NonRetryableError(err)
} else if len(nodeGroups) > 0 && "Subscription" == nodeGroups[0].(map[string]interface{})["PaymentType"].(string) {
return nil
}
return resource.RetryableError(Error("Waiting for node group %s payment type to be changed.", nodeGroupId))
}); err != nil {
return WrapError(err)
}
} else if !isUpdateClusterPaymentType && d.Get("payment_type") == "Subscription" &&
oldNodeGroup["PaymentType"] == "Subscription" && newNodeGroup["payment_type"] == "PayAsYouGo" {
return WrapError(Error("EMR cluster can only change paymentType from PayAsYouGo to Subscription."))
}
}
var wg sync.WaitGroup
var cm sync.Map
waitFlag := false
for _, increaseNodesGroupRequest := range increaseNodesGroups {
waitFlag = true
action := "IncreaseNodes"
wait := incrementalWait(3*time.Second, 5*time.Second)
err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, increaseNodesGroupRequest, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, increaseNodesGroupRequest)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
resp, err := jsonpath.Get("$.OperationId", response)
if err != nil {
return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.OperationId", response)
}
wg.Add(1)
go emrService.WaitForEmrV2Operation(d.Id(), increaseNodesGroupRequest["NodeGroupId"].(string), resp.(string), 2*Timeout5Minute, &wg, &cm)
}
for _, decreaseNodesGroupRequest := range decreaseNodesGroups {
waitFlag = true
action := "DecreaseNodes"
wait := incrementalWait(3*time.Second, 5*time.Second)
err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, decreaseNodesGroupRequest, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, decreaseNodesGroupRequest)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
resp, err := jsonpath.Get("$.OperationId", response)
if err != nil {
return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.OperationId", response)
}
wg.Add(1)
go emrService.WaitForEmrV2Operation(d.Id(), decreaseNodesGroupRequest["NodeGroupId"].(string), resp.(string), 2*Timeout5Minute, &wg, &cm)
}
if waitFlag {
wg.Wait()
var failedNodeGroupId []string
cm.Range(func(k, v interface{}) bool {
if v != nil && v.(bool) == false {
failedNodeGroupId = append(failedNodeGroupId, k.(string))
}
return true
})
if len(failedNodeGroupId) > 0 {
return WrapError(Error("EMR cluster resize found error result with the failed nodeGroupIds: [%s].", strings.Join(failedNodeGroupId, ",")))
}
}
for _, deleteNodeGroupRequest := range deleteNodeGroups {
action := "DeleteNodeGroup"
wait := incrementalWait(3*time.Second, 5*time.Second)
err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, deleteNodeGroupRequest, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, deleteNodeGroupRequest)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
}
d.SetPartial("node_groups")
}
if d.HasChange("bootstrap_scripts") {
_, newBootstrapScripts := d.GetChange("bootstrap_scripts")
listScriptsRequest := map[string]interface{}{
"ClusterId": d.Id(),
"RegionId": client.RegionId,
"ScriptType": "BOOTSTRAP",
}
action := "ListScripts"
wait := incrementalWait(3*time.Second, 5*time.Second)
err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, listScriptsRequest, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, listScriptsRequest)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
resp, err := jsonpath.Get("$.Scripts", response)
if err != nil {
return WrapErrorf(err, FailedGetAttributeMsg, d.Id(), "$.Scripts", response)
}
if resp != nil && len(resp.([]interface{})) > 0 {
deleteScriptRequest := map[string]interface{}{
"ClusterId": d.Id(),
"RegionId": client.RegionId,
"ScriptType": "BOOTSTRAP",
}
action = "DeleteScript"
for _, v := range resp.([]interface{}) {
scriptMap := v.(map[string]interface{})
deleteScriptRequest["ScriptId"] = scriptMap["ScriptId"]
err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, deleteScriptRequest, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, deleteScriptRequest)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
}
}
if newBootstrapScripts != nil && len(newBootstrapScripts.([]interface{})) > 0 {
var newScripts []map[string]interface{}
createScriptRequest := map[string]interface{}{
"ClusterId": d.Id(),
"RegionId": client.RegionId,
"ScriptType": "BOOTSTRAP",
}
for _, bs := range newBootstrapScripts.([]interface{}) {
newScriptMap := bs.(map[string]interface{})
newScript := map[string]interface{}{
"ScriptName": newScriptMap["script_name"],
"ScriptPath": newScriptMap["script_path"],
"ScriptArgs": newScriptMap["script_args"],
"ExecutionMoment": newScriptMap["execution_moment"],
"ExecutionFailStrategy": newScriptMap["execution_fail_strategy"],
}
if value, exists := newScriptMap["priority"]; exists && value.(int) > 0 {
newScript["Priority"] = value.(int)
}
if value, exists := newScriptMap["node_selector"]; exists && len(value.(*schema.Set).List()) == 1 {
nodeSelectorMap := value.(*schema.Set).List()[0].(map[string]interface{})
nodeSelector := map[string]interface{}{
"NodeSelectType": nodeSelectorMap["node_select_type"],
"NodeGroupId": nodeSelectorMap["node_group_id"],
"NodeGroupName": nodeSelectorMap["node_group_name"],
}
if v, ok := nodeSelectorMap["node_names"]; ok && len(v.([]interface{})) > 0 {
var nodeNames []string
for _, nn := range v.([]interface{}) {
nodeNames = append(nodeNames, nn.(string))
}
nodeSelector["NodeNames"] = nodeNames
}
if v, ok := nodeSelectorMap["node_group_ids"]; ok && len(v.([]interface{})) > 0 {
var nodeGroupIds []string
for _, ngId := range v.([]interface{}) {
nodeGroupIds = append(nodeGroupIds, ngId.(string))
}
nodeSelector["NodeGroupIds"] = nodeGroupIds
}
if v, ok := nodeSelectorMap["node_group_types"]; ok && len(v.([]interface{})) > 0 {
var nodeGroupTypes []string
for _, ngType := range v.([]interface{}) {
nodeGroupTypes = append(nodeGroupTypes, ngType.(string))
}
nodeSelector["NodeGroupTypes"] = nodeGroupTypes
}
if v, ok := nodeSelectorMap["node_group_names"]; ok && len(v.([]interface{})) > 0 {
var nodeGroupNames []string
for _, ngName := range v.([]interface{}) {
nodeGroupNames = append(nodeGroupNames, ngName.(string))
}
nodeSelector["NodeGroupNames"] = nodeGroupNames
}
newScript["NodeSelector"] = nodeSelector
}
newScripts = append(newScripts, newScript)
}
createScriptRequest["Scripts"] = newScripts
action = "CreateScript"
err = resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, createScriptRequest, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, createScriptRequest)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
}
}
d.Partial(false)
return nil
}