runner/utils.go (150 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package runner
import (
"container/list"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"strings"
"time"
"github.com/Azure/kperf/api/types"
"github.com/Azure/kperf/metrics"
"github.com/Azure/kperf/runner/group"
"github.com/Azure/kperf/runner/localstore"
"k8s.io/klog/v2"
)
// renderErrorResponse renders error into types.HTTPError format.
func renderErrorResponse(w http.ResponseWriter, code int, err error) {
if err == nil {
panic("unexpected error")
}
w.WriteHeader(code)
data, _ := json.Marshal(types.HTTPError{
ErrorMessage: err.Error(),
})
_, _ = w.Write(data)
}
// buildNetListeners returns slice of net.Listeners.
func buildNetListeners(addrs []string) (_ []net.Listener, retErr error) {
res := make([]net.Listener, 0, len(addrs))
defer func() {
if retErr != nil {
for _, l := range res {
l.Close()
}
}
}()
for _, addr := range addrs {
lis, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to listen on %s: %w", addr, err)
}
res = append(res, lis)
}
return res, nil
}
// buildRunnerGroupSummary returns aggrecated summary from runner groups' report.
func buildRunnerGroupSummary(s *localstore.Store, groups []*group.Handler) *types.RunnerMetricReport {
totalBytes := int64(0)
totalResp := 0
latenciesByURL := map[string]*list.List{}
errs := []types.ResponseError{}
errStats := map[string]int32{}
maxDuration := 0 * time.Second
for idx := range groups {
g := groups[idx]
pods, err := g.Pods(context.TODO())
if err != nil {
klog.V(2).ErrorS(err, "failed to list runners", "runner-group", g.Name())
continue
}
for _, pod := range pods {
data, err := readBlob(s, pod.Name)
if err != nil {
klog.V(2).ErrorS(err, "failed to read report", "runner", pod.Name)
continue
}
report := types.RunnerMetricReport{}
err = json.Unmarshal(data, &report)
if err != nil {
klog.V(2).ErrorS(err, "failed to unmarshal", "runner", pod.Name)
continue
}
// update totalReceivedBytes
totalBytes += report.TotalReceivedBytes
// update latencies
for u, l := range report.LatenciesByURL {
latencies, ok := latenciesByURL[u]
if !ok {
latenciesByURL[u] = list.New()
latencies = latenciesByURL[u]
}
for _, v := range l {
totalResp++
latencies.PushBack(v)
}
}
// update error stats
mergeErrorStat(errStats, report.ErrorStats)
errs = append(errs, report.Errors...)
report.Errors = nil
// update max duration
rDur, err := time.ParseDuration(report.Duration)
if err != nil {
klog.V(2).ErrorS(err, "failed to parse duration", "runner",
pod.Name, "duration", report.Duration)
}
if rDur > maxDuration {
maxDuration = rDur
}
}
}
percentileLatenciesByURL := map[string][][2]float64{}
latencies := make([]float64, 0, totalResp)
for u, l := range latenciesByURL {
lInSlice := listToSliceFloat64(l)
latencies = append(latencies, lInSlice...)
percentileLatenciesByURL[u] = metrics.BuildPercentileLatencies(lInSlice)
}
return &types.RunnerMetricReport{
Total: totalResp,
Errors: errs,
ErrorStats: errStats,
Duration: maxDuration.String(),
TotalReceivedBytes: totalBytes,
PercentileLatencies: metrics.BuildPercentileLatencies(latencies),
PercentileLatenciesByURL: percentileLatenciesByURL,
}
}
// listToSliceFloat64 converts list.List into []float64.
func listToSliceFloat64(l *list.List) []float64 {
res := make([]float64, 0, l.Len())
for e := l.Front(); e != nil; e = e.Next() {
res = append(res, e.Value.(float64))
}
return res
}
// mergeErrorStat merges two error stats.
func mergeErrorStat(s, d map[string]int32) {
for e, n := range d {
s[e] += n
}
}
// readBlob reads blob data from localstore.
func readBlob(s *localstore.Store, ref string) ([]byte, error) {
r, err := s.OpenReader(ref)
if err != nil {
return nil, err
}
defer r.Close()
return io.ReadAll(r)
}
// isLocalhost returns true if addr is local address.
func isLocalhost(addr string) (bool, error) {
h, p, err := net.SplitHostPort(addr)
if err != nil {
if !strings.Contains(err.Error(), "missing port in address") {
return false, fmt.Errorf("invalid address %s: %w", addr, err)
}
h = addr
}
if len(p) == 0 {
return false, fmt.Errorf("invalid host name format %s", addr)
}
if h == "localhost" {
h = "127.0.0.1"
}
ip := net.ParseIP(h)
return ip.IsLoopback(), nil
}