in internal/pkg/bulk/opApiKey.go [112:267]
func (b *Bulker) flushUpdateAPIKey(ctx context.Context, queue queueT) error {
idsPerRole := make(map[string][]string)
roles := make(map[string]json.RawMessage)
rolePerID := make(map[string]string)
responses := make(map[int]int)
idxToID := make(map[int32]string)
IDToResponse := make(map[string]int)
maxKeySize := 0
links := []apm.SpanLink{}
// merge ids
for n := queue.head; n != nil; n = n.next {
content := n.buf.Bytes()
metaMap := make(map[string]interface{})
dec := json.NewDecoder(bytes.NewReader(content))
if err := dec.Decode(&metaMap); err != nil {
zerolog.Ctx(ctx).Error().
Err(err).
Str("mod", kModBulk).
Msg("Failed to unmarshal api key update meta map")
return err
}
var req *apiKeyUpdateRequest
if err := dec.Decode(&req); err != nil {
zerolog.Ctx(ctx).Error().
Err(err).
Str("mod", kModBulk).
Str("request", string(content)).
Msg("Failed to unmarshal api key update request")
return err
}
if _, tracked := roles[req.RolesHash]; !tracked {
roles[req.RolesHash] = req.Roles
}
// last one wins, it may be policy change and ack are in the same queue
rolePerID[req.ID] = req.RolesHash
idxToID[n.idx] = req.ID
if maxKeySize < len(req.ID) {
maxKeySize = len(req.ID)
}
if n.spanLink != nil {
links = append(links, *n.spanLink)
}
}
if len(links) == 0 {
links = nil
}
span, ctx := apm.StartSpanOptions(ctx, "Flush: apiKeyUpdate", "apiKeyUpdate", apm.SpanOptions{
Links: links,
})
defer span.End()
for id, roleHash := range rolePerID {
delete(rolePerID, id)
idsPerRole[roleHash] = append(idsPerRole[roleHash], id)
}
responseIdx := 0
for hash, role := range roles {
idsPerBatch := b.getIDsCountPerBatch(len(role), maxKeySize)
ids := idsPerRole[hash]
if idsPerBatch <= 0 {
zerolog.Ctx(ctx).Error().Str("error.message", "request too large").Msg("No API Key ID could fit request size for bulk update")
zerolog.Ctx(ctx).Debug().
RawJSON("role", role).
Strs("ids", ids).
Msg("IDs could not fit into a message")
// idsPerRole for specific role no longer needed
delete(idsPerRole, hash)
continue
}
batches := int(math.Ceil(float64(len(ids)) / float64(idsPerBatch)))
// batch ids into batches of meaningful size
for batch := 0; batch < batches; batch++ {
// guard against indexing out of range
to := (batch + 1) * idsPerBatch
if to > len(ids) {
to = len(ids)
}
// handle ids in batch, we put them into single request
// and assign response index to the id so we can notify caller
idsInBatch := ids[batch*idsPerBatch : to]
bulkReq := &esAPIKeyBulkUpdateRequest{
IDs: idsInBatch,
Roles: role,
}
delete(roles, hash)
payload, err := json.Marshal(bulkReq)
if err != nil {
return err
}
req := &esapi.SecurityBulkUpdateAPIKeysRequest{
Body: bytes.NewReader(payload),
}
res, err := req.Do(ctx, b.es)
if err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("Error sending bulk API Key update request to Elasticsearch")
return err
}
if res.Body != nil {
defer res.Body.Close()
}
if res.IsError() {
zerolog.Ctx(ctx).Error().Str("error.message", res.String()).Msg("Error in bulk API Key update result to Elasticsearch")
return parseError(res, zerolog.Ctx(ctx))
}
zerolog.Ctx(ctx).Debug().Strs("IDs", bulkReq.IDs).RawJSON("role", role).Msg("API Keys updated.")
responses[responseIdx] = res.StatusCode
for _, id := range idsInBatch {
IDToResponse[id] = responseIdx
}
responseIdx++
}
// idsPerRole for specific role no longer needed
delete(idsPerRole, hash)
}
// WARNING: Once we start pushing items to
// the queue, the node pointers are invalid.
// Do NOT return a non-nil value or failQueue
// up the stack will fail.
for n := queue.head; n != nil; n = n.next {
// 'n' is invalid immediately on channel send
responseIdx := IDToResponse[idxToID[n.idx]]
res := responses[responseIdx]
select {
case n.ch <- respT{
err: nil,
idx: n.idx,
data: &BulkIndexerResponseItem{
DocumentID: "",
Status: res,
},
}:
default:
panic("Unexpected blocked response channel on flushRead")
}
}
return nil
}