x-pack/metricbeat/module/gcp/carbon/carbon.go (241 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package carbon
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"time"
"cloud.google.com/go/bigquery"
"cloud.google.com/go/civil" //nolint:typecheck // civil is used for type casting
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/gcp"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
const (
// metricsetName is the name of this metricset
metricsetName = "carbon"
)
// init registers the MetricSet with the central registry as soon as the program
// starts. The New function will be called later to instantiate an instance of
// the MetricSet for each host defined in the module's configuration. After the
// MetricSet has been created then Fetch will begin to be called periodically.
func init() {
mb.Registry.MustAddMetricSet(gcp.ModuleName, metricsetName, New)
}
// MetricSet holds any configuration or state information. It must implement
// the mb.MetricSet interface. And this is best achieved by embedding
// mb.BaseMetricSet because it implements all of the required mb.MetricSet
// interface methods except for Fetch.
type MetricSet struct {
mb.BaseMetricSet
config config
logger *logp.Logger
}
type config struct {
Period time.Duration `config:"period" validate:"required"`
ProjectID string `config:"project_id" validate:"required"`
CredentialsFilePath string `config:"credentials_file_path"`
CredentialsJSON string `config:"credentials_json"`
DatasetID string `config:"dataset_id" validate:"required"`
TableName string `config:"table_name"`
}
// Validate checks for deprecated config options
func (c config) Validate() error {
if c.CredentialsFilePath == "" && c.CredentialsJSON == "" {
return errors.New("no credentials_file_path or credentials_json specified")
}
if c.Period.Hours() < 24 {
return fmt.Errorf("collection period for carbon footprint metricset %s cannot be less than 24 hours", c.Period)
}
return nil
}
// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Beta("The gcp '%s' metricset is beta.", metricsetName)
m := &MetricSet{
BaseMetricSet: base,
logger: logp.NewLogger(metricsetName),
}
if err := base.Module().UnpackConfig(&m.config); err != nil {
return nil, fmt.Errorf("unpack carbon footprint config failed: %w", err)
}
m.Logger().Debugf("metricset config: %v", m.config)
return m, nil
}
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) (err error) {
// find current month
month := getReportMonth(time.Now())
var opt []option.ClientOption
if m.config.CredentialsFilePath != "" && m.config.CredentialsJSON != "" {
return errors.New("both credentials_file_path and credentials_json specified, you must use only one of them")
} else if m.config.CredentialsFilePath != "" {
opt = []option.ClientOption{option.WithCredentialsFile(m.config.CredentialsFilePath)}
} else if m.config.CredentialsJSON != "" {
opt = []option.ClientOption{option.WithCredentialsJSON([]byte(m.config.CredentialsJSON))}
} else {
return errors.New("no credentials_file_path or credentials_json specified")
}
client, err := bigquery.NewClient(ctx, m.config.ProjectID, opt...)
if err != nil {
return fmt.Errorf("gerror creating bigquery client: %w", err)
}
defer client.Close()
if m.config.TableName == "" {
m.logger.Warn("table_name is not set in config, \"carbon_footprint\" will be used by default.")
m.config.TableName = "carbon_footprint"
}
tableMeta, err := getTable(ctx, client, m.config.DatasetID, m.config.TableName)
if err != nil {
return fmt.Errorf("getTables failed: %w", err)
}
var events []mb.Event
eventsPerQuery, err := m.queryBigQuery(ctx, client, tableMeta, month)
if err != nil {
return fmt.Errorf("queryBigQuery failed: %w", err)
}
events = append(events, eventsPerQuery...)
m.Logger().Debugf("Total %d of events are created for carbon footprint", len(events))
for _, event := range events {
reporter.Event(event)
}
return nil
}
// getReportMonth gets the year-month of the latest expected report.
// GCP creates new reports on the 15 of each month. So if the date is below
// that, we fetch the previous month.
func getReportMonth(now time.Time) string {
if now.Day() < 15 {
now = now.AddDate(0, -1, 0)
}
return fmt.Sprintf("%04d-%02d-01", now.Year(), int(now.Month()))
}
type tableMeta struct {
tableFullID string
location string
}
func getTable(ctx context.Context, client *bigquery.Client, datasetID string, tableName string) (*tableMeta, error) {
dit := client.Datasets(ctx)
for {
dataset, err := dit.Next()
if errors.Is(err, iterator.Done) {
break
}
if err != nil {
return nil, err
}
meta, err := client.Dataset(dataset.DatasetID).Metadata(ctx)
if err != nil {
return nil, err
}
// compare with given dataset_id
if dataset.DatasetID != datasetID {
continue
}
tit := dataset.Tables(ctx)
for {
table, err := tit.Next()
if errors.Is(err, iterator.Done) {
break
}
if err != nil {
return nil, err
}
if table.TableID == tableName {
return &tableMeta{
tableFullID: table.ProjectID + "." + table.DatasetID + "." + table.TableID,
location: meta.Location,
}, nil
}
}
}
return nil, fmt.Errorf("could not find table '%s'", tableName)
}
func (m *MetricSet) queryBigQuery(ctx context.Context, client *bigquery.Client, tableMeta *tableMeta, month string) ([]mb.Event, error) {
var events []mb.Event
query := generateQuery(tableMeta.tableFullID, month)
m.logger.Debug("bigquery query = ", query)
q := client.Query(query)
// Location must match that of the dataset(s) referenced in the query.
q.Location = tableMeta.location
// Run the query and print results when the query job is completed.
job, err := q.Run(ctx)
if err != nil {
err = fmt.Errorf("bigquery Run failed: %w", err)
m.logger.Error(err)
return events, err
}
status, err := job.Wait(ctx)
if err != nil {
err = fmt.Errorf("bigquery Wait failed: %w", err)
m.logger.Error(err)
return events, err
}
if err := status.Err(); err != nil {
err = fmt.Errorf("bigquery status error: %w", err)
m.logger.Error(err)
return events, err
}
it, err := job.Read(ctx)
if err != nil {
return events, err
}
for {
var row []bigquery.Value
err := it.Next(&row)
if errors.Is(err, iterator.Done) {
break
}
if err != nil {
err = fmt.Errorf("bigquery RowIterator Next failed: %w", err)
m.logger.Error(err)
return events, err
}
if len(row) == 12 {
events = append(events, createEvents(row, m.config.ProjectID))
}
}
return events, nil
}
func createEvents(rowItems []bigquery.Value, projectID string) mb.Event {
event := mb.Event{}
event.MetricSetFields = mapstr.M{
"project_id": rowItems[1],
"project_name": rowItems[2],
"service_id": rowItems[4],
"service_description": rowItems[5],
"region": rowItems[6],
"footprint.scope1": rowItems[7],
"footprint.scope2.location": rowItems[8],
"footprint.scope2.market": rowItems[9],
"footprint.scope3": rowItems[10],
"footprint.offsets": rowItems[11],
}
event.RootFields = mapstr.M{
"cloud.provider": "gcp",
"cloud.project.id": projectID,
"cloud.project.name": rowItems[2],
"cloud.account.id": rowItems[3],
}
event.ID = generateEventID(rowItems)
return event
}
func generateEventID(rowItems []bigquery.Value) string {
// create eventID using hash of usage_month + project.id + project.name + service.description + region
// This will prevent more than one carbon metric getting collected for the same month.
eventID := rowItems[0].(civil.Date).String() +
rowItems[1].(string) +
rowItems[2].(string) +
rowItems[3].(string) +
rowItems[5].(string)
h := sha256.New()
h.Write([]byte(eventID))
prefix := hex.EncodeToString(h.Sum(nil))
return prefix[:20]
}
// generateQuery returns the query to be used by the BigQuery client to retrieve monthly
// cost types breakdown.
func generateQuery(tableName, month string) string {
// The table name is user provided, so it may contains special characters.
// In order to allow any character in the table identifier, use the Quoted identifier format.
// See https://github.com/elastic/beats/issues/26855
// NOTE: is not possible to escape backtics (`) in a multiline string
escapedTableName := fmt.Sprintf("`%s`", tableName)
query := fmt.Sprintf(`
SELECT
usage_month,
project.number,
project.id,
billing_account_id,
service.id,
service.description,
location.region,
carbon_footprint_kgCO2e.scope1,
carbon_footprint_kgCO2e.scope2.location_based,
carbon_footprint_kgCO2e.scope2.market_based,
carbon_footprint_kgCO2e.scope3,
carbon_offsets_kgCO2e
FROM %s
WHERE project.id IS NOT NULL
AND usage_month = '%s'
ORDER BY usage_month ASC;`, escapedTableName, month)
return query
}