in pkg/controller/cmd/server.go [236:308]
func (s *Server) GetFlowGraph(ctx *gin.Context) {
var ts, fs time.Time
f := ctx.Query("from")
t := ctx.Query("to")
if t != "" {
ti, err := strconv.Atoi(t)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("cannot convert timestamp: %v", err)})
}
ts = time.Unix(int64(ti), 0)
} else {
ts = time.Now()
}
if f != "" {
ti, err := strconv.Atoi(f)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("cannot convert timestamp: %v", err)})
}
fs = time.Unix(int64(ti), 0)
} else {
fs = ts.Add(-15 * time.Minute)
}
r := int(ts.Sub(fs).Seconds())
result, _, err := s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_flow_bytes[%ds]) > 0", r), ts)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
return
}
vector := result.(model.Vector)
g, err := graph.FromVector(vector)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error convert flow metrics to graph: %v", err)})
return
}
g.SetEdgeBytesFromVector(vector)
result, _, err = s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_flow_packets[%ds]) > 0", r), ts)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
return
}
vector = result.(model.Vector)
g.SetEdgePacketsFromVector(vector)
result, _, err = s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_packetloss_total[%ds]) > 0", r), ts)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
return
}
vector = result.(model.Vector)
g.AddNodesFromVector(vector)
g.SetEdgeDroppedFromVector(vector)
result, _, err = s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_tcpretrans_total[%ds]) > 0", r), ts)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
return
}
vector = result.(model.Vector)
g.AddNodesFromVector(vector)
g.SetEdgeRetransFromVector(vector)
jstr, err := g.ToJSON()
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error marshalling to json: %v", err)})
return
}
ctx.Data(http.StatusOK, gin.MIMEJSON, jstr)
}