internal/pkg/api/handleUpload.go (165 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package api import ( "context" "crypto/sha256" "encoding/hex" "encoding/json" "errors" "fmt" "io" "net/http" "strings" "time" "github.com/rs/zerolog" "go.elastic.co/apm/v2" "github.com/elastic/fleet-server/v7/internal/pkg/apikey" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/cache" "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/file" "github.com/elastic/fleet-server/v7/internal/pkg/file/cbor" "github.com/elastic/fleet-server/v7/internal/pkg/file/uploader" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/go-elasticsearch/v8" ) const ( // TODO: move to a config maxFileSize = 104857600 // 100 MiB maxUploadTimer = 24 * time.Hour ) var ( ErrTransitHashRequired = errors.New("transit hash required") ErrAgentIDMissing = errors.New("required field agent_id is missing") ErrFileInfoBodyRequired = fmt.Errorf("file info body is required") ) // FIXME Should we use the structs in openapi.gen.go instead of the generic ones? Will need to rework the uploader if we do type UploadT struct { bulker bulk.Bulk chunkClient *elasticsearch.Client cache cache.Cache uploader *uploader.Uploader authAgent func(*http.Request, *string, bulk.Bulk, cache.Cache) (*model.Agent, error) // injectable for testing purposes authAPIKey func(*http.Request, bulk.Bulk, cache.Cache) (*apikey.APIKey, error) // as above } func NewUploadT(cfg *config.Server, bulker bulk.Bulk, chunkClient *elasticsearch.Client, cache cache.Cache) *UploadT { return &UploadT{ chunkClient: chunkClient, bulker: bulker, cache: cache, uploader: uploader.New(chunkClient, bulker, cache, maxFileSize, maxUploadTimer), authAgent: authAgent, authAPIKey: authAPIKey, } } func (ut *UploadT) validateUploadBeginRequest(ctx context.Context, reader io.Reader) (uploader.JSDict, string, error) { span, _ := apm.StartSpan(ctx, "validateRequest", "validate") defer span.End() payload, err := uploader.ReadDict(reader) if err != nil { if errors.Is(err, io.EOF) { return nil, "", fmt.Errorf("%w: %w", ErrFileInfoBodyRequired, err) } return nil, "", &BadRequestErr{msg: "unable to decode upload begin request", nextErr: err} } // check API key matches payload agent ID agentID, ok := payload.Str("agent_id") if !ok || agentID == "" { return nil, "", ErrAgentIDMissing } return payload, agentID, nil } func (ut *UploadT) handleUploadBegin(_ zerolog.Logger, w http.ResponseWriter, r *http.Request) error { // decode early to match agentID in the payload payload, agentID, err := ut.validateUploadBeginRequest(r.Context(), r.Body) if err != nil { return err } agent, err := ut.authAgent(r, &agentID, ut.bulker, ut.cache) if err != nil { return err } // validate payload, enrich with additional fields, and write metadata doc to ES info, err := ut.uploader.Begin(r.Context(), agent.Namespaces, payload) if err != nil { return err } span, _ := apm.StartSpan(r.Context(), "response", "write") defer span.End() // prepare and write response resp := UploadBeginAPIResponse{ ChunkSize: info.ChunkSize, UploadId: info.ID, } out, err := json.Marshal(resp) if err != nil { return err } w.Header().Set("Content-Type", "application/json") _, err = w.Write(out) if err != nil { return err } return nil } func (ut *UploadT) handleUploadChunk(zlog zerolog.Logger, w http.ResponseWriter, r *http.Request, uplID string, chunkID int, chunkHash string) error { // chunkHash is checked by router upinfo, chunkInfo, err := ut.uploader.Chunk(r.Context(), uplID, chunkID, chunkHash) if err != nil { return err } // prevent over-sized chunks data := http.MaxBytesReader(w, r.Body, file.MaxChunkSize) // compute hash as we stream it hash := sha256.New() copier := io.TeeReader(data, hash) ce := cbor.NewChunkWriter(copier, chunkInfo.Last, chunkInfo.BID, chunkInfo.SHA2, upinfo.ChunkSize) if err := uploader.IndexChunk(r.Context(), ut.chunkClient, ce, upinfo.Source, chunkInfo.BID, chunkInfo.Pos); err != nil { return err } span, ctx := apm.StartSpan(r.Context(), "validateIndexChunk", "validate") hashsum := hex.EncodeToString(hash.Sum(nil)) if !strings.EqualFold(chunkHash, hashsum) { // delete document, since we wrote it, but the hash was invalid // context scoped to allow this operation to finish even if client disconnects if err := uploader.DeleteChunk(ctx, ut.bulker, upinfo.Source, chunkInfo.BID, chunkInfo.Pos); err != nil { zlog.Warn().Err(err). Str("source", upinfo.Source). Str("fileID", chunkInfo.BID). Int("chunkNum", chunkInfo.Pos). Msg("a chunk hash mismatch occurred, and fleet server was unable to remove the invalid chunk") } span.End() return uploader.ErrHashMismatch } span.End() span, _ = apm.StartSpan(r.Context(), "response", "write") defer span.End() w.WriteHeader(http.StatusOK) return nil } func (ut *UploadT) validateUploadCompleteRequest(r *http.Request, id string) (string, error) { span, ctx := apm.StartSpan(r.Context(), "validateRequest", "validate") defer span.End() info, err := ut.uploader.GetUploadInfo(ctx, id) if err != nil { return "", err } // need to auth that it matches the ID in the initial // doc, but that means we had to doc-lookup early if _, err := ut.authAgent(r, &info.AgentID, ut.bulker, ut.cache); err != nil { return "", fmt.Errorf("error authenticating for upload finalization: %w", err) } var req UploadCompleteRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { return "", &BadRequestErr{msg: "unable to decode upload complete request"} } hash := strings.TrimSpace(req.Transithash.Sha256) if hash == "" { return "", ErrTransitHashRequired } return hash, nil } func (ut *UploadT) handleUploadComplete(_ zerolog.Logger, w http.ResponseWriter, r *http.Request, uplID string) error { hash, err := ut.validateUploadCompleteRequest(r, uplID) if err != nil { return err } _, err = ut.uploader.Complete(r.Context(), uplID, hash) if err != nil { return err } span, _ := apm.StartSpan(r.Context(), "response", "write") defer span.End() _, err = w.Write([]byte(`{"status":"ok"}`)) if err != nil { return err } return nil }