internal/pkg/bulk/opApiKey.go (211 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package bulk import ( "bytes" "context" "encoding/json" "math" "github.com/elastic/fleet-server/v7/internal/pkg/apikey" "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/rs/zerolog" "go.elastic.co/apm/v2" ) const ( envelopeSize = 64 // 64B safeBuffer = 0.9 ) // The ApiKey API's are not yet bulk enabled. Stub the calls in the bulker // and limit parallel access to prevent many requests from overloading // the connection pool in the elastic search client. type apiKeyUpdateRequest struct { ID string `json:"id,omitempty"` Roles json.RawMessage `json:"role_descriptors,omitempty"` RolesHash string `json:"role_hash,omitempty"` } type esAPIKeyBulkUpdateRequest struct { IDs []string `json:"ids,omitempty"` Roles json.RawMessage `json:"role_descriptors,omitempty"` } func (b *Bulker) APIKeyAuth(ctx context.Context, key APIKey) (*SecurityInfo, error) { span, ctx := apm.StartSpan(ctx, "authAPIKey", "auth") defer span.End() if err := b.apikeyLimit.Acquire(ctx, 1); err != nil { return nil, err } defer b.apikeyLimit.Release(1) for _, pt := range b.opts.policyTokens { if pt.TokenKey == key.Key { return &SecurityInfo{Enabled: true}, nil } } return key.Authenticate(ctx, b.Client()) } func (b *Bulker) APIKeyCreate(ctx context.Context, name, ttl string, roles []byte, meta interface{}) (*APIKey, error) { span, ctx := apm.StartSpan(ctx, "createAPIKey", "auth") defer span.End() if err := b.apikeyLimit.Acquire(ctx, 1); err != nil { return nil, err } defer b.apikeyLimit.Release(1) return apikey.Create(ctx, b.Client(), name, ttl, "false", roles, meta) } func (b *Bulker) APIKeyRead(ctx context.Context, id string, withOwner bool) (*APIKeyMetadata, error) { span, ctx := apm.StartSpan(ctx, "readAPIKey", "auth") defer span.End() if err := b.apikeyLimit.Acquire(ctx, 1); err != nil { return nil, err } defer b.apikeyLimit.Release(1) return apikey.Read(ctx, b.Client(), id, withOwner) } func (b *Bulker) APIKeyInvalidate(ctx context.Context, ids ...string) error { span, ctx := apm.StartSpan(ctx, "invalidateAPIKey", "auth") defer span.End() if err := b.apikeyLimit.Acquire(ctx, 1); err != nil { return err } defer b.apikeyLimit.Release(1) return apikey.Invalidate(ctx, b.Client(), ids...) } func (b *Bulker) APIKeyUpdate(ctx context.Context, id, outputPolicyHash string, roles []byte) error { span, ctx := apm.StartSpan(ctx, "updateAPIKey", "auth") // NOTE: this is tracked as updateAPIKey/auth instead of update_api_key/bulker to be consistent with other auth actions that don't use a queue. span.Context.SetLabel("api_key_id", id) defer span.End() req := &apiKeyUpdateRequest{ ID: id, Roles: roles, RolesHash: outputPolicyHash, } body, err := json.Marshal(req) if err != nil { return err } _, err = b.waitBulkAction(ctx, ActionUpdateAPIKey, "", id, body) return err } // flushUpdateAPIKey takes an update API Key queue and groups request based on roles applied // It needs to group agent IDs per Role Hash in order to produce more efficient request containing a list of IDs for a change(update) // One thing to have in mind is that in a single queue there may be change and ack request with roles. in this case // Later occurrence wins overwriting policy change to reduced set of permissions. // Even if the order was incorrect we end up with just a bit broader permission set, never too strict, so agent does not // end up with fewer permissions than it needs func (b *Bulker) flushUpdateAPIKey(ctx context.Context, queue queueT) error { idsPerRole := make(map[string][]string) roles := make(map[string]json.RawMessage) rolePerID := make(map[string]string) responses := make(map[int]int) idxToID := make(map[int32]string) IDToResponse := make(map[string]int) maxKeySize := 0 links := []apm.SpanLink{} // merge ids for n := queue.head; n != nil; n = n.next { content := n.buf.Bytes() metaMap := make(map[string]interface{}) dec := json.NewDecoder(bytes.NewReader(content)) if err := dec.Decode(&metaMap); err != nil { zerolog.Ctx(ctx).Error(). Err(err). Str("mod", kModBulk). Msg("Failed to unmarshal api key update meta map") return err } var req *apiKeyUpdateRequest if err := dec.Decode(&req); err != nil { zerolog.Ctx(ctx).Error(). Err(err). Str("mod", kModBulk). Str("request", string(content)). Msg("Failed to unmarshal api key update request") return err } if _, tracked := roles[req.RolesHash]; !tracked { roles[req.RolesHash] = req.Roles } // last one wins, it may be policy change and ack are in the same queue rolePerID[req.ID] = req.RolesHash idxToID[n.idx] = req.ID if maxKeySize < len(req.ID) { maxKeySize = len(req.ID) } if n.spanLink != nil { links = append(links, *n.spanLink) } } if len(links) == 0 { links = nil } span, ctx := apm.StartSpanOptions(ctx, "Flush: apiKeyUpdate", "apiKeyUpdate", apm.SpanOptions{ Links: links, }) defer span.End() for id, roleHash := range rolePerID { delete(rolePerID, id) idsPerRole[roleHash] = append(idsPerRole[roleHash], id) } responseIdx := 0 for hash, role := range roles { idsPerBatch := b.getIDsCountPerBatch(len(role), maxKeySize) ids := idsPerRole[hash] if idsPerBatch <= 0 { zerolog.Ctx(ctx).Error().Str("error.message", "request too large").Msg("No API Key ID could fit request size for bulk update") zerolog.Ctx(ctx).Debug(). RawJSON("role", role). Strs("ids", ids). Msg("IDs could not fit into a message") // idsPerRole for specific role no longer needed delete(idsPerRole, hash) continue } batches := int(math.Ceil(float64(len(ids)) / float64(idsPerBatch))) // batch ids into batches of meaningful size for batch := 0; batch < batches; batch++ { // guard against indexing out of range to := (batch + 1) * idsPerBatch if to > len(ids) { to = len(ids) } // handle ids in batch, we put them into single request // and assign response index to the id so we can notify caller idsInBatch := ids[batch*idsPerBatch : to] bulkReq := &esAPIKeyBulkUpdateRequest{ IDs: idsInBatch, Roles: role, } delete(roles, hash) payload, err := json.Marshal(bulkReq) if err != nil { return err } req := &esapi.SecurityBulkUpdateAPIKeysRequest{ Body: bytes.NewReader(payload), } res, err := req.Do(ctx, b.es) if err != nil { zerolog.Ctx(ctx).Error().Err(err).Msg("Error sending bulk API Key update request to Elasticsearch") return err } if res.Body != nil { defer res.Body.Close() } if res.IsError() { zerolog.Ctx(ctx).Error().Str("error.message", res.String()).Msg("Error in bulk API Key update result to Elasticsearch") return parseError(res, zerolog.Ctx(ctx)) } zerolog.Ctx(ctx).Debug().Strs("IDs", bulkReq.IDs).RawJSON("role", role).Msg("API Keys updated.") responses[responseIdx] = res.StatusCode for _, id := range idsInBatch { IDToResponse[id] = responseIdx } responseIdx++ } // idsPerRole for specific role no longer needed delete(idsPerRole, hash) } // WARNING: Once we start pushing items to // the queue, the node pointers are invalid. // Do NOT return a non-nil value or failQueue // up the stack will fail. for n := queue.head; n != nil; n = n.next { // 'n' is invalid immediately on channel send responseIdx := IDToResponse[idxToID[n.idx]] res := responses[responseIdx] select { case n.ch <- respT{ err: nil, idx: n.idx, data: &BulkIndexerResponseItem{ DocumentID: "", Status: res, }, }: default: panic("Unexpected blocked response channel on flushRead") } } return nil } func (b *Bulker) getIDsCountPerBatch(roleSize, maxKeySize int) int { spareSpace := b.opts.apikeyMaxReqSize - roleSize - envelopeSize if spareSpace > maxKeySize { return int(float64(spareSpace) * safeBuffer / float64(maxKeySize)) } return 0 }