pkg/plugin/cloudlogging/client.go (267 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package cloudlogging import ( "context" "errors" "fmt" "math" "strings" "time" logging "cloud.google.com/go/logging/apiv2" "github.com/grafana/grafana-plugin-sdk-go/backend/log" "golang.org/x/oauth2" resourcemanager "google.golang.org/api/cloudresourcemanager/v1" "google.golang.org/api/impersonate" "google.golang.org/api/iterator" "google.golang.org/api/option" // Currently, LogEntry.ProtoPayload only supports two types // https://pkg.go.dev/cloud.google.com/go/logging/apiv2/loggingpb#LogEntry_ProtoPayload // Adding the _ import to get that proto message structure registered with the unmarshaller _ "google.golang.org/genproto/googleapis/appengine/logging/v1" _ "google.golang.org/genproto/googleapis/cloud/audit" "cloud.google.com/go/logging/apiv2/loggingpb" ) const testConnectionTimeout = time.Minute * 1 // API implements the methods we need to query logs and list projects from GCP type API interface { // ListLogs retrieves all logs matching some query filter up to the given limit ListLogs(context.Context, *Query) ([]*loggingpb.LogEntry, error) // TestConnection queries for any log from the given project TestConnection(ctx context.Context, projectID string) error // ListProjects returns the project IDs of all visible projects ListProjects(context.Context) ([]string, error) // ListProjectBuckets returns all log buckets of a project ListProjectBuckets(ctx context.Context, projectId string) ([]string, error) // ListProjectBucketViews returns all views of a log bucket ListProjectBucketViews(ctx context.Context, projectId string, bucketId string) ([]string, error) // Close closes the underlying connection to the GCP API Close() error } // Client wraps a GCP logging client to fetch logs, a resourcemanager client // to list projects, and a config client to get log bucket configurations type Client struct { lClient *logging.Client rClient *resourcemanager.ProjectsService configClient *logging.ConfigClient } // NewClient creates a new Client using jsonCreds for authentication func NewClient(ctx context.Context, jsonCreds []byte) (*Client, error) { client, err := logging.NewClient(ctx, option.WithCredentialsJSON(jsonCreds), option.WithUserAgent("googlecloud-logging-datasource")) if err != nil { return nil, err } rClient, err := resourcemanager.NewService(ctx, option.WithCredentialsJSON(jsonCreds), option.WithUserAgent("googlecloud-logging-datasource")) if err != nil { return nil, err } configClient, err := logging.NewConfigClient(ctx, option.WithCredentialsJSON(jsonCreds), option.WithUserAgent("googlecloud-logging-datasource")) if err != nil { return nil, err } return &Client{ lClient: client, rClient: rClient.Projects, configClient: configClient, }, nil } // NewClient creates a new Clients using GCE metadata for authentication func NewClientWithGCE(ctx context.Context) (*Client, error) { client, err := logging.NewClient(ctx, option.WithUserAgent("googlecloud-logging-datasource")) if err != nil { return nil, err } rClient, err := resourcemanager.NewService(ctx, option.WithUserAgent("googlecloud-logging-datasource")) if err != nil { return nil, err } configClient, err := logging.NewConfigClient(ctx, option.WithUserAgent("googlecloud-logging-datasource")) if err != nil { return nil, err } return &Client{ lClient: client, rClient: rClient.Projects, configClient: configClient, }, nil } // NewClient creates a new Clients using service account impersonation func NewClientWithImpersonation(ctx context.Context, jsonCreds []byte, impersonateSA string) (*Client, error) { var ts oauth2.TokenSource var err error if jsonCreds == nil { ts, err = impersonate.CredentialsTokenSource(ctx, impersonate.CredentialsConfig{ TargetPrincipal: impersonateSA, Scopes: []string{"https://www.googleapis.com/auth/cloud-platform.read-only"}, }) } else { ts, err = impersonate.CredentialsTokenSource(ctx, impersonate.CredentialsConfig{ TargetPrincipal: impersonateSA, Scopes: []string{"https://www.googleapis.com/auth/cloud-platform.read-only"}, }, option.WithCredentialsJSON(jsonCreds)) } if err != nil { return nil, err } client, err := logging.NewClient(ctx, option.WithTokenSource(ts), option.WithUserAgent("googlecloud-logging-datasource")) if err != nil { return nil, err } rClient, err := resourcemanager.NewService(ctx, option.WithTokenSource(ts), option.WithUserAgent("googlecloud-logging-datasource")) if err != nil { return nil, err } configClient, err := logging.NewConfigClient(ctx, option.WithTokenSource(ts), option.WithUserAgent("googlecloud-logging-datasource")) if err != nil { return nil, err } return &Client{ lClient: client, rClient: rClient.Projects, configClient: configClient, }, nil } // Close closes the underlying connection to the GCP API func (c *Client) Close() error { c.configClient.Close() return c.lClient.Close() } // Query is the information from a Grafana query needed to query GCP for logs type Query struct { ProjectID string BucketId string ViewId string Filter string Limit int64 TimeRange struct { From string To string } } // String is the query formatted for querying GCP // It is the query text, with the time range constraints appended func (q *Query) String() string { return fmt.Sprintf(`%s AND timestamp >= "%s" AND timestamp <= "%s"`, q.Filter, q.TimeRange.From, q.TimeRange.To, ) } // ListProjects returns the project IDs of all visible projects func (c *Client) ListProjects(ctx context.Context) ([]string, error) { response, err := c.rClient.List().Do() if err != nil { return nil, err } projectIDs := []string{} for _, p := range response.Projects { if p.LifecycleState == "DELETE_REQUESTED" || p.LifecycleState == "DELETE_IN_PROGRESS" { continue } projectIDs = append(projectIDs, p.ProjectId) } return projectIDs, nil } // ListProjectBucketsViews returns all views of a log bucket func (c *Client) ListProjectBucketViews(ctx context.Context, projectId string, bucketId string) ([]string, error) { views := []string{""} req := &loggingpb.ListViewsRequest{ // See https://pkg.go.dev/cloud.google.com/go/logging/apiv2/loggingpb#ListViewsRequest Parent: fmt.Sprintf("projects/%s/locations/%s", projectId, bucketId), } it := c.configClient.ListViews(ctx, req) for { resp, err := it.Next() if err == iterator.Done { break } if err != nil { return nil, err } // See response format: https://cloud.google.com/logging/docs/reference/v2/rest/v2/billingAccounts.locations.buckets.views#LogView view := strings.Split(resp.Name, "/") // Append `my-view` for `projects/my-project/locations/global/buckets/my-bucket/views/my-view` views = append(views, view[len(view)-1]) } return views, nil } // ListProjectBuckets returns all log buckets of a project func (c *Client) ListProjectBuckets(ctx context.Context, projectId string) ([]string, error) { buckets := []string{""} req := &loggingpb.ListBucketsRequest{ // Request struct fields. Using '-' to get the full list // See https://pkg.go.dev/cloud.google.com/go/logging/apiv2/loggingpb#ListBucketsRequest Parent: fmt.Sprintf("projects/%s/locations/-", projectId), } it := c.configClient.ListBuckets(ctx, req) for { resp, err := it.Next() if err == iterator.Done { break } if err != nil { return nil, err } // See response format: https://cloud.google.com/logging/docs/reference/v2/rest/v2/billingAccounts.locations.buckets#LogBucket bucket := strings.Split(resp.Name, "/") // Get `global/buckets/my-bucket` for `projects/my-project/locations/global/buckets/my-bucket` buckets = append(buckets, strings.Join(bucket[3:], "/")) } return buckets, nil } // TestConnection queries for any log from the given project func (c *Client) TestConnection(ctx context.Context, projectID string) error { start := time.Now() listCtx, cancel := context.WithTimeout(ctx, time.Duration(testConnectionTimeout)) defer func() { cancel() log.DefaultLogger.Debug("Finished testConnection", "duration", time.Since(start).String()) }() it := c.lClient.ListLogEntries(listCtx, &loggingpb.ListLogEntriesRequest{ ResourceNames: []string{legacyProjectResourceName(projectID)}, PageSize: 1, }) if listCtx.Err() != nil { return errors.New("list entries: timeout") } entry, err := it.Next() if err == iterator.Done { return errors.New("no entries") } if err == context.DeadlineExceeded { return errors.New("list entries: timeout") } if err != nil { return fmt.Errorf("list entries: %w", err) } if entry == nil { return errors.New("no entries") } return nil } // ListLogs retrieves all logs matching some query filter up to the given limit func (c *Client) ListLogs(ctx context.Context, q *Query) ([]*loggingpb.LogEntry, error) { // Never exceed the maximum page size pageSize := int32(math.Min(float64(q.Limit), 1000)) resourceName := []string{} if q.BucketId == "" { resourceName = append(resourceName, legacyProjectResourceName(q.ProjectID)) } else { resourceName = append(resourceName, projectResourceName(q.ProjectID, q.BucketId, q.ViewId)) } req := loggingpb.ListLogEntriesRequest{ ResourceNames: resourceName, Filter: q.String(), OrderBy: "timestamp desc", PageSize: pageSize, } start := time.Now() defer func() { log.DefaultLogger.Debug("Finished listing logs", "duration", time.Since(start).String()) }() it := c.lClient.ListLogEntries(ctx, &req) if it == nil { return nil, errors.New("nil response") } var i int64 entries := []*loggingpb.LogEntry{} for { resp, err := it.Next() if err == iterator.Done { break } if err != nil { log.DefaultLogger.Error("error getting page", "error", err) break } entries = append(entries, resp) i++ if i >= q.Limit { break } } return entries, nil } func legacyProjectResourceName(projectID string) string { return fmt.Sprintf("projects/%s", projectID) } func projectResourceName(projectId string, bucketId string, viewId string) string { if viewId != "" { return fmt.Sprintf("projects/%s/locations/%s/views/%s", projectId, bucketId, viewId) } else { // Use default `_AllLogs` view return fmt.Sprintf("projects/%s/locations/%s/views/_AllLogs", projectId, bucketId) } }