internal/pkg/file/es.go (212 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package file import ( "context" "encoding/json" "errors" "fmt" "strconv" "strings" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dsl" "github.com/elastic/fleet-server/v7/internal/pkg/es" "go.elastic.co/apm/v2" ) const ( FieldBaseID = "bid" FieldLast = "last" FieldSHA2 = "sha2" FieldUploadID = "upload_id" ) var ( QueryChunkInfoWithSize = prepareChunkInfo(true) QueryChunkInfo = prepareChunkInfo(false) QueryUploadID = prepareFindMetaByUploadID() ) // get fields other than the byte payload (data) func prepareChunkInfo(size bool) *dsl.Tmpl { tmpl := dsl.NewTmpl() root := dsl.NewRoot() root.Param("_source", false) root.Query().Term(FieldBaseID, tmpl.Bind(FieldBaseID), nil) root.Param("fields", []string{FieldSHA2, FieldLast, FieldBaseID}) if size { root.Param("script_fields", map[string]interface{}{ "size": map[string]interface{}{ "script": map[string]interface{}{ "lang": "painless", "source": "params._source.data.length", }, }, }) } root.Size(10000) tmpl.MustResolve(root) return tmpl } func prepareFindMetaByUploadID() *dsl.Tmpl { tmpl := dsl.NewTmpl() root := dsl.NewRoot() root.Query().Term(FieldUploadID, tmpl.Bind(FieldUploadID), nil) tmpl.MustResolve(root) return tmpl } func GetMetadata(ctx context.Context, bulker bulk.Bulk, indexPattern string, uploadID string) ([]es.HitT, error) { span, ctx := apm.StartSpan(ctx, "getFileInfo", "search") defer span.End() query, err := QueryUploadID.Render(map[string]interface{}{ FieldUploadID: uploadID, }) if err != nil { return nil, err } res, err := bulker.Search(ctx, fmt.Sprintf(indexPattern, "*"), query) if err != nil { return nil, err } return res.HitsT.Hits, nil } // Retrieves a file Metadata as an Info object func GetInfo(ctx context.Context, bulker bulk.Bulk, indexPattern string, uploadID string) (Info, error) { results, err := GetMetadata(ctx, bulker, indexPattern, uploadID) if err != nil { return Info{}, err } if len(results) == 0 { return Info{}, ErrInvalidID } if len(results) > 1 { return Info{}, fmt.Errorf("unable to locate upload record, got %d records, expected 1", len(results)) } var fi MetaDoc if err := json.Unmarshal(results[0].Source, &fi); err != nil { return Info{}, fmt.Errorf("file meta doc parsing error: %w", err) } // calculate number of chunks required cnt := fi.File.Size / fi.File.ChunkSize if fi.File.Size%fi.File.ChunkSize > 0 { cnt += 1 } return Info{ ID: fi.UploadID, Source: fi.Source, AgentID: fi.AgentID, ActionID: fi.ActionID, DocID: results[0].ID, ChunkSize: fi.File.ChunkSize, Total: fi.File.Size, Count: int(cnt), Start: fi.Start, Status: Status(fi.File.Status), }, nil } // retrieves a full chunk document, Data included func GetChunk(ctx context.Context, bulker bulk.Bulk, indexPattern string, source string, fileID string, chunkNum int) (Chunk, error) { var chunk Chunk out, err := bulker.Read(ctx, fmt.Sprintf(indexPattern, source), fmt.Sprintf("%s.%d", fileID, chunkNum)) if err != nil { return chunk, err } err = json.Unmarshal(out, &chunk) return chunk, err } type GetChunkInfoOpt struct { IncludeSize bool RequireHash bool } // Retrieves a subset of chunk document fields, specifically omitting the Data payload (bytes) // the chunk's ordered index position (Pos) is also parsed from the document ID. // Optionally adding the calculated field "size", that is the length, in bytes, of the Data field. // and optionally validating that a hash field is present func GetChunkInfos(ctx context.Context, bulker bulk.Bulk, indexPattern string, baseID string, opt GetChunkInfoOpt) ([]ChunkInfo, error) { span, ctx := apm.StartSpan(ctx, "getChunksInfo", "process") defer span.End() tpl := QueryChunkInfo if opt.IncludeSize { tpl = QueryChunkInfoWithSize } query, err := tpl.Render(map[string]interface{}{ FieldBaseID: baseID, }) if err != nil { return nil, err } bSpan, bCtx := apm.StartSpan(ctx, "searchChunksInfo", "search") res, err := bulker.Search(bCtx, fmt.Sprintf(indexPattern, "*"), query) bSpan.End() if err != nil { return nil, err } chunks := make([]ChunkInfo, len(res.HitsT.Hits)) var ( bid string last bool sha2 string size int ok bool ) vSpan, _ := apm.StartSpan(ctx, "validateChunksInfo", "validate") defer vSpan.End() for i, h := range res.HitsT.Hits { if bid, ok = getResultsFieldString(h.Fields, FieldBaseID); !ok { return nil, fmt.Errorf("unable to retrieve %s field from chunk document", FieldBaseID) } if last, ok = getResultsFieldBool(h.Fields, FieldLast); !ok { // Files written by Kibana omit this field for all intermediate chunks // and only write last:true on final chunk. False by default last = false } if sha2, ok = getResultsFieldString(h.Fields, FieldSHA2); opt.RequireHash && !ok { return nil, fmt.Errorf("unable to retrieve %s field from chunk document", FieldSHA2) } if size, ok = getResultsFieldInt(h.Fields, "size"); opt.IncludeSize && !ok { return nil, errors.New("unable to retrieve size from chunk document") } chunkid := strings.TrimPrefix(h.ID, bid+".") chunkNum, err := strconv.Atoi(chunkid) if err != nil { return nil, fmt.Errorf("unable to parse chunk number from id %s: %w", h.ID, err) } chunks[i] = ChunkInfo{ Pos: chunkNum, BID: bid, Last: last, SHA2: sha2, Size: size, Index: h.Index, ID: h.ID, } } return chunks, nil } // convenience function for translating the elasticsearch "field" response format // of "field": { "a": [value], "b": [value] } func getResultField(fields map[string]interface{}, key string) (interface{}, bool) { array, ok := fields[key].([]interface{}) if !ok { return nil, false } if array == nil || len(array) < 1 { return nil, false } return array[0], true } func getResultsFieldString(fields map[string]interface{}, key string) (string, bool) { val, ok := getResultField(fields, key) if !ok { return "", false } str, ok := val.(string) return str, ok } func getResultsFieldBool(fields map[string]interface{}, key string) (bool, bool) { val, ok := getResultField(fields, key) if !ok { return false, false } b, ok := val.(bool) return b, ok } func getResultsFieldInt(fields map[string]interface{}, key string) (int, bool) { val, ok := getResultField(fields, key) if !ok { return 0, false } switch n := val.(type) { case int: return n, true case int64: return int(n), true case float64: return int(n), true default: return 0, false } }