func()

in internal/pkg/bulk/opApiKey.go [112:267]


func (b *Bulker) flushUpdateAPIKey(ctx context.Context, queue queueT) error {
	idsPerRole := make(map[string][]string)
	roles := make(map[string]json.RawMessage)
	rolePerID := make(map[string]string)
	responses := make(map[int]int)
	idxToID := make(map[int32]string)
	IDToResponse := make(map[string]int)
	maxKeySize := 0
	links := []apm.SpanLink{}

	// merge ids
	for n := queue.head; n != nil; n = n.next {
		content := n.buf.Bytes()
		metaMap := make(map[string]interface{})
		dec := json.NewDecoder(bytes.NewReader(content))
		if err := dec.Decode(&metaMap); err != nil {
			zerolog.Ctx(ctx).Error().
				Err(err).
				Str("mod", kModBulk).
				Msg("Failed to unmarshal api key update meta map")
			return err
		}

		var req *apiKeyUpdateRequest
		if err := dec.Decode(&req); err != nil {
			zerolog.Ctx(ctx).Error().
				Err(err).
				Str("mod", kModBulk).
				Str("request", string(content)).
				Msg("Failed to unmarshal api key update request")
			return err
		}

		if _, tracked := roles[req.RolesHash]; !tracked {
			roles[req.RolesHash] = req.Roles
		}

		// last one wins, it may be policy change and ack are in the same queue
		rolePerID[req.ID] = req.RolesHash
		idxToID[n.idx] = req.ID
		if maxKeySize < len(req.ID) {
			maxKeySize = len(req.ID)
		}
		if n.spanLink != nil {
			links = append(links, *n.spanLink)
		}
	}

	if len(links) == 0 {
		links = nil
	}
	span, ctx := apm.StartSpanOptions(ctx, "Flush: apiKeyUpdate", "apiKeyUpdate", apm.SpanOptions{
		Links: links,
	})
	defer span.End()

	for id, roleHash := range rolePerID {
		delete(rolePerID, id)
		idsPerRole[roleHash] = append(idsPerRole[roleHash], id)

	}

	responseIdx := 0
	for hash, role := range roles {
		idsPerBatch := b.getIDsCountPerBatch(len(role), maxKeySize)
		ids := idsPerRole[hash]
		if idsPerBatch <= 0 {
			zerolog.Ctx(ctx).Error().Str("error.message", "request too large").Msg("No API Key ID could fit request size for bulk update")
			zerolog.Ctx(ctx).Debug().
				RawJSON("role", role).
				Strs("ids", ids).
				Msg("IDs could not fit into a message")

			// idsPerRole for specific role no longer needed
			delete(idsPerRole, hash)
			continue
		}

		batches := int(math.Ceil(float64(len(ids)) / float64(idsPerBatch)))

		// batch ids into batches of meaningful size
		for batch := 0; batch < batches; batch++ {
			// guard against indexing out of range
			to := (batch + 1) * idsPerBatch
			if to > len(ids) {
				to = len(ids)
			}

			// handle ids in batch, we put them into single request
			// and assign response index to the id so we can notify caller
			idsInBatch := ids[batch*idsPerBatch : to]
			bulkReq := &esAPIKeyBulkUpdateRequest{
				IDs:   idsInBatch,
				Roles: role,
			}
			delete(roles, hash)

			payload, err := json.Marshal(bulkReq)
			if err != nil {
				return err
			}

			req := &esapi.SecurityBulkUpdateAPIKeysRequest{
				Body: bytes.NewReader(payload),
			}

			res, err := req.Do(ctx, b.es)
			if err != nil {
				zerolog.Ctx(ctx).Error().Err(err).Msg("Error sending bulk API Key update request to Elasticsearch")
				return err
			}
			if res.Body != nil {
				defer res.Body.Close()
			}
			if res.IsError() {
				zerolog.Ctx(ctx).Error().Str("error.message", res.String()).Msg("Error in bulk API Key update result to Elasticsearch")
				return parseError(res, zerolog.Ctx(ctx))
			}

			zerolog.Ctx(ctx).Debug().Strs("IDs", bulkReq.IDs).RawJSON("role", role).Msg("API Keys updated.")

			responses[responseIdx] = res.StatusCode
			for _, id := range idsInBatch {
				IDToResponse[id] = responseIdx
			}
			responseIdx++
		}

		// idsPerRole for specific role no longer needed
		delete(idsPerRole, hash)
	}

	// WARNING: Once we start pushing items to
	// the queue, the node pointers are invalid.
	// Do NOT return a non-nil value or failQueue
	// up the stack will fail.

	for n := queue.head; n != nil; n = n.next {
		// 'n' is invalid immediately on channel send
		responseIdx := IDToResponse[idxToID[n.idx]]
		res := responses[responseIdx]
		select {
		case n.ch <- respT{
			err: nil,
			idx: n.idx,
			data: &BulkIndexerResponseItem{
				DocumentID: "",
				Status:     res,
			},
		}:
		default:
			panic("Unexpected blocked response channel on flushRead")
		}
	}
	return nil
}