pkg/plugin/plugin.go (256 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package plugin
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"strings"
"time"
"github.com/GoogleCloudPlatform/cloud-logging-data-source-plugin/pkg/plugin/cloudlogging"
"github.com/grafana/grafana-google-sdk-go/pkg/utils"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// Make sure CloudLoggingDatasource implements required interfaces
var (
_ backend.QueryDataHandler = (*CloudLoggingDatasource)(nil)
_ backend.CheckHealthHandler = (*CloudLoggingDatasource)(nil)
_ instancemgmt.InstanceDisposer = (*CloudLoggingDatasource)(nil)
errMissingCredentials = errors.New("missing credentials")
)
const (
privateKeyKey = "privateKey"
gceAuthentication = "gce"
jwtAuthentication = "jwt"
)
// config is the fields parsed from the front end
type config struct {
AuthType string `json:"authenticationType"`
ClientEmail string `json:"clientEmail"`
DefaultProject string `json:"defaultProject"`
TokenURI string `json:"tokenUri"`
ServiceAccountToImpersonate string `json:"serviceAccountToImpersonate"`
UsingImpersonation bool `json:"usingImpersonation"`
}
// toServiceAccountJSON creates the serviceAccountJSON bytes from the config fields
func (c config) toServiceAccountJSON(privateKey string) ([]byte, error) {
return json.Marshal(serviceAccountJSON{
Type: "service_account",
ProjectID: c.DefaultProject,
PrivateKey: privateKey,
ClientEmail: c.ClientEmail,
TokenURI: c.TokenURI,
})
}
// serviceAccountJSON is the expected structure of a GCP Service Account credentials file
// We mainly want to be able to pull out ProjectID to use as a default
type serviceAccountJSON struct {
Type string `json:"type"`
ProjectID string `json:"project_id"`
PrivateKey string `json:"private_key"`
ClientEmail string `json:"client_email"`
TokenURI string `json:"token_uri"`
}
// NewCloudLoggingDatasource creates a new datasource instance.
func NewCloudLoggingDatasource(ctx context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
var conf config
if err := json.Unmarshal(settings.JSONData, &conf); err != nil {
return nil, fmt.Errorf("unmarshal: %w", err)
}
if conf.AuthType == "" {
conf.AuthType = jwtAuthentication
}
var client_err error
var client *cloudlogging.Client
if conf.AuthType == jwtAuthentication {
privateKey, ok := settings.DecryptedSecureJSONData[privateKeyKey]
if !ok || privateKey == "" {
return nil, errMissingCredentials
}
serviceAccount, err := conf.toServiceAccountJSON(privateKey)
if err != nil {
return nil, fmt.Errorf("create credentials: %w", err)
}
if conf.UsingImpersonation {
client, client_err = cloudlogging.NewClientWithImpersonation(context.TODO(), serviceAccount, conf.ServiceAccountToImpersonate)
} else {
client, client_err = cloudlogging.NewClient(context.TODO(), serviceAccount)
}
} else {
if conf.UsingImpersonation {
client, client_err = cloudlogging.NewClientWithImpersonation(context.TODO(), nil, conf.ServiceAccountToImpersonate)
} else {
client, client_err = cloudlogging.NewClientWithGCE(context.TODO())
}
}
if client_err != nil {
return nil, client_err
}
return &CloudLoggingDatasource{
client: client,
}, nil
}
// CloudLoggingDatasource is an example datasource which can respond to data queries, reports
// its health and has streaming skills.
type CloudLoggingDatasource struct {
client cloudlogging.API
}
// Dispose here tells plugin SDK that plugin wants to clean up resources when a new instance
// created. As soon as datasource settings change detected by SDK old datasource instance will
// be disposed and a new one will be created using NewSampleDatasource factory function.
func (d *CloudLoggingDatasource) Dispose() {
if err := d.client.Close(); err != nil {
log.DefaultLogger.Error("failed closing client", "error", err)
}
}
// CallResource fetches some resource from GCP using the data source's credentials
// Currently limited resources are fetched, other requests receive a 404
func (d *CloudLoggingDatasource) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
// log.DefaultLogger.Info("CallResource called")
var body []byte
// Right now we only support calls to the following:
//`/gceDefaultProject`
//`/projects`
//`/logBuckets`
//`/logViews`
resource := strings.ToLower(req.Path)
if resource == "gcedefaultproject" {
proj, err := utils.GCEDefaultProject(ctx, "")
if err != nil {
log.DefaultLogger.Warn("problem getting GCE default project", "error", err)
}
body, err = json.Marshal(proj)
if err != nil {
return sender.Send(&backend.CallResourceResponse{
Status: http.StatusInternalServerError,
Body: []byte(`Unable to create response`),
})
}
} else if resource == "projects" {
projects, err := d.client.ListProjects(ctx)
if err != nil {
log.DefaultLogger.Warn("problem listing projects", "error", err)
}
body, err = json.Marshal(projects)
if err != nil {
return sender.Send(&backend.CallResourceResponse{
Status: http.StatusInternalServerError,
Body: []byte(`Unable to create response`),
})
}
} else if resource == "logbuckets" {
reqUrl, _ := url.Parse(req.URL)
params, _ := url.ParseQuery(reqUrl.RawQuery)
bucketNames, err := d.client.ListProjectBuckets(ctx, params.Get("ProjectId"))
if err != nil {
log.DefaultLogger.Warn("problem listing log buckets", "error", err)
}
body, err = json.Marshal(bucketNames)
if err != nil {
return sender.Send(&backend.CallResourceResponse{
Status: http.StatusInternalServerError,
Body: []byte(`Unable to create response`),
})
}
} else if resource == "logviews" {
reqUrl, _ := url.Parse(req.URL)
params, _ := url.ParseQuery(reqUrl.RawQuery)
views, err := d.client.ListProjectBucketViews(ctx, params.Get("ProjectId"), params.Get("BucketId"))
if err != nil {
log.DefaultLogger.Warn("problem listing log views", "error", err)
}
body, err = json.Marshal(views)
if err != nil {
return sender.Send(&backend.CallResourceResponse{
Status: http.StatusInternalServerError,
Body: []byte(`Unable to create response`),
})
}
} else {
return sender.Send(&backend.CallResourceResponse{
Status: http.StatusNotFound,
Body: []byte(`No such path`),
})
}
return sender.Send(&backend.CallResourceResponse{
Status: http.StatusOK,
Body: body,
})
}
// QueryData handles multiple queries and returns multiple responses.
// req contains the queries []DataQuery (where each query contains RefID as a unique identifier).
// The QueryDataResponse contains a map of RefID to the response for each query, and each response
// contains Frames ([]*Frame).
func (d *CloudLoggingDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
// log.DefaultLogger.Info("QueryData called")
// create response struct
response := backend.NewQueryDataResponse()
// loop over queries and execute them individually.
for _, q := range req.Queries {
res := d.query(ctx, req.PluginContext, q)
// save the response in a hashmap
// based on with RefID as identifier
response.Responses[q.RefID] = res
}
return response, nil
}
// queryModel is the fields needed to query from Grafana
type queryModel struct {
QueryText string `json:"queryText,omitempty"`
Query string `json:"query,omitempty"`
ProjectID string `json:"projectId"`
BucketId string `json:"bucketId"`
ViewId string `json:"viewId"`
}
func (d *CloudLoggingDatasource) query(ctx context.Context, pCtx backend.PluginContext, query backend.DataQuery) backend.DataResponse {
response := backend.DataResponse{}
var q queryModel
response.Error = json.Unmarshal(query.JSON, &q)
if response.Error != nil {
return response
}
var qstr string
if q.QueryText != "" {
qstr = q.QueryText
} else if q.Query != "" {
qstr = q.Query
}
clientRequest := cloudlogging.Query{
ProjectID: q.ProjectID,
BucketId: q.BucketId,
ViewId: q.ViewId,
Filter: qstr,
Limit: query.MaxDataPoints,
TimeRange: struct {
From string
To string
}{
From: query.TimeRange.From.Format(time.RFC3339),
To: query.TimeRange.To.Format(time.RFC3339),
},
}
logs, err := d.client.ListLogs(ctx, &clientRequest)
if err != nil {
response.Error = fmt.Errorf("query: %w", err)
return response
}
// create data frame response.
frames := []*data.Frame{}
for i := 0; i < len(logs); i++ {
body, err := cloudlogging.GetLogEntryMessage(logs[i])
if err != nil {
// some log messages might not have a payload
// log a warning here but continue
log.DefaultLogger.Warn("failed getting log message", "warning", err)
}
labels := cloudlogging.GetLogLabels(logs[i])
f := data.NewFrame(logs[i].GetInsertId())
timestamp := data.NewField("time", nil, []time.Time{logs[i].GetTimestamp().AsTime()})
content := data.NewField("content", labels, []string{body})
f.Fields = append(f.Fields, timestamp, content)
f.Meta = &data.FrameMeta{}
f.Meta.PreferredVisualization = data.VisTypeLogs
frames = append(frames, f)
}
// add the frames to the response.
for _, f := range frames {
response.Frames = append(response.Frames, f)
}
return response
}
// CheckHealth handles health checks sent from Grafana to the plugin.
// The main use case for these health checks is the test button on the
// datasource configuration page which allows users to verify that
// a datasource is working as expected.
func (d *CloudLoggingDatasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
// log.DefaultLogger.Info("CheckHealth called")
var status = backend.HealthStatusOk
settings := req.PluginContext.DataSourceInstanceSettings
var conf config
if err := json.Unmarshal(settings.JSONData, &conf); err != nil {
return nil, fmt.Errorf("unmarshal: %w", err)
}
if conf.DefaultProject == "" && conf.AuthType == gceAuthentication {
proj, err := utils.GCEDefaultProject(ctx, "")
if err != nil {
return nil, fmt.Errorf("failed to get GCE default project: %w", err)
}
conf.DefaultProject = proj
}
if err := d.client.TestConnection(ctx, conf.DefaultProject); err != nil {
return &backend.CheckHealthResult{
Status: backend.HealthStatusError,
Message: fmt.Sprintf("failed to run test query: %s", err),
}, nil
}
return &backend.CheckHealthResult{
Status: status,
Message: fmt.Sprintf("Successfully queried logs from GCP project %s", conf.DefaultProject),
}, nil
}