internal/pkg/bulk/engine.go (498 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package bulk
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"strings"
"sync"
"time"
"github.com/elastic/fleet-server/v7/internal/pkg/apikey"
"github.com/elastic/fleet-server/v7/internal/pkg/build"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/go-ucfg"
"github.com/rs/zerolog"
"go.elastic.co/apm/v2"
"golang.org/x/sync/semaphore"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
)
type APIKey = apikey.APIKey
type SecurityInfo = apikey.SecurityInfo
type APIKeyMetadata = apikey.APIKeyMetadata
var (
ErrNoQuotes = errors.New("quoted literal not supported")
)
type MultiOp struct {
ID string
Index string
Body []byte
}
type Bulk interface {
// Synchronous operations run in the bulk engine
Create(ctx context.Context, index, id string, body []byte, opts ...Opt) (string, error)
Read(ctx context.Context, index, id string, opts ...Opt) ([]byte, error)
ReadRaw(ctx context.Context, index, id string, opts ...Opt) (*MgetResponseItem, error)
Update(ctx context.Context, index, id string, body []byte, opts ...Opt) error
Delete(ctx context.Context, index, id string, opts ...Opt) error
Index(ctx context.Context, index, id string, body []byte, opts ...Opt) (string, error)
Search(ctx context.Context, index string, body []byte, opts ...Opt) (*es.ResultT, error)
HasTracer() bool
StartTransaction(name, transactionType string) *apm.Transaction
StartTransactionOptions(name, transactionType string, opts apm.TransactionOptions) *apm.Transaction
// Multi Operation API's run in the bulk engine
MCreate(ctx context.Context, ops []MultiOp, opts ...Opt) ([]BulkIndexerResponseItem, error)
MIndex(ctx context.Context, ops []MultiOp, opts ...Opt) ([]BulkIndexerResponseItem, error)
MUpdate(ctx context.Context, ops []MultiOp, opts ...Opt) ([]BulkIndexerResponseItem, error)
MDelete(ctx context.Context, ops []MultiOp, opts ...Opt) ([]BulkIndexerResponseItem, error)
// APIKey operations
APIKeyCreate(ctx context.Context, name, ttl string, roles []byte, meta interface{}) (*APIKey, error)
APIKeyRead(ctx context.Context, id string, withOwner bool) (*APIKeyMetadata, error)
APIKeyAuth(ctx context.Context, key APIKey) (*SecurityInfo, error)
APIKeyInvalidate(ctx context.Context, ids ...string) error
APIKeyUpdate(ctx context.Context, id, outputPolicyHash string, roles []byte) error
// Accessor used to talk to elastic search direcly bypassing bulk engine
Client() *elasticsearch.Client
CreateAndGetBulker(ctx context.Context, zlog zerolog.Logger, outputName string, outputMap map[string]map[string]interface{}) (Bulk, bool, error)
GetBulker(outputName string) Bulk
GetBulkerMap() map[string]Bulk
CancelFn() context.CancelFunc
RemoteOutputConfigChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool
ReadSecrets(ctx context.Context, secretIds []string) (map[string]string, error)
}
const kModBulk = "bulk"
type Bulker struct {
es esapi.Transport
ch chan *bulkT
opts bulkOptT
blkPool sync.Pool
apikeyLimit *semaphore.Weighted
tracer *apm.Tracer
cancelFn context.CancelFunc
remoteOutputConfigMap map[string]map[string]interface{}
bulkerMap map[string]Bulk
remoteOutputMutex sync.RWMutex
}
const (
defaultFlushInterval = time.Second * 5
defaultFlushThresholdCnt = 32768
defaultFlushThresholdSz = 1024 * 1024 * 10
defaultMaxPending = 32
defaultBlockQueueSz = 32 // Small capacity to allow multiOp to spin fast
defaultAPIKeyMaxParallel = 32
defaultApikeyMaxReqSize = 100 * 1024 * 1024
defaultFlushContextTimeout = time.Minute * 1
)
func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker {
bopts := parseBulkOpts(opts...)
poolFunc := func() interface{} {
return &bulkT{ch: make(chan respT, 1)}
}
return &Bulker{
opts: bopts,
es: es,
ch: make(chan *bulkT, bopts.blockQueueSz),
blkPool: sync.Pool{New: poolFunc},
apikeyLimit: semaphore.NewWeighted(int64(bopts.apikeyMaxParallel)),
tracer: tracer,
remoteOutputConfigMap: make(map[string]map[string]interface{}),
// remote ES bulkers
bulkerMap: make(map[string]Bulk),
}
}
func (b *Bulker) GetBulker(outputName string) Bulk {
b.remoteOutputMutex.RLock()
defer b.remoteOutputMutex.RUnlock()
return b.bulkerMap[outputName]
}
// GetBulkerMap returns a copy of the remote output bulkers
func (b *Bulker) GetBulkerMap() map[string]Bulk {
mp := make(map[string]Bulk)
b.remoteOutputMutex.RLock()
for k, v := range b.bulkerMap {
mp[k] = v
}
b.remoteOutputMutex.RUnlock()
return mp
}
func (b *Bulker) CancelFn() context.CancelFunc {
return b.cancelFn
}
func (b *Bulker) updateBulkerMap(outputName string, newBulker *Bulker) {
// concurrency control of updating map
b.remoteOutputMutex.Lock()
defer b.remoteOutputMutex.Unlock()
b.bulkerMap[outputName] = newBulker
}
// for remote ES output, create a new bulker in bulkerMap if does not exist
// if bulker exists for output, check if config changed
// if not changed, return the existing bulker
// if changed, stop the existing bulker and create a new one
func (b *Bulker) CreateAndGetBulker(ctx context.Context, zlog zerolog.Logger, outputName string, outputMap map[string]map[string]interface{}) (Bulk, bool, error) {
hasConfigChanged := b.hasChangedAndUpdateRemoteOutputConfig(zlog, outputName, outputMap[outputName])
b.remoteOutputMutex.RLock()
bulker := b.bulkerMap[outputName]
b.remoteOutputMutex.RUnlock()
if bulker != nil && !hasConfigChanged {
return bulker, false, nil
}
if bulker != nil && hasConfigChanged {
cancelFn := bulker.CancelFn()
if cancelFn != nil {
cancelFn()
}
}
bulkCtx, bulkCancel := context.WithCancel(context.Background()) // background context used to allow bulker to flush on exit, exits when config changes or primary bulker exits.
es, err := b.createRemoteEsClient(bulkCtx, outputName, outputMap)
if err != nil {
defer bulkCancel()
return nil, hasConfigChanged, err
}
// starting a new bulker to create/update API keys for remote ES output
newBulker := NewBulker(es, b.tracer)
newBulker.cancelFn = bulkCancel
b.updateBulkerMap(outputName, newBulker)
errCh := make(chan error)
go func() {
runFunc := func() (err error) {
zlog.Debug().Str(logger.PolicyOutputName, outputName).Msg("Bulker started")
return newBulker.Run(bulkCtx)
}
errCh <- runFunc()
}()
go func() {
select {
case err = <-errCh:
zlog.Error().Err(err).Str(logger.PolicyOutputName, outputName).Msg("Bulker error")
case <-bulkCtx.Done():
zlog.Debug().Str(logger.PolicyOutputName, outputName).Msg("Bulk context done")
err = bulkCtx.Err()
}
}()
return newBulker, hasConfigChanged, nil
}
var newESClient = es.NewClient
func (b *Bulker) createRemoteEsClient(ctx context.Context, outputName string, outputMap map[string]map[string]interface{}) (*elasticsearch.Client, error) {
var esOutput config.Elasticsearch
esConfig, err := ucfg.NewFrom(outputMap[outputName], config.DefaultOptions...)
if err != nil {
return nil, err
}
err = esConfig.Unpack(&esOutput)
if err != nil {
return nil, err
}
if len(esOutput.Hosts) == 0 {
return nil, fmt.Errorf("failed to get hosts from output: %v", outputName)
}
if esOutput.ServiceToken == "" {
return nil, fmt.Errorf("failed to get service token from output: %v", outputName)
}
cfg := config.Config{
Output: config.Output{
Elasticsearch: esOutput,
},
}
es, err := newESClient(ctx, &cfg, false, elasticsearchOptions(
true, b.opts.bi,
)...)
if err != nil {
return nil, err
}
return es, nil
}
func elasticsearchOptions(instumented bool, bi build.Info) []es.ConfigOption {
options := []es.ConfigOption{es.WithUserAgent("Remote-Fleet-Server", bi)}
if instumented {
options = append(options, es.InstrumentRoundTripper())
}
return options
}
func (b *Bulker) Client() *elasticsearch.Client {
client, ok := b.es.(*elasticsearch.Client)
if !ok {
panic("Client is not an elastic search pointer")
}
return client
}
func (b *Bulker) RemoteOutputConfigChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool {
b.remoteOutputMutex.RLock()
defer b.remoteOutputMutex.RUnlock()
curCfg := b.remoteOutputConfigMap[name]
hasChanged := false
// when output config first added, not reporting change
if curCfg != nil && !reflect.DeepEqual(curCfg, newCfg) {
hasChanged = true
}
return hasChanged
}
// check if remote output cfg changed
func (b *Bulker) hasChangedAndUpdateRemoteOutputConfig(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool {
hasChanged := b.RemoteOutputConfigChanged(zlog, name, newCfg)
if hasChanged {
zlog.Debug().Str("name", name).Msg("remote output configuration has changed")
}
newCfgCopy := make(map[string]interface{})
for k, v := range newCfg {
newCfgCopy[k] = v
}
b.remoteOutputMutex.Lock()
b.remoteOutputConfigMap[name] = newCfgCopy
b.remoteOutputMutex.Unlock()
return hasChanged
}
// read secrets one by one as there is no bulk API yet to read them in one request
func (b *Bulker) ReadSecrets(ctx context.Context, secretIds []string) (map[string]string, error) {
result := make(map[string]string)
esClient := b.Client()
for _, id := range secretIds {
val, err := ReadSecret(ctx, esClient, id)
if err != nil {
return nil, err
}
result[id] = val
}
return result, nil
}
// Stop timer, but don't stall on channel.
// API doesn't not seem to work as specified.
func stopTimer(t *time.Timer) {
if !t.Stop() {
select {
case <-t.C:
default:
}
}
}
func blkToQueueType(blk *bulkT) queueType {
queueIdx := kQueueBulk
forceRefresh := blk.flags.Has(flagRefresh)
switch blk.action {
case ActionSearch:
queueIdx = kQueueSearch
case ActionFleetSearch:
queueIdx = kQueueFleetSearch
case ActionRead:
if forceRefresh {
queueIdx = kQueueRefreshRead
} else {
queueIdx = kQueueRead
}
case ActionUpdateAPIKey:
queueIdx = kQueueAPIKeyUpdate
default:
if forceRefresh {
queueIdx = kQueueRefreshBulk
}
}
return queueIdx
}
func (b *Bulker) Run(ctx context.Context) error {
var err error
zerolog.Ctx(ctx).Info().Interface("opts", &b.opts).Msg("Run bulker with options")
// Create timer in stopped state
timer := time.NewTimer(b.opts.flushInterval)
stopTimer(timer)
defer timer.Stop()
w := semaphore.NewWeighted(int64(b.opts.maxPending))
var queues [kNumQueues]queueT
var i queueType
for ; i < kNumQueues; i++ {
queues[i].ty = i
}
var itemCnt int
var byteCnt int
doFlush := func() error {
for i := range queues {
q := &queues[i]
if q.pending > 0 {
// Pass queue structure by value
if err := b.flushQueue(ctx, w, *q); err != nil {
return err
}
// Reset local queue stored in array
q.cnt = 0
q.head = nil
q.pending = 0
}
}
// Reset threshold counters
itemCnt = 0
byteCnt = 0
return nil
}
for err == nil {
select {
case blk := <-b.ch:
queueIdx := blkToQueueType(blk)
q := &queues[queueIdx]
// Prepend block to head of target queue
blk.next = q.head
q.head = blk
// Update pending count on target queue
q.cnt += 1
q.pending += blk.buf.Len()
// Update threshold counters
itemCnt += 1
byteCnt += blk.buf.Len()
// Start timer on first queued item
if itemCnt == 1 {
timer.Reset(b.opts.flushInterval)
}
// Threshold test, short circuit timer on pending count
if itemCnt >= b.opts.flushThresholdCnt || byteCnt >= b.opts.flushThresholdSz {
zerolog.Ctx(ctx).Trace().
Str("mod", kModBulk).
Int("itemCnt", itemCnt).
Int("byteCnt", byteCnt).
Msg("Flush on threshold")
err = doFlush()
stopTimer(timer)
}
case <-timer.C:
zerolog.Ctx(ctx).Trace().
Str("mod", kModBulk).
Int("itemCnt", itemCnt).
Int("byteCnt", byteCnt).
Msg("Flush on timer")
err = doFlush()
case <-ctx.Done():
err = ctx.Err()
}
}
// cancelling context of each remote bulker when Run exits
defer func() {
b.remoteOutputMutex.RLock()
defer b.remoteOutputMutex.RUnlock()
for _, bulker := range b.bulkerMap {
bulker.CancelFn()()
}
}()
return err
}
func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue queueT) error {
start := time.Now()
zerolog.Ctx(ctx).Trace().
Str("mod", kModBulk).
Int("cnt", queue.cnt).
Int("szPending", queue.pending).
Str("queue", queue.Type()).
Msg("flushQueue Wait")
acquireCtx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout)
defer cancel()
if err := w.Acquire(acquireCtx, 1); err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("flushQueue Wait error")
return err
}
zerolog.Ctx(ctx).Trace().
Str("mod", kModBulk).
Int("cnt", queue.cnt).
Dur("tdiff", time.Since(start)).
Int("szPending", queue.pending).
Str("queue", queue.Type()).
Msg("flushQueue Acquired")
go func() {
start := time.Now()
// deadline prevents bulker being blocked on flush
flushCtx, cancel := context.WithTimeout(ctx, defaultFlushContextTimeout)
defer cancel()
if b.tracer != nil {
trans := b.tracer.StartTransaction(fmt.Sprintf("Flush queue %s", queue.Type()), "bulker")
trans.Context.SetLabel("queue.size", queue.cnt)
trans.Context.SetLabel("queue.pending", queue.pending)
ctx = apm.ContextWithTransaction(ctx, trans)
defer trans.End()
}
defer w.Release(1)
var err error
switch queue.ty {
case kQueueRead, kQueueRefreshRead:
err = b.flushRead(flushCtx, queue)
case kQueueSearch, kQueueFleetSearch:
err = b.flushSearch(flushCtx, queue)
case kQueueAPIKeyUpdate:
err = b.flushUpdateAPIKey(flushCtx, queue)
default:
err = b.flushBulk(flushCtx, queue)
}
if err != nil {
failQueue(queue, err)
apm.CaptureError(ctx, err).Send()
}
zerolog.Ctx(ctx).Trace().
Err(err).
Str("mod", kModBulk).
Int("cnt", queue.cnt).
Int("szPending", queue.pending).
Str("queue", queue.Type()).
Dur("rtt", time.Since(start)).
Msg("flushQueue Done")
}()
return nil
}
func failQueue(queue queueT, err error) {
for n := queue.head; n != nil; {
next := n.next // 'n' is invalid immediately on channel send
select {
case n.ch <- respT{
err: err,
}:
default:
panic("Unexpected blocked response channel on failQueue")
}
n = next
}
}
func (b *Bulker) parseOpts(opts ...Opt) optionsT {
var opt optionsT
for _, o := range opts {
o(&opt)
}
return opt
}
func (b *Bulker) newBlk(action actionT, opts optionsT) *bulkT {
blk := b.blkPool.Get().(*bulkT) //nolint:errcheck // we control what is placed in the pool
blk.action = action
if opts.Refresh {
blk.flags.Set(flagRefresh)
}
blk.spanLink = opts.spanLink
return blk
}
func (b *Bulker) freeBlk(blk *bulkT) {
blk.reset()
b.blkPool.Put(blk)
}
func (b *Bulker) validateIndex(index string) error {
// TODO: index
return nil
}
func (b *Bulker) validateIndices(indices []string) error {
for _, i := range indices {
if err := b.validateIndex(i); err != nil {
return err
}
}
return nil
}
func (b *Bulker) validateMeta(index, id string) error {
// Quotes on id are legal, but weird. Disallow for now.
if strings.IndexByte(index, '"') != -1 || strings.IndexByte(id, '"') != -1 {
return ErrNoQuotes
}
return nil
}
// TODO: Fail on non-escaped line feeds
func (b *Bulker) validateBody(body []byte) error {
if !json.Valid(body) {
return es.ErrInvalidBody
}
return nil
}
func (b *Bulker) dispatch(ctx context.Context, blk *bulkT) respT {
start := time.Now()
// Dispatch to bulk Run loop
select {
case b.ch <- blk:
case <-ctx.Done():
zerolog.Ctx(ctx).Error().
Err(ctx.Err()).
Str("mod", kModBulk).
Str("action", blk.action.String()).
Bool("refresh", blk.flags.Has(flagRefresh)).
Dur("rtt", time.Since(start)).
Msg("Dispatch abort queue")
return respT{err: ctx.Err()}
}
// Wait for response
select {
case resp := <-blk.ch:
zerolog.Ctx(ctx).Trace().
Err(resp.err).
Str("mod", kModBulk).
Str("action", blk.action.String()).
Bool("refresh", blk.flags.Has(flagRefresh)).
Dur("rtt", time.Since(start)).
Msg("Dispatch OK")
return resp
case <-ctx.Done():
zerolog.Ctx(ctx).Error().
Err(ctx.Err()).
Str("mod", kModBulk).
Str("action", blk.action.String()).
Bool("refresh", blk.flags.Has(flagRefresh)).
Dur("rtt", time.Since(start)).
Msg("Dispatch abort response")
}
return respT{err: ctx.Err()}
}