testing/fleetservertest/server.go (418 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package fleetservertest import ( "context" "encoding/json" "fmt" "io" "net/http" "strconv" "sync" "sync/atomic" "github.com/gofrs/uuid/v5" "github.com/gorilla/mux" ) // Handlers holds the handlers for the fleet-server-api, // see https://petstore.swagger.io/?url=https://raw.githubusercontent.com/elastic/fleet-server/main/model/openapi.yml // for rendered OpenAPI definition. If any of the handlers are nil, a // http.StatusNotImplemented is returned for the route. // // There is no authentication for the moment. // Check NewServerWithHandlers for a ready to use server or use Handlers in // conjunction with NewServer. type Handlers struct { // AgentID is the ID of agent communicating with this fleet-server: // - on Enroll this ID is set and returned to the enrolling agent, // - on all other API calls using an agent ID, if the ID sent is different, // fleet-server will return a 404. AgentID string APIKey string EnrollmentToken string // logFn if set will be used to log every request. logFn func(format string, a ...any) // =============================== Handlers =============================== AckFn func( ctx context.Context, h *Handlers, agentID string, ackRequest AckRequest) (*AckResponse, *HTTPError) CheckinFn func( ctx context.Context, h *Handlers, id string, userAgent string, acceptEncoding string, checkinRequest CheckinRequest) (*CheckinResponse, *HTTPError) EnrollFn func( ctx context.Context, h *Handlers, userAgent string, enrollmentToken string, enrollRequest EnrollRequest) (*EnrollResponse, *HTTPError) ArtifactFn func( ctx context.Context, h *Handlers, artifactID string, sha2 string) *HTTPError StatusFn func( ctx context.Context, h *Handlers) (*StatusResponse, *HTTPError) UploadBeginFn func( ctx context.Context, h *Handlers, requestBody UploadBeginRequest) (*UploadBeginResponse, *HTTPError) UploadChunkFn func( ctx context.Context, h *Handlers, uploadID string, chunkNum int32, xChunkSHA2 string, body io.ReadCloser) *HTTPError UploadCompleteFn func( ctx context.Context, h *Handlers, uploadID string, uploadCompleteRequest UploadCompleteRequest) *HTTPError } type Route struct { Name string Method string Pattern string AuthKey string Handler http.Handler } // NewRouter creates a new *mux.Router for each route defined on handlers. // It'll synchronise the calls to the handlers. That way it's safe for any handler // implementation to access the Handlers properties. func NewRouter(handlers *Handlers) *mux.Router { // mu is the mutex used to allow any handler safely access the properties // of handlers. It's used by a middleware so the handler implementation // does not need to worry about race conditions. mu := &sync.Mutex{} router := mux.NewRouter().StrictSlash(true) for _, route := range handlers.Routes() { router. Methods(route.Method). Path(route.Pattern). Name(route.Name). Handler( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { mu.Lock() defer mu.Unlock() ww := &statusResponseWriter{w: w} requestID := r.Header.Get("X-Request-Id") if requestID == "" { requestID = uuid.Must(uuid.NewV4()).String() } ww.Header().Set("X-Request-Id", requestID) handlers.logFn("[%s] STARTING - %s %s %s %s\n", requestID, r.Method, r.URL, r.Proto, r.RemoteAddr) route.Handler. ServeHTTP(ww, r) handlers.logFn("[%s] DONE %d - %s %s %s %s %d\n", requestID, ww.statusCode, r.Method, r.URL, r.Proto, r.RemoteAddr, ww.byteCount.Load()) })) } return router } // Routes returns all the api routes for the Handlers func (h *Handlers) Routes() []Route { return []Route{ { Name: "AgentAcks", Method: http.MethodPost, Pattern: PathAgentAcks, AuthKey: h.APIKey, Handler: http.HandlerFunc(h.AgentAcks), }, { Name: "AgentCheckin", Method: http.MethodPost, Pattern: PathAgentCheckin, AuthKey: h.APIKey, Handler: http.HandlerFunc(h.AgentCheckin), }, { Name: "AgentEnroll", Method: http.MethodPost, Pattern: PathAgentEnroll, AuthKey: h.EnrollmentToken, Handler: http.HandlerFunc(h.AgentEnroll), }, { Name: "Artifact", Method: http.MethodGet, Pattern: PathArtifact, AuthKey: h.APIKey, Handler: http.HandlerFunc(h.Artifact), }, { Name: "Status", Method: http.MethodGet, Pattern: PathStatus, AuthKey: h.APIKey, Handler: http.HandlerFunc(h.Status), }, { Name: "UploadBegin", Method: http.MethodPost, Pattern: PathUploadBegin, AuthKey: h.APIKey, Handler: http.HandlerFunc(h.UploadBegin), }, { Name: "UploadChunk", Method: http.MethodPut, Pattern: PathUploadChunk, AuthKey: h.APIKey, Handler: http.HandlerFunc(h.UploadChunk), }, { Name: "UploadComplete", Method: http.MethodPost, Pattern: PathUploadComplete, AuthKey: h.APIKey, Handler: http.HandlerFunc(h.UploadComplete), }, } } // AgentAcks - func (h *Handlers) AgentAcks(w http.ResponseWriter, r *http.Request) { if h.AckFn == nil { err := &HTTPError{StatusCode: http.StatusNotImplemented, Message: "agent acs Handlers not implemented"} respondAsJSON(err.StatusCode, err, w) return } params := mux.Vars(r) agentID := params["id"] ackRequestParam := AckRequest{} d := json.NewDecoder(r.Body) if err := d.Decode(&ackRequestParam); err != nil { respondAsJSON(http.StatusBadRequest, HTTPError{ StatusCode: http.StatusBadRequest, Message: fmt.Sprintf("could not decode ack params: %v", err), }, w) return } result, err := h.AckFn(r.Context(), h, agentID, ackRequestParam) if err != nil { respondAsJSON(err.StatusCode, err, w) return } respondAsJSON(http.StatusOK, result, w) } // AgentCheckin - func (h *Handlers) AgentCheckin(w http.ResponseWriter, r *http.Request) { if h.CheckinFn == nil { err := &HTTPError{StatusCode: http.StatusNotImplemented, Message: "checkin Handlers not implemented"} respondAsJSON(err.StatusCode, err, w) return } params := mux.Vars(r) agentID := params["id"] userAgentParam := r.Header.Get("User-Agent") acceptEncodingParam := r.Header.Get("Accept-Encoding") checkinRequestParam := CheckinRequest{} d := json.NewDecoder(r.Body) if err := d.Decode(&checkinRequestParam); err != nil { respondAsJSON(http.StatusBadRequest, HTTPError{ StatusCode: http.StatusBadRequest, Message: fmt.Sprintf("cannot decode checking request: %v", err), }, w) return } result, err := h.CheckinFn( r.Context(), h, agentID, userAgentParam, acceptEncodingParam, checkinRequestParam) if err != nil { respondAsJSON(err.StatusCode, err, w) return } respondAsJSON(http.StatusOK, result, w) } // AgentEnroll - func (h *Handlers) AgentEnroll(w http.ResponseWriter, r *http.Request) { if h.EnrollFn == nil { err := &HTTPError{StatusCode: http.StatusNotImplemented, Message: "agent checkin Handlers not implemented"} respondAsJSON(err.StatusCode, err, w) return } userAgentParam := r.Header.Get("User-Agent") enrollRequestParam := EnrollRequest{} d := json.NewDecoder(r.Body) if err := d.Decode(&enrollRequestParam); err != nil { respondAsJSON(http.StatusBadRequest, HTTPError{ StatusCode: http.StatusBadRequest, Message: fmt.Sprintf("could not decode enroll request: %v", err), }, w) return } // we can ignore the error and let the handler implementation deal with it. enrollmentToken, _ := getAuthorization(r) result, err := h.EnrollFn( r.Context(), h, userAgentParam, enrollmentToken, enrollRequestParam) if err != nil { respondAsJSON(err.StatusCode, err, w) return } respondAsJSON(http.StatusOK, result, w) } // Artifact - func (h *Handlers) Artifact(w http.ResponseWriter, r *http.Request) { if h.ArtifactFn == nil { err := &HTTPError{StatusCode: http.StatusNotImplemented, Message: "artifact Handlers not implemented"} respondAsJSON(err.StatusCode, err, w) return } params := mux.Vars(r) agentID := params["id"] sha2Param := params["sha2"] err := h.ArtifactFn(r.Context(), h, agentID, sha2Param) if err != nil { respondAsJSON(err.StatusCode, err, w) return } respondAsJSON(http.StatusOK, nil, w) } // Status - func (h *Handlers) Status(w http.ResponseWriter, r *http.Request) { if h.StatusFn == nil { err := &HTTPError{StatusCode: http.StatusNotImplemented, Message: "status Handlers not implemented"} respondAsJSON(err.StatusCode, err, w) return } result, err := h.StatusFn(r.Context(), h) if err != nil { if result != nil { respondAsJSON(err.StatusCode, result, w) } respondAsJSON(err.StatusCode, err, w) return } respondAsJSON(http.StatusOK, result, w) } // UploadBegin - Initiate a file upload process func (h *Handlers) UploadBegin(w http.ResponseWriter, r *http.Request) { if h.UploadBeginFn == nil { err := &HTTPError{StatusCode: http.StatusNotImplemented, Message: "upload begin Handlers not implemented"} respondAsJSON(err.StatusCode, err, w) return } requestBodyParam := UploadBeginRequest{} d := json.NewDecoder(r.Body) if err := d.Decode(&requestBodyParam); err != nil { respondAsJSON(http.StatusBadRequest, HTTPError{ StatusCode: http.StatusBadRequest, Message: fmt.Sprintf("could not decode upliad begin request: %v", err), }, w) return } result, err := h.UploadBeginFn(r.Context(), h, requestBodyParam) if err != nil { respondAsJSON(err.StatusCode, err, w) return } respondAsJSON(http.StatusOK, result, w) } // UploadChunk - Upload a section of file data func (h *Handlers) UploadChunk(w http.ResponseWriter, r *http.Request) { if h.UploadChunkFn == nil { err := &HTTPError{StatusCode: http.StatusNotImplemented, Message: "upload chunk Handlers not implemented"} respondAsJSON(err.StatusCode, err, w) return } params := mux.Vars(r) uploadID := params["id"] chunkNumParam := params["chunkNum"] if chunkNumParam == "" { respondAsJSON(http.StatusBadRequest, HTTPError{ StatusCode: http.StatusBadRequest, Message: "chunkNum is empty", }, w) } chunkNum, err := strconv.ParseInt(chunkNumParam, 10, 32) if err != nil { respondAsJSON(http.StatusBadRequest, HTTPError{ StatusCode: http.StatusBadRequest, Message: fmt.Sprintf("could not parse chunkNumParam=%s: %v", chunkNumParam, err), }, w) } chunkSHA2 := r.Header.Get("X-Chunk-SHA2") // Currently fleet-server limits the size of each chunk to 4 MiB. body := http.MaxBytesReader(w, r.Body, 4194304 /*4 MiB*/) uerr := h.UploadChunkFn( r.Context(), h, uploadID, int32(chunkNum), chunkSHA2, body) if uerr != nil { respondAsJSON(uerr.StatusCode, uerr, w) return } respondAsJSON(http.StatusOK, nil, w) } // UploadComplete - Complete a file upload process func (h *Handlers) UploadComplete(w http.ResponseWriter, r *http.Request) { if h.UploadCompleteFn == nil { err := &HTTPError{StatusCode: http.StatusNotImplemented, Message: "upload complete Handlers not implemented"} respondAsJSON(err.StatusCode, err, w) return } params := mux.Vars(r) uploadID := params["id"] uploadCompleteRequestParam := UploadCompleteRequest{} d := json.NewDecoder(r.Body) if err := d.Decode(&uploadCompleteRequestParam); err != nil { respondAsJSON(http.StatusBadRequest, HTTPError{ StatusCode: http.StatusBadRequest, Message: fmt.Sprintf("could not decode upload complete request%v", err), }, w) return } err := h.UploadCompleteFn(r.Context(), h, uploadID, uploadCompleteRequestParam) if err != nil { respondAsJSON(err.StatusCode, err, w) return } respondAsJSON(http.StatusOK, `{"status": "ok"}`, w) } // respondAsJSON uses the json encoder to write an interface to the http response with an optional status code func respondAsJSON(status int, body interface{}, w http.ResponseWriter) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(status) if err := json.NewEncoder(w).Encode(body); err != nil { //nolint:forbidigo // it's to be used in tests fmt.Printf("could not write response body: %v\n", err) } } func updateLocalMetaAgentID(data []byte, agentID string) ([]byte, error) { if data == nil { return data, nil } var m map[string]interface{} err := json.Unmarshal(data, &m) if err != nil { return nil, err } if v, ok := m["elastic"]; ok { if sm, ok := v.(map[string]interface{}); ok { if v, ok = sm["agent"]; ok { if sm, ok = v.(map[string]interface{}); ok { if _, ok = sm["id"]; ok { sm["id"] = agentID data, err = json.Marshal(m) if err != nil { return nil, err } } } } } } return data, nil } // statusResponseWriter wraps a http.ResponseWriter to expose the status code // through statusResponseWriter.statusCode type statusResponseWriter struct { w http.ResponseWriter statusCode int byteCount atomic.Uint64 } func (s *statusResponseWriter) Header() http.Header { return s.w.Header() } func (s *statusResponseWriter) Write(bs []byte) (int, error) { n, err := s.w.Write(bs) s.byteCount.Add(uint64(n)) //nolint:gosec// output of Write is guaranteed to be non-negative return n, err } func (s *statusResponseWriter) WriteHeader(statusCode int) { s.statusCode = statusCode s.w.WriteHeader(statusCode) } func (s *statusResponseWriter) StatusCode() int { return s.statusCode }