in internal/pkg/checkin/bulk.go [163:291]
func (bc *Bulk) flush(ctx context.Context) error {
start := time.Now()
bc.mut.Lock()
pending := bc.pending
bc.pending = make(map[string]pendingT, len(pending))
bc.mut.Unlock()
if len(pending) == 0 {
return nil
}
updates := make([]bulk.MultiOp, 0, len(pending))
simpleCache := make(map[pendingT][]byte)
nowTimestamp := start.UTC().Format(time.RFC3339)
var err error
var needRefresh bool
for id, pendingData := range pending {
// In the simple case, there are no fields and no seqNo.
// When that is true, we can reuse an already generated
// JSON body containing just the timestamp updates.
var body []byte
if pendingData.extra == nil {
var ok bool
body, ok = simpleCache[pendingData]
if !ok {
fields := bulk.UpdateFields{
dl.FieldLastCheckin: pendingData.ts,
dl.FieldUpdatedAt: nowTimestamp,
dl.FieldLastCheckinStatus: pendingData.status,
dl.FieldLastCheckinMessage: pendingData.message,
dl.FieldUnhealthyReason: pendingData.unhealthyReason,
}
if body, err = fields.Marshal(); err != nil {
return err
}
simpleCache[pendingData] = body
}
} else if pendingData.extra.deleteAudit {
// Use a script instead of a partial doc to update if attributes need to be removed
params, err := encodeParams(nowTimestamp, pendingData)
if err != nil {
return err
}
action := &estypes.UpdateAction{
Script: &estypes.Script{
Lang: &scriptlanguage.Painless,
Source: &deleteAuditAttributesScript,
Options: map[string]string{},
Params: params,
},
}
body, err = json.Marshal(&action)
if err != nil {
return fmt.Errorf("could not marshall script action: %w", err)
}
if pendingData.extra.seqNo.IsSet() {
needRefresh = true
}
} else {
fields := bulk.UpdateFields{
dl.FieldLastCheckin: pendingData.ts, // Set the checkin timestamp
dl.FieldUpdatedAt: nowTimestamp, // Set "updated_at" to the current timestamp
dl.FieldLastCheckinStatus: pendingData.status, // Set the pending status
dl.FieldLastCheckinMessage: pendingData.message, // Set the status message
dl.FieldUnhealthyReason: pendingData.unhealthyReason,
}
// If the agent version is not empty it needs to be updated
// Assuming the agent can by upgraded keeping the same id, but incrementing the version
if pendingData.extra.ver != "" {
fields[dl.FieldAgent] = map[string]interface{}{
dl.FieldAgentVersion: pendingData.extra.ver,
}
}
// Update local metadata if provided
if pendingData.extra.meta != nil {
// Surprise: The json encodeer compacts this raw JSON during
// the encode process, so there my be unexpected memory overhead:
// https://github.com/golang/go/blob/go1.16.3/src/encoding/json/encode.go#L499
fields[dl.FieldLocalMetadata] = json.RawMessage(pendingData.extra.meta)
}
// Update components if provided
if pendingData.extra.components != nil {
fields[dl.FieldComponents] = json.RawMessage(pendingData.extra.components)
}
// If seqNo changed, set the field appropriately
if pendingData.extra.seqNo.IsSet() {
fields[dl.FieldActionSeqNo] = pendingData.extra.seqNo
// Only refresh if seqNo changed; dropping metadata not important.
needRefresh = true
}
if body, err = fields.Marshal(); err != nil {
return err
}
}
updates = append(updates, bulk.MultiOp{
ID: id,
Body: body,
Index: dl.FleetAgents,
})
}
var opts []bulk.Opt
if needRefresh {
opts = append(opts, bulk.WithRefresh())
}
_, err = bc.bulker.MUpdate(ctx, updates, opts...)
zerolog.Ctx(ctx).Trace().
Err(err).
Dur("rtt", time.Since(start)).
Int("cnt", len(updates)).
Bool("refresh", needRefresh).
Msg("Flush updates")
return err
}