internal/stack/environment.go (346 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package stack import ( "context" "errors" "fmt" "os" "strings" "github.com/elastic/elastic-package/internal/elasticsearch" "github.com/elastic/elastic-package/internal/fleetserver" "github.com/elastic/elastic-package/internal/kibana" "github.com/elastic/elastic-package/internal/logger" "github.com/elastic/elastic-package/internal/profile" ) type environmentProvider struct { kibana *kibana.Client elasticsearch *elasticsearch.Client } func newEnvironmentProvider(profile *profile.Profile) (*environmentProvider, error) { return &environmentProvider{}, nil } // BootUp configures the profile to use as stack the one indicated using environment variables. func (p *environmentProvider) BootUp(ctx context.Context, options Options) error { logger.Warn("Configuring an stack from environment variables is in technical preview") config := Config{ Provider: ProviderEnvironment, ElasticsearchAPIKey: os.Getenv(ElasticsearchAPIKeyEnv), ElasticsearchHost: os.Getenv(ElasticsearchHostEnv), ElasticsearchUsername: os.Getenv(ElasticsearchUsernameEnv), ElasticsearchPassword: os.Getenv(ElasticsearchPasswordEnv), KibanaHost: os.Getenv(KibanaHostEnv), CACertFile: os.Getenv(CACertificateEnv), Parameters: make(map[string]string), } if err := requiredEnv(config.ElasticsearchHost, ElasticsearchHostEnv); err != nil { return err } if err := requiredEnv(config.KibanaHost, KibanaHostEnv); err != nil { return err } err := p.initClients() if err != nil { return err } // TODO: Migrate from serverless variables. config.Parameters[ParamServerlessLocalStackVersion] = options.StackVersion config, err = p.setupFleet(ctx, config, options) if err != nil { return fmt.Errorf("failed to setup Fleet: %w", err) } // We need to store the config here to be able to clean up Fleet if something // fails later. err = storeConfig(options.Profile, config) if err != nil { return fmt.Errorf("failed to store config: %w", err) } logstashEnabled := options.Profile.Config(configLogstashEnabled, "false") == "true" if logstashEnabled { err := addLogstashFleetOutput(ctx, p.kibana) if err != nil { return fmt.Errorf("failed to create logstash output: %w", err) } config.OutputID = fleetLogstashOutput } else { internalHost := DockerInternalHost(config.ElasticsearchHost) if internalHost != config.ElasticsearchHost { err := addElasticsearchFleetOutput(ctx, p.kibana, internalHost) if err != nil { return fmt.Errorf("failed to create elasticsearch output: %w", err) } config.OutputID = fleetElasticsearchOutput } } // We need to store the config here to be able to clean up the logstash output if something // fails later. err = storeConfig(options.Profile, config) if err != nil { return fmt.Errorf("failed to store config: %w", err) } selfMonitor := options.Profile.Config(configSelfMonitorEnabled, "false") == "true" policy, err := createAgentPolicy(ctx, p.kibana, options.StackVersion, config.OutputID, selfMonitor) if err != nil { return fmt.Errorf("failed to create agent policy: %w", err) } if config.ElasticsearchAPIKey != "" { config.EnrollmentToken, err = p.kibana.GetEnrollmentTokenForPolicyID(ctx, policy.ID) if err != nil { return fmt.Errorf("failed to get an enrollment token for policy %s: %w", policy.Name, err) } } localServices := &localServicesManager{ profile: options.Profile, } err = localServices.start(ctx, options, config) if err != nil { return fmt.Errorf("failed to start local services: %w", err) } if logstashEnabled { err = updateLogstashFleetOutput(ctx, options.Profile, p.kibana) if err != nil { return fmt.Errorf("cannot configure fleet output: %w", err) } } err = storeConfig(options.Profile, config) if err != nil { return fmt.Errorf("failed to store config: %w", err) } return nil } func requiredEnv(value string, envVarName string) error { if value == "" { return fmt.Errorf("environment variable %s required", envVarName) } return nil } func (p *environmentProvider) initClients() error { kibana, err := NewKibanaClient() if err != nil { return fmt.Errorf("cannot create Kibana client: %w", err) } p.kibana = kibana elasticsearch, err := NewElasticsearchClient() if err != nil { return fmt.Errorf("cannot create Elasticsearch client: %w", err) } p.elasticsearch = elasticsearch return nil } func (p *environmentProvider) setupFleet(ctx context.Context, config Config, options Options) (Config, error) { const localFleetServerURL = "https://fleet-server:8220" fleetServerURL, err := p.kibana.DefaultFleetServerURL(ctx) if errors.Is(err, kibana.ErrFleetServerNotFound) || !isFleetServerReachable(ctx, fleetServerURL, config) { // We need to setup a local Fleet Server fleetServerURL = localFleetServerURL config.Parameters[paramFleetServerManaged] = "true" host := kibana.FleetServerHost{ ID: fleetServerHostID(options.Profile.ProfileName), URLs: []string{fleetServerURL}, IsDefault: true, Name: "elastic-package-managed-fleet-server", } err := p.kibana.AddFleetServerHost(ctx, host) if errors.Is(err, kibana.ErrConflict) { err = p.kibana.UpdateFleetServerHost(ctx, host) if err != nil { return config, fmt.Errorf("failed to update existing Fleet Server host (id: %s): %w", host.ID, err) } } if err != nil { return config, fmt.Errorf("failed to add Fleet Server host: %w", err) } _, err = createFleetServerPolicy(ctx, p.kibana, options.StackVersion, options.Profile.ProfileName) if err != nil { return config, fmt.Errorf("failed to create agent policy for Fleet Server: %w", err) } config.FleetServiceToken, err = p.kibana.CreateFleetServiceToken(ctx) if err != nil { return config, fmt.Errorf("failed to create service token for Fleet Server: %w", err) } } else if err != nil { return config, fmt.Errorf("failed to discover Fleet Server URL: %w", err) } config.Parameters[ParamServerlessFleetURL] = fleetServerURL return config, nil } func fleetServerHostID(namespace string) string { return "elastic-package-" + namespace } func isFleetServerReachable(ctx context.Context, address string, config Config) bool { client, err := fleetserver.NewClient(address, fleetserver.APIKey(config.ElasticsearchAPIKey)) if err != nil { return false } status, err := client.Status(ctx) return err == nil && strings.ToLower(status.Status) == "healthy" } // TearDown stops and/or removes a stack. func (p *environmentProvider) TearDown(ctx context.Context, options Options) error { localServices := &localServicesManager{ profile: options.Profile, } err := localServices.destroy(ctx) if err != nil { return fmt.Errorf("failed to destroy local services: %w", err) } kibanaClient, err := NewKibanaClientFromProfile(options.Profile) if err != nil { return fmt.Errorf("failed to create kibana client: %w", err) } err = forceUnenrollAgentsWithPolicy(ctx, kibanaClient) if err != nil { return fmt.Errorf("failed to remove agents associated to test policy: %w", err) } err = deleteAgentPolicy(ctx, kibanaClient) if err != nil { return fmt.Errorf("failed to delete agent policy: %v", err) } config, err := LoadConfig(options.Profile) if err != nil { return fmt.Errorf("failed to load configuration: %w", err) } if managed, found := config.Parameters[paramFleetServerManaged]; found && managed == "true" { err = forceUnenrollFleetServerWithPolicy(ctx, kibanaClient) if err != nil { return fmt.Errorf("failed to remove managed fleet servers: %w", err) } err = deleteFleetServerPolicy(ctx, kibanaClient) if err != nil { return fmt.Errorf("failed to delete fleet server policy: %w", err) } } if config.OutputID != "" { err := kibanaClient.RemoveFleetOutput(ctx, config.OutputID) if err != nil { return fmt.Errorf("failed to delete %s output: %s", config.OutputID, err) } } return nil } // Update updates resources associated to a stack. func (p *environmentProvider) Update(context.Context, Options) error { return fmt.Errorf("not implemented") } // Dump dumps data for debug purpouses. func (p *environmentProvider) Dump(ctx context.Context, options DumpOptions) ([]DumpResult, error) { for _, service := range options.Services { if service != "elastic-agent" { return nil, &ErrNotImplemented{ Operation: fmt.Sprintf("logs dump for service %s", service), Provider: ProviderServerless, } } } return Dump(ctx, options) } // Status obtains status information of the stack. func (p *environmentProvider) Status(ctx context.Context, options Options) ([]ServiceStatus, error) { status := []ServiceStatus{ p.elasticsearchStatus(ctx, options), p.kibanaStatus(ctx, options), } config, err := LoadConfig(options.Profile) if err != nil { return nil, fmt.Errorf("failed to load configuration: %w", err) } // If fleet is managed, it will be included in the local services status. fleetManaged := true if managed, ok := config.Parameters[paramFleetServerManaged]; !ok || managed != "true" { fleetManaged = false status = append(status, p.fleetStatus(ctx, options, config)) } localServices := &localServicesManager{ profile: options.Profile, } localStatus, err := localServices.status() if err != nil { return nil, fmt.Errorf("cannot obtain status of local services: %w", err) } if len(localStatus) == 0 { localStatus = []ServiceStatus{{ Name: "elastic-agent", Version: "unknown", Status: "missing", }} if fleetManaged { localStatus = append(localStatus, ServiceStatus{ Name: "fleet-server", Version: "unknown", Status: "missing", }) } } status = append(status, localStatus...) return status, nil } func (p *environmentProvider) elasticsearchStatus(ctx context.Context, options Options) ServiceStatus { status := ServiceStatus{ Name: "elasticsearch", Version: "unknown", } client, err := NewElasticsearchClientFromProfile(options.Profile) if err != nil { status.Status = "unknown: failed to create client: " + err.Error() return status } err = client.CheckHealth(ctx) if err != nil { status.Status = "unhealthy: " + err.Error() } else { status.Status = "healthy" } info, err := client.Info(ctx) if err != nil { status.Version = "unknown" } else if info.Version.BuildFlavor == "serverless" { status.Version = "serverless" } else { status.Version = info.Version.Number } return status } func (p *environmentProvider) kibanaStatus(ctx context.Context, options Options) ServiceStatus { status := ServiceStatus{ Name: "kibana", Version: "unknown", } client, err := NewKibanaClientFromProfile(options.Profile) if err != nil { status.Status = "unknown: failed to create client: " + err.Error() return status } err = client.CheckHealth(ctx) if err != nil { status.Status = "unhealthy: " + err.Error() } else { status.Status = "healthy" } versionInfo, err := client.Version() if err == nil { if versionInfo.BuildFlavor == "serverless" { status.Version = "serverless" } else { status.Version = versionInfo.Version() } } return status } func (p *environmentProvider) fleetStatus(ctx context.Context, options Options, config Config) ServiceStatus { status := ServiceStatus{ Name: "fleet-server", Version: "unknown", } address, ok := config.Parameters[ParamServerlessFleetURL] if !ok || address == "" { status.Status = "unknown address" return status } client, err := fleetserver.NewClient(address, fleetserver.APIKey(config.ElasticsearchAPIKey), fleetserver.CertificateAuthority(config.CACertFile), ) if err != nil { status.Status = "unknown: " + err.Error() } fleetServerStatus, err := client.Status(ctx) if err != nil { status.Status = "unknown: " + err.Error() } else if fleetServerStatus.Status != "" { status.Status = strings.ToLower(fleetServerStatus.Status) } if fleetServerStatus != nil { if version := fleetServerStatus.Version.Number; version != "" { status.Version = version } else { status.Version = "unknown" } } return status }