internal/pkg/checkin/bulk.go (262 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
// Package checkin handles agent check ins.
package checkin
import (
"context"
_ "embed"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
"github.com/rs/zerolog"
estypes "github.com/elastic/go-elasticsearch/v8/typedapi/types"
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/scriptlanguage"
)
const defaultFlushInterval = 10 * time.Second
//go:embed deleteAuditFieldsOnCheckin.painless
var deleteAuditAttributesScript string
type optionsT struct {
flushInterval time.Duration
}
type Opt func(*optionsT)
func WithFlushInterval(d time.Duration) Opt {
return func(opt *optionsT) {
opt.flushInterval = d
}
}
type extraT struct {
meta []byte
seqNo sqn.SeqNo
ver string
components []byte
deleteAudit bool
}
// Minimize the size of this structure.
// There will be 10's of thousands of items
// in the map at any point.
type pendingT struct {
ts string
status string
message string
extra *extraT
unhealthyReason *[]string
}
// Bulk will batch pending checkins and update elasticsearch at a set interval.
type Bulk struct {
opts optionsT
bulker bulk.Bulk
mut sync.Mutex
pending map[string]pendingT
ts string
unix int64
}
func NewBulk(bulker bulk.Bulk, opts ...Opt) *Bulk {
parsedOpts := parseOpts(opts...)
return &Bulk{
opts: parsedOpts,
bulker: bulker,
pending: make(map[string]pendingT),
}
}
func parseOpts(opts ...Opt) optionsT {
outOpts := optionsT{
flushInterval: defaultFlushInterval,
}
for _, f := range opts {
f(&outOpts)
}
return outOpts
}
// Generate and cache timestamp on seconds change.
// Avoid thousands of formats of an identical string.
func (bc *Bulk) timestamp() string {
// WARNING: Expects mutex locked.
now := time.Now()
if now.Unix() != bc.unix {
bc.unix = now.Unix()
bc.ts = now.UTC().Format(time.RFC3339)
}
return bc.ts
}
// CheckIn will add the agent (identified by id) to the pending set.
// The pending agents are sent to elasticsearch as a bulk update at each flush interval.
// NOTE: If Checkin is called after Run has returned it will just add the entry to the pending map and not do any operations, this may occur when the fleet-server is shutting down.
// WARNING: Bulk will take ownership of fields, so do not use after passing in.
func (bc *Bulk) CheckIn(id string, status string, message string, meta []byte, components []byte, seqno sqn.SeqNo, newVer string, unhealthyReason *[]string, deleteAudit bool) error {
// Separate out the extra data to minimize
// the memory footprint of the 90% case of just
// updating the timestamp.
var extra *extraT
if meta != nil || seqno.IsSet() || newVer != "" || components != nil || deleteAudit {
extra = &extraT{
meta: meta,
seqNo: seqno,
ver: newVer,
components: components,
deleteAudit: deleteAudit,
}
}
bc.mut.Lock()
bc.pending[id] = pendingT{
ts: bc.timestamp(),
status: status,
message: message,
extra: extra,
unhealthyReason: unhealthyReason,
}
bc.mut.Unlock()
return nil
}
// Run starts the flush timer and exit only when the context is cancelled.
func (bc *Bulk) Run(ctx context.Context) error {
tick := time.NewTicker(bc.opts.flushInterval)
defer tick.Stop()
for {
select {
case <-tick.C:
if err := bc.flush(ctx); err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("Eat bulk checkin error; Keep on truckin'")
}
case <-ctx.Done():
return ctx.Err()
}
}
}
// flush sends the minium data needed to update records in elasticsearch.
func (bc *Bulk) flush(ctx context.Context) error {
start := time.Now()
bc.mut.Lock()
pending := bc.pending
bc.pending = make(map[string]pendingT, len(pending))
bc.mut.Unlock()
if len(pending) == 0 {
return nil
}
updates := make([]bulk.MultiOp, 0, len(pending))
simpleCache := make(map[pendingT][]byte)
nowTimestamp := start.UTC().Format(time.RFC3339)
var err error
var needRefresh bool
for id, pendingData := range pending {
// In the simple case, there are no fields and no seqNo.
// When that is true, we can reuse an already generated
// JSON body containing just the timestamp updates.
var body []byte
if pendingData.extra == nil {
var ok bool
body, ok = simpleCache[pendingData]
if !ok {
fields := bulk.UpdateFields{
dl.FieldLastCheckin: pendingData.ts,
dl.FieldUpdatedAt: nowTimestamp,
dl.FieldLastCheckinStatus: pendingData.status,
dl.FieldLastCheckinMessage: pendingData.message,
dl.FieldUnhealthyReason: pendingData.unhealthyReason,
}
if body, err = fields.Marshal(); err != nil {
return err
}
simpleCache[pendingData] = body
}
} else if pendingData.extra.deleteAudit {
// Use a script instead of a partial doc to update if attributes need to be removed
params, err := encodeParams(nowTimestamp, pendingData)
if err != nil {
return err
}
action := &estypes.UpdateAction{
Script: &estypes.Script{
Lang: &scriptlanguage.Painless,
Source: &deleteAuditAttributesScript,
Options: map[string]string{},
Params: params,
},
}
body, err = json.Marshal(&action)
if err != nil {
return fmt.Errorf("could not marshall script action: %w", err)
}
if pendingData.extra.seqNo.IsSet() {
needRefresh = true
}
} else {
fields := bulk.UpdateFields{
dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp
dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp
dl.FieldLastCheckinStatus: pendingData.status, // Set the pending status
dl.FieldLastCheckinMessage: pendingData.message, // Set the status message
dl.FieldUnhealthyReason: pendingData.unhealthyReason,
}
// If the agent version is not empty it needs to be updated
// Assuming the agent can by upgraded keeping the same id, but incrementing the version
if pendingData.extra.ver != "" {
fields[dl.FieldAgent] = map[string]interface{}{
dl.FieldAgentVersion: pendingData.extra.ver,
}
}
// Update local metadata if provided
if pendingData.extra.meta != nil {
// Surprise: The json encodeer compacts this raw JSON during
// the encode process, so there my be unexpected memory overhead:
// https://github.com/golang/go/blob/go1.16.3/src/encoding/json/encode.go#L499
fields[dl.FieldLocalMetadata] = json.RawMessage(pendingData.extra.meta)
}
// Update components if provided
if pendingData.extra.components != nil {
fields[dl.FieldComponents] = json.RawMessage(pendingData.extra.components)
}
// If seqNo changed, set the field appropriately
if pendingData.extra.seqNo.IsSet() {
fields[dl.FieldActionSeqNo] = pendingData.extra.seqNo
// Only refresh if seqNo changed; dropping metadata not important.
needRefresh = true
}
if body, err = fields.Marshal(); err != nil {
return err
}
}
updates = append(updates, bulk.MultiOp{
ID: id,
Body: body,
Index: dl.FleetAgents,
})
}
var opts []bulk.Opt
if needRefresh {
opts = append(opts, bulk.WithRefresh())
}
_, err = bc.bulker.MUpdate(ctx, updates, opts...)
zerolog.Ctx(ctx).Trace().
Err(err).
Dur("rtt", time.Since(start)).
Int("cnt", len(updates)).
Bool("refresh", needRefresh).
Msg("Flush updates")
return err
}
func encodeParams(now string, data pendingT) (map[string]json.RawMessage, error) {
var (
tsNow json.RawMessage
ts json.RawMessage
status json.RawMessage
message json.RawMessage
reason json.RawMessage
ver json.RawMessage
meta json.RawMessage
components json.RawMessage
isSet json.RawMessage
seqNo json.RawMessage
err error
)
tsNow, err = json.Marshal(now)
Err := errors.Join(err)
ts, err = json.Marshal(data.ts)
Err = errors.Join(Err, err)
status, err = json.Marshal(data.status)
Err = errors.Join(Err, err)
message, err = json.Marshal(data.message)
Err = errors.Join(Err, err)
reason, err = json.Marshal(data.unhealthyReason)
Err = errors.Join(Err, err)
ver, err = json.Marshal(data.extra.ver)
Err = errors.Join(Err, err)
isSet, err = json.Marshal(data.extra.seqNo.IsSet())
Err = errors.Join(Err, err)
seqNo, err = json.Marshal(data.extra.seqNo)
Err = errors.Join(Err, err)
if data.extra.meta != nil {
meta, err = json.Marshal(data.extra.meta)
Err = errors.Join(Err, err)
}
if data.extra.components != nil {
components, err = json.Marshal(data.extra.components)
Err = errors.Join(Err, err)
}
if Err != nil {
return nil, Err
}
return map[string]json.RawMessage{
"Now": tsNow,
"TS": ts,
"Status": status,
"Message": message,
"UnhealthyReason": reason,
"Ver": ver,
"Meta": meta,
"Components": components,
"SeqNoSet": isSet,
"SeqNo": seqNo,
}, nil
}