connectors/grafana-plugin/pkg/plugin/plugin.go (342 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package plugin import ( "bytes" "context" "encoding/base64" "encoding/json" "errors" "fmt" "io" "math" "net/http" "strconv" "strings" "time" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" "github.com/grafana/grafana-plugin-sdk-go/backend/log" "github.com/grafana/grafana-plugin-sdk-go/data" ) // Make sure IoTDBDatasource implements required interfaces. This is important to do // since otherwise we will only get a not implemented error response from plugin in // runtime. In this example datasource instance implements backend.QueryDataHandler, // backend.CheckHealthHandler, backend.StreamHandler interfaces. Plugin should not // implement all these interfaces - only those which are required for a particular task. // For example if plugin does not need streaming functionality then you are free to remove // methods that implement backend.StreamHandler. Implementing instancemgmt.InstanceDisposer // is useful to clean up resources used by previous datasource instance when a new datasource // instance created upon datasource settings changed. var ( _ backend.QueryDataHandler = (*IoTDBDataSource)(nil) _ backend.CheckHealthHandler = (*IoTDBDataSource)(nil) _ backend.CallResourceHandler = (*IoTDBDataSource)(nil) ) // ApacheIoTDBDatasource creates a new datasource instance. func ApacheIoTDBDatasource(ctx context.Context, d backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { var dm dataSourceModel if err := json.Unmarshal(d.JSONData, &dm); err != nil { return nil, err } ops, err := d.HTTPClientOptions(ctx) if err != nil { return nil, fmt.Errorf("http client options: %w", err) } httpClient, err := httpclient.New(ops) if err != nil { return nil, fmt.Errorf("new httpclient error: %w", err) } var authorization = "" if password, exists := d.DecryptedSecureJSONData["password"]; exists { authorization = "Basic " + base64.StdEncoding.EncodeToString([]byte(dm.Username+":"+password)) } return &IoTDBDataSource{CallResourceHandler: iotdbResourceHandler(authorization, httpClient), Username: dm.Username, Ulr: dm.Url, httpClient: httpClient}, nil } // SampleDatasource is an example datasource which can respond to data queries, reports // its health and has streaming skills. type IoTDBDataSource struct { backend.CallResourceHandler Username string Ulr string httpClient *http.Client } // Dispose here tells plugin SDK that plugin wants to clean up resources when a new instance // created. As soon as datasource settings change detected by SDK old datasource instance will // be disposed and a new one will be created using ApacheIoTDBDatasource factory function. func (d *IoTDBDataSource) Dispose() { // Clean up datasource instance resources. d.httpClient.CloseIdleConnections() } // QueryData handles multiple queries and returns multiple responses. // req contains the queries []DataQuery (where each query contains RefID as a unique identifier). // The QueryDataResponse contains a map of RefID to the response for each query, and each response // contains Frames ([]*Frame). func (d *IoTDBDataSource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { // create response struct response := backend.NewQueryDataResponse() // loop over queries and execute them individually. for _, q := range req.Queries { res := d.query(ctx, req.PluginContext, q) // save the response in a hashmap // based on with RefID as identifier response.Responses[q.RefID] = res } return response, nil } type dataSourceModel struct { Username string `json:"username"` Url string `json:"url"` } type groupBy struct { GroupByLevel string `json:"groupByLevel"` SamplingInterval string `json:"samplingInterval"` Step string `json:"step"` } type queryParam struct { Expression []string `json:"expression"` PrefixPath []string `json:"prefixPath"` StartTime int64 `json:"startTime"` EndTime int64 `json:"endTime"` Condition string `json:"condition"` Control string `json:"control"` SqlType string `json:"sqlType"` Paths []string `json:"paths"` AggregateFun string `json:"aggregateFun"` FillClauses string `json:"fillClauses"` GroupBy groupBy `json:"groupBy"` Hide bool `json:"hide"` } type QueryDataReq struct { Expression []string `json:"expression"` PrefixPath []string `json:"prefixPath"` StartTime int64 `json:"startTime"` EndTime int64 `json:"endTime"` Condition string `json:"condition"` Control string `json:"control"` } type QueryDataResponse struct { Expressions []string `json:"expressions"` Timestamps []int64 `json:"timestamps"` Values [][]interface{} `json:"values"` ColumnNames interface{} `json:"columnNames"` Code int32 `json:"code"` Message string `json:"message"` } type loginStatus struct { Code int `json:"code"` Message string `json:"message"` } func NewQueryDataReq(expression []string, prefixPath []string, startTime int64, endTime int64, condition string, control string) *QueryDataReq { return &QueryDataReq{Expression: expression, PrefixPath: prefixPath, StartTime: startTime, EndTime: endTime, Condition: condition, Control: control} } func verifyQuery(query backend.DataQuery) (qp *queryParam, errMsg string) { err := json.Unmarshal(query.JSON, &qp) if err != nil { return nil, "" } if qp.Hide { return nil, "none" } if qp.SqlType == "SQL: Drop-down List" { if len(qp.Paths) < 1 { return nil, "Input error, please select TIME-SERIES" } if qp.GroupBy.SamplingInterval != "" && qp.AggregateFun == "" { return nil, "Input error, please select FUNCTION when SAMPLING INTERVAL has a value" } } else if qp.SqlType == "SQL: Full Customized" { if len(qp.Expression) == 0 { return nil, "Input error, SELECT is required" } for i := 0; i < len(qp.Expression); i++ { if qp.Expression[i] == "" { return nil, "Input error, SELECT is required" } } if len(qp.PrefixPath) == 0 { return nil, "Input error, FROM is required" } for i := 0; i < len(qp.PrefixPath); i++ { if qp.PrefixPath[i] == "" { return nil, "Input error, FROM is required" } } } else { return nil, "none" } return qp, "" } func (d *IoTDBDataSource) query(cxt context.Context, pCtx backend.PluginContext, query backend.DataQuery) backend.DataResponse { response := backend.DataResponse{} instanceSettings := pCtx.DataSourceInstanceSettings var authorization = "" if password, exists := instanceSettings.DecryptedSecureJSONData["password"]; exists { // Use the decrypted API key. authorization = "Basic " + base64.StdEncoding.EncodeToString([]byte(d.Username+":"+password)) } // Unmarshal the JSON into our queryModel. var qdReq QueryDataReq qp, msg := verifyQuery(query) if msg == "none" { return response } else if msg != "" { response.Error = errors.New(msg) return response } qp.StartTime = query.TimeRange.From.UnixNano() / 1000000 qp.EndTime = query.TimeRange.To.UnixNano() / 1000000 if qp.SqlType == "SQL: Drop-down List" { qp.Control = "" var expressions []string = qp.Paths[len(qp.Paths)-1:] var paths []string = qp.Paths[0 : len(qp.Paths)-1] path := "root." + strings.Join(paths, ".") var prefixPaths = []string{path} if qp.AggregateFun != "" { expressions[0] = qp.AggregateFun + "(" + expressions[0] + ")" } if qp.GroupBy.SamplingInterval != "" && qp.GroupBy.Step == "" { qp.Control += " group by([" + strconv.FormatInt(qp.StartTime, 10) + "," + strconv.FormatInt(qp.EndTime, 10) + ")," + qp.GroupBy.SamplingInterval + ")" } if qp.GroupBy.SamplingInterval != "" && qp.GroupBy.Step != "" { qp.Control += " group by([" + strconv.FormatInt(qp.StartTime, 10) + "," + strconv.FormatInt(qp.EndTime, 10) + ")," + qp.GroupBy.SamplingInterval + "," + qp.GroupBy.Step + ")" } if qp.GroupBy.GroupByLevel != "" { qp.Control += " " + qp.GroupBy.GroupByLevel } if qp.FillClauses != "" { qp.Control += " fill" + qp.FillClauses } qdReq = *NewQueryDataReq(expressions, prefixPaths, qp.StartTime, qp.EndTime, qp.Condition, qp.Control) } else if qp.SqlType == "SQL: Full Customized" { qdReq = *NewQueryDataReq(qp.Expression, qp.PrefixPath, qp.StartTime, qp.EndTime, qp.Condition, qp.Control) } else { return response } qpJson, _ := json.Marshal(qdReq) reader := bytes.NewReader(qpJson) var dataSourceUrl = DataSourceUrlHandler(d.Ulr) request, _ := http.NewRequest(http.MethodPost, dataSourceUrl+"/grafana/v1/query/expression", reader) request.Header.Set("Content-Type", "application/json") request.Header.Add("Authorization", authorization) rsp, _ := d.httpClient.Do(request) body, err := io.ReadAll(rsp.Body) if err != nil { response.Error = errors.New("Data source is not working properly") log.DefaultLogger.Error("Data source is not working properly", err) return response } var queryDataResp QueryDataResponse err = json.Unmarshal(body, &queryDataResp) if err != nil { response.Error = errors.New("Parsing JSON error") log.DefaultLogger.Error("Parsing JSON error", err) return response } defer rsp.Body.Close() if queryDataResp.Code > 0 { response.Error = errors.New(queryDataResp.Message) log.DefaultLogger.Error(queryDataResp.Message) return response } // create data frame response. frame := data.NewFrame("response") for i := 0; i < len(queryDataResp.Expressions); i++ { if queryDataResp.Timestamps != nil && len(queryDataResp.Timestamps) > 0 { times := make([]time.Time, len(queryDataResp.Timestamps)) for c := 0; c < len(queryDataResp.Timestamps); c++ { times[c] = time.Unix(0, queryDataResp.Timestamps[c]*1000000) } if queryDataResp.Values != nil && len(queryDataResp.Values) > 0 { values := recoverType(queryDataResp.Values[i]) frame.Fields = append(frame.Fields, data.NewField("time", nil, times), data.NewField(queryDataResp.Expressions[i], nil, values), ) } } else { if queryDataResp.Values != nil && len(queryDataResp.Values) > 0 { values := recoverType(queryDataResp.Values[i]) frame.Fields = append(frame.Fields, data.NewField(queryDataResp.Expressions[i], nil, values), ) } } } response.Frames = append(response.Frames, frame) return response } func recoverType(m []interface{}) interface{} { if len(m) > 0 { switch m[0].(type) { case float64: tmp := make([]float64, len(m)) for i := range m { if m[i] == nil { tmp[i] = math.NaN() } else { tmp[i] = m[i].(float64) } } return tmp case string: tmp := make([]string, len(m)) for i := range m { tmp[i] = m[i].(string) } return tmp case bool: tmp := make([]float64, len(m)) for i := range m { if m[i] == nil { tmp[i] = math.NaN() } else if m[i].(bool) { tmp[i] = 1 } else { tmp[i] = 0 } } return tmp default: tmp := make([]float64, len(m)) for i := range m { if m[i] == nil { tmp[i] = math.NaN() } else { tmp[i] = m[i].(float64) } } return tmp } } else { return make([]float64, 0) } } // Whether the last character of the URL for processing datasource configuration is "/" func DataSourceUrlHandler(url string) string { var lastCharacter = url[len(url)-1:] if lastCharacter == "/" { url = url[0 : len(url)-1] } return url } // CheckHealth handles health checks sent from Grafana to the plugin. // The main use case for these health checks is the test button on the // datasource configuration page which allows users to verify that // a datasource is working as expected. func (d *IoTDBDataSource) CheckHealth(_ context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { var status = backend.HealthStatusOk var message = "Data source is working" loginStatus, err := d.IoTDBLogin(req) if err != nil { log.DefaultLogger.Error("Parsing JSON error", err) status = backend.HealthStatusError message = fmt.Sprint("Failed to connect to iotdb service.", err.Error()) } else if loginStatus.Code != 200 { status = backend.HealthStatusError message = loginStatus.Message } return &backend.CheckHealthResult{ Status: status, Message: message, }, nil } func (d *IoTDBDataSource) IoTDBLogin(req *backend.CheckHealthRequest) (*loginStatus, error) { instanceSettings := req.PluginContext.DataSourceInstanceSettings var authorization = "" if password, exists := instanceSettings.DecryptedSecureJSONData["password"]; exists { authorization = "Basic " + base64.StdEncoding.EncodeToString([]byte(d.Username+":"+password)) } var dataSourceUrl = DataSourceUrlHandler(d.Ulr) request, err := http.NewRequest(http.MethodGet, dataSourceUrl+"/grafana/v1/login", nil) if err != nil { log.DefaultLogger.Error("Error creating NewRequest", err.Error()) return nil, err } request.Header.Add("Authorization", authorization) response, err := d.httpClient.Do(request) if err != nil { log.DefaultLogger.Error("Failed to connect to iotdb service", err.Error()) return nil, err } body, err := io.ReadAll(response.Body) if err != nil { log.DefaultLogger.Error("Failed to get iotdb service data", err.Error()) return nil, err } var loginStatus loginStatus err = json.Unmarshal(body, &loginStatus) if err != nil { log.DefaultLogger.Error("Parsing JSON error", err) return nil, err } defer response.Body.Close() return &loginStatus, nil }