alicloud/resource_alicloud_log_oss_export.go (408 lines of code) (raw):

package alicloud import ( "encoding/json" "fmt" "log" "time" "github.com/hashicorp/terraform-plugin-sdk/helper/validation" sls "github.com/aliyun/aliyun-log-go-sdk" "github.com/aliyun/terraform-provider-alicloud/alicloud/connectivity" "github.com/hashicorp/terraform-plugin-sdk/helper/resource" "github.com/hashicorp/terraform-plugin-sdk/helper/schema" ) func resourceAlicloudLogOssExport() *schema.Resource { return &schema.Resource{ Create: resourceAlicloudLogOssExportCreate, Read: resourceAlicloudLogOssExportRead, Update: resourceAlicloudLogOssExportUpdate, Delete: resourceAlicloudLogOssExportDelete, Importer: &schema.ResourceImporter{ State: schema.ImportStatePassthrough, }, Timeouts: &schema.ResourceTimeout{ Create: schema.DefaultTimeout(1 * time.Minute), Delete: schema.DefaultTimeout(1 * time.Minute), Update: schema.DefaultTimeout(1 * time.Minute), }, Schema: map[string]*schema.Schema{ "project_name": { Type: schema.TypeString, Required: true, ForceNew: true, }, "logstore_name": { Type: schema.TypeString, Required: true, ForceNew: true, }, "export_name": { Type: schema.TypeString, Required: true, ForceNew: true, }, "display_name": { Type: schema.TypeString, Optional: true, }, "from_time": { Type: schema.TypeInt, Optional: true, }, "bucket": { Type: schema.TypeString, Required: true, }, "prefix": { Type: schema.TypeString, Optional: true, }, "suffix": { Type: schema.TypeString, Optional: true, }, "buffer_interval": { Type: schema.TypeInt, Required: true, }, "buffer_size": { Type: schema.TypeInt, Required: true, }, "role_arn": { Type: schema.TypeString, Optional: true, }, "log_read_role_arn": { Type: schema.TypeString, Optional: true, }, "compress_type": { Type: schema.TypeString, Optional: true, ValidateFunc: validation.StringInSlice([]string{"none", "zstd", "gzip", "snappy"}, false), Computed: true, }, "path_format": { Type: schema.TypeString, Required: true, }, "time_zone": { Type: schema.TypeString, Required: true, }, "content_type": { Type: schema.TypeString, Required: true, ValidateFunc: validation.StringInSlice([]string{"json", "parquet", "csv", "orc"}, false), }, "json_enable_tag": { Type: schema.TypeBool, Optional: true, }, "csv_config_delimiter": { Type: schema.TypeString, Optional: true, }, "csv_config_header": { Type: schema.TypeBool, Optional: true, }, "csv_config_linefeed": { Type: schema.TypeString, Optional: true, }, "csv_config_columns": { Type: schema.TypeList, Optional: true, Elem: &schema.Schema{Type: schema.TypeString}, }, "csv_config_null": { Type: schema.TypeString, Optional: true, }, "csv_config_quote": { Type: schema.TypeString, Optional: true, }, "csv_config_escape": { Type: schema.TypeString, Optional: true, }, "config_columns": { Type: schema.TypeSet, Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "name": { Type: schema.TypeString, Required: true, }, "type": { Type: schema.TypeString, Required: true, }, }, }, }, }, } } func resourceAlicloudLogOssExportCreate(d *schema.ResourceData, meta interface{}) error { client := meta.(*connectivity.AliyunClient) var requestInfo *sls.Client projectName := d.Get("project_name").(string) logstoreName := d.Get("logstore_name").(string) exportName := d.Get("export_name").(string) wait := incrementalWait(3*time.Second, 3*time.Second) if err := resource.Retry(d.Timeout(schema.TimeoutCreate), func() *resource.RetryError { raw, err := client.WithLogClient(func(slsClient *sls.Client) (interface{}, error) { return nil, slsClient.CreateExport(projectName, buildOSSExport(d)) }) if err != nil { if IsExpectedErrors(err, []string{LogClientTimeout}) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } addDebug("CreateOSSExport", raw, requestInfo, map[string]string{ "project_name": projectName, "logstore_name": logstoreName, "export_name": exportName, }) return nil }); err != nil { return WrapErrorf(err, DefaultErrorMsg, "alicloud_log_oss_export", "CreateLogOssExport", AliyunLogGoSdkERROR) } d.SetId(fmt.Sprintf("%s%s%s%s%s", projectName, COLON_SEPARATED, logstoreName, COLON_SEPARATED, exportName)) return resourceAlicloudLogOssExportRead(d, meta) } func resourceAlicloudLogOssExportRead(d *schema.ResourceData, meta interface{}) error { client := meta.(*connectivity.AliyunClient) logService := LogService{client} parts, err := ParseResourceId(d.Id(), 3) if err != nil { return WrapError(err) } ossExport, err := logService.DescribeLogOssExport(d.Id()) if err != nil { if NotFoundError(err) { log.Printf("[DEBUG] Resource alicloud_log_oss_export LogService.DescribeLogExport Failed!!! %s", err) d.SetId("") return nil } return WrapError(err) } ossDataSink := ossExport.ExportConfiguration.DataSink.(*sls.AliyunOSSSink) d.Set("project_name", parts[0]) d.Set("logstore_name", parts[1]) d.Set("export_name", parts[2]) d.Set("display_name", ossExport.DisplayName) d.Set("from_time", ossExport.ExportConfiguration.FromTime) d.Set("log_read_role_arn", ossExport.ExportConfiguration.RoleArn) d.Set("bucket", ossDataSink.Bucket) d.Set("prefix", ossDataSink.Prefix) d.Set("suffix", ossDataSink.Suffix) d.Set("buffer_interval", ossDataSink.BufferInterval) d.Set("buffer_size", ossDataSink.BufferSize) d.Set("time_zone", ossDataSink.TimeZone) d.Set("role_arn", ossDataSink.RoleArn) d.Set("compress_type", ossDataSink.CompressionType) d.Set("path_format", ossDataSink.PathFormat) d.Set("content_type", ossDataSink.ContentType) if ossDataSink.ContentType == "json" { detail := new(sls.JsonContentDetail) contentDetailBytes, _ := json.Marshal(ossDataSink.ContentDetail) json.Unmarshal(contentDetailBytes, detail) d.Set("json_enable_tag", detail.EnableTag) } else if ossDataSink.ContentType == "csv" { detail := new(sls.CsvContentDetail) contentDetailBytes, _ := json.Marshal(ossDataSink.ContentDetail) json.Unmarshal(contentDetailBytes, detail) d.Set("csv_config_delimiter", detail.Delimiter) d.Set("csv_config_header", detail.Header) d.Set("csv_config_linefeed", detail.LineFeed) d.Set("csv_config_columns", detail.ColumnNames) d.Set("csv_config_null", detail.Null) d.Set("csv_config_quote", detail.Quote) } else if ossDataSink.ContentType == "parquet" || ossDataSink.ContentType == "orc" { var config []map[string]interface{} contentDetailBytes, _ := json.Marshal(ossDataSink.ContentDetail) detail := new(sls.ParquetContentDetail) json.Unmarshal(contentDetailBytes, detail) for _, column := range detail.Columns { tempMap := map[string]interface{}{ "name": column.Name, "type": column.Type, } config = append(config, tempMap) } d.Set("config_columns", config) } return nil } func resourceAlicloudLogOssExportUpdate(d *schema.ResourceData, meta interface{}) error { client := meta.(*connectivity.AliyunClient) parts, err := ParseResourceId(d.Id(), 3) if err != nil { return WrapError(err) } wait := incrementalWait(3*time.Second, 3*time.Second) if err := resource.Retry(d.Timeout(schema.TimeoutUpdate), func() *resource.RetryError { _, err := client.WithLogClient(func(slsClient *sls.Client) (interface{}, error) { return nil, slsClient.RestartExport(parts[0], buildOSSExport(d)) }) if err != nil { if IsExpectedErrors(err, []string{LogClientTimeout}) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } return nil }); err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), "UpdateLogOssExport", AliyunLogGoSdkERROR) } return resourceAlicloudLogOssExportRead(d, meta) } func resourceAlicloudLogOssExportDelete(d *schema.ResourceData, meta interface{}) error { client := meta.(*connectivity.AliyunClient) logService := LogService{client} parts, err := ParseResourceId(d.Id(), 3) if err != nil { return WrapError(err) } var requestInfo *sls.Client wait := incrementalWait(3*time.Second, 3*time.Second) err = resource.Retry(d.Timeout(schema.TimeoutDelete), func() *resource.RetryError { raw, err := client.WithLogClient(func(slsClient *sls.Client) (interface{}, error) { return nil, slsClient.DeleteExport(parts[0], parts[2]) }) if err != nil { if IsExpectedErrors(err, []string{LogClientTimeout}) { wait() return resource.RetryableError(err) } return resource.NonRetryableError(err) } if debugOn() { addDebug("DeleteLogOssExport", raw, requestInfo, map[string]interface{}{ "project_name": parts[0], "logstore_name": parts[1], "export_name": parts[2], }) } return nil }) if err != nil { if IsExpectedErrors(err, []string{"JobNotExist"}) { return nil } return WrapErrorf(err, DefaultErrorMsg, "alicloud_log_oss_export", "DeleteLogOssExport", AliyunLogGoSdkERROR) } return WrapError(logService.WaitForLogOssExport(d.Id(), Deleted, DefaultTimeout)) } func buildOSSExport(d *schema.ResourceData) *sls.Export { contentType := d.Get("content_type").(string) ossExportConfig := &sls.AliyunOSSSink{ Type: sls.DataSinkOSS, Bucket: d.Get("bucket").(string), PathFormat: d.Get("path_format").(string), PathFormatType: "time", BufferSize: int64(d.Get("buffer_size").(int)), BufferInterval: int64(d.Get("buffer_interval").(int)), TimeZone: d.Get("time_zone").(string), ContentType: sls.OSSContentType(contentType), } roleArn := "" if v, ok := d.GetOk("role_arn"); ok { roleArn = v.(string) } ossExportConfig.RoleArn = roleArn if v, ok := d.GetOk("prefix"); ok { ossExportConfig.Prefix = v.(string) } if v, ok := d.GetOk("suffix"); ok { ossExportConfig.Suffix = v.(string) } if v, ok := d.GetOk("compress_type"); ok { ossExportConfig.CompressionType = sls.OSSCompressionType(v.(string)) } if contentType == "json" { enableTag := false if v, ok := d.GetOk("json_enable_tag"); ok { enableTag = v.(bool) } ossExportConfig.ContentDetail = sls.JsonContentDetail{EnableTag: enableTag} } else if contentType == "parquet" || contentType == "orc" { detail := sls.ParquetContentDetail{} if configColumns, ok := d.GetOk("config_columns"); ok { for _, f := range configColumns.(*schema.Set).List() { v := f.(map[string]interface{}) config := sls.Column{ Name: v["name"].(string), Type: v["type"].(string), } detail.Columns = append(detail.Columns, config) } } ossExportConfig.ContentDetail = detail } else if contentType == "csv" { detail := sls.CsvContentDetail{} if v, ok := d.GetOk("csv_config_delimiter"); ok { detail.Delimiter = v.(string) } if v, ok := d.GetOk("csv_config_header"); ok { detail.Header = v.(bool) } if v, ok := d.GetOk("csv_config_linefeed"); ok { detail.LineFeed = v.(string) } if v, ok := d.GetOk("csv_config_null"); ok { detail.Null = v.(string) } if v, ok := d.GetOk("csv_config_quote"); ok { detail.Quote = v.(string) } if v, ok := d.GetOk("csv_config_escape"); ok { detail.Escape = v.(string) } columns := []string{} if v, ok := d.GetOk("csv_config_columns"); ok { for _, v := range v.([]interface{}) { columns = append(columns, v.(string)) } } detail.ColumnNames = columns ossExportConfig.ContentDetail = detail } fromTime := int64(0) if v, ok := d.GetOk("from_time"); ok { fromTime = int64(v.(int)) } logReadRoleArn := roleArn if v, ok := d.GetOk("log_read_role_arn"); ok { logReadRoleArn = v.(string) } return &sls.Export{ ScheduledJob: sls.ScheduledJob{ BaseJob: sls.BaseJob{ Name: d.Get("export_name").(string), DisplayName: d.Get("display_name").(string), Description: "", Type: sls.EXPORT_JOB, }, Schedule: &sls.Schedule{ Type: "Resident", }, }, ExportConfiguration: &sls.ExportConfiguration{ FromTime: fromTime, LogStore: d.Get("logstore_name").(string), Parameters: map[string]string{}, RoleArn: logReadRoleArn, Version: sls.ExportVersion2, DataSink: ossExportConfig, }, } }