cmd/datasource-syncer/main.go (235 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "context" "crypto/fips140" "crypto/tls" "crypto/x509" "errors" "flag" "fmt" "net/http" "os" "strings" "github.com/go-kit/log" "github.com/go-kit/log/level" grafana "github.com/grafana/grafana-api-golang-client" "github.com/hashicorp/go-cleanhttp" "golang.org/x/mod/semver" "golang.org/x/oauth2" "golang.org/x/oauth2/google" ) var ( credentialsFile = flag.String("query.credentials-file", "", "JSON-encoded credentials (service account or refresh token). Can be left empty if default credentials have sufficient permission.") datasourceUIDList = flag.String("datasource-uids", "", "datasource-uids is a comma separated list of data source UIDs to update.") grafanaAPIToken = flag.String("grafana-api-token", "", "grafana-api-token used to access Grafana. Can be created using: https://grafana.com/docs/grafana/latest/administration/service-accounts/#create-a-service-account-in-grafana") grafanaEndpoint = flag.String("grafana-api-endpoint", "", "grafana-api-endpoint is the endpoint of the Grafana instance that contains the data sources to update.") projectID = flag.String("project-id", "", "Project ID of the Google Cloud Monitoring scoping project to query. Queries sent to this project will union results from all projects within the scope.") gcmEndpointOverride = flag.String("gcm-endpoint-override", "", "gcm-endpoint-override is the URL where queries should be sent to from Grafana. This should be left blank in almost all circumstances.") certFile = flag.String("tls-cert", "", "Path to the server TLS certificate.") keyFile = flag.String("tls-key", "", "Path to the server TLS key.") caFile = flag.String("tls-ca-cert", "", "Path to the server certificate authority") insecureSkipVerify = flag.Bool("insecure-skip-verify", false, "Skip TLS certificate verification") ) func main() { flag.Parse() logger := log.NewJSONLogger(log.NewSyncWriter(os.Stderr)) logger = log.With(logger, "ts", log.DefaultTimestampUTC) logger = log.With(logger, "caller", log.DefaultCaller) if !fips140.Enabled() { _ = logger.Log("msg", "FIPS mode not enabled") os.Exit(1) } if len(*datasourceUIDList) == 0 { //nolint:errcheck level.Error(logger).Log("msg", "--datasource-uid must be set") os.Exit(1) } if *grafanaAPIToken == "" { envToken := os.Getenv("GRAFANA_SERVICE_ACCOUNT_TOKEN") if envToken == "" { //nolint:errcheck level.Error(logger).Log("msg", "--grafana-api-token or the environment variable GRAFANA_SERVICE_ACCOUNT_TOKEN must be set") os.Exit(1) } grafanaAPIToken = &envToken } if *grafanaEndpoint == "" { //nolint:errcheck level.Error(logger).Log("msg", "--grafana-api-endpoint must be set") os.Exit(1) } if *projectID == "" { //nolint:errcheck level.Error(logger).Log("msg", "--project-id must be set") os.Exit(1) } client, err := getTLSClient(*certFile, *keyFile, *caFile, *insecureSkipVerify) if err != nil { //nolint:errcheck level.Error(logger).Log("msg", "couldn't create client", "err", err) os.Exit(1) } grafanaClient, err := grafana.New(*grafanaEndpoint, grafana.Config{ APIKey: *grafanaAPIToken, Client: client, }) if err != nil { //nolint:errcheck level.Error(logger).Log("msg", "couldn't create grafana client", "err", err) os.Exit(1) } token, err := getOAuth2Token(*credentialsFile) if err != nil { //nolint:errcheck level.Error(logger).Log("msg", "couldn't get Google OAuth2 token", "err", err) os.Exit(1) } dsSuccessfullyUpdated := []string{} dsErrors := []string{} datasourceUIDs := strings.Split(*datasourceUIDList, ",") for _, datasourceUID := range datasourceUIDs { datasourceUID = strings.TrimSpace(datasourceUID) if datasourceUID == "" { continue } dataSource, err := grafanaClient.DataSourceByUID(datasourceUID) if err != nil { dsErrors = append(dsErrors, datasourceUID) //nolint:errcheck level.Error(logger).Log("msg", fmt.Sprintf("error fetching data source config of data source uid: %s", datasourceUID), "err", err) continue } dataSource, err = buildUpdateDataSourceRequest(*dataSource, token) if err != nil { dsErrors = append(dsErrors, datasourceUID) //nolint:errcheck level.Error(logger).Log("msg", fmt.Sprintf("couldn't build data source update request for data source uid: %s", datasourceUID), "err", err) continue } err = grafanaClient.UpdateDataSourceByUID(dataSource) if err != nil { dsErrors = append(dsErrors, datasourceUID) //nolint:errcheck level.Error(logger).Log("msg", fmt.Sprintf("couldn't send update data source request to data source id: %s", datasourceUID), "err", err) continue } dsSuccessfullyUpdated = append(dsSuccessfullyUpdated, datasourceUID) } if len(dsSuccessfullyUpdated) != 0 { //nolint:errcheck level.Info(logger).Log("msg", fmt.Sprintf("Updated Grafana data source uids: %s", dsSuccessfullyUpdated)) } if len(dsErrors) != 0 { //nolint:errcheck level.Error(logger).Log("msg", fmt.Sprintf("Failed to update Grafana data source uids: %s", dsErrors)) os.Exit(1) } } // getOAuth2Token generates an OAuth token based if a JSON file is provided or it will use the default credentials. func getOAuth2Token(credentialsFile string) (string, error) { var err error var token oauth2.TokenSource if credentialsFile == "" { ctx := context.Background() token, err = google.DefaultTokenSource(ctx, "https://www.googleapis.com/auth/monitoring.read") if err != nil { return "", err } } else { jsonKey, err := os.ReadFile(credentialsFile) if err != nil { return "", fmt.Errorf("failed to read json key file: %v", err) } token, err = google.JWTAccessTokenSourceWithScope(jsonKey, "https://www.googleapis.com/auth/monitoring.read") if err != nil { return "", fmt.Errorf("could not generate token: %v", err) } } accessToken, err := token.Token() if err != nil { return "", err } return accessToken.AccessToken, nil } /* buildUpdateDataSourceRequest takes an existing data source config and adds or modifies the Authorization header and updates it to make Grafana compatible with GMP. For reference this is an example of a Grafana data source: "url": "https://monitoring.googleapis.com/v1/projects/gpe-test-1/location/global/prometheus/", "jsonData": { "httpHeaderName1": "X-Custom-Header" "httpHeaderName2": "Authorization", "httpMethod": "POST", "prometheusType": "Prometheus", "prometheusVersion": "2.40.0", }, "secureJsonFields": { "httpHeaderValue1": "secure value", "httpHeaderValue2": "secure value", } */ func buildUpdateDataSourceRequest(dataSource grafana.DataSource, token string) (*grafana.DataSource, error) { var ( minPrometheusVersion = "2.40.0" authorizationHeaderLabel = "Authorization" // httpHeader* are the prefixes that are used to store the names and values of custom headers. // check https://github.com/grafana/grafana/blob/148e1c1588e9f075b14b72eb87d5463ea5bbb253/pkg/services/datasources/models.go#L34C1-L34C1 for more info. httpHeaderName = "httpHeaderName" httpHeaderValue = "httpHeaderValue" ) if dataSource.Type != "prometheus" { return nil, errors.New("datasource type is not prometheus") } if *gcmEndpointOverride != "" { dataSource.URL = *gcmEndpointOverride } else { dataSource.URL = fmt.Sprintf("https://monitoring.googleapis.com/v1/projects/%s/location/global/prometheus/", *projectID) } // Miscellaneous updates to make Grafana more compatible with GMP. jsonData := dataSource.JSONData if jsonData["queryTimeout"] == nil { jsonData["queryTimeout"] = "2m" } if jsonData["timeout"] == nil { jsonData["timeout"] = "120" } jsonData["httpMethod"] = http.MethodGet if jsonData["prometheusType"] == nil { jsonData["prometheusType"] = "Prometheus" } // Make sure prometheusVersion is set to 2.40.0 or higher. if jsonData["prometheusVersion"] == nil { jsonData["prometheusVersion"] = minPrometheusVersion } else { // semver.Compare needs a prefix of v. dsPrometheusVersion := fmt.Sprintf("v%s", jsonData["prometheusVersion"].(string)) if semver.Compare(dsPrometheusVersion, fmt.Sprintf("v%s", minPrometheusVersion)) < 0 { jsonData["prometheusVersion"] = minPrometheusVersion } } // Headers are named httpHeaderNameX. Where X is a digit that is based on the number of headers. // Try to find httpHeaderNameX equal to Authorization. Keep track of X so we know which header // to use for httpHeaderValueX. If it's not found create httpHeaderNameX : Authorization. x := 1 found := false for { authHeader := fmt.Sprintf("%s%d", httpHeaderName, x) value, ok := jsonData[authHeader] if !ok { break } if value == authorizationHeaderLabel { found = true break } x++ } if !found { authHeader := fmt.Sprintf("%s%d", httpHeaderName, x) jsonData[authHeader] = authorizationHeaderLabel } authHeaderValue := fmt.Sprintf("%s%d", httpHeaderValue, x) if dataSource.SecureJSONData == nil { dataSource.SecureJSONData = map[string]interface{}{} } // Add token to SecureJSONData e.g. httpHeaderValue1: Bearer 123. dataSource.SecureJSONData[authHeaderValue] = fmt.Sprintf("Bearer %s", token) return &dataSource, nil } func getTLSClient(certFile, keyFile, caFile string, insecureSkipVerify bool) (*http.Client, error) { if (certFile != "" || keyFile != "") && (certFile == "" || keyFile == "") { return nil, errors.New("--tls-cert and tls-key must both be set or unset") } if certFile == "" && keyFile == "" && caFile == "" && !insecureSkipVerify { return nil, nil } tlsConfig := &tls.Config{ InsecureSkipVerify: insecureSkipVerify, } if certFile != "" && keyFile != "" { cert, err := tls.LoadX509KeyPair(certFile, keyFile) if err != nil { return nil, fmt.Errorf("unable to load server cert and key: %w", err) } tlsConfig.Certificates = []tls.Certificate{cert} } if caFile != "" { caCert, err := os.ReadFile(caFile) if err != nil { return nil, fmt.Errorf("unable to read ca cert: %w", err) } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) tlsConfig.RootCAs = caCertPool } transport := http.DefaultTransport.(*http.Transport).Clone() transport.TLSClientConfig = tlsConfig client := cleanhttp.DefaultClient() client.Transport = transport return client, nil }