in internal/pkg/file/es.go [140:205]
func GetChunkInfos(ctx context.Context, bulker bulk.Bulk, indexPattern string, baseID string, opt GetChunkInfoOpt) ([]ChunkInfo, error) {
span, ctx := apm.StartSpan(ctx, "getChunksInfo", "process")
defer span.End()
tpl := QueryChunkInfo
if opt.IncludeSize {
tpl = QueryChunkInfoWithSize
}
query, err := tpl.Render(map[string]interface{}{
FieldBaseID: baseID,
})
if err != nil {
return nil, err
}
bSpan, bCtx := apm.StartSpan(ctx, "searchChunksInfo", "search")
res, err := bulker.Search(bCtx, fmt.Sprintf(indexPattern, "*"), query)
bSpan.End()
if err != nil {
return nil, err
}
chunks := make([]ChunkInfo, len(res.HitsT.Hits))
var (
bid string
last bool
sha2 string
size int
ok bool
)
vSpan, _ := apm.StartSpan(ctx, "validateChunksInfo", "validate")
defer vSpan.End()
for i, h := range res.HitsT.Hits {
if bid, ok = getResultsFieldString(h.Fields, FieldBaseID); !ok {
return nil, fmt.Errorf("unable to retrieve %s field from chunk document", FieldBaseID)
}
if last, ok = getResultsFieldBool(h.Fields, FieldLast); !ok {
// Files written by Kibana omit this field for all intermediate chunks
// and only write last:true on final chunk. False by default
last = false
}
if sha2, ok = getResultsFieldString(h.Fields, FieldSHA2); opt.RequireHash && !ok {
return nil, fmt.Errorf("unable to retrieve %s field from chunk document", FieldSHA2)
}
if size, ok = getResultsFieldInt(h.Fields, "size"); opt.IncludeSize && !ok {
return nil, errors.New("unable to retrieve size from chunk document")
}
chunkid := strings.TrimPrefix(h.ID, bid+".")
chunkNum, err := strconv.Atoi(chunkid)
if err != nil {
return nil, fmt.Errorf("unable to parse chunk number from id %s: %w", h.ID, err)
}
chunks[i] = ChunkInfo{
Pos: chunkNum,
BID: bid,
Last: last,
SHA2: sha2,
Size: size,
Index: h.Index,
ID: h.ID,
}
}
return chunks, nil
}