alicloud/service_alicloud_alikafka.go (1,037 lines of code) (raw):
package alicloud
import (
"fmt"
"log"
"regexp"
"time"
"github.com/PaesslerAG/jsonpath"
"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/aliyun/alibaba-cloud-sdk-go/services/alikafka"
"github.com/aliyun/terraform-provider-alicloud/alicloud/connectivity"
)
type AlikafkaService struct {
client *connectivity.AliyunClient
}
func (s *AlikafkaService) DescribeAlikafkaInstance(instanceId string) (*alikafka.InstanceVO, error) {
alikafkaInstance := &alikafka.InstanceVO{}
instanceListReq := alikafka.CreateGetInstanceListRequest()
instanceListReq.RegionId = s.client.RegionId
wait := incrementalWait(2*time.Second, 1*time.Second)
var raw interface{}
var err error
err = resource.Retry(10*time.Minute, func() *resource.RetryError {
raw, err = s.client.WithAlikafkaClient(func(client *alikafka.Client) (interface{}, error) {
return client.GetInstanceList(instanceListReq)
})
if err != nil {
if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(instanceListReq.GetActionName(), raw, instanceListReq.RpcRequest, instanceListReq)
return nil
})
if err != nil {
return alikafkaInstance, WrapErrorf(err, DefaultErrorMsg, instanceId, instanceListReq.GetActionName(), AlibabaCloudSdkGoERROR)
}
instanceListResp, _ := raw.(*alikafka.GetInstanceListResponse)
addDebug(instanceListReq.GetActionName(), raw, instanceListReq.RpcRequest, instanceListReq)
for _, v := range instanceListResp.InstanceList.InstanceVO {
// ServiceStatus equals 10 means the instance is released, do not return the instance.
if v.InstanceId == instanceId && v.ServiceStatus != 10 {
return &v, nil
}
}
return alikafkaInstance, WrapErrorf(NotFoundErr("AlikafkaInstance", instanceId), NotFoundMsg, ProviderERROR)
}
func (s *AlikafkaService) DescribeAlikafkaInstanceByOrderId(orderId string, timeout int) (*alikafka.InstanceVO, error) {
alikafkaInstance := &alikafka.InstanceVO{}
instanceListReq := alikafka.CreateGetInstanceListRequest()
instanceListReq.RegionId = s.client.RegionId
instanceListReq.OrderId = orderId
deadline := time.Now().Add(time.Duration(timeout) * time.Second)
for {
wait := incrementalWait(2*time.Second, 1*time.Second)
var raw interface{}
var err error
err = resource.Retry(10*time.Minute, func() *resource.RetryError {
raw, err = s.client.WithAlikafkaClient(func(client *alikafka.Client) (interface{}, error) {
return client.GetInstanceList(instanceListReq)
})
if err != nil {
if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(instanceListReq.GetActionName(), raw, instanceListReq.RpcRequest, instanceListReq)
return nil
})
if err != nil {
return alikafkaInstance, WrapErrorf(err, DefaultErrorMsg, orderId, instanceListReq.GetActionName(), AlibabaCloudSdkGoERROR)
}
instanceListResp, _ := raw.(*alikafka.GetInstanceListResponse)
addDebug(instanceListReq.GetActionName(), raw, instanceListReq.RpcRequest, instanceListReq)
for _, v := range instanceListResp.InstanceList.InstanceVO {
return &v, nil
}
if time.Now().After(deadline) {
return alikafkaInstance, WrapErrorf(NotFoundErr("AlikafkaInstance", orderId), NotFoundMsg, ProviderERROR)
}
time.Sleep(DefaultIntervalShort * time.Second)
}
}
func (s *AlikafkaService) DescribeAlikafkaConsumerGroup(id string) (*alikafka.ConsumerVO, error) {
alikafkaConsumerGroup := &alikafka.ConsumerVO{}
parts, err := ParseResourceId(id, 2)
if err != nil {
return alikafkaConsumerGroup, WrapError(err)
}
instanceId := parts[0]
consumerId := parts[1]
request := alikafka.CreateGetConsumerListRequest()
request.InstanceId = instanceId
request.RegionId = s.client.RegionId
wait := incrementalWait(2*time.Second, 1*time.Second)
var raw interface{}
err = resource.Retry(10*time.Minute, func() *resource.RetryError {
raw, err = s.client.WithAlikafkaClient(func(client *alikafka.Client) (interface{}, error) {
return client.GetConsumerList(request)
})
if err != nil {
if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
return nil
})
if err != nil {
return alikafkaConsumerGroup, WrapErrorf(err, DefaultErrorMsg, id, request.GetActionName(), AlibabaCloudSdkGoERROR)
}
consumerListResp, _ := raw.(*alikafka.GetConsumerListResponse)
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
for _, v := range consumerListResp.ConsumerList.ConsumerVO {
if v.ConsumerId == consumerId {
return &v, nil
}
}
return alikafkaConsumerGroup, WrapErrorf(NotFoundErr("AlikafkaConsumerGroup", id), NotFoundMsg, ProviderERROR)
}
func (s *AlikafkaService) DescribeAlikafkaTopicStatus(id string) (*alikafka.TopicStatus, error) {
alikafkaTopicStatus := &alikafka.TopicStatus{}
parts, err := ParseResourceId(id, 2)
if err != nil {
return alikafkaTopicStatus, WrapError(err)
}
instanceId := parts[0]
topic := parts[1]
request := alikafka.CreateGetTopicStatusRequest()
request.InstanceId = instanceId
request.RegionId = s.client.RegionId
request.Topic = topic
wait := incrementalWait(3*time.Second, 5*time.Second)
var raw interface{}
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
raw, err = s.client.WithAlikafkaClient(func(alikafkaClient *alikafka.Client) (interface{}, error) {
return alikafkaClient.GetTopicStatus(request)
})
if err != nil {
if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
return nil
})
if err != nil {
return alikafkaTopicStatus, WrapErrorf(err, DefaultErrorMsg, id, request.GetActionName(), AlibabaCloudSdkGoERROR)
}
topicStatusResp, _ := raw.(*alikafka.GetTopicStatusResponse)
if topicStatusResp.TopicStatus.OffsetTable.OffsetTableItem != nil {
return &topicStatusResp.TopicStatus, nil
}
return alikafkaTopicStatus, WrapErrorf(NotFoundErr("AlikafkaTopicStatus "+ResourceNotfound, id), ResourceNotfound)
}
func (s *AlikafkaService) DescribeAlikafkaTopic(id string) (*alikafka.TopicVO, error) {
alikafkaTopic := &alikafka.TopicVO{}
parts, err := ParseResourceId(id, 2)
if err != nil {
return alikafkaTopic, WrapError(err)
}
instanceId := parts[0]
topic := parts[1]
request := alikafka.CreateGetTopicListRequest()
request.InstanceId = instanceId
request.RegionId = s.client.RegionId
wait := incrementalWait(3*time.Second, 5*time.Second)
var raw interface{}
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
raw, err = s.client.WithAlikafkaClient(func(alikafkaClient *alikafka.Client) (interface{}, error) {
return alikafkaClient.GetTopicList(request)
})
if err != nil {
if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
return nil
})
if err != nil {
return alikafkaTopic, WrapErrorf(err, DefaultErrorMsg, id, request.GetActionName(), AlibabaCloudSdkGoERROR)
}
topicListResp, _ := raw.(*alikafka.GetTopicListResponse)
for _, v := range topicListResp.TopicList.TopicVO {
if v.Topic == topic {
return &v, nil
}
}
return alikafkaTopic, WrapErrorf(NotFoundErr("AlikafkaTopic", id), NotFoundMsg, ProviderERROR)
}
func (s *AlikafkaService) DescribeAlikafkaSaslUser(id string) (*alikafka.SaslUserVO, error) {
alikafkaSaslUser := &alikafka.SaslUserVO{}
parts, err := ParseResourceId(id, 2)
if err != nil {
return alikafkaSaslUser, WrapError(err)
}
instanceId := parts[0]
username := parts[1]
request := alikafka.CreateDescribeSaslUsersRequest()
request.InstanceId = instanceId
request.RegionId = s.client.RegionId
wait := incrementalWait(3*time.Second, 5*time.Second)
var raw interface{}
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
raw, err = s.client.WithAlikafkaClient(func(alikafkaClient *alikafka.Client) (interface{}, error) {
return alikafkaClient.DescribeSaslUsers(request)
})
if err != nil {
if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
return nil
})
if err != nil {
return alikafkaSaslUser, WrapErrorf(err, DefaultErrorMsg, id, request.GetActionName(), AlibabaCloudSdkGoERROR)
}
userListResp, _ := raw.(*alikafka.DescribeSaslUsersResponse)
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
for _, v := range userListResp.SaslUserList.SaslUserVO {
if v.Username == username {
return &v, nil
}
}
return alikafkaSaslUser, WrapErrorf(NotFoundErr("AlikafkaSaslUser", id), NotFoundMsg, ProviderERROR)
}
func (s *AlikafkaService) DescribeAlikafkaSaslAcl(id string) (*alikafka.KafkaAclVO, error) {
alikafkaSaslAcl := &alikafka.KafkaAclVO{}
parts, err := ParseResourceId(id, 6)
if err != nil {
return alikafkaSaslAcl, WrapError(err)
}
instanceId := parts[0]
username := parts[1]
aclResourceType := parts[2]
aclResourceName := parts[3]
aclResourcePatternType := parts[4]
aclOperationType := parts[5]
request := alikafka.CreateDescribeAclsRequest()
request.InstanceId = instanceId
request.RegionId = s.client.RegionId
request.Username = username
request.AclResourceType = aclResourceType
request.AclResourceName = aclResourceName
wait := incrementalWait(3*time.Second, 5*time.Second)
var raw interface{}
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
raw, err = s.client.WithAlikafkaClient(func(alikafkaClient *alikafka.Client) (interface{}, error) {
return alikafkaClient.DescribeAcls(request)
})
if err != nil {
if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
return nil
})
if err != nil {
if IsExpectedErrors(err, []string{"BIZ_SUBSCRIPTION_NOT_FOUND", "BIZ_TOPIC_NOT_FOUND", "BIZ.INSTANCE.STATUS.ERROR"}) {
return alikafkaSaslAcl, WrapErrorf(err, NotFoundMsg, AlibabaCloudSdkGoERROR)
}
return alikafkaSaslAcl, WrapErrorf(err, DefaultErrorMsg, id, request.GetActionName(), AlibabaCloudSdkGoERROR)
}
aclListResp, _ := raw.(*alikafka.DescribeAclsResponse)
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
for _, v := range aclListResp.KafkaAclList.KafkaAclVO {
if v.Username == username && v.AclResourceType == aclResourceType && v.AclResourceName == aclResourceName && v.AclResourcePatternType == aclResourcePatternType && v.AclOperationType == aclOperationType {
return &v, nil
}
}
return alikafkaSaslAcl, WrapErrorf(NotFoundErr("AlikafkaSaslAcl", id), NotFoundMsg, ProviderERROR)
}
func (s *AlikafkaService) WaitForAlikafkaInstanceUpdated(id string, topicQuota int, diskSize int, ioMax int,
eipMax int, paidType int, specType string, timeout int) error {
deadline := time.Now().Add(time.Duration(timeout) * time.Second)
for {
object, err := s.DescribeAlikafkaInstance(id)
if err != nil {
return WrapError(err)
}
// Wait for all variables be equal.
if object.InstanceId == id && object.TopicNumLimit == topicQuota && object.DiskSize == diskSize && object.IoMax == ioMax && object.EipMax == eipMax && object.PaidType == paidType && object.SpecType == specType {
return nil
}
if time.Now().After(deadline) {
return WrapErrorf(err, WaitTimeoutMsg, id, GetFunc(1), timeout, object.InstanceId, id, ProviderERROR)
}
time.Sleep(DefaultIntervalShort * time.Second)
}
}
func (s *AlikafkaService) WaitForAlikafkaInstance(id string, status Status, timeout int) error {
deadline := time.Now().Add(time.Duration(timeout) * time.Second)
for {
object, err := s.DescribeAlikafkaInstance(id)
if err != nil {
if NotFoundError(err) {
if status == Deleted {
return nil
}
} else {
return WrapError(err)
}
}
// Process wait for running.
if object.InstanceId == id && status == Running {
// ServiceStatus equals 5, means the server is in service.
if object.ServiceStatus == 5 {
return nil
}
} else if object.InstanceId == id {
// If target status is not deleted and found a instance, return.
if status != Deleted {
return nil
} else {
// ServiceStatus equals 10, means the server is in released.
if object.ServiceStatus == 10 {
return nil
}
}
}
if time.Now().After(deadline) {
return WrapErrorf(err, WaitTimeoutMsg, id, GetFunc(1), timeout, object.InstanceId, id, ProviderERROR)
}
time.Sleep(DefaultIntervalShort * time.Second)
}
}
func (s *AlikafkaService) WaitForAlikafkaConsumerGroup(id string, status Status, timeout int) error {
deadline := time.Now().Add(time.Duration(timeout) * time.Second)
for {
object, err := s.DescribeAlikafkaConsumerGroup(id)
if err != nil {
if NotFoundError(err) {
if status == Deleted {
return nil
}
} else {
return WrapError(err)
}
}
if object.InstanceId+":"+object.ConsumerId == id && status != Deleted {
return nil
}
if time.Now().After(deadline) {
return WrapErrorf(err, WaitTimeoutMsg, id, GetFunc(1), timeout, object.InstanceId+":"+object.ConsumerId, id, ProviderERROR)
}
time.Sleep(DefaultIntervalShort * time.Second)
}
}
func (s *AlikafkaService) KafkaTopicStatusRefreshFunc(id string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
object, err := s.DescribeAlikafkaTopicStatus(id)
if err != nil {
if !IsExpectedErrors(err, []string{ResourceNotfound}) {
return nil, "", WrapError(err)
}
}
if object.OffsetTable.OffsetTableItem != nil && len(object.OffsetTable.OffsetTableItem) > 0 {
return object, "Running", WrapError(err)
}
return object, "Creating", nil
}
}
func (s *AlikafkaService) WaitForAlikafkaTopic(id string, status Status, timeout int) error {
deadline := time.Now().Add(time.Duration(timeout) * time.Second)
for {
object, err := s.DescribeAlikafkaTopic(id)
if err != nil {
if NotFoundError(err) {
if status == Deleted {
return nil
}
} else {
return WrapError(err)
}
}
if object.InstanceId+":"+object.Topic == id && status != Deleted {
return nil
}
if time.Now().After(deadline) {
return WrapErrorf(err, WaitTimeoutMsg, id, GetFunc(1), timeout, object.InstanceId+":"+object.Topic, id, ProviderERROR)
}
time.Sleep(DefaultIntervalShort * time.Second)
}
}
func (s *AlikafkaService) WaitForAlikafkaSaslUser(id string, status Status, timeout int) error {
deadline := time.Now().Add(time.Duration(timeout) * time.Second)
parts, err := ParseResourceId(id, 2)
if err != nil {
return WrapError(err)
}
instanceId := parts[0]
for {
object, err := s.DescribeAlikafkaSaslUser(id)
if err != nil {
if NotFoundError(err) {
if status == Deleted {
return nil
}
} else {
return WrapError(err)
}
}
if instanceId+":"+object.Username == id && status != Deleted {
return nil
}
if time.Now().After(deadline) {
return WrapErrorf(err, WaitTimeoutMsg, id, GetFunc(1), timeout, instanceId+":"+object.Username, id, ProviderERROR)
}
time.Sleep(DefaultIntervalShort * time.Second)
}
}
func (s *AlikafkaService) WaitForAlikafkaSaslAcl(id string, status Status, timeout int) error {
deadline := time.Now().Add(time.Duration(timeout) * time.Second)
parts, err := ParseResourceId(id, 6)
if err != nil {
return WrapError(err)
}
instanceId := parts[0]
for {
object, err := s.DescribeAlikafkaSaslAcl(id)
if err != nil {
if NotFoundError(err) {
if status == Deleted {
return nil
}
} else {
return WrapError(err)
}
}
if instanceId+":"+object.Username+":"+object.AclResourceType+":"+object.AclResourceName+":"+object.AclResourcePatternType+":"+object.AclOperationType == id && status != Deleted {
return nil
}
if time.Now().After(deadline) {
return WrapErrorf(err, WaitTimeoutMsg, id, GetFunc(1), timeout, instanceId+":"+object.Username, id, ProviderERROR)
}
time.Sleep(DefaultIntervalShort * time.Second)
}
}
func (s *AlikafkaService) DescribeTags(resourceId string, resourceTags map[string]interface{}, resourceType TagResourceType) (tags []alikafka.TagResource, err error) {
request := alikafka.CreateListTagResourcesRequest()
request.RegionId = s.client.RegionId
request.ResourceType = string(resourceType)
request.ResourceId = &[]string{resourceId}
if resourceTags != nil && len(resourceTags) > 0 {
var reqTags []alikafka.ListTagResourcesTag
for key, value := range resourceTags {
reqTags = append(reqTags, alikafka.ListTagResourcesTag{
Key: key,
Value: value.(string),
})
}
request.Tag = &reqTags
}
wait := incrementalWait(3*time.Second, 5*time.Second)
var raw interface{}
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
raw, err = s.client.WithAlikafkaClient(func(alikafkaClient *alikafka.Client) (interface{}, error) {
return alikafkaClient.ListTagResources(request)
})
if err != nil {
if IsExpectedErrors(err, []string{Throttling, ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
return nil
})
if err != nil {
err = WrapErrorf(err, DefaultErrorMsg, resourceId, request.GetActionName(), AlibabaCloudSdkGoERROR)
return
}
response, _ := raw.(*alikafka.ListTagResourcesResponse)
return response.TagResources.TagResource, nil
}
func (s *AlikafkaService) setInstanceTags(d *schema.ResourceData, resourceType TagResourceType) error {
if d.HasChange("tags") {
oraw, nraw := d.GetChange("tags")
o := oraw.(map[string]interface{})
n := nraw.(map[string]interface{})
create, remove := s.diffTags(s.tagsFromMap(o), s.tagsFromMap(n))
if len(remove) > 0 {
var tagKey []string
for _, v := range remove {
tagKey = append(tagKey, v.Key)
}
request := alikafka.CreateUntagResourcesRequest()
request.ResourceId = &[]string{d.Id()}
request.ResourceType = string(resourceType)
request.TagKey = &tagKey
request.RegionId = s.client.RegionId
wait := incrementalWait(2*time.Second, 1*time.Second)
err := resource.Retry(10*time.Minute, func() *resource.RetryError {
raw, err := s.client.WithAlikafkaClient(func(client *alikafka.Client) (interface{}, error) {
return client.UntagResources(request)
})
if err != nil {
if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
return nil
})
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), request.GetActionName(), AlibabaCloudSdkGoERROR)
}
}
if len(create) > 0 {
request := alikafka.CreateTagResourcesRequest()
request.ResourceId = &[]string{d.Id()}
request.Tag = &create
request.ResourceType = string(resourceType)
request.RegionId = s.client.RegionId
wait := incrementalWait(2*time.Second, 1*time.Second)
err := resource.Retry(10*time.Minute, func() *resource.RetryError {
raw, err := s.client.WithAlikafkaClient(func(client *alikafka.Client) (interface{}, error) {
return client.TagResources(request)
})
if err != nil {
if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(request.GetActionName(), raw, request.RpcRequest, request)
return nil
})
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), request.GetActionName(), AlibabaCloudSdkGoERROR)
}
}
d.SetPartial("tags")
}
return nil
}
func (s *AlikafkaService) tagsToMap(tags []alikafka.TagResource) map[string]string {
result := make(map[string]string)
for _, t := range tags {
if !s.ignoreTag(t) {
result[t.TagKey] = t.TagValue
}
}
return result
}
func (s *AlikafkaService) ignoreTag(t alikafka.TagResource) bool {
filter := []string{"^aliyun", "^acs:", "^http://", "^https://"}
for _, v := range filter {
log.Printf("[DEBUG] Matching prefix %v with %v\n", v, t.TagKey)
ok, _ := regexp.MatchString(v, t.TagKey)
if ok {
log.Printf("[DEBUG] Found Alibaba Cloud specific t %s (val: %s), ignoring.\n", t.TagKey, t.TagValue)
return true
}
}
return false
}
func (s *AlikafkaService) tagVOTagsToMap(tags []alikafka.TagVO) map[string]string {
result := make(map[string]string)
for _, t := range tags {
if !s.tagVOIgnoreTag(t) {
result[t.Key] = t.Value
}
}
return result
}
func (s *AlikafkaService) tagVOIgnoreTag(t alikafka.TagVO) bool {
filter := []string{"^aliyun", "^acs:", "^http://", "^https://"}
for _, v := range filter {
log.Printf("[DEBUG] Matching prefix %v with %v\n", v, t.Key)
ok, _ := regexp.MatchString(v, t.Key)
if ok {
log.Printf("[DEBUG] Found Alibaba Cloud specific t %s (val: %s), ignoring.\n", t.Key, t.Value)
return true
}
}
return false
}
func (s *AlikafkaService) diffTags(oldTags, newTags []alikafka.TagResourcesTag) ([]alikafka.TagResourcesTag, []alikafka.TagResourcesTag) {
// First, we're creating everything we have
create := make(map[string]interface{})
for _, t := range newTags {
create[t.Key] = t.Value
}
// Build the list of what to remove
var remove []alikafka.TagResourcesTag
for _, t := range oldTags {
old, ok := create[t.Key]
if !ok || old != t.Value {
// Delete it!
remove = append(remove, t)
}
}
return s.tagsFromMap(create), remove
}
func (s *AlikafkaService) tagsFromMap(m map[string]interface{}) []alikafka.TagResourcesTag {
result := make([]alikafka.TagResourcesTag, 0, len(m))
for k, v := range m {
result = append(result, alikafka.TagResourcesTag{
Key: k,
Value: v.(string),
})
}
return result
}
func (s *AlikafkaService) GetAllowedIpList(id string) (object map[string]interface{}, err error) {
var response map[string]interface{}
client := s.client
action := "GetAllowedIpList"
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"InstanceId": id,
}
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err = client.RpcPost("alikafka", "2019-09-16", action, nil, request, true)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
v, err := jsonpath.Get("$", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$", response)
}
object = v.(map[string]interface{})
return object, nil
}
func (s *AlikafkaService) SetResourceTags(d *schema.ResourceData, resourceType string) error {
if d.HasChange("tags") {
added, removed := parsingTags(d)
client := s.client
removedTagKeys := make([]string, 0)
for _, v := range removed {
if !ignoredTags(v, "") {
removedTagKeys = append(removedTagKeys, v)
}
}
if len(removedTagKeys) > 0 {
action := "UntagResources"
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"ResourceType": resourceType,
"ResourceId.1": d.Id(),
}
for i, key := range removedTagKeys {
request[fmt.Sprintf("TagKey.%d", i+1)] = key
}
wait := incrementalWait(2*time.Second, 1*time.Second)
err := resource.Retry(10*time.Minute, func() *resource.RetryError {
response, err := client.RpcPost("alikafka", "2019-09-16", action, nil, request, false)
if err != nil {
if NeedRetry(err) || IsExpectedErrors(err, []string{"ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(action, response, request)
return nil
})
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
}
if len(added) > 0 {
action := "TagResources"
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"ResourceType": resourceType,
"ResourceId.1": d.Id(),
}
count := 1
for key, value := range added {
request[fmt.Sprintf("Tag.%d.Key", count)] = key
request[fmt.Sprintf("Tag.%d.Value", count)] = value
count++
}
wait := incrementalWait(2*time.Second, 1*time.Second)
err := resource.Retry(10*time.Minute, func() *resource.RetryError {
response, err := client.RpcPost("alikafka", "2019-09-16", action, nil, request, false)
if err != nil {
if NeedRetry(err) || IsExpectedErrors(err, []string{"ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
addDebug(action, response, request)
return nil
})
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
}
d.SetPartial("tags")
}
return nil
}
func (s *AlikafkaService) DescribeAliKafkaConsumerGroup(id string) (object map[string]interface{}, err error) {
var response map[string]interface{}
client := s.client
action := "GetConsumerList"
parts, err := ParseResourceId(id, 2)
if err != nil {
err = WrapError(err)
return
}
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"InstanceId": parts[0],
}
idExist := false
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err = client.RpcPost("alikafka", "2019-09-16", action, nil, request, true)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return object, WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
}
v, err := jsonpath.Get("$.ConsumerList.ConsumerVO", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.ConsumerList.ConsumerVO", response)
}
if len(v.([]interface{})) < 1 {
return object, WrapErrorf(NotFoundErr("AliKafka", id), NotFoundWithResponse, response)
}
for _, v := range v.([]interface{}) {
if fmt.Sprint(v.(map[string]interface{})["ConsumerId"]) == parts[1] {
idExist = true
return v.(map[string]interface{}), nil
}
}
if !idExist {
return object, WrapErrorf(NotFoundErr("AliKafka", id), NotFoundWithResponse, response)
}
return object, nil
}
func (s *AlikafkaService) DescribeAliKafkaSaslUser(id string) (object map[string]interface{}, err error) {
var response map[string]interface{}
action := "DescribeSaslUsers"
client := s.client
parts, err := ParseResourceId(id, 2)
if err != nil {
return object, WrapError(err)
}
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"InstanceId": parts[0],
}
idExist := false
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err = client.RpcPost("alikafka", "2019-09-16", action, nil, request, true)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
if IsExpectedErrors(err, []string{"BIZ_INSTANCE_STATUS_ERROR", "BIZ.INSTANCE.STATUS.ERROR"}) {
return object, WrapErrorf(NotFoundErr("AliKafka:SaslUser", id), NotFoundWithResponse, response)
}
return object, WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
}
resp, err := jsonpath.Get("$.SaslUserList.SaslUserVO", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.SaslUserList.SaslUserVO", response)
}
if v, ok := resp.([]interface{}); !ok || len(v) < 1 {
return object, WrapErrorf(NotFoundErr("AliKafka:SaslUser", id), NotFoundWithResponse, response)
}
for _, v := range resp.([]interface{}) {
if fmt.Sprint(v.(map[string]interface{})["Username"]) == parts[1] {
idExist = true
return v.(map[string]interface{}), nil
}
}
if !idExist {
return object, WrapErrorf(NotFoundErr("AliKafka:SaslUser", id), NotFoundWithResponse, response)
}
return object, nil
}
func (s *AlikafkaService) DescribeAliKafkaInstanceAllowedIpAttachment(id string) (object map[string]interface{}, err error) {
var response map[string]interface{}
action := "GetAllowedIpList"
client := s.client
parts, err := ParseResourceId(id, 4)
if err != nil {
err = WrapError(err)
return
}
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"InstanceId": parts[0],
}
idExist := false
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err = client.RpcPost("alikafka", "2019-09-16", action, nil, request, true)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
if IsExpectedErrors(err, []string{"BIZ_INSTANCE_STATUS_ERROR", "BIZ.INSTANCE.STATUS.ERROR"}) {
return object, WrapErrorf(NotFoundErr("AliKafka:InstanceAllowedIpAttachment", id), NotFoundWithResponse, response)
}
return object, WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
}
var resp interface{}
allowedType := parts[1]
switch allowedType {
case "vpc":
resp, err = jsonpath.Get("$.AllowedList.VpcList", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.AllowedList.VpcList", response)
}
case "internet":
resp, err = jsonpath.Get("$.AllowedList.InternetList", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.AllowedList.InternetList", response)
}
}
if v, ok := resp.([]interface{}); !ok || len(v) < 1 {
return object, WrapErrorf(NotFoundErr("AliKafka:InstanceAllowedIpAttachment", id), NotFoundWithResponse, response)
}
for _, v := range resp.([]interface{}) {
ipList := v.(map[string]interface{})
if fmt.Sprint(ipList["PortRange"]) == parts[2] {
for _, ip := range ipList["AllowedIpList"].([]interface{}) {
if fmt.Sprint(ip) == parts[3] {
idExist = true
return v.(map[string]interface{}), nil
}
}
}
}
if !idExist {
return object, WrapErrorf(NotFoundErr("AliKafka:InstanceAllowedIpAttachment", id), NotFoundWithResponse, response)
}
return object, nil
}
func (s *AlikafkaService) DescribeAliKafkaInstance(id string) (object map[string]interface{}, err error) {
var response map[string]interface{}
action := "GetInstanceList"
client := s.client
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"InstanceId": []string{id},
}
idExist := false
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(10*time.Minute, func() *resource.RetryError {
response, err = client.RpcPost("alikafka", "2019-09-16", action, nil, request, true)
if err != nil {
if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) || NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return object, WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
}
resp, err := jsonpath.Get("$.InstanceList.InstanceVO", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.InstanceList.InstanceVO", response)
}
if v, ok := resp.([]interface{}); !ok || len(v) < 1 {
return object, WrapErrorf(NotFoundErr("AliKafka:Instance", id), NotFoundWithResponse, response)
}
for _, v := range resp.([]interface{}) {
if fmt.Sprint(v.(map[string]interface{})["InstanceId"]) == id && fmt.Sprint(v.(map[string]interface{})["ServiceStatus"]) != "10" {
idExist = true
return v.(map[string]interface{}), nil
}
}
if !idExist {
return object, WrapErrorf(NotFoundErr("AliKafka:Instance", id), NotFoundWithResponse, response)
}
return object, nil
}
func (s *AlikafkaService) GetQuotaTip(instanceId string) (object map[string]interface{}, err error) {
var response map[string]interface{}
client := s.client
action := "GetQuotaTip"
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"InstanceId": instanceId,
}
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(10*time.Minute, func() *resource.RetryError {
response, err = client.RpcPost("alikafka", "2019-09-16", action, nil, request, true)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return object, WrapError(err)
}
v, err := jsonpath.Get("$.QuotaData", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, instanceId, "$.QuotaData", response)
}
return v.(map[string]interface{}), nil
}
func (s *AlikafkaService) DescribeAliKafkaInstanceByOrderId(orderId string, timeout int) (object map[string]interface{}, err error) {
var response map[string]interface{}
client := s.client
action := "GetInstanceList"
request := map[string]interface{}{
"RegionId": s.client.RegionId,
"OrderId": orderId,
}
deadline := time.Now().Add(time.Duration(timeout) * time.Second)
for {
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(10*time.Minute, func() *resource.RetryError {
response, err = client.RpcPost("alikafka", "2019-09-16", action, nil, request, true)
if err != nil {
if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) || NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return object, WrapErrorf(err, DefaultErrorMsg, orderId, action, AlibabaCloudSdkGoERROR)
}
v, err := jsonpath.Get("$.InstanceList.InstanceVO", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, orderId, "$.InstanceList.InstanceVO", response)
}
for _, v := range v.([]interface{}) {
return v.(map[string]interface{}), nil
}
if time.Now().After(deadline) {
return object, WrapErrorf(NotFoundErr("AlikafkaInstance", orderId), NotFoundMsg, ProviderERROR)
}
}
}
func (s *AlikafkaService) AliKafkaInstanceStateRefreshFunc(id, attribute string, failStates []string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
object, err := s.DescribeAliKafkaInstance(id)
if err != nil {
if NotFoundError(err) {
// Set this to nil as if we didn't find anything.
return nil, "", nil
}
return nil, "", WrapError(err)
}
for _, failState := range failStates {
if fmt.Sprint(object[attribute]) == failState {
return object, fmt.Sprint(object[attribute]), WrapError(Error(FailedToReachTargetStatus, fmt.Sprint(object[attribute])))
}
}
return object, fmt.Sprint(object[attribute]), nil
}
}
func (s *AlikafkaService) AliKafkaConsumerStateRefreshFunc(id, attribute string, failStates []string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
object, err := s.DescribeAliKafkaConsumerGroup(id)
if err != nil {
if NotFoundError(err) {
// Set this to nil as if we didn't find anything.
return nil, "", nil
}
return nil, "", WrapError(err)
}
for _, failState := range failStates {
if fmt.Sprint(object[attribute]) == failState {
return object, fmt.Sprint(object[attribute]), WrapError(Error(FailedToReachTargetStatus, fmt.Sprint(object[attribute])))
}
}
return object, fmt.Sprint(object[attribute]), nil
}
}
func (s *AlikafkaService) AliKafkaTopicStateRefreshFunc(id, attribute string, failStates []string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
object, err := s.DescribeAlikafkaTopic(id)
if err != nil {
if NotFoundError(err) {
// Set this to nil as if we didn't find anything.
return nil, "", nil
}
return nil, "", WrapError(err)
}
return object, "existing", nil
}
}