func resourceAlibabacloudStackDtsSynchronizationJobCreate()

in alibabacloudstack/resource_apsarastack_dts_synchronization_job.go [231:385]


func resourceAlibabacloudStackDtsSynchronizationJobCreate(d *schema.ResourceData, meta interface{}) error {
	client := meta.(*connectivity.AlibabacloudStackClient)
	var response map[string]interface{}
	action := "ConfigureDtsJob"
	request := client.NewCommonRequest("POST", "Dts", "2020-01-01", action, "")
	request.Headers["x-acs-caller-sdk-source"] = "Terraform" // 必填,调用来源说明
	request.Headers["x-acs-content-type"] = "application/json"
	request.Headers["Content-type"] = "application/json"
	mergeMaps(request.QueryParams, map[string]string{
		"DbList":                          d.Get("db_list").(string),
		"DtsJobName":                      d.Get("dts_job_name").(string),
		"DataInitialization":              fmt.Sprintf("%t", d.Get("data_initialization").(bool)),
		"DataSynchronization":             fmt.Sprintf("%t", d.Get("data_synchronization").(bool)),
		"StructureInitialization":         fmt.Sprintf("%t", d.Get("structure_initialization").(bool)),
		"SynchronizationDirection":        d.Get("synchronization_direction").(string),
		"DestinationEndpointInstanceType": d.Get("destination_endpoint_instance_type").(string),
		"SourceEndpointInstanceType":      d.Get("source_endpoint_instance_type").(string),
		"JobType":                         "SYNC",
	})
	if v, ok := d.GetOk("dts_job_id"); ok {
		request.QueryParams["DtsJobId"] = v.(string)
	}
	if v, ok := d.GetOk("dts_instance_id"); ok {
		request.QueryParams["DtsInstanceId"] = v.(string)
	}
	if v, ok := d.GetOk("checkpoint"); ok {
		request.QueryParams["Checkpoint"] = v.(string)
	}
	if v, ok := d.GetOkExists("delay_notice"); ok {
		request.QueryParams["DelayNotice"] = fmt.Sprintf("%t", v.(bool))
	}
	if v, ok := d.GetOk("delay_phone"); ok {
		request.QueryParams["DelayPhone"] = v.(string)
	}
	if v, ok := d.GetOk("delay_rule_time"); ok {
		request.QueryParams["DelayRuleTime"] = v.(string)
	}
	if v, ok := d.GetOk("destination_endpoint_database_name"); ok {
		request.QueryParams["DestinationEndpointDataBaseName"] = v.(string)
	}
	if v, ok := d.GetOk("destination_endpoint_engine_name"); ok {
		request.QueryParams["DestinationEndpointEngineName"] = v.(string)
	}
	if v, ok := d.GetOk("destination_endpoint_ip"); ok {
		request.QueryParams["DestinationEndpointIP"] = v.(string)
	}
	if v, ok := d.GetOk("destination_endpoint_instance_id"); ok {
		request.QueryParams["DestinationEndpointInstanceID"] = v.(string)
	}
	if v, ok := d.GetOk("destination_endpoint_oracle_sid"); ok {
		request.QueryParams["DestinationEndpointOracleSID"] = v.(string)
	}
	if v, ok := d.GetOk("destination_endpoint_password"); ok {
		request.QueryParams["DestinationEndpointPassword"] = v.(string)
	}
	if v, ok := d.GetOk("destination_endpoint_port"); ok {
		request.QueryParams["DestinationEndpointPort"] = v.(string)
	}

	if v, ok := d.GetOk("destination_endpoint_region"); ok {
		request.QueryParams["DestinationEndpointRegion"] = v.(string)
	}

	if v, ok := d.GetOk("destination_endpoint_user_name"); ok {
		request.QueryParams["DestinationEndpointUserName"] = v.(string)
	}
	if v, ok := d.GetOkExists("error_notice"); ok {
		request.QueryParams["ErrorNotice"] = fmt.Sprintf("%t", v.(bool))
	}
	if v, ok := d.GetOk("error_phone"); ok {
		request.QueryParams["ErrorPhone"] = v.(string)
	}
	if v, ok := d.GetOk("reserve"); ok {
		request.QueryParams["Reserve"] = v.(string)
	}
	if v, ok := d.GetOk("source_endpoint_database_name"); ok {
		request.QueryParams["SourceEndpointDatabaseName"] = v.(string)
	}
	if v, ok := d.GetOk("source_endpoint_engine_name"); ok {
		request.QueryParams["SourceEndpointEngineName"] = v.(string)
	}
	if v, ok := d.GetOk("source_endpoint_ip"); ok {
		request.QueryParams["SourceEndpointIP"] = v.(string)
	}
	if v, ok := d.GetOk("source_endpoint_instance_id"); ok {
		request.QueryParams["SourceEndpointInstanceID"] = v.(string)
	}
	if v, ok := d.GetOk("source_endpoint_oracle_sid"); ok {
		request.QueryParams["SourceEndpointOracleSID"] = v.(string)
	}
	if v, ok := d.GetOk("source_endpoint_owner_id"); ok {
		request.QueryParams["SourceEndpointOwnerID"] = v.(string)
	}
	if v, ok := d.GetOk("source_endpoint_password"); ok {
		request.QueryParams["SourceEndpointPassword"] = v.(string)
	}
	if v, ok := d.GetOk("source_endpoint_port"); ok {
		request.QueryParams["SourceEndpointPort"] = v.(string)
	}

	if v, ok := d.GetOk("source_endpoint_region"); ok {
		request.QueryParams["SourceEndpointRegion"] = v.(string)
	}

	if v, ok := d.GetOk("source_endpoint_role"); ok {
		request.QueryParams["SourceEndpointRole"] = v.(string)
	}
	if v, ok := d.GetOk("source_endpoint_user_name"); ok {
		request.QueryParams["SourceEndpointUserName"] = v.(string)
	}
	wait := incrementalWait(3*time.Second, 3*time.Second)
	request.Domain = client.Config.Endpoints[connectivity.DTSCode]
	var err error
	var dtsClient *sts.Client
	if client.Config.SecurityToken == "" {
		dtsClient, err = sts.NewClientWithAccessKey(client.Config.RegionId, client.Config.AccessKey, client.Config.SecretKey)
	} else {
		dtsClient, err = sts.NewClientWithStsToken(client.Config.RegionId, client.Config.AccessKey, client.Config.SecretKey, client.Config.SecurityToken)
	}
	dtsClient.Domain = client.Config.Endpoints[connectivity.DTSCode]
	err = resource.Retry(d.Timeout(schema.TimeoutCreate), func() *resource.RetryError {
		raw, err := dtsClient.ProcessCommonRequest(request)
		addDebug(action, raw, request, request.QueryParams)
		if err != nil {
			if errmsgs.NeedRetry(err) {
				wait()
				return resource.RetryableError(err)
			}
			errmsg := ""
			if raw != nil {
				errmsg = errmsgs.GetBaseResponseErrorMessage(raw.BaseResponse)
			}
			err = errmsgs.WrapErrorf(err, errmsgs.RequestV1ErrorMsg, "alibabacloudstack_dts_synchronization_job", action, errmsgs.AlibabacloudStackSdkGoERROR, errmsg)
			return resource.NonRetryableError(err)
		}
		err = json.Unmarshal(raw.GetHttpContentBytes(), &response)
		if err != nil {
			return resource.NonRetryableError(err)
		}
		return nil
	})
	if err != nil {
		return err
	}

	d.SetId(fmt.Sprint(response["DtsJobId"]))
	d.Set("dts_instance_id", response["DtsInstanceId"])
	dtsService := DtsService{client}
	stateConf := BuildStateConf([]string{}, []string{"Synchronizing"}, d.Timeout(schema.TimeoutUpdate), 5*time.Second, dtsService.DtsSynchronizationJobStateRefreshFunc(d.Id(), []string{"InitializeFailed"}))
	if _, err := stateConf.WaitForState(); err != nil {
		return errmsgs.WrapErrorf(err, errmsgs.IdMsg, d.Id())
	}

	return nil
}