clusterloader2/pkg/prometheus/clients/aks_managed.go (82 lines of code) (raw):
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prom
import (
"context"
"fmt"
"io"
"net/http"
"os"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"k8s.io/klog/v2"
)
const (
azureQueryEndpointEnv = "AZURE_QUERY_ENDPOINT"
tokenRequestContext = "https://prometheus.monitor.azure.com/.default"
)
// aksManagedPrometheusClient is a client for querying Azure Managed Prometheus in an AKS managed environment.
// It contains the URI for the Azure Managed Prometheus query endpoint and necessary headers for authentication.
// Overview: https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/prometheus-metrics-overview.
// Query API: https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/prometheus-api-promql.
type aksManagedPrometheusClient struct {
uri string
headers map[string]string
}
// Query creates an HTTP request to query Azure Prometheus endpoint at a specific time,
// and returns the result as a byte slice.
func (mpc *aksManagedPrometheusClient) Query(query string, queryTime time.Time) ([]byte, error) {
// Create a context with a timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", mpc.uri, nil)
if err != nil {
return nil, err
}
// Set query parameters
q := req.URL.Query()
q.Add("query", query)
q.Add("time", queryTime.Format(time.RFC3339))
req.URL.RawQuery = q.Encode()
// Set headers
for key, value := range mpc.headers {
req.Header.Add(key, value)
}
// Execute the request
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Read and handle the response
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if statusCode := resp.StatusCode; statusCode > 299 {
return respBody, fmt.Errorf("response failed with status code %d", statusCode)
}
return respBody, nil
}
func getPromQLToken(credential *azidentity.DefaultAzureCredential) (string, error) {
tokenRequestContext := []string{tokenRequestContext}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
accessToken, err := credential.GetToken(ctx, policy.TokenRequestOptions{Scopes: tokenRequestContext})
if err != nil {
return "", err
}
token := accessToken.Token
return token, nil
}
// NewAKSManagedPrometheusClient returns an instance of aksManagedPrometheusClient
// which is configured to query the Azure Managed Prometheus endpoint.
func NewAKSManagedPrometheusClient() (Client, error) {
// Check if AZURE_QUERY_ENDPOINT is set
endpoint := os.Getenv(azureQueryEndpointEnv)
if endpoint == "" {
return nil, fmt.Errorf("environment variable AZURE_QUERY_ENDPOINT is not set")
}
credential, err := azidentity.NewDefaultAzureCredential(&azidentity.DefaultAzureCredentialOptions{})
if err != nil {
return nil, err
}
promQLToken, err := getPromQLToken(credential)
if err != nil {
klog.Errorf("Failed to get Azure Prometheus: %v", err)
return nil, err
}
return &aksManagedPrometheusClient{
uri: fmt.Sprintf("%s/api/v1/query", endpoint),
headers: map[string]string{
"Authorization": "Bearer " + promQLToken,
},
}, nil
}
var _ Client = &aksManagedPrometheusClient{}