alicloud/service_alicloud_amqp_open.go (371 lines of code) (raw):
package alicloud
import (
"fmt"
"time"
"github.com/PaesslerAG/jsonpath"
"github.com/aliyun/terraform-provider-alicloud/alicloud/connectivity"
"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
)
type AmqpOpenService struct {
client *connectivity.AliyunClient
}
func (s *AmqpOpenService) DescribeAmqpVirtualHost(id string) (object map[string]interface{}, err error) {
var response map[string]interface{}
client := s.client
action := "ListVirtualHosts"
parts, err := ParseResourceId(id, 2)
if err != nil {
err = WrapError(err)
return
}
request := map[string]interface{}{
"InstanceId": parts[0],
"MaxResults": 100,
}
idExist := false
for {
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err = client.RpcGet("amqp-open", "2019-12-12", action, request, nil)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return object, WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
}
v, err := jsonpath.Get("$.Data.VirtualHosts", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.Data.VirtualHosts", response)
}
if len(v.([]interface{})) < 1 {
return object, WrapErrorf(NotFoundErr("Amqp", id), NotFoundWithResponse, response)
}
for _, v := range v.([]interface{}) {
if fmt.Sprint(v.(map[string]interface{})["Name"]) == parts[1] {
idExist = true
return v.(map[string]interface{}), nil
}
}
if nextToken, ok := response["NextToken"].(string); ok && nextToken != "" {
request["NextToken"] = nextToken
} else {
break
}
}
if !idExist {
return object, WrapErrorf(NotFoundErr("Amqp", id), NotFoundWithResponse, response)
}
return
}
func (s *AmqpOpenService) DescribeAmqpQueue(id string) (object map[string]interface{}, err error) {
var response map[string]interface{}
client := s.client
action := "ListQueues"
parts, err := ParseResourceId(id, 3)
if err != nil {
err = WrapError(err)
return
}
request := map[string]interface{}{
"InstanceId": parts[0],
"VirtualHost": parts[1],
"MaxResults": 100,
}
idExist := false
for {
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err = client.RpcGet("amqp-open", "2019-12-12", action, request, nil)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return object, WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
}
v, err := jsonpath.Get("$.Data.Queues", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.Data.Queues", response)
}
if len(v.([]interface{})) < 1 {
return object, WrapErrorf(NotFoundErr("Amqp", id), NotFoundWithResponse, response)
}
for _, v := range v.([]interface{}) {
if fmt.Sprint(v.(map[string]interface{})["Name"]) == parts[2] {
idExist = true
return v.(map[string]interface{}), nil
}
}
if nextToken, ok := response["NextToken"].(string); ok && nextToken != "" {
request["NextToken"] = nextToken
} else {
break
}
}
if !idExist {
return object, WrapErrorf(NotFoundErr("Amqp", id), NotFoundWithResponse, response)
}
return
}
func (s *AmqpOpenService) DescribeAmqpExchange(id string) (object map[string]interface{}, err error) {
var response map[string]interface{}
client := s.client
action := "ListExchanges"
parts, err := ParseResourceId(id, 3)
if err != nil {
err = WrapError(err)
return
}
request := map[string]interface{}{
"InstanceId": parts[0],
"VirtualHost": parts[1],
"MaxResults": 100,
}
idExist := false
for {
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err = client.RpcGet("amqp-open", "2019-12-12", action, request, nil)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
if IsExpectedErrors(err, []string{"107"}) {
return nil, WrapErrorf(err, NotFoundMsg, AlibabaCloudSdkGoERROR)
}
return object, WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
}
v, err := jsonpath.Get("$.Data.Exchanges", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.Data.Exchanges", response)
}
if len(v.([]interface{})) < 1 {
return object, WrapErrorf(NotFoundErr("Amqp", id), NotFoundWithResponse, response)
}
for _, v := range v.([]interface{}) {
if fmt.Sprint(v.(map[string]interface{})["Name"]) == parts[2] {
idExist = true
return v.(map[string]interface{}), nil
}
}
if nextToken, ok := response["NextToken"].(string); ok && nextToken != "" {
request["NextToken"] = nextToken
} else {
break
}
}
if !idExist {
return object, WrapErrorf(NotFoundErr("Amqp", id), NotFoundWithResponse, response)
}
return
}
func (s *AmqpOpenService) DescribeAmqpInstance(id string) (object map[string]interface{}, err error) {
var response map[string]interface{}
client := s.client
action := "ListInstances"
request := map[string]interface{}{
"MaxResults": 100,
}
idExist := false
for {
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err = client.RpcGet("amqp-open", "2019-12-12", action, request, nil)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return object, WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
}
v, err := jsonpath.Get("$.Data.Instances", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.Data.Instances", response)
}
if len(v.([]interface{})) < 1 {
return object, WrapErrorf(NotFoundErr("Amqp", id), NotFoundWithResponse, response)
}
for _, v := range v.([]interface{}) {
if fmt.Sprint(v.(map[string]interface{})["InstanceId"]) == id {
idExist = true
return v.(map[string]interface{}), nil
}
}
if nextToken, ok := response["NextToken"].(string); ok && nextToken != "" {
request["NextToken"] = nextToken
} else {
break
}
}
if !idExist {
return object, WrapErrorf(NotFoundErr("Amqp", id), NotFoundWithResponse, response)
}
return
}
func (s *AmqpOpenService) AmqpInstanceStateRefreshFunc(id string, failStates []string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
object, err := s.DescribeAmqpInstance(id)
if err != nil {
if NotFoundError(err) {
// Set this to nil as if we didn't find anything.
return nil, "", nil
}
return nil, "", WrapError(err)
}
for _, failState := range failStates {
if fmt.Sprint(object["Status"]) == failState {
return object, fmt.Sprint(object["Status"]), WrapError(Error(FailedToReachTargetStatus, fmt.Sprint(object["Status"])))
}
}
return object, fmt.Sprint(object["Status"]), nil
}
}
func (s *AmqpOpenService) DescribeAmqpBinding(id string) (object map[string]interface{}, err error) {
var response map[string]interface{}
action := "ListBindings"
client := s.client
parts, err := ParseResourceId(id, 4)
if err != nil {
return nil, WrapError(err)
}
request := map[string]interface{}{
"InstanceId": parts[0],
"VirtualHost": parts[1],
"MaxResults": PageSizeLarge,
}
idExist := false
for {
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err = client.RpcGet("amqp-open", "2019-12-12", action, request, nil)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
if IsExpectedErrors(err, []string{"ExchangeNotExist"}) {
return nil, WrapErrorf(err, NotFoundMsg, AlibabaCloudSdkGoERROR)
}
return object, WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
}
resp, err := jsonpath.Get("$.Data.Bindings", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.Data.Bindings", response)
}
if v, ok := resp.([]interface{}); !ok || len(v) < 1 {
return object, WrapErrorf(NotFoundErr("Amqp:Binding", id), NotFoundWithResponse, response)
}
for _, v := range resp.([]interface{}) {
if fmt.Sprint(v.(map[string]interface{})["SourceExchange"]) == parts[2] && fmt.Sprint(v.(map[string]interface{})["DestinationName"]) == parts[3] {
idExist = true
return v.(map[string]interface{}), nil
}
}
if nextToken, ok := response["NextToken"].(string); ok && nextToken != "" {
request["NextToken"] = nextToken
} else {
break
}
}
if !idExist {
return object, WrapErrorf(NotFoundErr("Amqp:Binding", id), NotFoundWithResponse, response)
}
return
}
func (s *AmqpOpenService) DescribeAmqpStaticAccount(id string) (object map[string]interface{}, err error) {
client := s.client
parts, err := ParseResourceId(id, 2)
if err != nil {
return object, WrapError(err)
}
request := map[string]interface{}{}
request["InstanceId"] = parts[0]
var response map[string]interface{}
action := "ListAccounts"
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
resp, err := client.RpcPost("amqp-open", "2019-12-12", action, nil, request, true)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
response = resp
addDebug(action, response, request)
return nil
})
if err != nil {
return object, WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR)
}
v, err := jsonpath.Get("$.Data", response)
if err != nil {
return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.Data", response)
}
data := v.(map[string]interface{})
val, ok := data[parts[0]]
if ok {
allData := val.([]interface{})
for _, i := range allData {
detail := i.(map[string]interface{})
if parts[1] == detail["accessKey"] {
return detail, nil
}
}
err = WrapErrorf(NotFoundErr("Amqp", id), NotFoundMsg, ProviderERROR)
return object, err
} else {
err = WrapErrorf(NotFoundErr("Amqp", id), NotFoundMsg, ProviderERROR)
return object, err
}
}
func (s *AmqpOpenService) AmqpStaticAccountStateRefreshFunc(d *schema.ResourceData, failStates []string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
object, err := s.DescribeAmqpStaticAccount(d.Id())
if err != nil {
if NotFoundError(err) {
return nil, "", nil
}
return nil, "", WrapError(err)
}
for _, failState := range failStates {
if fmt.Sprint(object[""]) == failState {
return object, fmt.Sprint(object[""]), WrapError(Error(FailedToReachTargetStatus, fmt.Sprint(object[""])))
}
}
return object, fmt.Sprint(object[""]), nil
}
}