pkg/controller/cmd/server.go (355 lines of code) (raw):
package cmd
import (
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"os/signal"
"sort"
"strconv"
"strings"
"syscall"
"time"
"github.com/alibaba/kubeskoop/pkg/controller/ipcache"
"github.com/alibaba/kubeskoop/pkg/controller/k8s"
"github.com/alibaba/kubeskoop/pkg/controller/graph"
"github.com/alibaba/kubeskoop/pkg/controller/rpc"
"github.com/alibaba/kubeskoop/pkg/controller/service"
exporter "github.com/alibaba/kubeskoop/pkg/exporter/cmd"
skoopContext "github.com/alibaba/kubeskoop/pkg/skoop/context"
"github.com/gin-gonic/gin"
"github.com/prometheus/common/model"
"google.golang.org/grpc"
)
const (
defaultAgentPort = 10263
defaultHTTPPort = 10264
)
type Server struct {
config ServerConfig
controller service.ControllerService
ipCacheService *ipcache.Service
}
func NewServer(config *Config) *Server {
ctrlSVC, err := service.NewControllerService(k8s.Client, &config.Controller)
if err != nil {
log.Fatalf("error create controller service: %v", err)
}
cache := ipcache.NewService(k8s.PodInformer, k8s.NodeInformer)
return &Server{
config: config.Server,
controller: ctrlSVC,
ipCacheService: cache,
}
}
func (s *Server) Run() {
done := make(chan struct{})
k8s.StartInformer(done)
go s.RunAgentServer(s.config.AgentPort, done)
go s.RunHTTPServer(s.config.HTTPPort, done)
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM)
<-signals
close(done)
}
func (s *Server) RunAgentServer(port int, done <-chan struct{}) {
if port == 0 {
port = defaultAgentPort
}
grpcServer := grpc.NewServer(grpc.MaxRecvMsgSize(102 * 1024 * 1024))
rpc.RegisterControllerRegisterServiceServer(grpcServer, s.controller)
rpc.RegisterIPCacheServiceServer(grpcServer, s.ipCacheService)
listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port))
if err != nil {
log.Fatalf("err listen on %d: %v", port, err)
}
go func() {
err = grpcServer.Serve(listener)
if err != nil {
log.Fatalf("grpc serve err: %v", err)
}
}()
<-done
grpcServer.Stop()
}
func (s *Server) RunHTTPServer(port int, done <-chan struct{}) {
if port == 0 {
port = defaultHTTPPort
}
r := gin.New()
r.Use(gin.Logger())
r.Use(gin.Recovery())
r.POST("/diagnose", s.CommitDiagnoseTask)
r.GET("/diagnoses", s.ListDiagnoseTasks)
r.POST("/capture", s.CommitCaptureTask)
r.GET("/captures", s.ListCaptureTasks)
r.GET("/capture/:task_id/download", s.DownloadCaptureFile)
r.POST("/pingmesh", s.PingMesh)
r.GET("/pods", s.ListPods)
r.GET("/nodes", s.ListNodes)
r.GET("/namespaces", s.ListNamespaces)
r.GET("/flow", s.GetFlowGraph)
r.GET("/events", s.GetEvent)
r.GET("/config", s.GetExporterConfig)
r.PUT("/config", s.UpdateExporterConfig)
go func() {
err := r.Run(fmt.Sprintf("0.0.0.0:%d", port))
if err != nil {
log.Fatalf("error run http server: %v", err)
}
}()
<-done
}
// CommitDiagnoseTask commit diagnose task
func (s *Server) CommitDiagnoseTask(ctx *gin.Context) {
var task skoopContext.TaskConfig
if err := ctx.ShouldBindJSON(&task); err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("error get task config from request: %v", err)})
return
}
taskID, err := s.controller.Diagnose(ctx, &task)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("error commit diagnose task: %v", err)})
return
}
ctx.AsciiJSON(http.StatusOK, map[string]string{"task_id": fmt.Sprintf("%d", taskID)})
}
// ListDiagnoseTasks list all diagnose task
func (s *Server) ListDiagnoseTasks(ctx *gin.Context) {
tasks, err := s.controller.DiagnoseList(ctx)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("error list diagnose task: %v", err)})
return
}
ctx.AsciiJSON(http.StatusOK, tasks)
}
// CommitCaptureTask commit capture task
func (s *Server) CommitCaptureTask(ctx *gin.Context) {
var captureTask service.CaptureArgs
if err := ctx.ShouldBindJSON(&captureTask); err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("error get task config from request: %v", err)})
return
}
taskID, err := s.controller.Capture(ctx, &captureTask)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("error commit capture task: %v", err)})
return
}
ctx.AsciiJSON(http.StatusOK, map[string]string{"task_id": fmt.Sprintf("%d", taskID)})
}
// ListCaptureTasks list all capture task
func (s *Server) ListCaptureTasks(ctx *gin.Context) {
tasks, err := s.controller.CaptureList(ctx)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("error list capture task: %v", err)})
}
ctx.AsciiJSON(http.StatusOK, tasks)
}
// DownloadCaptureFile download capture file
func (s *Server) DownloadCaptureFile(ctx *gin.Context) {
id, err := strconv.Atoi(ctx.Param("task_id"))
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("error get task id from request: %v", err)})
return
}
name, fl, fd, err := s.controller.DownloadCaptureFile(ctx, id)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("error download capture file: %v", err)})
return
}
defer fd.Close()
ctx.Header("Content-Disposition", "attachment; filename="+name)
ctx.Header("Content-Type", "application/text/plain")
ctx.Header("Accept-Length", fmt.Sprintf("%d", fl))
_, err = io.Copy(ctx.Writer, fd)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("error transmiss capture file: %v", err)})
return
}
ctx.Status(http.StatusOK)
}
func (s *Server) ListPods(ctx *gin.Context) {
pods, err := s.controller.PodList(ctx)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("error list pods: %v", err)})
return
}
ctx.AsciiJSON(http.StatusOK, pods)
}
func (s *Server) ListNodes(ctx *gin.Context) {
nodes, err := s.controller.NodeList(ctx)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("error list nodes: %v", err)})
return
}
ctx.AsciiJSON(http.StatusOK, nodes)
}
func (s *Server) ListNamespaces(ctx *gin.Context) {
namespaces, err := s.controller.NamespaceList(ctx)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("error list namespaces: %v", err)})
return
}
ctx.AsciiJSON(http.StatusOK, namespaces)
}
func (s *Server) PingMesh(ctx *gin.Context) {
var pingmesh service.PingMeshArgs
if err := ctx.ShouldBindJSON(&pingmesh); err != nil {
ctx.AsciiJSON(400, map[string]string{"error": fmt.Sprintf("error get task config from request: %v", err)})
return
}
result, err := s.controller.PingMesh(ctx, &pingmesh)
if err != nil {
ctx.AsciiJSON(400, map[string]string{"error": fmt.Sprintf("error do pingmesh: %v", err)})
return
}
ctx.JSON(200, result)
}
func (s *Server) GetFlowGraph(ctx *gin.Context) {
var ts, fs time.Time
f := ctx.Query("from")
t := ctx.Query("to")
if t != "" {
ti, err := strconv.Atoi(t)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("cannot convert timestamp: %v", err)})
}
ts = time.Unix(int64(ti), 0)
} else {
ts = time.Now()
}
if f != "" {
ti, err := strconv.Atoi(f)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("cannot convert timestamp: %v", err)})
}
fs = time.Unix(int64(ti), 0)
} else {
fs = ts.Add(-15 * time.Minute)
}
r := int(ts.Sub(fs).Seconds())
result, _, err := s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_flow_bytes[%ds]) > 0", r), ts)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
return
}
vector := result.(model.Vector)
g, err := graph.FromVector(vector)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error convert flow metrics to graph: %v", err)})
return
}
g.SetEdgeBytesFromVector(vector)
result, _, err = s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_flow_packets[%ds]) > 0", r), ts)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
return
}
vector = result.(model.Vector)
g.SetEdgePacketsFromVector(vector)
result, _, err = s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_packetloss_total[%ds]) > 0", r), ts)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
return
}
vector = result.(model.Vector)
g.AddNodesFromVector(vector)
g.SetEdgeDroppedFromVector(vector)
result, _, err = s.controller.QueryPrometheus(ctx, fmt.Sprintf("increase(kubeskoop_tcpretrans_total[%ds]) > 0", r), ts)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query flow metrics: %v", err)})
return
}
vector = result.(model.Vector)
g.AddNodesFromVector(vector)
g.SetEdgeRetransFromVector(vector)
jstr, err := g.ToJSON()
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error marshalling to json: %v", err)})
return
}
ctx.Data(http.StatusOK, gin.MIMEJSON, jstr)
}
func (s *Server) GetEvent(ctx *gin.Context) {
start := ctx.Query("start")
end := ctx.Query("end")
limit := ctx.Query("limit")
nodes := ctx.Query("nodes")
namespaces := ctx.Query("namespaces")
pods := ctx.Query("pods")
types := ctx.Query("types")
var startTime, endTime time.Time
var limitCnt int
var err error
if start != "" {
s, err := strconv.Atoi(start)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("start time format error: %v", err)})
return
}
startTime = time.Unix(int64(s), 0)
} else {
startTime = time.Now().Add(-10 * time.Minute)
}
if end != "" {
e, err := strconv.Atoi(end)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("end time format error: %v", err)})
return
}
endTime = time.Unix(int64(e), 0)
} else {
endTime = time.Now()
}
if limit != "" {
l, err := strconv.Atoi(limit)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("limit time format error: %v", err)})
return
}
limitCnt = l
} else {
limitCnt = 100
}
filters := map[string][]string{}
if nodes != "" {
filters["instance"] = strings.Split(nodes, ",")
}
if namespaces != "" {
filters["namespace"] = strings.Split(namespaces, ",")
}
if pods != "" {
filters["pod"] = strings.Split(pods, ",")
}
if types != "" {
filters["type"] = strings.Split(types, ",")
}
evts, err := s.controller.QueryRangeEvent(ctx, startTime, endTime, filters, limitCnt)
if err != nil {
ctx.AsciiJSON(http.StatusInternalServerError, map[string]string{"error": fmt.Sprintf("error query event: %v", err)})
return
}
// sort events by timestamp in descending order
sort.Slice(evts, func(i, j int) bool {
return evts[i].Timestamp > evts[j].Timestamp
})
ctx.AsciiJSON(http.StatusOK, evts)
}
func (s *Server) GetExporterConfig(ctx *gin.Context) {
cfg, err := s.controller.GetExporterConfig(ctx)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("error get exporter config: %v", err)})
return
}
ctx.AsciiJSON(http.StatusOK, cfg)
}
func (s *Server) UpdateExporterConfig(ctx *gin.Context) {
var cfg *exporter.InspServerConfig
err := ctx.BindJSON(&cfg)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("error unmarshal config: %v", err)})
return
}
err = s.controller.UpdateExporterConfig(ctx, cfg)
if err != nil {
ctx.AsciiJSON(http.StatusBadRequest, map[string]string{"error": fmt.Sprintf("error update exporter config: %v", err)})
return
}
ctx.Status(http.StatusOK)
}