func()

in pkg/controller/cmd/server.go [236:308]


func (s *Server) GetFlowGraph(ctx *gin.Context) {
	var ts, fs time.Time
	f := ctx.Query("from")
	t := ctx.Query("to")
	if t != "" {
		ti, err := strconv.Atoi(t)
		if err != nil {
			ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("cannot convert timestamp: %v", err)})
		}
		ts = time.Unix(int64(ti), 0)
	} else {
		ts = time.Now()
	}
	if f != "" {
		ti, err := strconv.Atoi(f)
		if err != nil {
			ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("cannot convert timestamp: %v", err)})
		}
		fs = time.Unix(int64(ti), 0)
	} else {
		fs = ts.Add(-15 * time.Minute)
	}

	r := int(ts.Sub(fs).Seconds())

	result, _, err := s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_flow_bytes[%ds]) > 0", r), ts)
	if err != nil {
		ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
		return
	}
	vector := result.(model.Vector)

	g, err := graph.FromVector(vector)
	if err != nil {
		ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error convert flow metrics to graph: %v", err)})
		return
	}
	g.SetEdgeBytesFromVector(vector)

	result, _, err = s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_flow_packets[%ds]) > 0", r), ts)
	if err != nil {
		ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
		return
	}
	vector = result.(model.Vector)
	g.SetEdgePacketsFromVector(vector)

	result, _, err = s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_packetloss_total[%ds]) > 0", r), ts)
	if err != nil {
		ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
		return
	}
	vector = result.(model.Vector)
	g.AddNodesFromVector(vector)
	g.SetEdgeDroppedFromVector(vector)

	result, _, err = s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_tcpretrans_total[%ds]) > 0", r), ts)
	if err != nil {
		ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
		return
	}
	vector = result.(model.Vector)
	g.AddNodesFromVector(vector)
	g.SetEdgeRetransFromVector(vector)

	jstr, err := g.ToJSON()
	if err != nil {
		ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error marshalling to json: %v", err)})
		return
	}

	ctx.Data(http.StatusOK, gin.MIMEJSON, jstr)
}