internal/pkg/policy/policy_output.go (401 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package policy import ( "context" "encoding/json" "errors" "fmt" "strings" "time" "github.com/rs/zerolog" "go.elastic.co/apm/v2" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/fleet-server/v7/internal/pkg/apikey" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/smap" ) const ( OutputTypeElasticsearch = "elasticsearch" OutputTypeRemoteElasticsearch = "remote_elasticsearch" OutputTypeLogstash = "logstash" OutputTypeKafka = "kafka" ) var ( ErrNoOutputPerms = errors.New("output permission sections not found") ErrFailInjectAPIKey = errors.New("fail inject api key") ) type Output struct { Name string Type string ServiceToken string Role *RoleT } // Prepare prepares the output p to be sent to the elastic-agent // The agent might be mutated for an elasticsearch output func (p *Output) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, agent *model.Agent, outputMap map[string]map[string]interface{}) error { span, ctx := apm.StartSpan(ctx, "prepareOutput", "process") defer span.End() span.Context.SetLabel("output_type", p.Type) zlog = zlog.With(). Str(logger.AgentID, agent.Id). Str(logger.PolicyOutputName, p.Name).Logger() switch p.Type { case OutputTypeElasticsearch: zlog.Debug().Msg("preparing elasticsearch output") if err := p.prepareElasticsearch(ctx, zlog, bulker, bulker, agent, outputMap, false); err != nil { return fmt.Errorf("failed to prepare elasticsearch output %q: %w", p.Name, err) } case OutputTypeRemoteElasticsearch: zlog.Debug().Msg("preparing remote elasticsearch output") newBulker, hasConfigChanged, err := bulker.CreateAndGetBulker(ctx, zlog, p.Name, outputMap) if err != nil { return err } // the outputBulker is different for remote ES, it is used to create/update Api keys in the remote ES client if err := p.prepareElasticsearch(ctx, zlog, bulker, newBulker, agent, outputMap, hasConfigChanged); err != nil { return fmt.Errorf("failed to prepare remote elasticsearch output %q: %w", p.Name, err) } case OutputTypeLogstash: zlog.Debug().Msg("preparing logstash output") zlog.Info().Msg("no actions required for logstash output preparation") case OutputTypeKafka: zlog.Debug().Msg("preparing kafka output") zlog.Info().Msg("no actions required for kafka output preparation") default: zlog.Error().Msgf("unknown output type: %s; skipping preparation", p.Type) return fmt.Errorf("encountered unexpected output type while preparing outputs: %s", p.Type) } return nil } func (p *Output) prepareElasticsearch( ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, outputBulker bulk.Bulk, agent *model.Agent, outputMap map[string]map[string]interface{}, hasConfigChanged bool) error { // The role is required to do api key management if p.Role == nil { zlog.Error(). Msg("policy does not contain required output permission section") return ErrNoOutputPerms } if _, ok := outputMap[p.Name]; !ok { zlog.Error().Err(ErrFailInjectAPIKey).Msg("Unable to find output in map") return ErrFailInjectAPIKey } output, foundOutput := agent.Outputs[p.Name] if !foundOutput { if agent.Outputs == nil { agent.Outputs = map[string]*model.PolicyOutput{} } zlog.Debug().Msgf("creating agent.Outputs[%s]", p.Name) output = &model.PolicyOutput{} agent.Outputs[p.Name] = output } // retire api key of removed remote output var toRetireAPIKeys *model.ToRetireAPIKeyIdsItems var removedOutputName string // find the first output that is removed - supposing one output can be removed at a time for agentOutputName, agentOutput := range agent.Outputs { found := false for outputMapKey := range outputMap { if agentOutputName == outputMapKey { found = true break } } if !found { zlog.Info().Str(logger.APIKeyID, agentOutput.APIKeyID).Str(logger.PolicyOutputName, agentOutputName).Msg("Output removed, will retire API key") toRetireAPIKeys = &model.ToRetireAPIKeyIdsItems{ ID: agentOutput.APIKeyID, RetiredAt: time.Now().UTC().Format(time.RFC3339), Output: agentOutputName, } removedOutputName = agentOutputName break } } if toRetireAPIKeys != nil { // adding remote API key to new output toRetireAPIKeys fields := map[string]interface{}{ dl.FieldPolicyOutputToRetireAPIKeyIDs: *toRetireAPIKeys, } // Using painless script to append the old keys to the history body, err := renderUpdatePainlessScript(p.Name, fields) if err != nil { return fmt.Errorf("could not update painless script: %w", err) } if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { zlog.Error().Err(err).Msg("fail update agent record") return fmt.Errorf("fail update agent record: %w", err) } // remove output from agent doc body, err = json.Marshal(map[string]interface{}{ "script": map[string]interface{}{ "lang": "painless", "source": fmt.Sprintf("ctx._source['outputs'].remove(\"%s\")", removedOutputName), }, }) if err != nil { return fmt.Errorf("could not create request body to update agent: %w", err) } if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { zlog.Error().Err(err).Msg("fail update agent record") return fmt.Errorf("fail update agent record: %w", err) } } // Determine whether we need to generate an output ApiKey. // This is accomplished by comparing the sha2 hash stored in the corresponding // output in the agent record with the precalculated sha2 hash of the role. // Note: This will need to be updated when doing multi-cluster elasticsearch support // Currently, we assume all ES outputs are the same ES fleet-server is connected to. needNewKey := false needUpdateKey := false switch { case output.APIKey == "": zlog.Debug().Msg("must generate api key as default API key is not present") needNewKey = true case hasConfigChanged: zlog.Debug().Msg("must generate api key as remote output config changed") needNewKey = true case p.Role.Sha2 != output.PermissionsHash: // the is actually the OutputPermissionsHash for the default hash. The Agent // document on ES does not have OutputPermissionsHash for any other output // besides the default one. It seems to me error-prone to rely on the default // output permissions hash to generate new API keys for other outputs. zlog.Debug().Msg("must update api key as policy output permissions changed") needUpdateKey = true default: zlog.Debug().Msg("policy output permissions are the same") } if needUpdateKey { zlog.Debug(). RawJSON("roles", p.Role.Raw). Str("oldHash", output.PermissionsHash). Str("newHash", p.Role.Sha2). Msg("Generating a new API key") // query current api key for roles so we don't lose permissions in the meantime currentRoles, err := fetchAPIKeyRoles(ctx, outputBulker, output.APIKeyID) if err != nil { zlog.Error(). Str("apiKeyID", output.APIKeyID). Err(err).Msg("fail fetching roles for key") return err } // merge roles with p.Role newRoles, err := mergeRoles(zlog, currentRoles, p.Role) if err != nil { zlog.Error(). Str("apiKeyID", output.APIKeyID). Err(err).Msg("fail merging roles for key") return err } // hash provided is only for merging request together and not persisted err = outputBulker.APIKeyUpdate(ctx, output.APIKeyID, newRoles.Sha2, newRoles.Raw) if err != nil { zlog.Error().Err(err).Msg("fail generate output key") zlog.Debug().RawJSON("roles", newRoles.Raw).Str("sha", newRoles.Sha2).Err(err).Msg("roles not updated") return err } output.PermissionsHash = p.Role.Sha2 // for the sake of consistency zlog.Debug(). Str("hash.sha256", p.Role.Sha2). Str("roles", string(p.Role.Raw)). Msg("Updating agent record to pick up most recent roles.") fields := map[string]interface{}{ dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2, } // Using painless script to update permission hash for updated key body, err := renderUpdatePainlessScript(p.Name, fields) if err != nil { return err } if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { zlog.Error().Err(err).Msg("fail update agent record") return err } } else if needNewKey { zlog.Debug(). RawJSON("fleet.policy.roles", p.Role.Raw). Str("fleet.policy.default.oldHash", output.PermissionsHash). Str("fleet.policy.default.newHash", p.Role.Sha2). Msg("Generating a new API key") ctx := zlog.WithContext(ctx) outputAPIKey, err := generateOutputAPIKey(ctx, outputBulker, agent.Id, p.Name, p.Role.Raw) // reporting output health and not returning the error to keep fleet-server running if outputAPIKey == nil && p.Type == OutputTypeRemoteElasticsearch { if err != nil { doc := model.OutputHealth{ Output: p.Name, State: client.UnitStateDegraded.String(), Message: fmt.Sprintf("remote ES could not create API key due to error: %v", err), } zerolog.Ctx(ctx).Warn().Err(err).Str(logger.PolicyOutputName, p.Name).Msg(doc.Message) if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil { zlog.Error().Err(err).Str(logger.PolicyOutputName, p.Name).Msg("error writing output health") } } // replace type remote_elasticsearch with elasticsearch as agent doesn't recognize remote_elasticsearch outputMap[p.Name][FieldOutputType] = OutputTypeElasticsearch // remove the service token from the agent policy sent to the agent delete(outputMap[p.Name], FieldOutputServiceToken) return nil } else if p.Type == OutputTypeRemoteElasticsearch { doc := model.OutputHealth{ Output: p.Name, State: client.UnitStateHealthy.String(), Message: "", } if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil { zlog.Error().Err(err).Msg("create output health") } } if err != nil { return fmt.Errorf("failed generate output API key: %w", err) } // When a new keys is generated we need to update the Agent record, // this will need to be updated when multiples remote Elasticsearch output // are supported. zlog.Info(). Str("fleet.policy.role.hash.sha256", p.Role.Sha2). Str(logger.DefaultOutputAPIKeyID, outputAPIKey.ID). Msg("Updating agent record to pick up default output key.") fields := map[string]interface{}{ dl.FieldPolicyOutputAPIKey: outputAPIKey.Agent(), dl.FieldPolicyOutputAPIKeyID: outputAPIKey.ID, dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2, } if !foundOutput { fields[dl.FiledType] = OutputTypeElasticsearch } if output.APIKeyID != "" { fields[dl.FieldPolicyOutputToRetireAPIKeyIDs] = model.ToRetireAPIKeyIdsItems{ ID: output.APIKeyID, RetiredAt: time.Now().UTC().Format(time.RFC3339), Output: p.Name, } } // Using painless script to append the old keys to the history body, err := renderUpdatePainlessScript(p.Name, fields) if err != nil { return fmt.Errorf("could not update painless script: %w", err) } if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { zlog.Error().Err(err).Msg("fail update agent record") return fmt.Errorf("fail update agent record: %w", err) } // Now that all is done, we can update the output on the agent variable // Right not it's more for consistency and to ensure the in-memory agent // data is correct and in sync with ES, so it can be safely used after // this method returns. output.Type = OutputTypeElasticsearch output.APIKey = outputAPIKey.Agent() output.APIKeyID = outputAPIKey.ID output.PermissionsHash = p.Role.Sha2 // for the sake of consistency } if p.Type == OutputTypeRemoteElasticsearch { // replace type remote_elasticsearch with elasticsearch as agent doesn't recognize remote_elasticsearch outputMap[p.Name][FieldOutputType] = OutputTypeElasticsearch // remove the service token from the agent policy sent to the agent delete(outputMap[p.Name], FieldOutputServiceToken) } // Always insert the `api_key` as part of the output block, this is required // because only fleet server knows the api key for the specific agent, if we don't // add it the agent will not receive the `api_key` and will not be able to connect // to Elasticsearch. // // We need to investigate allocation with the new LS output, we had optimization // in place to reduce number of agent policy allocation when sending the updated // agent policy to multiple agents. // See: https://github.com/elastic/fleet-server/issues/1301 outputMap[p.Name]["api_key"] = output.APIKey return nil } func fetchAPIKeyRoles(ctx context.Context, b bulk.Bulk, apiKeyID string) (*RoleT, error) { res, err := b.APIKeyRead(ctx, apiKeyID, true) if err != nil { return nil, err } roleMap, err := smap.Parse(res.RoleDescriptors) if err != nil { return nil, err } r := &RoleT{ Raw: res.RoleDescriptors, } // Stable hash on permissions payload if r.Sha2, err = roleMap.Hash(); err != nil { return nil, err } return r, nil } // mergeRoles takes old and new role sets and merges them following these rules: // - take all new roles // - append all old roles // to avoid name collisions every old entry has a `rdstale` suffix // if rdstale suffix already exists it uses `{index}-rdstale` to avoid further collisions // everything ending with `rdstale` is removed on ack. // in case we have key `123` in both old and new result will be: {"123", "123-0-rdstale"} // in case old contains {"123", "123-0-rdstale"} and new contains {"123"} result is: {"123", "123-rdstale", "123-0-rdstale"} func mergeRoles(zlog zerolog.Logger, old, new *RoleT) (*RoleT, error) { if old == nil { return new, nil } if new == nil { return old, nil } oldMap, err := smap.Parse(old.Raw) if err != nil { return nil, err } if oldMap == nil { return new, nil } newMap, err := smap.Parse(new.Raw) if err != nil { return nil, err } if newMap == nil { return old, nil } destMap := smap.Map{} // copy all from new for k, v := range newMap { destMap[k] = v } findNewKey := func(m smap.Map, candidate string) string { if strings.HasSuffix(candidate, "-rdstale") { candidate = strings.TrimSuffix(candidate, "-rdstale") dashIdx := strings.LastIndex(candidate, "-") if dashIdx >= 0 { candidate = candidate[:dashIdx] } } // 1 should be enough, 100 is just to have some space for i := 0; i < 100; i++ { c := fmt.Sprintf("%s-%d-rdstale", candidate, i) if _, exists := m[c]; !exists { return c } } return "" } // copy old for k, v := range oldMap { newKey := findNewKey(destMap, k) if newKey == "" { zlog.Warn().Msg("Failed to find a key for role assignement.") zlog.Debug(). RawJSON("roles", new.Raw). Str("candidate", k). Msg("roles not included.") continue } destMap[newKey] = v } r := &RoleT{} if r.Sha2, err = destMap.Hash(); err != nil { return nil, err } if r.Raw, err = json.Marshal(destMap); err != nil { return nil, err } return r, nil } func renderUpdatePainlessScript(outputName string, fields map[string]interface{}) ([]byte, error) { var source strings.Builder // prepare agent.elasticsearch_outputs[OUTPUT_NAME] source.WriteString(fmt.Sprintf(` if (ctx._source['outputs']==null) {ctx._source['outputs']=new HashMap();} if (ctx._source['outputs']['%s']==null) {ctx._source['outputs']['%s']=new HashMap();} `, outputName, outputName)) for field := range fields { if field == dl.FieldPolicyOutputToRetireAPIKeyIDs { // dl.FieldPolicyOutputToRetireAPIKeyIDs is a special case. // It's an array that gets deleted when the keys are invalidated. // Thus, append the old API key ID, create the field if necessary. source.WriteString(fmt.Sprintf(` if (ctx._source['outputs']['%s'].%s==null) {ctx._source['outputs']['%s'].%s=new ArrayList();} if (!ctx._source['outputs']['%s'].%s.contains(params.%s)) {ctx._source['outputs']['%s'].%s.add(params.%s);} `, outputName, field, outputName, field, outputName, field, field, outputName, field, field)) } else { // Update the other fields source.WriteString(fmt.Sprintf(` ctx._source['outputs']['%s'].%s=params.%s;`, outputName, field, field)) } } body, err := json.Marshal(map[string]interface{}{ "script": map[string]interface{}{ "lang": "painless", "source": source.String(), "params": fields, }, }) return body, err } func generateOutputAPIKey( ctx context.Context, bulk bulk.Bulk, agentID, outputName string, roles []byte) (*apikey.APIKey, error) { name := fmt.Sprintf("%s:%s", agentID, outputName) zerolog.Ctx(ctx).Info().Str(logger.AgentID, agentID).Msgf("generating output API key %s", name) return bulk.APIKeyCreate( ctx, name, "", roles, apikey.NewMetadata(agentID, outputName, apikey.TypeOutput), ) }