recipes/beyla-service-graph/graphgen/internal/query.go (71 lines of code) (raw):
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"context"
"fmt"
"log/slog"
"strings"
"time"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
)
const (
clientIpKey = "client_address"
clientKey = "client"
serverIpKey = "server_address"
serverKey = "server"
)
type QueryArgs struct {
QueryWindow time.Duration
Cluster string
Namespace string
}
func QueryPrometheus(
ctx context.Context,
client api.Client,
queryArgs QueryArgs,
) (*Graph, error) {
promApi := v1.NewAPI(client)
query := getQuery(queryArgs)
slog.InfoContext(ctx, "logging", "query", query)
res, warnings, err := promApi.Query(ctx, query, time.Now())
if err != nil {
return nil, err
}
if len(warnings) != 0 {
slog.WarnContext(ctx, "Warnings from promQL query", "warnings", warnings)
}
slog.InfoContext(ctx, "Got metrics", "metrics", res)
slog.InfoContext(ctx, "type", "type", res.Type())
vec, ok := res.(model.Vector)
if !ok {
return nil, fmt.Errorf("couldn't cast %v to vector", res)
}
graph := NewGraph()
for _, sample := range vec {
labels := sample.Metric
client := &Node{Ip: string(labels[clientIpKey]), Name: string(labels[clientKey])}
server := &Node{Ip: string(labels[serverIpKey]), Name: string(labels[serverKey])}
graph.AddEdge(client, server)
}
return graph, nil
}
func getQuery(queryArgs QueryArgs) string {
filters := addFilter(nil, "cluster", queryArgs.Cluster)
filters = addFilter(filters, "namespace", queryArgs.Namespace)
return fmt.Sprintf(
`sum by (
server, client_address, client, server_address
) (
rate(traces_service_graph_request_total{%s}[%s]) != 0
)`,
strings.Join(filters, ","),
queryArgs.QueryWindow,
)
}
func addFilter(filters []string, key, value string) []string {
if value == "" {
return filters
}
return append(filters, fmt.Sprintf(`%s="%s"`, key, value))
}