internal/pkg/file/uploader/upload.go (170 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package uploader import ( "context" "encoding/json" "errors" "fmt" "strings" "time" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/cache" "github.com/elastic/fleet-server/v7/internal/pkg/file" "github.com/elastic/go-elasticsearch/v8" "github.com/gofrs/uuid" "go.elastic.co/apm/v2" ) var ( ErrInvalidUploadID = errors.New("active upload not found with this ID, it may be expired") ErrFileSizeTooLarge = errors.New("this file exceeds the maximum allowed file size") ErrMissingChunks = errors.New("file data incomplete, not all chunks were uploaded") ErrHashMismatch = errors.New("hash does not match") ErrUploadExpired = errors.New("upload has expired") ErrUploadStopped = errors.New("upload has stopped") ErrInvalidChunkNum = errors.New("invalid chunk number") ErrPayloadRequired = errors.New("upload start payload required") ErrFileSizeRequired = errors.New("file.size is required") ErrInvalidFileSize = errors.New("invalid filesize") ErrFieldRequired = errors.New("field required") ) type Uploader struct { cache cache.Cache // cache of file metadata doc info sizeLimit int64 timeLimit time.Duration chunkClient *elasticsearch.Client bulker bulk.Bulk } func New(chunkClient *elasticsearch.Client, bulker bulk.Bulk, cache cache.Cache, sizeLimit int64, timeLimit time.Duration) *Uploader { return &Uploader{ chunkClient: chunkClient, bulker: bulker, sizeLimit: sizeLimit, timeLimit: timeLimit, cache: cache, } } // Start an upload operation func (u *Uploader) Begin(ctx context.Context, namespaces []string, data JSDict) (file.Info, error) { vSpan, _ := apm.StartSpan(ctx, "validateFileInfo", "validate") if data == nil { vSpan.End() return file.Info{}, ErrPayloadRequired } /* Validation and Input parsing */ // make sure all required fields are present and non-empty if err := validateUploadPayload(data); err != nil { vSpan.End() return file.Info{}, err } size, _ := data.Int64("file", "size") if size > u.sizeLimit { vSpan.End() return file.Info{}, ErrFileSizeTooLarge } uid, err := uuid.NewV4() vSpan.End() if err != nil { return file.Info{}, fmt.Errorf("unable to generate upload operation ID: %w", err) } id := uid.String() // grab required fields that were checked already in validation step agentID, _ := data.Str("agent_id") actionID, _ := data.Str("action_id") source, _ := data.Str("src") docID := fmt.Sprintf("%s.%s", actionID, agentID) info := file.Info{ ID: id, DocID: docID, AgentID: agentID, ActionID: actionID, Namespaces: namespaces, ChunkSize: file.MaxChunkSize, Source: source, Total: size, Status: file.StatusAwaiting, Start: time.Now(), } chunkCount := info.Total / info.ChunkSize if info.Total%info.ChunkSize > 0 { chunkCount += 1 } info.Count = int(chunkCount) /* Enrich document with additional server-side fields */ if err := data.Put(info.ChunkSize, "file", "ChunkSize"); err != nil { return file.Info{}, err } if err := data.Put(info.Status, "file", "Status"); err != nil { return file.Info{}, err } if err := data.Put(id, "upload_id"); err != nil { return file.Info{}, err } if err := data.Put(info.Start.UnixMilli(), "upload_start"); err != nil { return file.Info{}, err } if err := data.Put(info.Start.UnixMilli(), "@timestamp"); err != nil { return file.Info{}, err } if err := data.Put(info.Namespaces, "namespaces"); err != nil { return file.Info{}, err } /* Write to storage */ doc, err := json.Marshal(data) if err != nil { return file.Info{}, err } _, err = CreateFileDoc(ctx, u.bulker, doc, source, docID) if err != nil { return file.Info{}, err } return info, nil } func (u *Uploader) Chunk(ctx context.Context, uplID string, chunkNum int, chunkHash string) (file.Info, file.ChunkInfo, error) { // find the upload, details, and status associated with the file upload info, err := u.GetUploadInfo(ctx, uplID) if err != nil { return file.Info{}, file.ChunkInfo{}, err } /* Verify Chunk upload can proceed */ if info.Expired(u.timeLimit) { return file.Info{}, file.ChunkInfo{}, ErrUploadExpired } if !info.StatusCanUpload() { return file.Info{}, file.ChunkInfo{}, ErrUploadStopped } if chunkNum < 0 || chunkNum >= info.Count { return file.Info{}, file.ChunkInfo{}, ErrInvalidChunkNum } return info, file.ChunkInfo{ Pos: chunkNum, BID: info.DocID, Last: chunkNum == info.Count-1, Size: int(info.ChunkSize), SHA2: chunkHash, }, nil } func validateUploadPayload(info JSDict) error { required := [][]string{ {"file", "name"}, {"file", "mime_type"}, {"action_id"}, {"agent_id"}, {"src"}, } for _, fields := range required { if value, ok := info.Str(fields...); !ok || strings.TrimSpace(value) == "" { return fmt.Errorf("%s is required: %w", strings.Join(fields, "."), ErrFieldRequired) } } if size, ok := info.Int64("file", "size"); !ok { return ErrFileSizeRequired } else if size <= 0 { return fmt.Errorf("file.size: %d: %w", size, ErrInvalidFileSize) } return nil } // GetUploadInfo searches for Upload Metadata document in local memory cache if available // otherwise, fetches from elasticsearch and caches for next use func (u *Uploader) GetUploadInfo(ctx context.Context, uploadID string) (file.Info, error) { span, ctx := apm.StartSpan(ctx, "getFileInfo", "process") defer span.End() // Fetch metadata doc, if not cached info, exist := u.cache.GetUpload(uploadID) if exist { return info, nil } // not found in cache, try fetching info, err := file.GetInfo(ctx, u.bulker, UploadHeaderIndexPattern, uploadID) if err != nil { return file.Info{}, fmt.Errorf("unable to retrieve upload info: %w", err) } u.cache.SetUpload(uploadID, info) return info, nil }