alicloud/service_alicloud_emr.go (690 lines of code) (raw):
package alicloud
import (
"fmt"
"sync"
"time"
"github.com/PaesslerAG/jsonpath"
"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/aliyun/alibaba-cloud-sdk-go/services/emr"
"github.com/aliyun/terraform-provider-alicloud/alicloud/connectivity"
)
type EmrService struct {
client *connectivity.AliyunClient
}
func (s *EmrService) DescribeEmrCluster(id string) (object map[string]interface{}, err error) {
var response map[string]interface{}
client := s.client
action := "DescribeClusterV2"
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"Id": id,
}
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2016-04-08", action, nil, request, true)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return object, WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
}
v, err := jsonpath.Get("$.ClusterInfo", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.ClusterInfo", response)
}
if v.(map[string]interface{})["Status"] == "RELEASED" {
return object, WrapErrorf(NotFoundErr("EmrCluster", id), NotFoundMsg, AlibabaCloudSdkGoERROR)
}
object = v.(map[string]interface{})
return object, nil
}
func (s *EmrService) GetEmrV2Cluster(id string) (object map[string]interface{}, err error) {
var response map[string]interface{}
client := s.client
action := "GetCluster"
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"ClusterId": id,
}
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, request, true)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return object, WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
}
v, err := jsonpath.Get("$.Cluster", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.Cluster", response)
}
if v.(map[string]interface{})["ClusterState"] == "TERMINATED" {
return object, WrapErrorf(NotFoundErr("EmrCluster", id), NotFoundWithResponse, response)
}
object = v.(map[string]interface{})
return object, nil
}
func (s *EmrService) GetEmrV2Operation(id string, operationID string) (object map[string]interface{}, err error) {
var response map[string]interface{}
client := s.client
action := "GetOperation"
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"ClusterId": id,
"OperationId": operationID,
}
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(3*time.Minute, func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, request, true)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return object, WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
}
v, err := jsonpath.Get("$.Operation", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.Operation", response)
}
object = v.(map[string]interface{})
return object, nil
}
func (s *EmrService) ListEmrV2NodeGroups(clusterId string, nodeGroupIds []string) (object []interface{}, err error) {
var response map[string]interface{}
client := s.client
action := "ListNodeGroups"
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"ClusterId": clusterId,
"NodeGroupStates": []string{"RUNNING"},
"MaxResults": PageSizeXLarge,
}
if nodeGroupIds != nil && len(nodeGroupIds) > 0 {
request["NodeGroupIds"] = nodeGroupIds
}
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2021-03-20", action, nil, request, true)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return object, WrapErrorf(err, DefaultErrorMsg, clusterId, action, AlibabaCloudSdkGoERROR)
}
v, err := jsonpath.Get("$.NodeGroups", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, clusterId, "$.NodeGroups", response)
}
if v != nil {
object = v.([]interface{})
}
return object, nil
}
func (s *EmrService) DataSourceDescribeEmrCluster(id string) (object map[string]interface{}, err error) {
var response map[string]interface{}
client := s.client
action := "DescribeClusterV2"
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"Id": id,
}
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2016-04-08", action, nil, request, true)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return object, WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
}
v, err := jsonpath.Get("$.ClusterInfo", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.ClusterInfo", response)
}
object = v.(map[string]interface{})
return object, nil
}
func (s *EmrService) WaitForEmrCluster(id string, status Status, timeout int) error {
deadline := time.Now().Add(time.Duration(timeout) * time.Second)
for {
object, err := s.DescribeEmrCluster(id)
if err != nil {
if NotFoundError(err) {
if status == Deleted {
return nil
}
} else {
return WrapError(err)
}
}
if object["Id"].(string) == id && status != Deleted {
break
}
time.Sleep(DefaultIntervalShort * time.Second)
if time.Now().After(deadline) {
return WrapErrorf(err, WaitTimeoutMsg, id, GetFunc(1), timeout, object["Id"].(string), id, ProviderERROR)
}
}
return nil
}
func (s *EmrService) WaitForEmrV2Cluster(id string, status Status, timeout int) error {
deadline := time.Now().Add(time.Duration(timeout) * time.Second)
for {
object, err := s.GetEmrV2Cluster(id)
if err != nil {
if NotFoundError(err) {
if status == Deleted {
return nil
}
} else {
return WrapError(err)
}
}
if object["ClusterId"].(string) == id && status != Deleted {
break
}
time.Sleep(DefaultIntervalShort * time.Second)
if time.Now().After(deadline) {
return WrapErrorf(err, WaitTimeoutMsg, id, GetFunc(1), timeout, object["ClusterId"].(string), id, ProviderERROR)
}
}
return nil
}
func (s *EmrService) WaitForEmrV2Operation(id string, nodeGroupId string, operationID string, timeout int, wg *sync.WaitGroup, cm *sync.Map) {
defer wg.Done()
deadline := time.Now().Add(time.Duration(timeout) * time.Second)
for {
object, _ := s.GetEmrV2Operation(id, operationID)
if object != nil {
if object["OperationState"].(string) == "COMPLETED" || object["OperationState"].(string) == "PARTIAL_COMPLETED" {
cm.Store(nodeGroupId, true)
return
}
if object["OperationState"].(string) == "FAILED" || object["OperationState"].(string) == "TERMINATED" {
cm.Store(nodeGroupId, false)
return
}
}
time.Sleep(DefaultIntervalMedium * time.Second)
if time.Now().After(deadline) {
cm.Store(nodeGroupId, false)
return
}
}
}
func (s *EmrService) EmrClusterStateRefreshFunc(id string, failStates []string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
object, err := s.DescribeEmrCluster(id)
if err != nil {
if NotFoundError(err) {
// Set this to nil as if we didn't find anything.
return nil, "", nil
}
return nil, "", WrapError(err)
}
for _, failState := range failStates {
if object["Status"].(string) == failState {
return object, object["Status"].(string), WrapError(Error(FailedToReachTargetStatus, object["Status"].(string)))
}
}
return object, object["Status"].(string), nil
}
}
func (s *EmrService) EmrV2ClusterStateRefreshFunc(id string, failStates []string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
object, err := s.GetEmrV2Cluster(id)
if err != nil {
if NotFoundError(err) {
// Set this to nil as if we didn't find anything.
return nil, "", nil
}
return nil, "", WrapError(err)
}
for _, failState := range failStates {
if object["ClusterState"].(string) == failState {
return object, object["ClusterState"].(string), WrapError(Error(FailedToReachTargetStatus, object["ClusterState"].(string)))
}
}
return object, object["ClusterState"].(string), nil
}
}
func (s *EmrService) setEmrClusterTags(d *schema.ResourceData) error {
if d.HasChange("tags") {
oraw, nraw := d.GetChange("tags")
o := oraw.(map[string]interface{})
n := nraw.(map[string]interface{})
create, remove := s.diffTags(s.tagsFromMap(o), s.tagsFromMap(n))
if len(remove) > 0 {
var tagKey []string
for _, v := range remove {
tagKey = append(tagKey, v.Key)
}
request := emr.CreateUntagResourcesRequest()
request.ResourceId = &[]string{d.Id()}
request.ResourceType = string(TagResourceCluster)
request.TagKey = &tagKey
request.RegionId = s.client.RegionId
raw, err := s.client.WithEmrClient(func(client *emr.Client) (interface{}, error) {
return client.UntagResources(request)
})
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), request.GetActionName(), AlibabaCloudSdkGoERROR)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
}
if len(create) > 0 {
request := emr.CreateTagResourcesRequest()
request.ResourceId = &[]string{d.Id()}
request.Tag = &create
request.ResourceType = string(TagResourceCluster)
request.RegionId = s.client.RegionId
raw, err := s.client.WithEmrClient(func(client *emr.Client) (interface{}, error) {
return client.TagResources(request)
})
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), request.GetActionName(), AlibabaCloudSdkGoERROR)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
}
d.SetPartial("tags")
}
return nil
}
func (s *EmrService) DescribeEmrClusterTags(resourceId string, resourceType TagResourceType) (tags []emr.TagResource, err error) {
request := emr.CreateListTagResourcesRequest()
request.RegionId = s.client.RegionId
request.ResourceType = string(resourceType)
request.ResourceId = &[]string{resourceId}
raw, err := s.client.WithEmrClient(func(client *emr.Client) (interface{}, error) {
return client.ListTagResources(request)
})
if err != nil {
err = WrapErrorf(err, DefaultErrorMsg, resourceId, request.GetActionName(), AlibabaCloudSdkGoERROR)
return
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
response, _ := raw.(*emr.ListTagResourcesResponse)
tags = response.TagResources.TagResource
return
}
func (s *EmrService) diffTags(oldTags, newTags []emr.TagResourcesTag) ([]emr.TagResourcesTag, []emr.TagResourcesTag) {
// First, we're creating everything we have
create := make(map[string]interface{})
for _, t := range newTags {
create[t.Key] = t.Value
}
// Build the list of what to remove
var remove []emr.TagResourcesTag
for _, t := range oldTags {
old, ok := create[t.Key]
if !ok || old != t.Value {
// Delete it!
remove = append(remove, t)
}
}
return s.tagsFromMap(create), remove
}
func (s *EmrService) tagsFromMap(m map[string]interface{}) []emr.TagResourcesTag {
result := make([]emr.TagResourcesTag, 0, len(m))
for k, v := range m {
result = append(result, emr.TagResourcesTag{
Key: k,
Value: v.(string),
})
}
return result
}
func (s *EmrService) tagsToMap(tags []emr.TagResource) map[string]string {
result := make(map[string]string)
for _, t := range tags {
result[t.TagKey] = t.TagValue
}
return result
}
func (s *EmrService) ListTagResources(id string, resourceType string) (object interface{}, err error) {
client := s.client
action := "ListTagResources"
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"ResourceType": resourceType,
"ResourceId.1": id,
}
tags := make([]interface{}, 0)
var response map[string]interface{}
for {
wait := incrementalWait(3*time.Second, 5*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err := client.RpcPost("Emr", "2016-04-08", action, nil, request, false)
if err != nil {
if IsExpectedErrors(err, []string{Throttling}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(action, response, request)
v, err := jsonpath.Get("$.TagResources.TagResource", response)
if err != nil {
return resource.NonRetryableError(WrapErrorf(err, FailedGetAttributeMsg, id, "$.TagResources.TagResource", response))
}
if v != nil {
tags = append(tags, v.([]interface{})...)
}
return nil
})
if err != nil {
err = WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
return
}
if response["NextToken"] == nil {
break
}
request["NextToken"] = response["NextToken"]
}
return tags, nil
}
func (s *EmrService) SetEmrClusterTagsNew(d *schema.ResourceData) error {
if d.HasChange("tags") {
client := s.client
_, nraw := d.GetChange("tags")
var createTags []map[string]interface{}
newTagMap := nraw.(map[string]interface{})
for newKey, newValue := range newTagMap {
createTags = append(createTags, map[string]interface{}{
"Key": newKey,
"Value": newValue,
})
}
tags, err := s.ListTagResourcesNew(d.Id(), string(TagResourceCluster))
if err != nil {
return WrapError(err)
}
var deleteTagKeys []string
for oldKey, oldValue := range tagsToMap(tags) {
newValue, ok := newTagMap[oldKey]
if !ok || oldValue != newValue {
deleteTagKeys = append(deleteTagKeys, oldKey)
}
}
if len(createTags) > 0 {
action := "TagResources"
tagResourcesRequest := map[string]interface{}{
"RegionId": s.client.RegionId,
"ResourceType": string(TagResourceCluster),
"ResourceIds": []string{d.Id()},
"Tags": createTags,
}
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
_, err = client.RpcPost("Emr", "2021-03-20", action, nil, tagResourcesRequest, false)
if err != nil {
return resource.NonRetryableError(err)
}
addDebug(action, d.Id(), tagResourcesRequest)
return nil
})
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
}
if len(deleteTagKeys) > 0 {
action := "UntagResources"
unTagResourcesRequest := map[string]interface{}{
"RegionId": s.client.RegionId,
"ResourceType": string(TagResourceCluster),
"ResourceIds": []string{d.Id()},
"Tags": deleteTagKeys,
}
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
_, err = client.RpcPost("Emr", "2021-03-20", action, nil, unTagResourcesRequest, false)
if err != nil {
return resource.NonRetryableError(err)
}
addDebug(action, d.Id(), unTagResourcesRequest)
return nil
})
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
}
d.SetPartial("tags")
}
return nil
}
func (s *EmrService) ListTagResourcesNew(id string, resourceType string) (object interface{}, err error) {
client := s.client
action := "ListTagResources"
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"ResourceType": resourceType,
"ResourceIds": []string{id},
}
tags := make([]interface{}, 0)
var response map[string]interface{}
for {
wait := incrementalWait(3*time.Second, 5*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err := client.RpcPost("Emr", "2021-03-20", action, nil, request, false)
if err != nil {
if IsExpectedErrors(err, []string{Throttling}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(action, response, request)
v, err := jsonpath.Get("$.TagResources", response)
if err != nil {
return resource.NonRetryableError(WrapErrorf(err, FailedGetAttributeMsg, id, "$.TagResources", response))
}
if v != nil {
tags = append(tags, v.([]interface{})...)
}
return nil
})
if err != nil {
err = WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
return
}
if response["NextToken"] == nil {
break
}
request["NextToken"] = response["NextToken"]
}
return tags, nil
}
func (s *EmrService) SetResourceTags(d *schema.ResourceData, resourceType string) error {
if d.HasChange("tags") {
added, removed := parsingTags(d)
client := s.client
removedTagKeys := make([]string, 0)
for _, v := range removed {
if !ignoredTags(v, "") {
removedTagKeys = append(removedTagKeys, v)
}
}
if len(removedTagKeys) > 0 {
action := "UntagResources"
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"ResourceType": resourceType,
"ResourceId.1": d.Id(),
}
for i, key := range removedTagKeys {
request[fmt.Sprintf("TagKey.%d", i+1)] = key
}
wait := incrementalWait(2*time.Second, 1*time.Second)
err := resource.Retry(10*time.Minute, func() *resource.RetryError {
response, err := client.RpcPost("Emr", "2016-04-08", action, nil, request, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(action, response, request)
return nil
})
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
}
if len(added) > 0 {
action := "TagResources"
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"ResourceType": resourceType,
"ResourceId.1": d.Id(),
}
count := 1
for key, value := range added {
request[fmt.Sprintf("Tag.%d.Key", count)] = key
request[fmt.Sprintf("Tag.%d.Value", count)] = value
count++
}
wait := incrementalWait(2*time.Second, 1*time.Second)
err := resource.Retry(10*time.Minute, func() *resource.RetryError {
response, err := client.RpcPost("Emr", "2016-04-08", action, nil, request, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(action, response, request)
return nil
})
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
}
d.SetPartial("tags")
}
return nil
}
func (s *EmrService) DescribeClusterBasicInfo(id string) (object map[string]interface{}, err error) {
var response map[string]interface{}
client := s.client
action := "DescribeClusterBasicInfo"
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"ClusterId": id,
}
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2016-04-08", action, nil, request, true)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return object, WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
}
v, err := jsonpath.Get("$.ClusterInfo", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.ClusterInfo", response)
}
object = v.(map[string]interface{})
return object, nil
}
func (s *EmrService) DescribeClusterV2(id string) (object map[string]interface{}, err error) {
var response map[string]interface{}
client := s.client
action := "DescribeClusterV2"
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"Id": id,
}
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2016-04-08", action, nil, request, true)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return object, WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
}
v, err := jsonpath.Get("$.ClusterInfo", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.ClusterInfo", response)
}
object = v.(map[string]interface{})
return object, nil
}
func (s *EmrService) DescribeEmrMainVersionClusterTypes(id string) (object []interface{}, err error) {
var response map[string]interface{}
client := s.client
action := "DescribeEmrMainVersion"
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"EmrVersion": id,
}
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err = client.RpcPost("Emr", "2016-04-08", action, nil, request, true)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return object, WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
}
v, err := jsonpath.Get("$.EmrMainVersion.ClusterTypeInfoList.ClusterTypeInfo", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.EmrMainVersion.ClusterTypeInfoList.ClusterTypeInfo", response)
}
object = v.([]interface{})
return object, nil
}