connectors/grafana-plugin/pkg/plugin/plugin.go (342 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package plugin
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"net/http"
"strconv"
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// Make sure IoTDBDatasource implements required interfaces. This is important to do
// since otherwise we will only get a not implemented error response from plugin in
// runtime. In this example datasource instance implements backend.QueryDataHandler,
// backend.CheckHealthHandler, backend.StreamHandler interfaces. Plugin should not
// implement all these interfaces - only those which are required for a particular task.
// For example if plugin does not need streaming functionality then you are free to remove
// methods that implement backend.StreamHandler. Implementing instancemgmt.InstanceDisposer
// is useful to clean up resources used by previous datasource instance when a new datasource
// instance created upon datasource settings changed.
var (
_ backend.QueryDataHandler = (*IoTDBDataSource)(nil)
_ backend.CheckHealthHandler = (*IoTDBDataSource)(nil)
_ backend.CallResourceHandler = (*IoTDBDataSource)(nil)
)
// ApacheIoTDBDatasource creates a new datasource instance.
func ApacheIoTDBDatasource(ctx context.Context, d backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
var dm dataSourceModel
if err := json.Unmarshal(d.JSONData, &dm); err != nil {
return nil, err
}
ops, err := d.HTTPClientOptions(ctx)
if err != nil {
return nil, fmt.Errorf("http client options: %w", err)
}
httpClient, err := httpclient.New(ops)
if err != nil {
return nil, fmt.Errorf("new httpclient error: %w", err)
}
var authorization = ""
if password, exists := d.DecryptedSecureJSONData["password"]; exists {
authorization = "Basic " + base64.StdEncoding.EncodeToString([]byte(dm.Username+":"+password))
}
return &IoTDBDataSource{CallResourceHandler: iotdbResourceHandler(authorization, httpClient), Username: dm.Username, Ulr: dm.Url, httpClient: httpClient}, nil
}
// SampleDatasource is an example datasource which can respond to data queries, reports
// its health and has streaming skills.
type IoTDBDataSource struct {
backend.CallResourceHandler
Username string
Ulr string
httpClient *http.Client
}
// Dispose here tells plugin SDK that plugin wants to clean up resources when a new instance
// created. As soon as datasource settings change detected by SDK old datasource instance will
// be disposed and a new one will be created using ApacheIoTDBDatasource factory function.
func (d *IoTDBDataSource) Dispose() {
// Clean up datasource instance resources.
d.httpClient.CloseIdleConnections()
}
// QueryData handles multiple queries and returns multiple responses.
// req contains the queries []DataQuery (where each query contains RefID as a unique identifier).
// The QueryDataResponse contains a map of RefID to the response for each query, and each response
// contains Frames ([]*Frame).
func (d *IoTDBDataSource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
// create response struct
response := backend.NewQueryDataResponse()
// loop over queries and execute them individually.
for _, q := range req.Queries {
res := d.query(ctx, req.PluginContext, q)
// save the response in a hashmap
// based on with RefID as identifier
response.Responses[q.RefID] = res
}
return response, nil
}
type dataSourceModel struct {
Username string `json:"username"`
Url string `json:"url"`
}
type groupBy struct {
GroupByLevel string `json:"groupByLevel"`
SamplingInterval string `json:"samplingInterval"`
Step string `json:"step"`
}
type queryParam struct {
Expression []string `json:"expression"`
PrefixPath []string `json:"prefixPath"`
StartTime int64 `json:"startTime"`
EndTime int64 `json:"endTime"`
Condition string `json:"condition"`
Control string `json:"control"`
SqlType string `json:"sqlType"`
Paths []string `json:"paths"`
AggregateFun string `json:"aggregateFun"`
FillClauses string `json:"fillClauses"`
GroupBy groupBy `json:"groupBy"`
Hide bool `json:"hide"`
}
type QueryDataReq struct {
Expression []string `json:"expression"`
PrefixPath []string `json:"prefixPath"`
StartTime int64 `json:"startTime"`
EndTime int64 `json:"endTime"`
Condition string `json:"condition"`
Control string `json:"control"`
}
type QueryDataResponse struct {
Expressions []string `json:"expressions"`
Timestamps []int64 `json:"timestamps"`
Values [][]interface{} `json:"values"`
ColumnNames interface{} `json:"columnNames"`
Code int32 `json:"code"`
Message string `json:"message"`
}
type loginStatus struct {
Code int `json:"code"`
Message string `json:"message"`
}
func NewQueryDataReq(expression []string, prefixPath []string, startTime int64, endTime int64, condition string, control string) *QueryDataReq {
return &QueryDataReq{Expression: expression, PrefixPath: prefixPath, StartTime: startTime, EndTime: endTime, Condition: condition, Control: control}
}
func verifyQuery(query backend.DataQuery) (qp *queryParam, errMsg string) {
err := json.Unmarshal(query.JSON, &qp)
if err != nil {
return nil, ""
}
if qp.Hide {
return nil, "none"
}
if qp.SqlType == "SQL: Drop-down List" {
if len(qp.Paths) < 1 {
return nil, "Input error, please select TIME-SERIES"
}
if qp.GroupBy.SamplingInterval != "" && qp.AggregateFun == "" {
return nil, "Input error, please select FUNCTION when SAMPLING INTERVAL has a value"
}
} else if qp.SqlType == "SQL: Full Customized" {
if len(qp.Expression) == 0 {
return nil, "Input error, SELECT is required"
}
for i := 0; i < len(qp.Expression); i++ {
if qp.Expression[i] == "" {
return nil, "Input error, SELECT is required"
}
}
if len(qp.PrefixPath) == 0 {
return nil, "Input error, FROM is required"
}
for i := 0; i < len(qp.PrefixPath); i++ {
if qp.PrefixPath[i] == "" {
return nil, "Input error, FROM is required"
}
}
} else {
return nil, "none"
}
return qp, ""
}
func (d *IoTDBDataSource) query(cxt context.Context, pCtx backend.PluginContext, query backend.DataQuery) backend.DataResponse {
response := backend.DataResponse{}
instanceSettings := pCtx.DataSourceInstanceSettings
var authorization = ""
if password, exists := instanceSettings.DecryptedSecureJSONData["password"]; exists {
// Use the decrypted API key.
authorization = "Basic " + base64.StdEncoding.EncodeToString([]byte(d.Username+":"+password))
}
// Unmarshal the JSON into our queryModel.
var qdReq QueryDataReq
qp, msg := verifyQuery(query)
if msg == "none" {
return response
} else if msg != "" {
response.Error = errors.New(msg)
return response
}
qp.StartTime = query.TimeRange.From.UnixNano() / 1000000
qp.EndTime = query.TimeRange.To.UnixNano() / 1000000
if qp.SqlType == "SQL: Drop-down List" {
qp.Control = ""
var expressions []string = qp.Paths[len(qp.Paths)-1:]
var paths []string = qp.Paths[0 : len(qp.Paths)-1]
path := "root." + strings.Join(paths, ".")
var prefixPaths = []string{path}
if qp.AggregateFun != "" {
expressions[0] = qp.AggregateFun + "(" + expressions[0] + ")"
}
if qp.GroupBy.SamplingInterval != "" && qp.GroupBy.Step == "" {
qp.Control += " group by([" + strconv.FormatInt(qp.StartTime, 10) + "," + strconv.FormatInt(qp.EndTime, 10) + ")," + qp.GroupBy.SamplingInterval + ")"
}
if qp.GroupBy.SamplingInterval != "" && qp.GroupBy.Step != "" {
qp.Control += " group by([" + strconv.FormatInt(qp.StartTime, 10) + "," + strconv.FormatInt(qp.EndTime, 10) + ")," + qp.GroupBy.SamplingInterval + "," + qp.GroupBy.Step + ")"
}
if qp.GroupBy.GroupByLevel != "" {
qp.Control += " " + qp.GroupBy.GroupByLevel
}
if qp.FillClauses != "" {
qp.Control += " fill" + qp.FillClauses
}
qdReq = *NewQueryDataReq(expressions, prefixPaths, qp.StartTime, qp.EndTime, qp.Condition, qp.Control)
} else if qp.SqlType == "SQL: Full Customized" {
qdReq = *NewQueryDataReq(qp.Expression, qp.PrefixPath, qp.StartTime, qp.EndTime, qp.Condition, qp.Control)
} else {
return response
}
qpJson, _ := json.Marshal(qdReq)
reader := bytes.NewReader(qpJson)
var dataSourceUrl = DataSourceUrlHandler(d.Ulr)
request, _ := http.NewRequest(http.MethodPost, dataSourceUrl+"/grafana/v1/query/expression", reader)
request.Header.Set("Content-Type", "application/json")
request.Header.Add("Authorization", authorization)
rsp, _ := d.httpClient.Do(request)
body, err := io.ReadAll(rsp.Body)
if err != nil {
response.Error = errors.New("Data source is not working properly")
log.DefaultLogger.Error("Data source is not working properly", err)
return response
}
var queryDataResp QueryDataResponse
err = json.Unmarshal(body, &queryDataResp)
if err != nil {
response.Error = errors.New("Parsing JSON error")
log.DefaultLogger.Error("Parsing JSON error", err)
return response
}
defer rsp.Body.Close()
if queryDataResp.Code > 0 {
response.Error = errors.New(queryDataResp.Message)
log.DefaultLogger.Error(queryDataResp.Message)
return response
}
// create data frame response.
frame := data.NewFrame("response")
for i := 0; i < len(queryDataResp.Expressions); i++ {
if queryDataResp.Timestamps != nil && len(queryDataResp.Timestamps) > 0 {
times := make([]time.Time, len(queryDataResp.Timestamps))
for c := 0; c < len(queryDataResp.Timestamps); c++ {
times[c] = time.Unix(0, queryDataResp.Timestamps[c]*1000000)
}
if queryDataResp.Values != nil && len(queryDataResp.Values) > 0 {
values := recoverType(queryDataResp.Values[i])
frame.Fields = append(frame.Fields,
data.NewField("time", nil, times),
data.NewField(queryDataResp.Expressions[i], nil, values),
)
}
} else {
if queryDataResp.Values != nil && len(queryDataResp.Values) > 0 {
values := recoverType(queryDataResp.Values[i])
frame.Fields = append(frame.Fields,
data.NewField(queryDataResp.Expressions[i], nil, values),
)
}
}
}
response.Frames = append(response.Frames, frame)
return response
}
func recoverType(m []interface{}) interface{} {
if len(m) > 0 {
switch m[0].(type) {
case float64:
tmp := make([]float64, len(m))
for i := range m {
if m[i] == nil {
tmp[i] = math.NaN()
} else {
tmp[i] = m[i].(float64)
}
}
return tmp
case string:
tmp := make([]string, len(m))
for i := range m {
tmp[i] = m[i].(string)
}
return tmp
case bool:
tmp := make([]float64, len(m))
for i := range m {
if m[i] == nil {
tmp[i] = math.NaN()
} else if m[i].(bool) {
tmp[i] = 1
} else {
tmp[i] = 0
}
}
return tmp
default:
tmp := make([]float64, len(m))
for i := range m {
if m[i] == nil {
tmp[i] = math.NaN()
} else {
tmp[i] = m[i].(float64)
}
}
return tmp
}
} else {
return make([]float64, 0)
}
}
// Whether the last character of the URL for processing datasource configuration is "/"
func DataSourceUrlHandler(url string) string {
var lastCharacter = url[len(url)-1:]
if lastCharacter == "/" {
url = url[0 : len(url)-1]
}
return url
}
// CheckHealth handles health checks sent from Grafana to the plugin.
// The main use case for these health checks is the test button on the
// datasource configuration page which allows users to verify that
// a datasource is working as expected.
func (d *IoTDBDataSource) CheckHealth(_ context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
var status = backend.HealthStatusOk
var message = "Data source is working"
loginStatus, err := d.IoTDBLogin(req)
if err != nil {
log.DefaultLogger.Error("Parsing JSON error", err)
status = backend.HealthStatusError
message = fmt.Sprint("Failed to connect to iotdb service.", err.Error())
} else if loginStatus.Code != 200 {
status = backend.HealthStatusError
message = loginStatus.Message
}
return &backend.CheckHealthResult{
Status: status,
Message: message,
}, nil
}
func (d *IoTDBDataSource) IoTDBLogin(req *backend.CheckHealthRequest) (*loginStatus, error) {
instanceSettings := req.PluginContext.DataSourceInstanceSettings
var authorization = ""
if password, exists := instanceSettings.DecryptedSecureJSONData["password"]; exists {
authorization = "Basic " + base64.StdEncoding.EncodeToString([]byte(d.Username+":"+password))
}
var dataSourceUrl = DataSourceUrlHandler(d.Ulr)
request, err := http.NewRequest(http.MethodGet, dataSourceUrl+"/grafana/v1/login", nil)
if err != nil {
log.DefaultLogger.Error("Error creating NewRequest", err.Error())
return nil, err
}
request.Header.Add("Authorization", authorization)
response, err := d.httpClient.Do(request)
if err != nil {
log.DefaultLogger.Error("Failed to connect to iotdb service", err.Error())
return nil, err
}
body, err := io.ReadAll(response.Body)
if err != nil {
log.DefaultLogger.Error("Failed to get iotdb service data", err.Error())
return nil, err
}
var loginStatus loginStatus
err = json.Unmarshal(body, &loginStatus)
if err != nil {
log.DefaultLogger.Error("Parsing JSON error", err)
return nil, err
}
defer response.Body.Close()
return &loginStatus, nil
}