alibabacloudstack/service_apsarastack_alikafka.go (678 lines of code) (raw):
package alibabacloudstack
import (
"encoding/json"
"log"
"regexp"
"time"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/responses"
"github.com/aliyun/alibaba-cloud-sdk-go/services/alikafka"
"github.com/aliyun/terraform-provider-alibabacloudstack/alibabacloudstack/connectivity"
"github.com/aliyun/terraform-provider-alibabacloudstack/alibabacloudstack/errmsgs"
)
type TopicListResponse struct {
EagleEyeTraceId string `json:"eagleEyeTraceId"`
AsapiSuccess bool `json:"asapiSuccess"`
RequestId string `json:"RequestId"`
Message string `json:"Message"`
PageSize int `json:"PageSize"`
Code int `json:"Code"`
Success bool `json:"Success"`
ResponseVersion string `json:"responseVersion"`
CurrentPage int `json:"CurrentPage"`
Total int `json:"Total"`
TopicList []AliKafkaTopic `json:"TopicList"`
}
type AliKafkaTopic struct {
InstanceDo interface{} `json:"instanceDo"`
RoleList []string `json:"roleList"`
Tags []string `json:"tags"`
LocalTopic bool `json:"localTopic"`
InstanceId string `json:"instanceId"`
RelationName string `json:"relationName"`
HaveAlarm bool `json:"haveAlarm"`
StatusName string `json:"statusName"`
AlarmList []string `json:"alarmList"`
Topic string `json:"topic"`
ChannelName string `json:"channelName"`
AuthType int `json:"authType"`
Status int `json:"status"`
}
type AlikafkaService struct {
client *connectivity.AlibabacloudStackClient
}
func (alikafkaService *AlikafkaService) DescribeAlikafkaInstance(instanceId string) (*alikafka.InstanceVO, error) {
alikafkaInstance := &alikafka.InstanceVO{}
instanceListReq := alikafka.CreateGetInstanceListRequest()
alikafkaService.client.InitRpcRequest(*instanceListReq.RpcRequest)
instanceListReq.QueryParams["Product"] = "alikafka"
wait := incrementalWait(2*time.Second, 1*time.Second)
var raw interface{}
var err error
err = resource.Retry(10*time.Minute, func() *resource.RetryError {
raw, err = alikafkaService.client.WithAlikafkaClient(func(client *alikafka.Client) (interface{}, error) {
return client.GetInstanceList(instanceListReq)
})
if err != nil {
if errmsgs.IsExpectedErrors(err, []string{errmsgs.ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(instanceListReq.GetActionName(), raw, instanceListReq.RpcRequest, instanceListReq)
return nil
})
instanceListResp, ok := raw.(*alikafka.GetInstanceListResponse)
if err != nil {
errmsg := ""
if ok {
errmsg = errmsgs.GetBaseResponseErrorMessage(instanceListResp.BaseResponse)
}
return alikafkaInstance, errmsgs.WrapErrorf(err, errmsgs.RequestV1ErrorMsg, instanceId, instanceListReq.GetActionName(), errmsgs.AlibabacloudStackSdkGoERROR, errmsg)
}
addDebug(instanceListReq.GetActionName(), raw, instanceListReq.RpcRequest, instanceListReq)
for _, v := range instanceListResp.InstanceList.InstanceVO {
if v.InstanceId == instanceId && v.ServiceStatus != 10 {
return &v, nil
}
}
return alikafkaInstance, errmsgs.WrapErrorf(errmsgs.Error(errmsgs.GetNotFoundMessage("AlikafkaInstance", instanceId)), errmsgs.NotFoundMsg, errmsgs.ProviderERROR)
}
func (alikafkaService *AlikafkaService) DescribeAlikafkaInstanceByOrderId(orderId string, timeout int) (*alikafka.InstanceVO, error) {
alikafkaInstance := &alikafka.InstanceVO{}
instanceListReq := alikafka.CreateGetInstanceListRequest()
alikafkaService.client.InitRpcRequest(*instanceListReq.RpcRequest)
instanceListReq.OrderId = orderId
instanceListReq.QueryParams["Product"] = "alikafka"
deadline := time.Now().Add(time.Duration(timeout) * time.Second)
for {
wait := incrementalWait(2*time.Second, 1*time.Second)
var raw interface{}
var err error
err = resource.Retry(10*time.Minute, func() *resource.RetryError {
raw, err = alikafkaService.client.WithAlikafkaClient(func(client *alikafka.Client) (interface{}, error) {
return client.GetInstanceList(instanceListReq)
})
if err != nil {
if errmsgs.IsExpectedErrors(err, []string{errmsgs.ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(instanceListReq.GetActionName(), raw, instanceListReq.RpcRequest, instanceListReq)
return nil
})
instanceListResp, ok := raw.(*alikafka.GetInstanceListResponse)
if err != nil {
errmsg := ""
if ok {
errmsg = errmsgs.GetBaseResponseErrorMessage(instanceListResp.BaseResponse)
}
return alikafkaInstance, errmsgs.WrapErrorf(err, errmsgs.RequestV1ErrorMsg, orderId, instanceListReq.GetActionName(), errmsgs.AlibabacloudStackSdkGoERROR, errmsg)
}
addDebug(instanceListReq.GetActionName(), raw, instanceListReq.RpcRequest, instanceListReq)
for _, v := range instanceListResp.InstanceList.InstanceVO {
return &v, nil
}
if time.Now().After(deadline) {
return alikafkaInstance, errmsgs.WrapErrorf(errmsgs.Error(errmsgs.GetNotFoundMessage("AlikafkaInstance", orderId)), errmsgs.NotFoundMsg, errmsgs.ProviderERROR)
}
time.Sleep(DefaultIntervalShort * time.Second)
}
}
func (alikafkaService *AlikafkaService) DescribeAlikafkaConsumerGroup(id string) (*alikafka.ConsumerVO, error) {
alikafkaConsumerGroup := &alikafka.ConsumerVO{}
parts, err := ParseResourceId(id, 2)
if err != nil {
return alikafkaConsumerGroup, errmsgs.WrapError(err)
}
instanceId := parts[0]
request := alikafka.CreateGetConsumerListRequest()
alikafkaService.client.InitRpcRequest(*request.RpcRequest)
request.InstanceId = instanceId
request.QueryParams["Product"] = "alikafka"
wait := incrementalWait(2*time.Second, 1*time.Second)
var raw interface{}
err = resource.Retry(10*time.Minute, func() *resource.RetryError {
raw, err = alikafkaService.client.WithAlikafkaClient(func(client *alikafka.Client) (interface{}, error) {
return client.GetConsumerList(request)
})
if err != nil {
if errmsgs.IsExpectedErrors(err, []string{errmsgs.ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
return nil
})
consumerListResp, ok := raw.(*alikafka.GetConsumerListResponse)
if err != nil {
errmsg := ""
if ok {
errmsg = errmsgs.GetBaseResponseErrorMessage(consumerListResp.BaseResponse)
}
return alikafkaConsumerGroup, errmsgs.WrapErrorf(err, errmsgs.RequestV1ErrorMsg, id, request.GetActionName(), errmsgs.AlibabacloudStackSdkGoERROR, errmsg)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
//for _, v := range consumerListResp.ConsumerList {
// if v.ConsumerId == consumerId {
// return &v, nil
// }
//}
return alikafkaConsumerGroup, errmsgs.WrapErrorf(errmsgs.Error(errmsgs.GetNotFoundMessage("AlikafkaConsumerGroup", id)), errmsgs.NotFoundMsg, errmsgs.ProviderERROR)
}
func (alikafkaService *AlikafkaService) DescribeAlikafkaTopicStatus(id string) (*alikafka.TopicStatus, error) {
alikafkaTopicStatus := &alikafka.TopicStatus{}
parts, err := ParseResourceId(id, 2)
if err != nil {
return alikafkaTopicStatus, errmsgs.WrapError(err)
}
instanceId := parts[0]
topic := parts[1]
request := alikafka.CreateGetTopicStatusRequest()
alikafkaService.client.InitRpcRequest(*request.RpcRequest)
request.InstanceId = instanceId
request.Topic = topic
request.QueryParams["Product"] = "alikafka"
wait := incrementalWait(3*time.Second, 5*time.Second)
var raw interface{}
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
raw, err = alikafkaService.client.WithAlikafkaClient(func(alikafkaClient *alikafka.Client) (interface{}, error) {
return alikafkaClient.GetTopicStatus(request)
})
if err != nil {
if errmsgs.IsExpectedErrors(err, []string{errmsgs.ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
return nil
})
topicStatusResp, ok := raw.(*alikafka.GetTopicStatusResponse)
if err != nil {
errmsg := ""
if ok {
errmsg = errmsgs.GetBaseResponseErrorMessage(topicStatusResp.BaseResponse)
}
return alikafkaTopicStatus, errmsgs.WrapErrorf(err, errmsgs.RequestV1ErrorMsg, id, request.GetActionName(), errmsgs.AlibabacloudStackSdkGoERROR, errmsg)
}
if topicStatusResp.TopicStatus.OffsetTable.OffsetTableItem != nil {
return &topicStatusResp.TopicStatus, nil
}
return alikafkaTopicStatus, errmsgs.WrapErrorf(errmsgs.Error(errmsgs.GetNotFoundMessage("AlikafkaTopicStatus "+errmsgs.ResourceNotfound, id)), errmsgs.ResourceNotfound)
}
func (alikafkaService *AlikafkaService) DoAlikafkaGettopiclistRequest(id string) (*AliKafkaTopic, error) {
return alikafkaService.DescribeAlikafkaTopic(id)
}
func (alikafkaService *AlikafkaService) DescribeAlikafkaTopic(id string) (*AliKafkaTopic, error) {
alikafkaTopic := &AliKafkaTopic{}
parts, err := ParseResourceId(id, 2)
if err != nil {
return alikafkaTopic, errmsgs.WrapError(err)
}
instanceId := parts[0]
topic := parts[1]
// request := alikafka.CreateGetTopicListRequest()
request := alikafkaService.client.NewCommonRequest("POST", "alikafka", "2019-09-16", "GetTopicList", "")
request.QueryParams["InstanceId"] = instanceId
wait := incrementalWait(3*time.Second, 5*time.Second)
var raw interface{}
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
raw, err = alikafkaService.client.WithAlikafkaClient(func(alikafkaClient *alikafka.Client) (interface{}, error) {
return alikafkaClient.ProcessCommonRequest(request)
})
addDebug(request.GetActionName(), raw, request, request.QueryParams)
if err != nil {
if errmsgs.IsExpectedErrors(err, []string{errmsgs.ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
topicListResp := TopicListResponse{}
bresponse, ok := raw.(*responses.CommonResponse)
err = json.Unmarshal(bresponse.GetHttpContentBytes(), &topicListResp)
if err != nil && !ok {
errmsg := errmsgs.GetBaseResponseErrorMessage(bresponse.BaseResponse)
return alikafkaTopic, errmsgs.WrapErrorf(err, errmsgs.RequestV1ErrorMsg, id, request.GetActionName(), errmsgs.AlibabacloudStackSdkGoERROR, errmsg)
}
for _, v := range topicListResp.TopicList {
if v.Topic == topic {
return &v, nil
}
}
return alikafkaTopic, errmsgs.WrapErrorf(errmsgs.Error(errmsgs.GetNotFoundMessage("AlikafkaTopic", id)), errmsgs.NotFoundMsg, errmsgs.ProviderERROR)
}
func (alikafkaService *AlikafkaService) DescribeAlikafkaSaslUser(id string) (*alikafka.SaslUserVO, error) {
alikafkaSaslUser := &alikafka.SaslUserVO{}
parts, err := ParseResourceId(id, 3)
if err != nil {
return alikafkaSaslUser, errmsgs.WrapError(err)
}
instanceId := parts[0]
username := parts[1]
request := alikafka.CreateDescribeSaslUsersRequest()
alikafkaService.client.InitRpcRequest(*request.RpcRequest)
request.InstanceId = instanceId
wait := incrementalWait(3*time.Second, 5*time.Second)
var raw interface{}
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
raw, err = alikafkaService.client.WithAlikafkaClient(func(alikafkaClient *alikafka.Client) (interface{}, error) {
return alikafkaClient.DescribeSaslUsers(request)
})
if err != nil {
if errmsgs.IsExpectedErrors(err, []string{errmsgs.ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
return nil
})
userListResp, ok := raw.(*alikafka.DescribeSaslUsersResponse)
if err != nil {
errmsg := ""
if ok {
errmsg = errmsgs.GetBaseResponseErrorMessage(userListResp.BaseResponse)
}
return alikafkaSaslUser, errmsgs.WrapErrorf(err, errmsgs.RequestV1ErrorMsg, id, request.GetActionName(), errmsgs.AlibabacloudStackSdkGoERROR, errmsg)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
for _, v := range userListResp.SaslUserList.SaslUserVO {
if v.Username == username {
return &v, nil
}
}
return alikafkaSaslUser, errmsgs.WrapErrorf(errmsgs.Error(errmsgs.GetNotFoundMessage("AlikafkaSaslUser", id)), errmsgs.NotFoundMsg, errmsgs.ProviderERROR)
}
func (alikafkaService *AlikafkaService) DescribeAlikafkaSaslAcl(id string) (*alikafka.KafkaAclVO, error) {
alikafkaSaslAcl := &alikafka.KafkaAclVO{}
parts, err := ParseResourceId(id, 6)
if err != nil {
return alikafkaSaslAcl, errmsgs.WrapError(err)
}
instanceId := parts[0]
username := parts[1]
aclResourceType := parts[2]
aclResourceName := parts[3]
aclResourcePatternType := parts[4]
aclOperationType := parts[5]
request := alikafka.CreateDescribeAclsRequest()
alikafkaService.client.InitRpcRequest(*request.RpcRequest)
request.InstanceId = instanceId
request.Username = username
request.AclResourceType = aclResourceType
request.AclResourceName = aclResourceName
request.AclResourcePatternType = aclResourcePatternType
wait := incrementalWait(3*time.Second, 5*time.Second)
var raw interface{}
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
raw, err = alikafkaService.client.WithAlikafkaClient(func(alikafkaClient *alikafka.Client) (interface{}, error) {
return alikafkaClient.DescribeAcls(request)
})
if err != nil {
if errmsgs.IsExpectedErrors(err, []string{errmsgs.ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
return nil
})
aclListResp, ok := raw.(*alikafka.DescribeAclsResponse)
if err != nil {
errmsg := ""
if ok {
errmsg = errmsgs.GetBaseResponseErrorMessage(aclListResp.BaseResponse)
}
if errmsgs.IsExpectedErrors(err, []string{"BIZ_SUBSCRIPTION_NOT_FOUND", "BIZ_TOPIC_NOT_FOUND"}) {
return alikafkaSaslAcl, errmsgs.WrapErrorf(err, errmsgs.NotFoundMsg, errmsgs.AlibabacloudStackSdkGoERROR)
}
return alikafkaSaslAcl, errmsgs.WrapErrorf(err, errmsgs.RequestV1ErrorMsg, id, request.GetActionName(), errmsgs.AlibabacloudStackSdkGoERROR, errmsg)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
for _, v := range aclListResp.KafkaAclList.KafkaAclVO {
if v.AclResourcePatternType == aclResourcePatternType && v.AclOperationType == aclOperationType {
return &v, nil
}
}
return alikafkaSaslAcl, errmsgs.WrapErrorf(errmsgs.Error(errmsgs.GetNotFoundMessage("AlikafkaSaslAcl", id)), errmsgs.NotFoundMsg, errmsgs.ProviderERROR)
}
func (s *AlikafkaService) WaitForAlikafkaInstanceUpdated(id string, topicQuota int, diskSize int, ioMax int, eipMax int, paidType int, specType string, timeout int) error {
deadline := time.Now().Add(time.Duration(timeout) * time.Second)
for {
object, err := s.DescribeAlikafkaInstance(id)
if err != nil {
return errmsgs.WrapError(err)
}
if object.InstanceId == id && object.TopicNumLimit == topicQuota && object.DiskSize == diskSize && object.IoMax == ioMax && object.EipMax == eipMax && object.PaidType == paidType && object.SpecType == specType {
return nil
}
if time.Now().After(deadline) {
return errmsgs.WrapErrorf(err, errmsgs.WaitTimeoutMsg, id, GetFunc(1), timeout, object.InstanceId, id, errmsgs.ProviderERROR)
}
time.Sleep(DefaultIntervalShort * time.Second)
}
}
func (s *AlikafkaService) WaitForAlikafkaInstance(id string, status Status, timeout int) error {
deadline := time.Now().Add(time.Duration(timeout) * time.Second)
for {
object, err := s.DescribeAlikafkaInstance(id)
if err != nil {
if errmsgs.NotFoundError(err) {
if status == Deleted {
return nil
}
} else {
return errmsgs.WrapError(err)
}
}
if object.InstanceId == id && status == Running {
if object.ServiceStatus == 5 {
return nil
}
} else if object.InstanceId == id {
if status != Deleted {
return nil
} else if object.ServiceStatus == 10 {
return nil
}
}
if time.Now().After(deadline) {
return errmsgs.WrapErrorf(err, errmsgs.WaitTimeoutMsg, id, GetFunc(1), timeout, object.InstanceId, id, errmsgs.ProviderERROR)
}
time.Sleep(DefaultIntervalShort * time.Second)
}
}
func (s *AlikafkaService) WaitForAlikafkaConsumerGroup(id string, status Status, timeout int) error {
deadline := time.Now().Add(time.Duration(timeout) * time.Second)
for {
object, err := s.DescribeAlikafkaConsumerGroup(id)
if err != nil {
if errmsgs.NotFoundError(err) {
if status == Deleted {
return nil
}
} else {
return errmsgs.WrapError(err)
}
}
if object.InstanceId+":"+object.ConsumerId == id && status != Deleted {
return nil
}
if time.Now().After(deadline) {
return errmsgs.WrapErrorf(err, errmsgs.WaitTimeoutMsg, id, GetFunc(1), timeout, object.InstanceId+":"+object.ConsumerId, id, errmsgs.ProviderERROR)
}
time.Sleep(DefaultIntervalShort * time.Second)
}
}
func (s *AlikafkaService) KafkaTopicListRefreshFunc(id string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
object, err := s.DescribeAlikafkaTopic(id)
if err != nil {
if !errmsgs.IsExpectedErrors(err, []string{errmsgs.ResourceNotfound}) {
return nil, "", errmsgs.WrapError(err)
}
}
return object, "Creating", nil
}
}
func (s *AlikafkaService) KafkaTopicStatusRefreshFunc(id string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
object, err := s.DescribeAlikafkaTopicStatus(id)
if err != nil {
if !errmsgs.IsExpectedErrors(err, []string{errmsgs.ResourceNotfound}) {
return nil, "", errmsgs.WrapError(err)
}
}
if object.OffsetTable.OffsetTableItem != nil && len(object.OffsetTable.OffsetTableItem) > 0 {
return object, "Running", errmsgs.WrapError(err)
}
return object, "Creating", nil
}
}
func (s *AlikafkaService) WaitForAlikafkaTopic(id string, status Status, timeout int) error {
deadline := time.Now().Add(time.Duration(timeout) * time.Second)
for {
object, err := s.DescribeAlikafkaTopic(id)
if err != nil {
if errmsgs.NotFoundError(err) {
if status == Deleted {
return nil
}
} else {
return errmsgs.WrapError(err)
}
}
if object.InstanceId+":"+object.Topic == id && status != Deleted {
return nil
}
if time.Now().After(deadline) {
return errmsgs.WrapErrorf(err, errmsgs.WaitTimeoutMsg, id, GetFunc(1), timeout, object.InstanceId+":"+object.Topic, id, errmsgs.ProviderERROR)
}
time.Sleep(DefaultIntervalShort * time.Second)
}
}
func (s *AlikafkaService) WaitForAlikafkaSaslUser(id string, status Status, timeout int) error {
deadline := time.Now().Add(time.Duration(timeout) * time.Second)
parts, err := ParseResourceId(id, 3)
if err != nil {
return errmsgs.WrapError(err)
}
instanceId := parts[0]
for {
object, err := s.DescribeAlikafkaSaslUser(id)
if err != nil {
if errmsgs.NotFoundError(err) {
if status == Deleted {
return nil
}
} else {
return errmsgs.WrapError(err)
}
}
if instanceId+":"+object.Username == id && status != Deleted {
return nil
}
if time.Now().After(deadline) {
return errmsgs.WrapErrorf(err, errmsgs.WaitTimeoutMsg, id, GetFunc(1), timeout, instanceId+":"+object.Username, id, errmsgs.ProviderERROR)
}
time.Sleep(DefaultIntervalShort * time.Second)
}
}
func (s *AlikafkaService) WaitForAlikafkaSaslAcl(id string, status Status, timeout int) error {
deadline := time.Now().Add(time.Duration(timeout) * time.Second)
parts, err := ParseResourceId(id, 6)
if err != nil {
return errmsgs.WrapError(err)
}
instanceId := parts[0]
for {
object, err := s.DescribeAlikafkaSaslAcl(id)
if err != nil {
if errmsgs.NotFoundError(err) {
if status == Deleted {
return nil
}
} else {
return errmsgs.WrapError(err)
}
}
if instanceId+":"+object.Username+":"+object.AclResourceType+":"+object.AclResourceName+":"+object.AclResourcePatternType+":"+object.AclOperationType == id && status != Deleted {
return nil
}
if time.Now().After(deadline) {
return errmsgs.WrapErrorf(err, errmsgs.WaitTimeoutMsg, id, GetFunc(1), timeout, instanceId+":"+object.Username, id, errmsgs.ProviderERROR)
}
time.Sleep(DefaultIntervalShort * time.Second)
}
}
func (s *AlikafkaService) DescribeTags(resourceId string, resourceTags map[string]interface{}, resourceType TagResourceType) (tags []alikafka.TagResource, err error) {
request := alikafka.CreateListTagResourcesRequest()
s.client.InitRpcRequest(*request.RpcRequest)
request.ResourceType = string(resourceType)
request.ResourceId = &[]string{resourceId}
if resourceTags != nil && len(resourceTags) > 0 {
var reqTags []alikafka.ListTagResourcesTag
for key, value := range resourceTags {
reqTags = append(reqTags, alikafka.ListTagResourcesTag{
Key: key,
Value: value.(string),
})
}
request.Tag = &reqTags
}
wait := incrementalWait(3*time.Second, 5*time.Second)
var raw interface{}
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
raw, err = s.client.WithAlikafkaClient(func(alikafkaClient *alikafka.Client) (interface{}, error) {
return alikafkaClient.ListTagResources(request)
})
if err != nil {
if errmsgs.IsExpectedErrors(err, []string{errmsgs.Throttling, errmsgs.ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
return nil
})
response, ok := raw.(*alikafka.ListTagResourcesResponse)
if err != nil {
errmsg := ""
if ok {
errmsg = errmsgs.GetBaseResponseErrorMessage(response.BaseResponse)
}
err = errmsgs.WrapErrorf(err, errmsgs.RequestV1ErrorMsg, resourceId, request.GetActionName(), errmsgs.AlibabacloudStackSdkGoERROR, errmsg)
return
}
return response.TagResources.TagResource, nil
}
func (s *AlikafkaService) setInstanceTags(d *schema.ResourceData, resourceType TagResourceType) error {
if d.HasChange("tags") {
oraw, nraw := d.GetChange("tags")
o := oraw.(map[string]interface{})
n := nraw.(map[string]interface{})
create, remove := s.diffTags(s.tagsFromMap(o), s.tagsFromMap(n))
if len(remove) > 0 {
var tagKey []string
for _, v := range remove {
tagKey = append(tagKey, v.Key)
}
request := alikafka.CreateUntagResourcesRequest()
s.client.InitRpcRequest(*request.RpcRequest)
request.ResourceId = &[]string{d.Id()}
request.ResourceType = string(resourceType)
request.TagKey = &tagKey
wait := incrementalWait(2*time.Second, 1*time.Second)
var raw interface{}
var err error
err = resource.Retry(10*time.Minute, func() *resource.RetryError {
raw, err = s.client.WithAlikafkaClient(func(client *alikafka.Client) (interface{}, error) {
return client.UntagResources(request)
})
if err != nil {
if errmsgs.IsExpectedErrors(err, []string{errmsgs.ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
return nil
})
response, ok := raw.(*alikafka.UntagResourcesResponse)
if err != nil {
errmsg := ""
if ok {
errmsg = errmsgs.GetBaseResponseErrorMessage(response.BaseResponse)
}
return errmsgs.WrapErrorf(err, errmsgs.RequestV1ErrorMsg, d.Id(), request.GetActionName(), errmsgs.AlibabacloudStackSdkGoERROR, errmsg)
}
}
if len(create) > 0 {
request := alikafka.CreateTagResourcesRequest()
s.client.InitRpcRequest(*request.RpcRequest)
request.ResourceId = &[]string{d.Id()}
request.Tag = &create
request.ResourceType = string(resourceType)
wait := incrementalWait(2*time.Second, 1*time.Second)
var raw interface{}
var err error
err = resource.Retry(10*time.Minute, func() *resource.RetryError {
raw, err = s.client.WithAlikafkaClient(func(client *alikafka.Client) (interface{}, error) {
return client.TagResources(request)
})
if err != nil {
if errmsgs.IsExpectedErrors(err, []string{errmsgs.ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
return nil
})
response, ok := raw.(*alikafka.TagResourcesResponse)
if err != nil {
errmsg := ""
if ok {
errmsg = errmsgs.GetBaseResponseErrorMessage(response.BaseResponse)
}
return errmsgs.WrapErrorf(err, errmsgs.RequestV1ErrorMsg, d.Id(), request.GetActionName(), errmsgs.AlibabacloudStackSdkGoERROR, errmsg)
}
}
}
return nil
}
func (s *AlikafkaService) tagsToMap(tags []alikafka.TagResource) map[string]string {
result := make(map[string]string)
for _, t := range tags {
if !s.ignoreTag(t) {
result[t.TagKey] = t.TagValue
}
}
return result
}
func (s *AlikafkaService) ignoreTag(t alikafka.TagResource) bool {
filter := []string{"^aliyun", "^acs:", "^http://", "^https://"}
for _, v := range filter {
log.Printf("[DEBUG] Matching prefix %v with %v\n", v, t.TagKey)
ok, _ := regexp.MatchString(v, t.TagKey)
if ok {
log.Printf("[DEBUG] Found Alibaba Cloud specific t %s (val: %s), ignoring.\n", t.TagKey, t.TagValue)
return true
}
}
return false
}
func (s *AlikafkaService) diffTags(oldTags, newTags []alikafka.TagResourcesTag) ([]alikafka.TagResourcesTag, []alikafka.TagResourcesTag) {
create := make(map[string]interface{})
for _, t := range newTags {
create[t.Key] = t.Value
}
var remove []alikafka.TagResourcesTag
for _, t := range oldTags {
old, ok := create[t.Key]
if !ok || old != t.Value {
remove = append(remove, t)
}
}
return s.tagsFromMap(create), remove
}
func (s *AlikafkaService) tagsFromMap(m map[string]interface{}) []alikafka.TagResourcesTag {
result := make([]alikafka.TagResourcesTag, 0, len(m))
for k, v := range m {
result = append(result, alikafka.TagResourcesTag{
Key: k,
Value: v.(string),
})
}
return result
}