internal/beater/auth/apikey.go (145 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package auth import ( "bytes" "context" "encoding/base64" "errors" "fmt" "net/http" "time" "github.com/cespare/xxhash/v2" "github.com/elastic/go-freelru" es "github.com/elastic/apm-server/internal/elasticsearch" ) const ( // Application is a constant mapped to the "application" field for the Elasticsearch security API // This identifies privileges and keys created for APM Application es.AppName = "apm" // ResourceInternal is only valid for first authorization of a request. // The API Key needs to grant privileges to additional resources for successful processing of requests. ResourceInternal = es.Resource("-") ) var ( // PrivilegeAgentConfigRead identifies the Elasticsearch API Key privilege // required for authorizing agent config queries. PrivilegeAgentConfigRead = es.NewPrivilege("agentConfig", "config_agent:read") // PrivilegeEventWrite identifies the Elasticsearch API Key privilege required // for authorizing event ingestion. PrivilegeEventWrite = es.NewPrivilege("event", "event:write") // PrivilegeSourcemapWrite identifies the Elasticsearch API Key privilege // required for authorizing source map uploads. PrivilegeSourcemapWrite = es.NewPrivilege("sourcemap", "sourcemap:write") ) // AllPrivilegeActions returns all Elasticsearch privilege actions used by APM Server. func AllPrivilegeActions() []es.PrivilegeAction { return []es.PrivilegeAction{ PrivilegeAgentConfigRead.Action, PrivilegeEventWrite.Action, PrivilegeSourcemapWrite.Action, } } type apikeyAuth struct { esClient *es.Client cache *privilegesCache } type apikeyAuthorizer struct { permissions es.Permissions } func newApikeyAuth(client *es.Client, cache *privilegesCache) *apikeyAuth { return &apikeyAuth{client, cache} } func (a *apikeyAuth) authenticate(ctx context.Context, credentials string) (*APIKeyAuthenticationDetails, *apikeyAuthorizer, error) { decoded, err := base64.StdEncoding.DecodeString(credentials) if err != nil { return nil, nil, fmt.Errorf("%w: improperly encoded ApiKey credentials: expected base64(ID:APIKey): %s", ErrAuthFailed, err) } colon := bytes.IndexByte(decoded, ':') if colon == -1 { return nil, nil, fmt.Errorf("%w: improperly formatted ApiKey credentials: expected base64(ID:APIKey)", ErrAuthFailed) } id := string(decoded[:colon]) // Check that the user has any privileges for the internal resource. response, err := a.hasPrivileges(ctx, id, credentials, ResourceInternal) if err != nil { return nil, nil, err } permissions := response.Application[Application][ResourceInternal] haveAny := false for _, havePermission := range permissions { if havePermission { haveAny = true break } } if !haveAny { return nil, nil, ErrAuthFailed } details := &APIKeyAuthenticationDetails{ID: id, Username: response.Username} return details, &apikeyAuthorizer{permissions}, nil } func (a *apikeyAuth) hasPrivileges(ctx context.Context, id, credentials string, resource es.Resource) (*es.HasPrivilegesResponse, error) { cacheKey := id + "_" + string(resource) if response, ok := a.cache.get(cacheKey); ok { if response == nil { return nil, ErrAuthFailed } return response, nil } if a.cache.isFull() { return nil, errors.New( "api_key limit reached, check your logs for failed authorization attempts " + "or consider increasing config option `apm-server.api_key.limit`", ) } request := es.HasPrivilegesRequest{ Applications: []es.Application{{ Name: Application, // it is important to query all privilege actions because they are cached by api key+resources // querying a.anyOfPrivileges would result in an incomplete cache entry Privileges: AllPrivilegeActions(), Resources: []es.Resource{resource}, }}, } info, err := es.HasPrivileges(ctx, a.esClient, request, credentials) if err != nil { var eserr *es.Error if errors.As(err, &eserr) && eserr.StatusCode == http.StatusUnauthorized { // Cache authorization failures to avoid hitting Elasticsearch every time. a.cache.add(cacheKey, nil) return nil, ErrAuthFailed } return nil, err } a.cache.add(cacheKey, &info) return &info, nil } // Authorize checks if the configured API Key is authorized for the given action and resource. // // An API Key is considered to be authorized when the API Key has the configured privileges // for the requested resource. Permissions are fetched from Elasticsearch and then cached in // a global cache. func (a *apikeyAuthorizer) Authorize(ctx context.Context, action Action, _ Resource) error { // TODO if resource is non-zero, map to different application resources in the privilege queries. // // For now, having any valid "apm" application API Key grants access to any agent and service. // In the future, it should be possible to have API Keys that can be restricted to a set of agent // and service names. var apikeyPrivilegeAction es.PrivilegeAction switch action { case ActionAgentConfig: apikeyPrivilegeAction = PrivilegeAgentConfigRead.Action case ActionEventIngest: apikeyPrivilegeAction = PrivilegeEventWrite.Action case ActionSourcemapUpload: apikeyPrivilegeAction = PrivilegeSourcemapWrite.Action default: return fmt.Errorf("unknown action %q", action) } if a.permissions[apikeyPrivilegeAction] { return nil } return fmt.Errorf("%w: API Key not permitted action %q", ErrUnauthorized, apikeyPrivilegeAction) } type privilegesCache struct { cache *freelru.ShardedLRU[string, *es.HasPrivilegesResponse] size int } func hashStringXXHASH(s string) uint32 { return uint32(xxhash.Sum64String(s)) } func newPrivilegesCache(expiration time.Duration, size int) (*privilegesCache, error) { cacheSize := size if cacheSize < 1 { cacheSize = 8192 } lru, err := freelru.NewSharded[string, *es.HasPrivilegesResponse](uint32(cacheSize), hashStringXXHASH) if err != nil { return nil, err } lru.SetLifetime(expiration) return &privilegesCache{cache: lru, size: size}, nil } func (c *privilegesCache) isFull() bool { return c.cache.Len() >= c.size } func (c *privilegesCache) get(id string) (*es.HasPrivilegesResponse, bool) { return c.cache.Get(id) } func (c *privilegesCache) add(id string, privileges *es.HasPrivilegesResponse) { c.cache.Add(id, privileges) }