client_scheduled_sql.go (296 lines of code) (raw):

package sls import ( "encoding/json" "errors" "fmt" "io/ioutil" "net/url" ) type SqlType string type ResourcePool string type DataFormat string type JobType string type Status string type ScheduledSQLState string const ( STANDARD SqlType = "standard" SEARCH_QUERY SqlType = "searchQuery" ) const ( DEFAULT ResourcePool = "default" ENHANCED ResourcePool = "enhanced" ) const ( LOG_TO_LOG DataFormat = "log2log" LOG_TO_METRIC DataFormat = "log2metric" METRIC_TO_metric DataFormat = "metric2metric" ) const ( ALERT_JOB JobType = "Alert" REPORT_JOB JobType = "Report" ETL_JOB JobType = "ETL" INGESTION_JOB JobType = "Ingestion" REBUILD_INDEX_JOB JobType = "RebuildIndex" AUDIT_JOB_JOB JobType = "AuditJob" EXPORT_JOB JobType = "Export" SCHEDULED_SQL_JOB JobType = "ScheduledSQL" ) const ( ENABLED Status = "Enabled" DISABLED Status = "Disabled" ) const ( ScheduledSQL_RUNNING ScheduledSQLState = "RUNNING" ScheduledSQL_FAILED ScheduledSQLState = "FAILED" ScheduledSQL_SUCCEEDED ScheduledSQLState = "SUCCEEDED" ) type ScheduledSQL struct { Name string `json:"name"` DisplayName string `json:"displayName"` Description string `json:"description"` Status Status `json:"status"` ScheduleId string `json:"scheduleId"` Configuration *ScheduledSQLConfiguration `json:"configuration"` Schedule *Schedule `json:"schedule"` CreateTime int64 `json:"createTime,omitempty"` LastModifiedTime int64 `json:"lastModifiedTime,omitempty"` Type JobType `json:"type"` } type ScheduledSQLConfiguration struct { SourceLogStore string `json:"sourceLogstore"` DestProject string `json:"destProject"` DestEndpoint string `json:"destEndpoint"` DestLogStore string `json:"destLogstore"` Script string `json:"script"` SqlType SqlType `json:"sqlType"` ResourcePool ResourcePool `json:"resourcePool"` RoleArn string `json:"roleArn"` DestRoleArn string `json:"destRoleArn"` FromTimeExpr string `json:"fromTimeExpr"` ToTimeExpr string `json:"toTimeExpr"` MaxRunTimeInSeconds int32 `json:"maxRunTimeInSeconds"` MaxRetries int32 `json:"maxRetries"` FromTime int64 `json:"fromTime"` ToTime int64 `json:"toTime"` DataFormat DataFormat `json:"dataFormat"` Parameters *ScheduledSQLParameters `json:"parameters,omitempty"` } func NewScheduledSQLConfiguration() *ScheduledSQLConfiguration { return &ScheduledSQLConfiguration{ SqlType: STANDARD, ResourcePool: DEFAULT, FromTime: 0, ToTime: 0, DataFormat: LOG_TO_LOG, } } type ScheduledSQLParameters struct { TimeKey string `json:"timeKey,omitempty"` LabelKeys string `json:"labelKeys,omitempty"` MetricKeys string `json:"metricKeys,omitempty"` MetricName string `json:"metricName,omitempty"` HashLabels string `json:"hashLabels,omitempty"` AddLabels string `json:"addLabels,omitempty"` } func (c *Client) CreateScheduledSQL(project string, scheduledsql *ScheduledSQL) error { fromTime := scheduledsql.Configuration.FromTime toTime := scheduledsql.Configuration.ToTime timeRange := fromTime > 1451577600 && toTime > fromTime sustained := fromTime > 1451577600 && toTime == 0 if !timeRange && !sustained { return fmt.Errorf("invalid fromTime: %d toTime: %d, please ensure fromTime more than 1451577600", fromTime, toTime) } body, err := json.Marshal(scheduledsql) if err != nil { return NewClientError(err) } h := map[string]string{ "x-log-bodyrawsize": fmt.Sprintf("%v", len(body)), "Content-Type": "application/json", } uri := "/jobs" r, err := c.request(project, "POST", uri, h, body) if err != nil { return err } r.Body.Close() return nil } func (c *Client) DeleteScheduledSQL(project string, name string) error { h := map[string]string{ "x-log-bodyrawsize": "0", "Content-Type": "application/json", } uri := "/jobs/" + name r, err := c.request(project, "DELETE", uri, h, nil) if err != nil { return err } r.Body.Close() return nil } func (c *Client) UpdateScheduledSQL(project string, scheduledsql *ScheduledSQL) error { body, err := json.Marshal(scheduledsql) if err != nil { return NewClientError(err) } h := map[string]string{ "x-log-bodyrawsize": fmt.Sprintf("%v", len(body)), "Content-Type": "application/json", } uri := "/jobs/" + scheduledsql.Name r, err := c.request(project, "PUT", uri, h, body) if err != nil { return err } r.Body.Close() return nil } func (c *Client) GetScheduledSQL(project string, name string) (*ScheduledSQL, error) { h := map[string]string{ "x-log-bodyrawsize": "0", "Content-Type": "application/json", } uri := "/jobs/" + name r, err := c.request(project, "GET", uri, h, nil) if err != nil { return nil, err } defer r.Body.Close() buf, err := ioutil.ReadAll(r.Body) if err != nil { return nil, readResponseError(err) } scheduledSQL := &ScheduledSQL{} if err = json.Unmarshal(buf, scheduledSQL); err != nil { err = NewClientError(err) } return scheduledSQL, err } func (c *Client) ListScheduledSQL(project, name, displayName string, offset, size int) (scheduledsqls []*ScheduledSQL, total, count int, error error) { h := map[string]string{ "x-log-bodyrawsize": "0", "Content-Type": "application/json", } v := url.Values{} v.Add("jobName", name) if displayName != "" { v.Add("displayName", displayName) } v.Add("jobType", "ScheduledSQL") v.Add("offset", fmt.Sprintf("%d", offset)) v.Add("size", fmt.Sprintf("%d", size)) uri := "/jobs?" + v.Encode() r, err := c.request(project, "GET", uri, h, nil) if err != nil { return nil, 0, 0, err } defer r.Body.Close() type ScheduledSqlList struct { Total int `json:"total"` Count int `json:"count"` Results []*ScheduledSQL `json:"results"` } buf, err := ioutil.ReadAll(r.Body) if err != nil { return nil, 0, 0, readResponseError(err) } scheduledSqlList := &ScheduledSqlList{} if err = json.Unmarshal(buf, scheduledSqlList); err != nil { err = NewClientError(err) } return scheduledSqlList.Results, scheduledSqlList.Total, scheduledSqlList.Count, err } type ScheduledSQLJobInstance struct { InstanceId string `json:"instanceId"` JobName string `json:"jobName,omitempty"` DisplayName string `json:"displayName,omitempty"` Description string `json:"description,omitempty"` JobScheduleId string `json:"jobScheduleId,omitempty"` CreateTimeInMillis int64 `json:"createTimeInMillis"` ScheduleTimeInMillis int64 `json:"scheduleTimeInMillis"` UpdateTimeInMillis int64 `json:"updateTimeInMillis"` State ScheduledSQLState `json:"state"` ErrorCode string `json:"errorCode"` ErrorMessage string `json:"errorMessage"` Summary string `json:"summary,omitempty"` } func (c *Client) GetScheduledSQLJobInstance(projectName, jobName, instanceId string, result bool) (*ScheduledSQLJobInstance, error) { h := map[string]string{ "x-log-bodyrawsize": "0", "Content-Type": "application/json", } uri := fmt.Sprintf("/jobs/%s/jobinstances/%s?result=%t", jobName, instanceId, result) r, err := c.request(projectName, "GET", uri, h, nil) if err != nil { return nil, err } defer r.Body.Close() buf, err := ioutil.ReadAll(r.Body) if err != nil { return nil, readResponseError(err) } instance := &ScheduledSQLJobInstance{} if err = json.Unmarshal(buf, instance); err != nil { err = NewClientError(err) } return instance, err } func (c *Client) ModifyScheduledSQLJobInstanceState(projectName, jobName, instanceId string, state ScheduledSQLState) error { if ScheduledSQL_RUNNING != state { return NewClientError(errors.New(fmt.Sprintf("Invalid state: %s, state must be RUNNING.", state))) } h := map[string]string{ "x-log-bodyrawsize": "0", "Content-Type": "application/json", } uri := fmt.Sprintf("/jobs/%s/jobinstances/%s?state=%s", jobName, instanceId, state) r, err := c.request(projectName, "PUT", uri, h, nil) if err != nil { return err } r.Body.Close() return nil } type InstanceStatus struct { FromTime int64 ToTime int64 Offset int64 Size int64 State ScheduledSQLState } func (c *Client) ListScheduledSQLJobInstances(projectName, jobName string, status *InstanceStatus) (instances []*ScheduledSQLJobInstance, total, count int64, err error) { h := map[string]string{ "x-log-bodyrawsize": "0", "Content-Type": "application/json", } v := url.Values{} v.Add("jobType", "ScheduledSQL") v.Add("start", fmt.Sprintf("%d", status.FromTime)) v.Add("end", fmt.Sprintf("%d", status.ToTime)) v.Add("offset", fmt.Sprintf("%d", status.Offset)) v.Add("size", fmt.Sprintf("%d", status.Size)) if status.State != "" { v.Add("state", string(status.State)) } uri := fmt.Sprintf("/jobs/%s/jobinstances?%s", jobName, v.Encode()) r, err := c.request(projectName, "GET", uri, h, nil) if err != nil { return nil, 0, 0, err } defer r.Body.Close() type ScheduledSqlJobInstances struct { Total int64 `json:"total"` Count int64 `json:"count"` Results []*ScheduledSQLJobInstance `json:"results"` } buf, err := ioutil.ReadAll(r.Body) if err != nil { return nil, 0, 0, readResponseError(err) } jobInstances := &ScheduledSqlJobInstances{} if err = json.Unmarshal(buf, jobInstances); err != nil { err = NewClientError(err) } return jobInstances.Results, jobInstances.Total, jobInstances.Count, err }