client_metric_agg.go (270 lines of code) (raw):
package sls
import (
"encoding/json"
"fmt"
"time"
)
const (
// MetricAggRulesSQL sql type
MetricAggRulesSQL = "sql"
// MetricAggRulesPromQL promql type
MetricAggRulesPromQL = "promql"
)
type MetricAggRules struct {
ID string
Name string
Desc string
SrcStore string
SrcAccessKeyID string // ETL_STS_DEFAULT
SrcAccessKeySecret string // acs:ram::${aliuid}:role/aliyunlogetlrole
DestEndpoint string // same region, inner endpoint; different region, public endpoint
DestProject string
DestStore string
DestAccessKeyID string // ETL_STS_DEFAULT
DestAccessKeySecret string // acs:ram::${aliuid}:role/aliyunlogetlrole
AggRules []MetricAggRuleItem
}
type MetricAggRuleItem struct {
Name string
QueryType string
Query string
TimeName string
MetricNames []string
LabelNames map[string]string
BeginUnixTime int64
EndUnixTime int64
Interval int64
DelaySeconds int64
}
func (c *Client) getScheduledSQLParams(aggRules []MetricAggRuleItem) (map[string]string, error) {
params := make(map[string]string)
params["sls.config.job_mode"] = `{"type":"ml","source":"ScheduledSQL"}`
var aggRuleJsons []interface{}
for _, aggRule := range aggRules {
aggRuleMap := make(map[string]interface{})
aggRuleMap["rule_name"] = aggRule.Name
advancedQueryMap := make(map[string]interface{})
advancedQueryMap["type"] = aggRule.QueryType
advancedQueryMap["query"] = aggRule.Query
advancedQueryMap["time_name"] = aggRule.TimeName
advancedQueryMap["metric_names"] = aggRule.MetricNames
advancedQueryMap["labels"] = aggRule.LabelNames
aggRuleMap["advanced_query"] = advancedQueryMap
scheduleControlMap := make(map[string]interface{})
scheduleControlMap["from_unixtime"] = aggRule.BeginUnixTime
scheduleControlMap["to_unixtime"] = aggRule.EndUnixTime
scheduleControlMap["granularity"] = aggRule.Interval
scheduleControlMap["delay"] = aggRule.DelaySeconds
aggRuleMap["schedule_control"] = scheduleControlMap
aggRuleJsons = append(aggRuleJsons, aggRuleMap)
}
scheduledSql := make(map[string]interface{})
scheduledSql["agg_rules"] = aggRuleJsons
scheduledSqlJson, err := json.Marshal(scheduledSql)
if err != nil {
return nil, err
}
params["config.ml.scheduled_sql"] = string(scheduledSqlJson)
return params, nil
}
func (c *Client) createMetricAggRulesConfig(aggRules *MetricAggRules) (*ETL, error) {
etl := new(ETL)
etl.Name = aggRules.ID
etl.DisplayName = aggRules.Name
etl.Description = aggRules.Desc
etl.Type = "ETL"
etl.Configuration.AccessKeyId = aggRules.SrcAccessKeyID
etl.Configuration.AccessKeySecret = aggRules.SrcAccessKeySecret
etl.Configuration.Script = ""
etl.Configuration.Logstore = aggRules.SrcStore
parameters, err := c.getScheduledSQLParams(aggRules.AggRules)
if err != nil {
return nil, err
}
etl.Configuration.Parameters = parameters
etl.Configuration.FromTime = time.Now().Unix()
var sink ETLSink
sink.Endpoint = aggRules.DestEndpoint
sink.Name = "sls-convert-metric"
sink.AccessKeyId = aggRules.DestAccessKeyID
sink.AccessKeySecret = aggRules.DestAccessKeySecret
sink.Project = aggRules.DestProject
sink.Logstore = aggRules.DestStore
etl.Configuration.ETLSinks = append(etl.Configuration.ETLSinks, sink)
etl.Schedule.Type = ScheduleTypeResident
return etl, nil
}
func (c *Client) CreateMetricAggRules(project string, aggRules *MetricAggRules) error {
etl, err := c.createMetricAggRulesConfig(aggRules)
if err != nil {
return err
}
if err := c.CreateETL(project, *etl); err != nil {
return err
}
return nil
}
func (c *Client) UpdateMetricAggRules(project string, aggRules *MetricAggRules) error {
etl, err := c.createMetricAggRulesConfig(aggRules)
if err != nil {
return err
}
if err := c.UpdateETL(project, *etl); err != nil {
return err
}
return nil
}
func (c *Client) castEtlToMetricAggRules(etl *ETL) (*MetricAggRules, error) {
aggRules := new(MetricAggRules)
aggRules.ID = etl.Name
aggRules.Name = etl.DisplayName
aggRules.Desc = etl.Description
aggRules.SrcAccessKeyID = etl.Configuration.AccessKeyId
aggRules.SrcAccessKeySecret = etl.Configuration.AccessKeySecret
aggRules.SrcStore = etl.Configuration.Logstore
scheduledSqlJson := etl.Configuration.Parameters["config.ml.scheduled_sql"]
aggRuleJson := make(map[string][]map[string]interface{})
err := json.Unmarshal([]byte(scheduledSqlJson), &aggRuleJson)
if err != nil {
return nil, err
}
aggRuleMaps := aggRuleJson["agg_rules"]
var aggRuleItems []MetricAggRuleItem
for _, aggRuleMap := range aggRuleMaps {
aggRuleItem := new(MetricAggRuleItem)
aggRuleItem.Name, err = castInterfaceToString(aggRuleMap, "rule_name")
if err != nil {
return nil, err
}
advancedQuery, err := castInterfaceToMap(aggRuleMap, "advanced_query")
if err != nil {
return nil, err
}
aggRuleItem.QueryType, err = castInterfaceToString(advancedQuery, "type")
if err != nil {
return nil, err
}
aggRuleItem.Query, err = castInterfaceToString(advancedQuery, "query")
if err != nil {
return nil, err
}
aggRuleItem.TimeName, err = castInterfaceToString(advancedQuery, "time_name")
if err != nil {
return nil, err
}
aggRuleItem.MetricNames, err = castInterfaceArrayToStringArray(advancedQuery, "metric_names")
if err != nil {
return nil, err
}
aggRuleItem.LabelNames, err = castInterfaceMapToStringMap(advancedQuery, "labels")
if err != nil {
return nil, err
}
scheduleControl, err := castInterfaceToMap(aggRuleMap, "schedule_control")
if err != nil {
return nil, err
}
aggRuleItem.BeginUnixTime, err = castInterfaceToInt(scheduleControl, "from_unixtime")
if err != nil {
return nil, err
}
aggRuleItem.EndUnixTime, err = castInterfaceToInt(scheduleControl, "to_unixtime")
if err != nil {
return nil, err
}
aggRuleItem.Interval, err = castInterfaceToInt(scheduleControl, "granularity")
if err != nil {
return nil, err
}
aggRuleItem.DelaySeconds, err = castInterfaceToInt(scheduleControl, "delay")
if err != nil {
return nil, err
}
aggRuleItems = append(aggRuleItems, *aggRuleItem)
}
aggRules.AggRules = aggRuleItems
for _, sink := range etl.Configuration.ETLSinks {
aggRules.DestEndpoint = sink.Endpoint
aggRules.DestAccessKeyID = sink.AccessKeyId
aggRules.DestAccessKeySecret = sink.AccessKeySecret
aggRules.DestProject = sink.Project
aggRules.DestStore = sink.Logstore
}
return aggRules, nil
}
func (c *Client) ListMetricAggRules(project string, offset int, size int) ([]*MetricAggRules, error) {
listEtl, err := c.ListETL(project, offset, size)
if err != nil {
return nil, err
}
etls := listEtl.Results
var aggRules []*MetricAggRules
for _, etl := range etls {
if _, ok := etl.Configuration.Parameters["config.ml.scheduled_sql"]; ok {
aggRule, err := c.castEtlToMetricAggRules(etl)
if err != nil {
return nil, err
}
aggRules = append(aggRules, aggRule)
}
}
return aggRules, nil
}
func (c *Client) GetMetricAggRules(project string, ruleID string) (*MetricAggRules, error) {
etl, err := c.GetETL(project, ruleID)
if err != nil {
return nil, err
}
aggRules, err := c.castEtlToMetricAggRules(etl)
if err != nil {
return nil, err
}
return aggRules, nil
}
func (c *Client) DeleteMetricAggRules(project string, ruleID string) error {
if err := c.DeleteETL(project, ruleID); err != nil {
return err
}
return nil
}
func castInterfaceArrayToStringArray(inter map[string]interface{}, key string) ([]string, error) {
t, ok := inter[key].([]interface{})
if !ok {
return nil, fmt.Errorf("castInterfaceArrayToStringArray is not ok, key: %s, value: %v\n", key, inter[key])
}
s := make([]string, len(t))
for i, v := range t {
s[i] = v.(string)
}
return s, nil
}
func castInterfaceMapToStringMap(inter map[string]interface{}, key string) (map[string]string, error) {
t, ok := inter[key].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("castInterfaceMapToStringMap is not ok, key: %s, value: %v\n", key, inter[key])
}
s := make(map[string]string, len(t))
for k, v := range t {
s[k] = v.(string)
}
return s, nil
}
func castInterfaceToInt(inter map[string]interface{}, key string) (int64, error) {
t, ok := inter[key].(float64)
if !ok {
return 0, fmt.Errorf("castInterfaceToInt is not ok, key: %s, value: %v\n", key, inter[key])
}
return int64(t), nil
}
func castInterfaceToString(inter map[string]interface{}, key string) (string, error) {
t, ok := inter[key].(string)
if !ok {
return "", fmt.Errorf("castInterfaceToString is not ok, key: %s, value: %v\n", key, inter[key])
}
return t, nil
}
func castInterfaceToMap(inter map[string]interface{}, key string) (map[string]interface{}, error) {
t, ok := inter[key].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("castInterfaceToMap is not ok, key: %s, value: %v\n", key, inter[key])
}
return t, nil
}