func()

in internal/pkg/checkin/bulk.go [163:291]


func (bc *Bulk) flush(ctx context.Context) error {
	start := time.Now()

	bc.mut.Lock()
	pending := bc.pending
	bc.pending = make(map[string]pendingT, len(pending))
	bc.mut.Unlock()

	if len(pending) == 0 {
		return nil
	}

	updates := make([]bulk.MultiOp, 0, len(pending))

	simpleCache := make(map[pendingT][]byte)

	nowTimestamp := start.UTC().Format(time.RFC3339)

	var err error
	var needRefresh bool
	for id, pendingData := range pending {

		// In the simple case, there are no fields and no seqNo.
		// When that is true, we can reuse an already generated
		// JSON body containing just the timestamp updates.
		var body []byte
		if pendingData.extra == nil {
			var ok bool
			body, ok = simpleCache[pendingData]
			if !ok {
				fields := bulk.UpdateFields{
					dl.FieldLastCheckin:        pendingData.ts,
					dl.FieldUpdatedAt:          nowTimestamp,
					dl.FieldLastCheckinStatus:  pendingData.status,
					dl.FieldLastCheckinMessage: pendingData.message,
					dl.FieldUnhealthyReason:    pendingData.unhealthyReason,
				}
				if body, err = fields.Marshal(); err != nil {
					return err
				}
				simpleCache[pendingData] = body
			}
		} else if pendingData.extra.deleteAudit {
			// Use a script instead of a partial doc to update if attributes need to be removed
			params, err := encodeParams(nowTimestamp, pendingData)
			if err != nil {
				return err
			}
			action := &estypes.UpdateAction{
				Script: &estypes.Script{
					Lang:    &scriptlanguage.Painless,
					Source:  &deleteAuditAttributesScript,
					Options: map[string]string{},
					Params:  params,
				},
			}
			body, err = json.Marshal(&action)
			if err != nil {
				return fmt.Errorf("could not marshall script action: %w", err)
			}
			if pendingData.extra.seqNo.IsSet() {
				needRefresh = true
			}
		} else {
			fields := bulk.UpdateFields{
				dl.FieldLastCheckin:        pendingData.ts,      // Set the checkin timestamp
				dl.FieldUpdatedAt:          nowTimestamp,        // Set "updated_at" to the current timestamp
				dl.FieldLastCheckinStatus:  pendingData.status,  // Set the pending status
				dl.FieldLastCheckinMessage: pendingData.message, // Set the status message
				dl.FieldUnhealthyReason:    pendingData.unhealthyReason,
			}

			// If the agent version is not empty it needs to be updated
			// Assuming the agent can by upgraded keeping the same id, but incrementing the version
			if pendingData.extra.ver != "" {
				fields[dl.FieldAgent] = map[string]interface{}{
					dl.FieldAgentVersion: pendingData.extra.ver,
				}
			}

			// Update local metadata if provided
			if pendingData.extra.meta != nil {
				// Surprise: The json encodeer compacts this raw JSON during
				// the encode process, so there my be unexpected memory overhead:
				// https://github.com/golang/go/blob/go1.16.3/src/encoding/json/encode.go#L499
				fields[dl.FieldLocalMetadata] = json.RawMessage(pendingData.extra.meta)
			}

			// Update components if provided
			if pendingData.extra.components != nil {
				fields[dl.FieldComponents] = json.RawMessage(pendingData.extra.components)
			}

			// If seqNo changed, set the field appropriately
			if pendingData.extra.seqNo.IsSet() {
				fields[dl.FieldActionSeqNo] = pendingData.extra.seqNo

				// Only refresh if seqNo changed; dropping metadata not important.
				needRefresh = true
			}

			if body, err = fields.Marshal(); err != nil {
				return err
			}
		}

		updates = append(updates, bulk.MultiOp{
			ID:    id,
			Body:  body,
			Index: dl.FleetAgents,
		})
	}

	var opts []bulk.Opt
	if needRefresh {
		opts = append(opts, bulk.WithRefresh())
	}

	_, err = bc.bulker.MUpdate(ctx, updates, opts...)

	zerolog.Ctx(ctx).Trace().
		Err(err).
		Dur("rtt", time.Since(start)).
		Int("cnt", len(updates)).
		Bool("refresh", needRefresh).
		Msg("Flush updates")

	return err
}