client_job.go (533 lines of code) (raw):

package sls import ( "encoding/json" "errors" "fmt" "io/ioutil" "net/url" ) const ( DataSourceOSS DataSourceType = "AliyunOSS" DataSourceBSS DataSourceType = "AliyunBSS" DataSourceMaxCompute DataSourceType = "AliyunMaxCompute" DataSourceJDBC DataSourceType = "JDBC" DataSourceKafka DataSourceType = "Kafka" DataSourceCMS DataSourceType = "AliyunCloudMonitor" DataSourceGeneral DataSourceType = "General" DataSourceS3 DataSourceType = "S3" OSSDataFormatTypeLine OSSDataFormatType = "Line" OSSDataFormatTypeMultiline OSSDataFormatType = "Multiline" OSSDataFormatTypeJSON OSSDataFormatType = "JSON" OSSDataFormatTypeParquet OSSDataFormatType = "Parquet" OSSDataFormatTypeDelimitedText OSSDataFormatType = "DelimitedText" KafkaValueTypeText KafkaValueType = "Text" KafkaValueTypeJSON KafkaValueType = "JSON" KafkaPositionGroupOffsets KafkaPosition = "GROUP_OFFSETS" KafkaPositionEarliest KafkaPosition = "EARLIEST" KafkaPositionLatest KafkaPosition = "LATEST" KafkaPositionTimeStamp KafkaPosition = "TIMESTAMP" DataSinkLOG DataSinkType = "AliyunLOG" DataSinkOSS DataSinkType = "AliyunOSS" DataSinkADB DataSinkType = "AliyunADB" DataSinkTSDB DataSinkType = "AliyunTSDB" DataSinkODPS DataSinkType = "AliyunODPS" DataSinkGENERAL DataSinkType = "General" OSSContentDetailTypeParquet OSSContentType = "parquet" OSSContentDetailTypeORC OSSContentType = "orc" OSSContentDetailTypeCSV OSSContentType = "csv" OSSContentDetailTypeJSON OSSContentType = "json" OSSCompressionTypeNone OSSCompressionType = "none" OSSCompressionTypeZstd OSSCompressionType = "zstd" OSSCompressionTypeGzip OSSCompressionType = "gzip" OSSCompressionTypeSnappy OSSCompressionType = "snappy" ExportVersion2 ExportVersion = "v2.0" // new versions of OSSExport, ODPSExport Version property must use this field, otherwise the service may be unavailable or incomplete ) type ( BaseJob struct { Name string `json:"name"` DisplayName string `json:"displayName,omitempty"` Description string `json:"description,omitempty"` Type JobType `json:"type"` Recyclable bool `json:"recyclable"` CreateTime int64 `json:"createTime"` LastModifyTime int64 `json:"lastModifyTime"` } ScheduledJob struct { BaseJob Status string `json:"status"` Schedule *Schedule `json:"schedule"` ScheduleId string `json:"scheduleId"` } Ingestion struct { ScheduledJob IngestionConfiguration *IngestionConfiguration `json:"configuration"` } IngestionConfiguration struct { Version string `json:"version"` LogStore string `json:"logstore"` NumberOfInstance int32 `json:"numberOfInstance"` DataSource interface{} `json:"source"` } DataSourceType string DataSource struct { DataSourceType DataSourceType `json:"type"` } // >>> ingestion oss source OSSDataFormatType string AliyunOSSSource struct { DataSource Bucket string `json:"bucket"` Endpoint string `json:"endpoint"` RoleArn string `json:"roleARN"` Prefix string `json:"prefix,omitempty"` Pattern string `json:"pattern,omitempty"` CompressionCodec string `json:"compressionCodec,omitempty"` Encoding string `json:"encoding,omitempty"` Format interface{} `json:"format,omitempty"` RestoreObjectEnable bool `json:"restoreObjectEnable"` LastModifyTimeAsLogTime bool `json:"lastModifyTimeAsLogTime"` } OSSDataFormat struct { Type OSSDataFormatType `json:"type"` TimeFormat string `json:"timeFormat"` TimeZone string `json:"timeZone"` } LineFormat struct { OSSDataFormat TimePattern string `json:"timePattern"` } MultiLineFormat struct { LineFormat MaxLines int64 `json:"maxLines,omitempty"` Negate bool `json:"negate"` Match string `json:"match"` Pattern string `json:"pattern"` FlushPattern string `json:"flushPattern"` } StructureDataFormat struct { OSSDataFormat TimeField string `json:"timeField"` } JSONFormat struct { StructureDataFormat SkipInvalidRows bool `json:"skipInvalidRows"` } ParquetFormat struct { StructureDataFormat } DelimitedTextFormat struct { StructureDataFormat FieldNames []string `json:"fieldNames"` FieldDelimiter string `json:"fieldDelimiter"` QuoteChar string `json:"quoteChar"` EscapeChar string `json:"escapeChar"` SkipLeadingRows int64 `json:"skipLeadingRows"` MaxLines int64 `json:"maxLines"` FirstRowAsHeader bool `json:"firstRowAsHeader"` } // ingestion maxcompute source >>> AliyunMaxComputeSource struct { DataSource AccessKeyID string `json:"accessKeyID"` AccessKeySecret string `json:"accessKeySecret"` Endpoint string `json:"endpoint"` TunnelEndpoint string `json:"tunnelEndpoint,omitempty"` Project string `json:"project"` Table string `json:"table"` PartitionSpec string `json:"partitionSpec"` TimeField string `json:"timeField"` TimeFormat string `json:"timeFormat"` TimeZone string `json:"timeZone"` } // ingestion cloud monitor source AliyunCloudMonitorSource struct { DataSource AccessKeyID string `json:"accessKeyID"` AccessKeySecret string `json:"accessKeySecret"` StartTime int64 `json:"startTime"` Namespaces []string `json:"namespaces"` OutputType string `json:"outputType"` DelayTime int64 `json:"delayTime"` } // ingestion kafka source KafkaValueType string KafkaPosition string KafkaSource struct { DataSource Topics string `json:"topics"` BootStrapServers string `json:"bootstrapServers"` ValueType KafkaValueType `json:"valueType"` FromPosition KafkaPosition `json:"fromPosition"` FromTimeStamp int64 `json:"fromTimestamp"` ToTimeStamp int64 `json:"toTimestamp"` TimeField string `json:"timeField"` TimePattern string `json:"timePattern"` TimeFormat string `json:"timeFormat"` TimeZone string `json:"timeZone"` Communication string `json:"communication"` NameResolutions string `json:"nameResolutions"` AdditionalProps map[string]string `json:"additionalProps"` VpcId string `json:"vpcId"` } S3Source struct { DataSource AWSAccessKey string `json:"awsAccessKey"` AWSAccessKeySecret string `json:"awsAccessKeySecret"` AWSRegion string `json:"awsRegion"` Bucket string `json:"bucket"` Prefix string `json:"prefix,omitempty"` Pattern string `json:"pattern,omitempty"` CompressionCodec string `json:"compressionCodec,omitempty"` Encoding string `json:"encoding,omitempty"` Format interface{} `json:"format,omitempty"` } // ingestion JDBC source AliyunBssSource struct { DataSource RoleArn string `json:"roleARN"` HistoryMonth int64 `json:"historyMonth"` } // ingestion general source IngestionGeneralSource struct { DataSource Fields map[string]interface{} } Export struct { ScheduledJob ExportConfiguration *ExportConfiguration `json:"configuration"` } ExportVersion string ExportConfiguration struct { FromTime int64 `json:"fromTime"` ToTime int64 `json:"toTime"` LogStore string `json:"logstore"` Parameters map[string]string `json:"parameters"` RoleArn string `json:"roleArn"` Version ExportVersion `json:"version"` DataSink DataSink `json:"sink"` } DataSink interface { DataSinkType() DataSinkType } DataSinkType string OSSContentType string OSSCompressionType string AliyunOSSSink struct { Type DataSinkType `json:"type"` RoleArn string `json:"roleArn"` Bucket string `json:"bucket"` Prefix string `json:"prefix"` Suffix string `json:"suffix"` PathFormat string `json:"pathFormat"` PathFormatType string `json:"pathFormatType"` BufferSize int64 `json:"bufferSize"` BufferInterval int64 `json:"bufferInterval"` TimeZone string `json:"timeZone"` ContentType OSSContentType `json:"contentType"` CompressionType OSSCompressionType `json:"compressionType"` //CsvContentDetail, JsonContentDetail, ParquetContentDetail, OrcContentDetail // the old version ContentDetail is st ring(struct serialized string), and now we highly recommend you use the struct ContentDetail directly ContentDetail interface{} `json:"contentDetail"` } CsvContentDetail struct { ColumnNames []string `json:"columns"` Delimiter string `json:"delimiter"` Quote string `json:"quote"` Escape string `json:"escape"` Null string `json:"null"` Header bool `json:"header"` LineFeed string `json:"lineFeed"` } JsonContentDetail struct { EnableTag bool `json:"enableTag"` } ParquetContentDetail ColumnStorageContentDetail OrcContentDetail ColumnStorageContentDetail ColumnStorageContentDetail struct { Columns []Column `json:"columns"` } Column struct { Name string `json:"name"` Type string `json:"type"` } AliyunODPSSink struct { Type DataSinkType `json:"type"` OdpsRolearn string `json:"odpsRolearn"` OdpsEndpoint string `json:"odpsEndpoint"` OdpsTunnelEndpoint string `json:"odpsTunnelEndpoint"` OdpsProject string `json:"odpsProject"` OdpsTable string `json:"odpsTable"` TimeZone string `json:"timeZone"` PartitionTimeFormat string `json:"partitionTimeFormat"` Fields []string `json:"fields"` PartitionColumn []string `json:"partitionColumn"` OdpsAccessKeyId string `json:"odpsAccessKeyId"` OdpsAccessSecret string `json:"odpsAccessAecret"` } AliyunGeneralSink struct { Type DataSinkType `json:"type"` Fields map[string]interface{} `json:"fields"` } ) func (_ *AliyunOSSSink) DataSinkType() DataSinkType { return DataSinkOSS } func (_ *AliyunODPSSink) DataSinkType() DataSinkType { return DataSinkODPS } func (_ *AliyunGeneralSink) DataSinkType() DataSinkType { return DataSinkGENERAL } func (e *ExportConfiguration) UnmarshalJSON(data []byte) error { c := map[string]interface{}{} if err := json.Unmarshal(data, &c); err != nil { return err } sinkMap := c["sink"].(map[string]interface{}) t := sinkMap["type"].(string) var sink DataSink sinkBytes, err := json.Marshal(sinkMap) if err != nil { return err } else if string(DataSinkOSS) == t { sink = &AliyunOSSSink{} } else if string(DataSinkODPS) == t { sink = &AliyunODPSSink{} } else if string(DataSinkGENERAL) == t { sink = &AliyunGeneralSink{} } else { // TODO: other sinks to be implemented return errors.New(fmt.Sprintf("unsupported sink type:%s", t)) } if err = json.Unmarshal(sinkBytes, sink); err != nil { return err } parameters := map[string]string{} for k, v := range c["parameters"].(map[string]interface{}) { parameters[k] = v.(string) } *e = ExportConfiguration{ FromTime: int64(c["fromTime"].(float64)), ToTime: int64(c["toTime"].(float64)), LogStore: c["logstore"].(string), Parameters: parameters, RoleArn: c["roleArn"].(string), Version: ExportVersion(c["version"].(string)), DataSink: sink, } return nil } func (c *Client) CreateIngestion(project string, ingestion *Ingestion) error { body, err := json.Marshal(ingestion) if err != nil { return NewClientError(err) } h := map[string]string{ "x-log-bodyrawsize": fmt.Sprintf("%v", len(body)), "Content-Type": "application/json", } uri := "/jobs" r, err := c.request(project, "POST", uri, h, body) if err != nil { return err } r.Body.Close() return nil } func (c *Client) UpdateIngestion(project string, ingestion *Ingestion) error { body, err := json.Marshal(ingestion) if err != nil { return NewClientError(err) } h := map[string]string{ "x-log-bodyrawsize": fmt.Sprintf("%v", len(body)), "Content-Type": "application/json", } uri := "/jobs/" + ingestion.Name r, err := c.request(project, "PUT", uri, h, body) if err != nil { return err } r.Body.Close() return nil } func (c *Client) GetIngestion(project string, name string) (*Ingestion, error) { h := map[string]string{ "x-log-bodyrawsize": "0", "Content-Type": "application/json", } uri := "/jobs/" + name r, err := c.request(project, "GET", uri, h, nil) if err != nil { return nil, err } defer r.Body.Close() buf, err := ioutil.ReadAll(r.Body) if err != nil { return nil, readResponseError(err) } ingestion := &Ingestion{} if err = json.Unmarshal(buf, ingestion); err != nil { err = NewClientError(err) } return ingestion, err } func (c *Client) ListIngestion(project, logstore, name, displayName string, offset, size int) (ingestions []*Ingestion, total, count int, error error) { h := map[string]string{ "x-log-bodyrawsize": "0", "Content-Type": "application/json", } v := url.Values{} v.Add("logstore", logstore) v.Add("jobName", name) if displayName != "" { v.Add("displayName", displayName) } v.Add("jobType", "Ingestion") v.Add("offset", fmt.Sprintf("%d", offset)) v.Add("size", fmt.Sprintf("%d", size)) uri := "/jobs?" + v.Encode() r, err := c.request(project, "GET", uri, h, nil) if err != nil { return nil, 0, 0, err } defer r.Body.Close() type ingestionList struct { Total int `json:"total"` Count int `json:"count"` Results []*Ingestion `json:"results"` } buf, err := ioutil.ReadAll(r.Body) if err != nil { return nil, 0, 0, readResponseError(err) } is := &ingestionList{} if err = json.Unmarshal(buf, is); err != nil { err = NewClientError(err) } return is.Results, is.Total, is.Count, err } func (c *Client) DeleteIngestion(project string, name string) error { h := map[string]string{ "x-log-bodyrawsize": "0", "Content-Type": "application/json", } uri := "/jobs/" + name r, err := c.request(project, "DELETE", uri, h, nil) if err != nil { return err } r.Body.Close() return nil } func (c *Client) CreateExport(project string, export *Export) error { body, err := json.Marshal(export) if err != nil { return NewClientError(err) } h := map[string]string{ "x-log-bodyrawsize": fmt.Sprintf("%v", len(body)), "Content-Type": "application/json", } uri := "/jobs" r, err := c.request(project, "POST", uri, h, body) if err != nil { return err } r.Body.Close() return nil } func (c *Client) UpdateExport(project string, export *Export) error { body, err := json.Marshal(export) if err != nil { return NewClientError(err) } h := map[string]string{ "x-log-bodyrawsize": fmt.Sprintf("%v", len(body)), "Content-Type": "application/json", } uri := "/jobs/" + export.Name r, err := c.request(project, "PUT", uri, h, body) if err != nil { return err } r.Body.Close() return nil } func (c *Client) GetExport(project, name string) (*Export, error) { h := map[string]string{ "x-log-bodyrawsize": "0", "Content-Type": "application/json", } uri := "/jobs/" + name r, err := c.request(project, "GET", uri, h, nil) if err != nil { return nil, err } defer r.Body.Close() buf, err := ioutil.ReadAll(r.Body) if err != nil { return nil, readResponseError(err) } export := &Export{} if err = json.Unmarshal(buf, export); err != nil { err = NewClientError(err) } return export, err } func (c *Client) ListExport(project, logstore, name, displayName string, offset, size int) (exports []*Export, total, count int, error error) { h := map[string]string{ "x-log-bodyrawsize": "0", "Content-Type": "application/json", } v := url.Values{} v.Add("logstore", logstore) v.Add("jobName", name) if displayName != "" { v.Add("displayName", displayName) } v.Add("jobType", "Export") v.Add("offset", fmt.Sprintf("%d", offset)) v.Add("size", fmt.Sprintf("%d", size)) uri := "/jobs?" + v.Encode() r, err := c.request(project, "GET", uri, h, nil) if err != nil { return nil, 0, 0, err } defer r.Body.Close() type exportList struct { Total int `json:"total"` Count int `json:"count"` Results []*Export `json:"results"` } buf, err := ioutil.ReadAll(r.Body) if err != nil { return nil, 0, 0, readResponseError(err) } el := &exportList{} if err = json.Unmarshal(buf, el); err != nil { err = NewClientError(err) } return el.Results, el.Total, el.Count, err } func (c *Client) DeleteExport(project string, name string) error { h := map[string]string{ "x-log-bodyrawsize": "0", "Content-Type": "application/json", } uri := "/jobs/" + name r, err := c.request(project, "DELETE", uri, h, nil) if err != nil { return err } r.Body.Close() return nil } func (c *Client) RestartExport(project string, export *Export) error { body, err := json.Marshal(export) if err != nil { return NewClientError(err) } h := map[string]string{ "x-log-bodyrawsize": fmt.Sprintf("%v", len(body)), "Content-Type": "application/json", } uri := fmt.Sprintf("/jobs/%s?action=RESTART", export.Name) r, err := c.request(project, "PUT", uri, h, body) if err != nil { return err } r.Body.Close() return nil }