internal/pkg/file/uploader/es.go (153 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package uploader
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/dsl"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/file"
"github.com/elastic/fleet-server/v7/internal/pkg/file/cbor"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/rs/zerolog"
"go.elastic.co/apm/v2"
)
const (
// integration name is substituted in
UploadHeaderIndexPattern = ".fleet-fileds-fromhost-meta-%s"
UploadDataIndexPattern = ".fleet-fileds-fromhost-data-%s"
)
var (
MatchChunkByBID = prepareQueryChunkByBID()
MatchChunkByDocument = prepareQueryChunkByDoc()
UpdateMetaDocByID = prepareUpdateMetaDoc()
)
func prepareQueryChunkByBID() *dsl.Tmpl {
tmpl := dsl.NewTmpl()
root := dsl.NewRoot()
root.Query().Term(file.FieldBaseID, tmpl.Bind(file.FieldBaseID), nil)
tmpl.MustResolve(root)
return tmpl
}
func prepareQueryChunkByDoc() *dsl.Tmpl {
tmpl := dsl.NewTmpl()
root := dsl.NewRoot()
root.Query().Term("_id", tmpl.Bind("_id"), nil)
tmpl.MustResolve(root)
return tmpl
}
func prepareUpdateMetaDoc() *dsl.Tmpl {
tmpl := dsl.NewTmpl()
root := dsl.NewRoot()
root.Query().Term("_id", tmpl.Bind("_id"), nil)
scr := root.Script()
scr.Param("source", tmpl.Bind("source"))
scr.Param("lang", "painless")
prm := scr.Params()
prm.Param("status", tmpl.Bind("status"))
prm.Param("hash", tmpl.Bind("hash"))
tmpl.MustResolve(root)
return tmpl
}
/*
Metadata Doc Operations
*/
func CreateFileDoc(ctx context.Context, bulker bulk.Bulk, doc []byte, source string, fileID string) (string, error) {
span, ctx := apm.StartSpan(ctx, "createFileInfo", "create")
defer span.End()
return bulker.Create(ctx, fmt.Sprintf(UploadHeaderIndexPattern, source), fileID, doc, bulk.WithRefresh())
}
func UpdateFileDoc(ctx context.Context, bulker bulk.Bulk, source string, fileID string, status file.Status, hash string) error {
span, ctx := apm.StartSpan(ctx, "updateFileInfo", "update_by_query")
defer span.End()
client := bulker.Client()
q, err := UpdateMetaDocByID.Render(map[string]interface{}{
"_id": fileID,
"status": string(status),
"hash": hash,
"source": "ctx._source.file.Status = params.status; if(params.hash != ''){ ctx._source.transithash = ['sha256':params.hash]; }",
})
if err != nil {
return err
}
resp, err := client.UpdateByQuery([]string{fmt.Sprintf(UploadHeaderIndexPattern, source)}, client.UpdateByQuery.WithContext(ctx),
func(req *esapi.UpdateByQueryRequest) {
req.Body = bytes.NewReader(q)
})
if err != nil {
return err
}
type ByQueryResponse struct {
Error es.ErrorT `json:"error"`
}
var response ByQueryResponse
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
return err
}
zerolog.Ctx(ctx).Trace().Int("status_code", resp.StatusCode).Interface("response", response).Msg("updated file metadata document")
if response.Error.Type != "" {
return fmt.Errorf("%s: %s caused by %s: %s", response.Error.Type, response.Error.Reason, response.Error.Cause.Type, response.Error.Cause.Reason)
}
return nil
}
/*
Chunk Operations
*/
func IndexChunk(ctx context.Context, client *elasticsearch.Client, body *cbor.ChunkEncoder, source string, fileID string, chunkNum int) error {
span, _ := apm.StartSpan(ctx, "createChunk", "create")
defer span.End()
chunkDocID := fmt.Sprintf("%s.%d", fileID, chunkNum)
resp, err := client.Create(fmt.Sprintf(UploadDataIndexPattern, source), chunkDocID, body, func(req *esapi.CreateRequest) {
req.DocumentID = chunkDocID
if req.Header == nil {
req.Header = make(http.Header)
}
req.Header.Set("Content-Type", "application/cbor")
req.Header.Set("Accept", "application/json")
req.Refresh = "true"
})
if err != nil {
return err
}
var response ChunkUploadResponse
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
return err
}
zerolog.Ctx(ctx).Trace().Int("status_code", resp.StatusCode).Interface("chunk-response", response).Msg("uploaded chunk")
if response.Error.Type != "" {
return fmt.Errorf("%s: %s caused by %s: %s", response.Error.Type, response.Error.Reason, response.Error.Cause.Type, response.Error.Cause.Reason)
}
return nil
}
type ChunkUploadResponse struct {
Index string `json:"_index"`
ID string `json:"_id"`
Result string `json:"result"`
Version int `json:"_version"`
Shards struct {
Total int `json:"total"`
Success int `json:"successful"`
Failed int `json:"failed"`
} `json:"_shards"`
Error es.ErrorT `json:"error"`
}
func DeleteChunk(ctx context.Context, bulker bulk.Bulk, source string, fileID string, chunkNum int) error {
span, ctx := apm.StartSpan(ctx, "deleteChunk", "delete_by_query")
defer span.End()
q, err := MatchChunkByDocument.Render(map[string]interface{}{
"_id": fmt.Sprintf("%s.%d", fileID, chunkNum),
})
if err != nil {
return err
}
client := bulker.Client()
_, err = client.DeleteByQuery([]string{fmt.Sprintf(UploadDataIndexPattern, source)}, bytes.NewReader(q), client.DeleteByQuery.WithContext(ctx))
return err
}
func DeleteAllChunksForFile(ctx context.Context, bulker bulk.Bulk, source string, baseID string) error {
q, err := MatchChunkByBID.Render(map[string]interface{}{
file.FieldBaseID: baseID,
})
if err != nil {
return err
}
client := bulker.Client()
_, err = client.DeleteByQuery([]string{fmt.Sprintf(UploadDataIndexPattern, source)}, bytes.NewReader(q), client.DeleteByQuery.WithContext(ctx))
return err
}