pkg/plugin/plugin.go (310 lines of code) (raw):

// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package plugin import ( "context" "encoding/json" "errors" "fmt" "net/http" "strconv" "strings" "time" "cloud.google.com/go/trace/apiv1/tracepb" cloudtrace "github.com/GoogleCloudPlatform/cloud-trace-data-source-plugin/pkg/plugin/cloudtrace" "github.com/grafana/grafana-google-sdk-go/pkg/utils" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" "github.com/grafana/grafana-plugin-sdk-go/backend/log" "github.com/grafana/grafana-plugin-sdk-go/data" ) // Make sure CloudTraceDatasource implements required interfaces var ( _ backend.QueryDataHandler = (*CloudTraceDatasource)(nil) _ backend.CheckHealthHandler = (*CloudTraceDatasource)(nil) _ instancemgmt.InstanceDisposer = (*CloudTraceDatasource)(nil) errMissingCredentials = errors.New("missing credentials") ) const ( privateKeyKey = "privateKey" gceAuthentication = "gce" jwtAuthentication = "jwt" ) // config is the fields parsed from the front end type config struct { AuthType string `json:"authenticationType"` ClientEmail string `json:"clientEmail"` DefaultProject string `json:"defaultProject"` TokenURI string `json:"tokenUri"` ServiceAccountToImpersonate string `json:"serviceAccountToImpersonate"` UsingImpersonation bool `json:"usingImpersonation"` } // toServiceAccountJSON creates the serviceAccountJSON bytes from the config fields func (c config) toServiceAccountJSON(privateKey string) ([]byte, error) { return json.Marshal(serviceAccountJSON{ Type: "service_account", ProjectID: c.DefaultProject, PrivateKey: privateKey, ClientEmail: c.ClientEmail, TokenURI: c.TokenURI, }) } // serviceAccountJSON is the expected structure of a GCP Service Account credentials file // We mainly want to be able to pull out ProjectID to use as a default type serviceAccountJSON struct { Type string `json:"type"` ProjectID string `json:"project_id"` PrivateKey string `json:"private_key"` ClientEmail string `json:"client_email"` TokenURI string `json:"token_uri"` } // NewCloudTraceDatasource creates a new datasource instance. func NewCloudTraceDatasource(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { var conf config if err := json.Unmarshal(settings.JSONData, &conf); err != nil { return nil, fmt.Errorf("unmarshal: %w", err) } if conf.AuthType == "" { conf.AuthType = jwtAuthentication } var client_err error var client *cloudtrace.Client if conf.AuthType == jwtAuthentication { privateKey, ok := settings.DecryptedSecureJSONData[privateKeyKey] if !ok || privateKey == "" { return nil, errMissingCredentials } serviceAccount, err := conf.toServiceAccountJSON(privateKey) if err != nil { return nil, fmt.Errorf("create credentials: %w", err) } if conf.UsingImpersonation { client, client_err = cloudtrace.NewClientWithImpersonation(context.TODO(), serviceAccount, conf.ServiceAccountToImpersonate) } else { client, client_err = cloudtrace.NewClient(context.TODO(), serviceAccount) } } else { if conf.UsingImpersonation { client, client_err = cloudtrace.NewClientWithImpersonation(context.TODO(), nil, conf.ServiceAccountToImpersonate) } else { client, client_err = cloudtrace.NewClientWithGCE(context.TODO()) } } if client_err != nil { return nil, client_err } return &CloudTraceDatasource{ client: client, }, nil } // CloudTraceDatasource is an example datasource which can respond to data queries, reports // its health and has streaming skills. type CloudTraceDatasource struct { client cloudtrace.API } // Dispose here tells plugin SDK that plugin wants to clean up resources when a new instance // created. As soon as datasource settings change detected by SDK old datasource instance will // be disposed and a new one will be created using NewSampleDatasource factory function. func (d *CloudTraceDatasource) Dispose() { if err := d.client.Close(); err != nil { log.DefaultLogger.Error("failed closing client", "error", err) } } // CallResource fetches some resource from GCP using the data source's credentials // // Currently only projects are fetched, other requests receive a 404 func (d *CloudTraceDatasource) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { // log.DefaultLogger.Info("CallResource called") var body []byte // Right now we only support calls to `gceDefaultProject` and `/projects` resource := req.Path if resource == "gceDefaultProject" { proj, err := utils.GCEDefaultProject(ctx, "") if err != nil { log.DefaultLogger.Warn("problem getting GCE default project", "error", err) } body, err = json.Marshal(proj) if err != nil { return sender.Send(&backend.CallResourceResponse{ Status: http.StatusInternalServerError, Body: []byte(`Unable to create response`), }) } } else if strings.ToLower(resource) != "projects" { return sender.Send(&backend.CallResourceResponse{ Status: http.StatusNotFound, Body: []byte(`No such path`), }) } else { projects, err := d.client.ListProjects(ctx) if err != nil { log.DefaultLogger.Warn("problem listing projects", "error", err) } body, err = json.Marshal(projects) if err != nil { return sender.Send(&backend.CallResourceResponse{ Status: http.StatusInternalServerError, Body: []byte(`Unable to create response`), }) } } return sender.Send(&backend.CallResourceResponse{ Status: http.StatusOK, Body: body, }) } // QueryData handles multiple queries and returns multiple responses. // req contains the queries []DataQuery (where each query contains RefID as a unique identifier). // The QueryDataResponse contains a map of RefID to the response for each query, and each response // contains Frames ([]*Frame). func (d *CloudTraceDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { // log.DefaultLogger.Info("QueryData called") // create response struct response := backend.NewQueryDataResponse() // loop over queries and execute them individually. for _, q := range req.Queries { res := d.query(ctx, req.PluginContext, q) // save the response in a hashmap // based on with RefID as identifier response.Responses[q.RefID] = res } return response, nil } // queryModel is the fields needed to query from Grafana type queryModel struct { TraceID string `json:"traceId"` QueryText string `json:"queryText"` QueryType string `json:"queryType"` ProjectID string `json:"projectId"` MaxDataPoints int `json:"MaxDataPoints"` } func (d *CloudTraceDatasource) query(ctx context.Context, pCtx backend.PluginContext, query backend.DataQuery) backend.DataResponse { response := backend.DataResponse{} var q queryModel response.Error = json.Unmarshal(query.JSON, &q) if response.Error != nil { return response } if q.QueryType == "traceID" && strings.TrimSpace(q.TraceID) != "" { f, err := d.getTraceSpanFrame(ctx, q) if err != nil { response.Error = fmt.Errorf("trace query: %w", err) return response } response.Frames = append(response.Frames, f) } if q.QueryType == "" { f, err := d.getTracesTableFrame(ctx, q, query) if err != nil { response.Error = fmt.Errorf("filter query: %w", err) return response } response.Frames = append(response.Frames, f) } return response } func (d *CloudTraceDatasource) getTraceSpanFrame(ctx context.Context, q queryModel) (*data.Frame, error) { clientRequest := cloudtrace.TraceQuery{ ProjectID: q.ProjectID, TraceID: q.TraceID, } trace, err := d.client.GetTrace(ctx, &clientRequest) if err != nil { return nil, err } f := createTraceSpanFrame(trace) return f, nil } func createTraceSpanFrame(trace *tracepb.Trace) *data.Frame { // Create one frame for all trace/spans f := data.NewFrame(trace.GetTraceId()) f.Meta = &data.FrameMeta{} f.Meta.PreferredVisualization = data.VisTypeTrace // Create one set of fields for all trace/spans traceIDField := data.NewField("traceID", nil, []string{}) spanIDField := data.NewField("spanID", nil, []string{}) parentSpanIDField := data.NewField("parentSpanID", nil, []string{}) operationNameField := data.NewField("operationName", nil, []string{}) serviceNameField := data.NewField("serviceName", nil, []string{}) serviceTagsField := data.NewField("serviceTags", nil, []json.RawMessage{}) startTimeField := data.NewField("startTime", nil, []time.Time{}) durationField := data.NewField("duration", nil, []float64{}) tagsField := data.NewField("tags", nil, []json.RawMessage{}) // Add values to each field for each span for _, s := range trace.Spans { serviceTags, spanTags, err := cloudtrace.GetTags(s) if err != nil { log.DefaultLogger.Warn("failed getting span tags", "error", err) continue } tagsField.Append(spanTags) serviceTagsField.Append(serviceTags) traceIDField.Append(trace.GetTraceId()) spanIDField.Append(strconv.FormatUint(s.GetSpanId(), 10)) parentSpanIDField.Append(strconv.FormatUint(s.GetParentSpanId(), 10)) operationNameField.Append(cloudtrace.GetSpanOperationName(s)) serviceNameField.Append(cloudtrace.GetServiceName(s)) startTimeField.Append(s.GetStartTime().AsTime()) duration := float64(s.GetEndTime().AsTime().UnixMicro()-s.GetStartTime().AsTime().UnixMicro()) / 1000 durationField.Append(duration) } f.Fields = append(f.Fields, traceIDField, parentSpanIDField, spanIDField, serviceNameField, operationNameField, serviceTagsField, tagsField, startTimeField, durationField, ) return f } func (d *CloudTraceDatasource) getTracesTableFrame(ctx context.Context, q queryModel, dQuery backend.DataQuery) (*data.Frame, error) { filter, err := cloudtrace.GetListTracesFilter(q.QueryText) if err != nil { return nil, err } clientRequest := cloudtrace.TracesQuery{ ProjectID: q.ProjectID, Filter: filter, Limit: dQuery.MaxDataPoints, TimeRange: cloudtrace.TimeRange{ From: dQuery.TimeRange.From, To: dQuery.TimeRange.To, }, } traces, err := d.client.ListTraces(ctx, &clientRequest) if err != nil { return nil, err } f := createTracesTableFrame(traces) return f, nil } func createTracesTableFrame(traces []*tracepb.Trace) *data.Frame { // Create one frame for all traces f := data.NewFrame("traceTable") f.Meta = &data.FrameMeta{} f.Meta.PreferredVisualization = data.VisTypeTable // Create one set of fields for all traces tableTraceIDField := data.NewField("Trace ID", nil, []string{}) tableTraceNameField := data.NewField("Trace name", nil, []string{}) tableStartTimeField := data.NewField("Start time", nil, []time.Time{}) tableLatencyField := data.NewField("Latency", nil, []int64{}) tableLatencyField.Config = &data.FieldConfig{ Unit: "ms", } // Add values to each field for each trace for _, t := range traces { tableTraceIDField.Append(t.TraceId) spans := t.GetSpans() if len(spans) < 1 { log.DefaultLogger.Warn("failed getting trace spans", "traceID", t.TraceId) continue } rootSpan := spans[0] tableTraceNameField.Append(cloudtrace.GetTraceName(rootSpan)) tableStartTimeField.Append(rootSpan.GetStartTime().AsTime()) latency := rootSpan.GetEndTime().AsTime().UnixMilli() - rootSpan.GetStartTime().AsTime().UnixMilli() tableLatencyField.Append(latency) } f.Fields = append(f.Fields, tableTraceIDField, tableTraceNameField, tableStartTimeField, tableLatencyField, ) return f } // CheckHealth handles health checks sent from Grafana to the plugin. // The main use case for these health checks is the test button on the // datasource configuration page which allows users to verify that // a datasource is working as expected. func (d *CloudTraceDatasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { // log.DefaultLogger.Info("CheckHealth called") var status = backend.HealthStatusOk settings := req.PluginContext.DataSourceInstanceSettings var conf config if err := json.Unmarshal(settings.JSONData, &conf); err != nil { return nil, fmt.Errorf("unmarshal: %w", err) } if conf.DefaultProject == "" && conf.AuthType == gceAuthentication { proj, err := utils.GCEDefaultProject(ctx, "") if err != nil { return nil, fmt.Errorf("failed to get GCE default project: %w", err) } conf.DefaultProject = proj } if err := d.client.TestConnection(ctx, conf.DefaultProject); err != nil { return &backend.CheckHealthResult{ Status: backend.HealthStatusError, Message: fmt.Sprintf("failed to run test query: %s", err), }, nil } return &backend.CheckHealthResult{ Status: status, Message: fmt.Sprintf("Successfully queried traces from GCP project %s", conf.DefaultProject), }, nil }