in alicloud/resource_alicloud_alikafka_instance.go [487:954]
func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interface{}) error {
client := meta.(*connectivity.AliyunClient)
alikafkaService := AlikafkaService{client}
var err error
var response map[string]interface{}
d.Partial(true)
if err := alikafkaService.setInstanceTags(d, TagResourceInstance); err != nil {
return WrapError(err)
}
// Process change instance name.
if !d.IsNewResource() && d.HasChange("name") {
action := "ModifyInstanceName"
request := map[string]interface{}{
"RegionId": client.RegionId,
"InstanceId": d.Id(),
}
if v, ok := d.GetOk("name"); ok {
request["InstanceName"] = v
}
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutUpdate)), func() *resource.RetryError {
response, err = client.RpcPost("alikafka", "2019-09-16", action, nil, request, false)
if err != nil {
if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) || NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
if fmt.Sprint(response["Success"]) == "false" {
return WrapError(fmt.Errorf("%s failed, response: %v", action, response))
}
d.SetPartial("name")
}
// Process paid type change, note only support change from post to pre pay.
if !d.IsNewResource() && d.HasChange("paid_type") {
o, n := d.GetChange("paid_type")
oldPaidType := o.(string)
newPaidType := n.(string)
oldPaidTypeInt := 1
newPaidTypeInt := 1
if oldPaidType == string(PrePaid) {
oldPaidTypeInt = 0
}
if newPaidType == string(PrePaid) {
newPaidTypeInt = 0
}
if oldPaidTypeInt == 1 && newPaidTypeInt == 0 {
action := "ConvertPostPayOrder"
request := map[string]interface{}{
"RegionId": client.RegionId,
"InstanceId": d.Id(),
}
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutUpdate)), func() *resource.RetryError {
response, err = client.RpcPost("alikafka", "2019-09-16", action, nil, request, false)
if err != nil {
if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) || NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
stateConf := BuildStateConf([]string{}, []string{strconv.Itoa(newPaidTypeInt)}, d.Timeout(schema.TimeoutUpdate), 1*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "PaidType", []string{}))
if _, err := stateConf.WaitForState(); err != nil {
return WrapErrorf(err, IdMsg, d.Id())
}
} else {
return WrapError(errors.New("paid type only support change from post pay to pre pay"))
}
d.SetPartial("paid_type")
}
update := false
request := map[string]interface{}{
"InstanceId": d.Id(),
"RegionId": client.RegionId,
}
// updating topic_quota only by updating partition_num
if !d.IsNewResource() && d.HasChange("partition_num") {
update = true
}
request["PartitionNum"] = d.Get("partition_num")
if !d.IsNewResource() && d.HasChange("disk_size") {
update = true
}
request["DiskSize"] = d.Get("disk_size")
if !d.IsNewResource() && d.HasChange("io_max") {
update = true
if v, ok := d.GetOk("io_max"); ok {
request["IoMax"] = v
}
}
if !d.IsNewResource() && d.HasChange("io_max_spec") {
update = true
if v, ok := d.GetOk("io_max_spec"); ok {
request["IoMaxSpec"] = v
}
}
if !d.IsNewResource() && d.HasChange("spec_type") {
update = true
}
request["SpecType"] = d.Get("spec_type")
if !d.IsNewResource() && d.HasChange("deploy_type") {
update = true
}
if d.Get("deploy_type").(int) == 4 {
request["EipModel"] = true
} else {
request["EipModel"] = false
}
if !d.IsNewResource() && d.HasChange("eip_max") {
update = true
}
request["EipMax"] = d.Get("eip_max").(int)
if update {
action := "UpgradePostPayOrder"
if d.Get("paid_type").(string) == string(PrePaid) {
action = "UpgradePrePayOrder"
}
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(5*time.Minute, func() *resource.RetryError {
response, err = client.RpcPost("alikafka", "2019-09-16", action, nil, request, false)
if err != nil {
if NeedRetry(err) || IsExpectedErrors(err, []string{"ONS_SYSTEM_FLOW_CONTROL"}) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
stateConf := BuildStateConf([]string{}, []string{fmt.Sprint(d.Get("disk_size"))}, d.Timeout(schema.TimeoutUpdate), 5*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "DiskSize", []string{}))
if _, err := stateConf.WaitForState(); err != nil {
return WrapErrorf(err, IdMsg, d.Id())
}
if d.HasChange("io_max") {
stateConf = BuildStateConf([]string{}, []string{fmt.Sprint(d.Get("io_max"))}, d.Timeout(schema.TimeoutUpdate), 5*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "IoMax", []string{}))
if _, err := stateConf.WaitForState(); err != nil {
return WrapErrorf(err, IdMsg, d.Id())
}
}
stateConf = BuildStateConf([]string{}, []string{fmt.Sprint(d.Get("eip_max"))}, d.Timeout(schema.TimeoutUpdate), 5*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "EipMax", []string{}))
if _, err := stateConf.WaitForState(); err != nil {
return WrapErrorf(err, IdMsg, d.Id())
}
stateConf = BuildStateConf([]string{}, []string{fmt.Sprint(d.Get("spec_type"))}, d.Timeout(schema.TimeoutUpdate), 5*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "SpecType", []string{}))
if _, err := stateConf.WaitForState(); err != nil {
return WrapErrorf(err, IdMsg, d.Id())
}
stateConf = BuildStateConf([]string{}, []string{"5"}, d.Timeout(schema.TimeoutUpdate), 5*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "ServiceStatus", []string{}))
if _, err := stateConf.WaitForState(); err != nil {
return WrapErrorf(err, IdMsg, d.Id())
}
d.SetPartial("partition_num")
d.SetPartial("disk_size")
d.SetPartial("io_max")
d.SetPartial("io_max_spec")
d.SetPartial("spec_type")
d.SetPartial("eip_max")
}
if !d.IsNewResource() && d.HasChange("service_version") {
action := "UpgradeInstanceVersion"
request := map[string]interface{}{
"InstanceId": d.Id(),
"RegionId": client.RegionId,
}
if v, ok := d.GetOk("service_version"); ok {
request["TargetVersion"] = v
}
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutUpdate)), func() *resource.RetryError {
response, err = client.RpcPost("alikafka", "2019-09-16", action, nil, request, false)
if err != nil {
if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) || NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
// means no need to update version
if IsExpectedErrors(err, []string{"ONS_INIT_ENV_ERROR"}) {
return nil
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
if fmt.Sprint(response["Success"]) == "false" {
return WrapError(fmt.Errorf("%s failed, response: %v", action, response))
}
// wait for upgrade task be invoke
time.Sleep(60 * time.Second)
// upgrade service may be last a long time
stateConf := BuildStateConf([]string{}, []string{"5"}, d.Timeout(schema.TimeoutUpdate), 5*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "ServiceStatus", []string{}))
if _, err := stateConf.WaitForState(); err != nil {
return WrapErrorf(err, IdMsg, d.Id())
}
d.SetPartial("service_version")
}
if !d.IsNewResource() && d.HasChange("config") {
action := "UpdateInstanceConfig"
request := map[string]interface{}{
"RegionId": client.RegionId,
"InstanceId": d.Id(),
}
if v, ok := d.GetOk("config"); ok {
request["Config"] = v
}
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutUpdate)), func() *resource.RetryError {
response, err = client.RpcPost("alikafka", "2019-09-16", action, nil, request, false)
if err != nil {
if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) || NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, request)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
if fmt.Sprint(response["Success"]) == "false" {
return WrapError(fmt.Errorf("%s failed, response: %v", action, response))
}
// wait for upgrade task be invoke
stateConf := BuildStateConf([]string{}, []string{"5"}, d.Timeout(schema.TimeoutUpdate), 5*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "ServiceStatus", []string{}))
if _, err := stateConf.WaitForState(); err != nil {
return WrapErrorf(err, IdMsg, d.Id())
}
d.SetPartial("config")
}
update = false
changeResourceGroupReq := map[string]interface{}{
"RegionId": client.RegionId,
"ResourceId": d.Id(),
}
if !d.IsNewResource() && d.HasChange("resource_group_id") {
update = true
}
if v, ok := d.GetOk("resource_group_id"); ok {
changeResourceGroupReq["NewResourceGroupId"] = v
}
if update {
action := "ChangeResourceGroup"
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutUpdate)), func() *resource.RetryError {
response, err = client.RpcPost("alikafka", "2019-09-16", action, nil, changeResourceGroupReq, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, changeResourceGroupReq)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
d.SetPartial("resource_group_id")
}
update = false
enableAutoGroupCreationReq := map[string]interface{}{
"RegionId": client.RegionId,
"InstanceId": d.Id(),
}
if d.HasChange("enable_auto_group") {
update = true
if v, ok := d.GetOkExists("enable_auto_group"); ok {
enableAutoGroupCreationReq["Enable"] = v
}
}
if update {
action := "EnableAutoGroupCreation"
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutUpdate)), func() *resource.RetryError {
response, err = client.RpcPost("alikafka", "2019-09-16", action, nil, enableAutoGroupCreationReq, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, enableAutoGroupCreationReq)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
if fmt.Sprint(response["Success"]) == "false" {
return WrapError(fmt.Errorf("%s failed, response: %v", action, response))
}
d.SetPartial("enable_auto_group")
}
update = false
enableAutoTopicCreationReq := map[string]interface{}{
"RegionId": client.RegionId,
"InstanceId": d.Id(),
}
if d.HasChange("enable_auto_topic") {
update = true
}
if v, ok := d.GetOk("enable_auto_topic"); ok {
enableAutoTopicCreationReq["Operate"] = v
}
if update {
action := "EnableAutoTopicCreation"
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutUpdate)), func() *resource.RetryError {
response, err = client.RpcPost("alikafka", "2019-09-16", action, nil, enableAutoTopicCreationReq, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action, response, enableAutoTopicCreationReq)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
if fmt.Sprint(response["Success"]) == "false" {
return WrapError(fmt.Errorf("%s failed, response: %v", action, response))
}
d.SetPartial("enable_auto_topic")
}
update = false
updateTopicPartitionNumReq := map[string]interface{}{
"RegionId": client.RegionId,
"Operate": "updatePartition",
"UpdatePartition": true,
"InstanceId": d.Id(),
}
object, err := alikafkaService.DescribeAliKafkaInstance(d.Id())
if err != nil {
return WrapError(err)
}
defaultTopicPartitionNum, ok := d.GetOkExists("default_topic_partition_num")
if ok && fmt.Sprint(object["DefaultPartitionNum"]) != fmt.Sprint(defaultTopicPartitionNum) {
update = true
updateTopicPartitionNumReq["PartitionNum"] = defaultTopicPartitionNum
}
if update {
action := "EnableAutoTopicCreation"
wait := incrementalWait(3*time.Second, 3*time.Second)
err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutUpdate)), func() *resource.RetryError {
response, err = client.RpcPost("alikafka", "2019-09-16", action, nil, updateTopicPartitionNumReq, false)
if err != nil {
if NeedRetry(err) {
wait()
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}
return nil
})
addDebug(action+" updateTopicPartitionNum", response, updateTopicPartitionNumReq)
if err != nil {
return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR)
}
if fmt.Sprint(response["Success"]) == "false" {
return WrapError(fmt.Errorf("%s failed, response: %v", action, response))
}
d.SetPartial("default_topic_partition_num")
}
d.Partial(false)
return resourceAliCloudAlikafkaInstanceRead(d, meta)
}