internal/pkg/dl/migration.go (186 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package dl
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/rs/zerolog"
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/dsl"
)
type (
migrationFn func(context.Context, bulk.Bulk) error
migrationBodyFn func() (string, string, []byte, error)
migrationResponse struct {
Took int `json:"took"`
TimedOut bool `json:"timed_out"`
Total int `json:"total"`
Updated int `json:"updated"`
Deleted int `json:"deleted"`
Batches int `json:"batches"`
VersionConflicts int `json:"version_conflicts"`
Noops int `json:"noops"`
Retries struct {
Bulk int `json:"bulk"`
Search int `json:"search"`
} `json:"retries"`
Failures []json.RawMessage `json:"failures"`
}
)
// timeNow is used to get the current time. It should be replaced for testing.
var timeNow = time.Now
// Migrate applies, in sequence, the migration functions. Currently, each migration
// function is responsible to ensure it only applies the migration if needed,
// being a no-op otherwise.
func Migrate(ctx context.Context, bulker bulk.Bulk) error {
// WARNING: No new migrations should be added here. We need to implement
// a mechanism to perform migrations with standalone mode.
// See https://github.com/elastic/fleet-server/pull/2359.
for _, fn := range []migrationFn{migrateTov7_15, migrateToV8_5} {
if err := fn(ctx, bulker); err != nil {
return err
}
}
return nil
}
func migrate(ctx context.Context, bulker bulk.Bulk, fn migrationBodyFn) (int, error) {
var updatedDocs int
for {
name, index, body, err := fn()
if err != nil {
return updatedDocs,
fmt.Errorf("failed to prepare request for migration %s: %w",
name, err)
}
resp, err := applyMigration(ctx, name, index, bulker, body)
if err != nil {
zerolog.Ctx(ctx).Err(err).
Bytes("http.request.body.content", body).
Msgf("migration %s failed", name)
return updatedDocs, fmt.Errorf("failed to apply migration %q: %w",
name, err)
}
updatedDocs += resp.Updated
if resp.VersionConflicts == 0 {
break
}
}
return updatedDocs, nil
}
func applyMigration(ctx context.Context, name string, index string, bulker bulk.Bulk, body []byte) (migrationResponse, error) {
start := time.Now()
client := bulker.Client()
reader := bytes.NewReader(body)
opts := []func(*esapi.UpdateByQueryRequest){
client.UpdateByQuery.WithBody(reader),
client.UpdateByQuery.WithContext(ctx),
client.UpdateByQuery.WithRefresh(true),
client.UpdateByQuery.WithConflicts("proceed"),
}
res, err := client.UpdateByQuery([]string{index}, opts...)
if err != nil {
return migrationResponse{}, err
}
if res.IsError() {
if res.StatusCode == http.StatusNotFound {
// Ignore index not created yet; nothing to upgrade
return migrationResponse{}, nil
}
return migrationResponse{}, fmt.Errorf("migrate %s UpdateByQuery failed: %s",
name, res.String())
}
resp := migrationResponse{}
decoder := json.NewDecoder(res.Body)
if err := decoder.Decode(&resp); err != nil {
return migrationResponse{}, fmt.Errorf("decode UpdateByQuery response: %w", err)
}
zerolog.Ctx(ctx).Info().
Str("fleet.migration.name", name).
Int("fleet.migration.es.took", resp.Took).
Bool("fleet.migration.es.timed_out", resp.TimedOut).
Int("fleet.migration.updated", resp.Updated).
Int("fleet.migration.deleted", resp.Deleted).
Int("fleet.migration.batches", resp.Batches).
Int("fleet.migration.version_conflicts", resp.VersionConflicts).
Int("fleet.migration.noops", resp.Noops).
Int("fleet.migration.retries.bulk", resp.Retries.Bulk).
Int("fleet.migration.retries.search", resp.Retries.Search).
Dur("fleet.migration.total.duration", time.Since(start)).
Int("fleet.migration.total.count", resp.Total).
Msgf("migration %s done", name)
for _, fail := range resp.Failures {
zerolog.Ctx(ctx).Error().RawJSON("failure", fail).Msgf("failed applying %s migration", name)
}
return resp, err
}
// ============================== V7.15 migration ==============================
func migrateTov7_15(ctx context.Context, bulker bulk.Bulk) error {
zerolog.Ctx(ctx).Debug().Msg("applying migration to v7.15")
_, err := migrate(ctx, bulker, migrateAgentMetadata)
if err != nil {
return fmt.Errorf("v7.15.0 data migration failed: %w", err)
}
return nil
}
// FleetServer 7.15 added a new *AgentMetadata field to the Agent record.
// This field was populated in new enrollments in 7.15 and later; however, the
// change was not backported to support 7.14. The security team is reliant on the
// existence of this field in 7.16, so the following migration was added to
// support upgrade from 7.14.
//
// It is currently safe to run this in the background; albeit with some
// concern on conflicts. The conflict risk exists regardless as N Fleet Servers
// can be run in parallel at the same time.
//
// As the update only occurs once, the 99.9% case is a noop.
func migrateAgentMetadata() (string, string, []byte, error) {
const migrationName = "AgentMetadata"
query := dsl.NewRoot()
query.Query().Bool().MustNot().Exists("agent.id")
painless := "ctx._source.agent = [:]; ctx._source.agent.id = ctx._id;"
query.Param("script", painless)
body, err := query.MarshalJSON()
if err != nil {
return migrationName, FleetAgents, nil, fmt.Errorf("could not marshal ES query: %w", err)
}
return migrationName, FleetAgents, body, nil
}
// ============================== V8.5.0 migration =============================
// https://github.com/elastic/fleet-server/issues/1672
func migrateToV8_5(ctx context.Context, bulker bulk.Bulk) error {
zerolog.Ctx(ctx).Debug().Msg("applying migration to v8.5.0")
_, err := migrate(ctx, bulker, migrateAgentOutputs)
if err != nil {
return fmt.Errorf("v8.5.0 data migration failed: %w", err)
}
// NOTE(michel-laterman): We use lazy output migration now that the
// coordinator is removed. Migration completes once an agent checks in
// and an API key associated with an output is empty. The agent will
// subscribe to the policy monitor with revision_idx=0 to force a policy
// change to occur.
return nil
}
// migrateAgentOutputs performs the necessary changes on the Agent documents
// to introduce the `Outputs` field.
//
// FleetServer 8.5.0 introduces a new field to the Agent document, Outputs, to
// store the outputs credentials and data. The DefaultAPIKey, DefaultAPIKeyID,
// DefaultAPIKeyHistory and PolicyOutputPermissionsHash are now deprecated in
// favour of the new `Outputs` fields, which maps the output name to its data.
// This change fixes https://github.com/elastic/fleet-server/issues/1672.
//
// The change is backward compatible as the deprecated fields are just set to
// their zero value and an older version of FleetServer can repopulate them.
// However, reverting FleetServer to an older version might cause very issue
// this change fixes.
func migrateAgentOutputs() (string, string, []byte, error) {
const (
migrationName = "AgentOutputs"
fieldOutputs = "outputs"
fieldDefaultAPIKeyID = "default_api_key_id" //nolint:gosec // this is not a credential
fieldRetiredAt = "retiredAt"
)
query := dsl.NewRoot()
query.Query().Bool().Must().Exists(fieldDefaultAPIKeyID)
fields := map[string]interface{}{fieldRetiredAt: timeNow().UTC().Format(time.RFC3339)}
painless := `
// set up the new fields
ctx._source['` + fieldOutputs + `']=new HashMap();
ctx._source['` + fieldOutputs + `']['default']=new HashMap();
ctx._source['` + fieldOutputs + `']['default'].to_retire_api_key_ids=new ArrayList();
// copy 'default_api_key_history' to new 'outputs' field
ctx._source['` + fieldOutputs + `']['default'].type="elasticsearch";
if (ctx._source.default_api_key_history != null && ctx._source.default_api_key_history.length > 0) {
ctx._source['` + fieldOutputs + `']['default'].to_retire_api_key_ids=ctx._source.default_api_key_history;
}
Map map = new HashMap();
map.put("retired_at", params.` + fieldRetiredAt + `);
map.put("id", ctx._source.default_api_key_id);
// Make current API key empty, so fleet-server will generate a new one
// Add current API jey to be retired
if (ctx._source['` + fieldOutputs + `']['default'].to_retire_api_key_ids != null) {
ctx._source['` + fieldOutputs + `']['default'].to_retire_api_key_ids.add(map);
}
ctx._source['` + fieldOutputs + `']['default'].api_key="";
ctx._source['` + fieldOutputs + `']['default'].api_key_id="";
ctx._source['` + fieldOutputs + `']['default'].permissions_hash=ctx._source.policy_output_permissions_hash;
// Erase deprecated fields
ctx._source.default_api_key_history=null;
ctx._source.default_api_key=null;
ctx._source.default_api_key_id=null;
ctx._source.policy_output_permissions_hash=null;
`
query.Param("script", map[string]interface{}{
"lang": "painless",
"source": painless,
"params": fields,
})
body, err := query.MarshalJSON()
if err != nil {
return migrationName, FleetAgents, nil, fmt.Errorf("could not marshal ES query: %w", err)
}
return migrationName, FleetAgents, body, nil
}