internal/stack/serverless.go (402 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package stack
import (
"context"
"errors"
"fmt"
"slices"
"strings"
"time"
"github.com/elastic/elastic-package/internal/compose"
"github.com/elastic/elastic-package/internal/docker"
"github.com/elastic/elastic-package/internal/elasticsearch"
"github.com/elastic/elastic-package/internal/kibana"
"github.com/elastic/elastic-package/internal/logger"
"github.com/elastic/elastic-package/internal/profile"
"github.com/elastic/elastic-package/internal/serverless"
)
const (
paramServerlessProjectID = "serverless_project_id"
paramServerlessProjectType = "serverless_project_type"
ParamServerlessFleetURL = "serverless_fleet_url"
ParamServerlessLocalStackVersion = "serverless_local_stack_version"
configRegion = "stack.serverless.region"
configProjectType = "stack.serverless.type"
configElasticCloudURL = "stack.elastic_cloud.host"
defaultRegion = "aws-us-east-1"
defaultProjectType = "observability"
)
var allowedProjectTypes = []string{
"security",
"observability",
}
type serverlessProvider struct {
profile *profile.Profile
client *serverless.Client
elasticsearchClient *elasticsearch.Client
kibanaClient *kibana.Client
}
type projectSettings struct {
Name string
Region string
Type string
StackVersion string
LogstashEnabled bool
SelfMonitor bool
}
func (sp *serverlessProvider) createProject(ctx context.Context, settings projectSettings, options Options, conf Config) (Config, error) {
project, err := sp.client.CreateProject(ctx, settings.Name, settings.Region, settings.Type)
if err != nil {
return Config{}, fmt.Errorf("failed to create %s project %s in %s: %w", settings.Type, settings.Name, settings.Region, err)
}
ctx, cancel := context.WithTimeout(ctx, time.Minute*30)
defer cancel()
if err := sp.client.EnsureEndpoints(ctx, project); err != nil {
return Config{}, fmt.Errorf("failed to ensure endpoints have been provisioned properly: %w", err)
}
var config Config
config.Provider = ProviderServerless
config.Parameters = map[string]string{
paramServerlessProjectID: project.ID,
paramServerlessProjectType: project.Type,
}
config.ElasticsearchHost = project.Endpoints.Elasticsearch
config.KibanaHost = project.Endpoints.Kibana
config.ElasticsearchUsername = project.Credentials.Username
config.ElasticsearchPassword = project.Credentials.Password
// add stack version set in command line
config.Parameters[ParamServerlessLocalStackVersion] = options.StackVersion
// Store config now in case fails initialization or other requests,
// so it can be destroyed later
err = storeConfig(sp.profile, config)
if err != nil {
return Config{}, fmt.Errorf("failed to store config: %w", err)
}
logger.Debug("Waiting for creation plan to be completed")
err = sp.client.EnsureProjectInitialized(ctx, project)
if err != nil {
return Config{}, fmt.Errorf("project not initialized: %w", err)
}
err = sp.createClients(project)
if err != nil {
return Config{}, err
}
config.Parameters[ParamServerlessFleetURL], err = project.DefaultFleetServerURL(ctx, sp.kibanaClient)
if err != nil {
return Config{}, fmt.Errorf("failed to get fleet URL: %w", err)
}
project.Endpoints.Fleet = config.Parameters[ParamServerlessFleetURL]
printUserConfig(options.Printer, config)
// update config with latest updates (e.g. fleet server url)
err = storeConfig(sp.profile, config)
if err != nil {
return Config{}, fmt.Errorf("failed to store config: %w", err)
}
err = project.EnsureHealthy(ctx, sp.elasticsearchClient, sp.kibanaClient)
if err != nil {
return Config{}, fmt.Errorf("not all services are healthy: %w", err)
}
if settings.LogstashEnabled {
err = addLogstashFleetOutput(ctx, sp.kibanaClient)
if err != nil {
return Config{}, err
}
}
return config, nil
}
func (sp *serverlessProvider) deleteProject(ctx context.Context, project *serverless.Project, options Options) error {
return sp.client.DeleteProject(ctx, project)
}
func (sp *serverlessProvider) currentProjectWithClientsAndFleetEndpoint(ctx context.Context, config Config) (*serverless.Project, error) {
project, err := sp.currentProject(ctx, config)
if err != nil {
return nil, err
}
err = sp.createClients(project)
if err != nil {
return nil, err
}
fleetURL, found := config.Parameters[ParamServerlessFleetURL]
if !found {
fleetURL, err = project.DefaultFleetServerURL(ctx, sp.kibanaClient)
if err != nil {
return nil, fmt.Errorf("failed to get fleet URL: %w", err)
}
}
project.Endpoints.Fleet = fleetURL
return project, nil
}
func (sp *serverlessProvider) currentProject(ctx context.Context, config Config) (*serverless.Project, error) {
projectID, found := config.Parameters[paramServerlessProjectID]
if !found {
return nil, serverless.ErrProjectNotExist
}
projectType, found := config.Parameters[paramServerlessProjectType]
if !found {
return nil, serverless.ErrProjectNotExist
}
project, err := sp.client.GetProject(ctx, projectType, projectID)
if errors.Is(err, serverless.ErrProjectNotExist) {
return nil, err
}
if err != nil {
return nil, fmt.Errorf("couldn't check project health: %w", err)
}
project.Credentials.Username = config.ElasticsearchUsername
project.Credentials.Password = config.ElasticsearchPassword
return project, nil
}
func (sp *serverlessProvider) createClients(project *serverless.Project) error {
var err error
sp.elasticsearchClient, err = NewElasticsearchClient(
elasticsearch.OptionWithAddress(project.Endpoints.Elasticsearch),
elasticsearch.OptionWithUsername(project.Credentials.Username),
elasticsearch.OptionWithPassword(project.Credentials.Password),
)
if err != nil {
return fmt.Errorf("failed to create elasticsearch client: %w", err)
}
sp.kibanaClient, err = NewKibanaClient(
kibana.Address(project.Endpoints.Kibana),
kibana.Username(project.Credentials.Username),
kibana.Password(project.Credentials.Password),
)
if err != nil {
return fmt.Errorf("failed to create kibana client: %w", err)
}
return nil
}
func getProjectSettings(options Options) (projectSettings, error) {
s := projectSettings{
Name: createProjectName(options),
Type: options.Profile.Config(configProjectType, defaultProjectType),
Region: options.Profile.Config(configRegion, defaultRegion),
StackVersion: options.StackVersion,
LogstashEnabled: options.Profile.Config(configLogstashEnabled, "false") == "true",
SelfMonitor: options.Profile.Config(configSelfMonitorEnabled, "false") == "true",
}
return s, nil
}
func createProjectName(options Options) string {
return fmt.Sprintf("elastic-package-test-%s", options.Profile.ProfileName)
}
func newServerlessProvider(profile *profile.Profile) (*serverlessProvider, error) {
host := profile.Config(configElasticCloudURL, "")
options := []serverless.ClientOption{}
if host != "" {
options = append(options, serverless.WithAddress(host))
}
client, err := serverless.NewClient(options...)
if err != nil {
return nil, fmt.Errorf("can't create serverless provider: %w", err)
}
return &serverlessProvider{profile, client, nil, nil}, nil
}
func (sp *serverlessProvider) BootUp(ctx context.Context, options Options) error {
logger.Warn("Elastic Serverless provider is in technical preview")
config, err := LoadConfig(sp.profile)
if err != nil {
return fmt.Errorf("failed to load configuration: %w", err)
}
settings, err := getProjectSettings(options)
if err != nil {
return err
}
if !slices.Contains(allowedProjectTypes, settings.Type) {
return fmt.Errorf("serverless project type not supported: %s", settings.Type)
}
var project *serverless.Project
isNewProject := false
project, err = sp.currentProject(ctx, config)
switch err {
default:
return err
case serverless.ErrProjectNotExist:
logger.Infof("Creating %s project: %q", settings.Type, settings.Name)
config, err = sp.createProject(ctx, settings, options, config)
if err != nil {
return fmt.Errorf("failed to create deployment: %w", err)
}
outputID := ""
if settings.LogstashEnabled {
outputID = serverless.FleetLogstashOutput
}
logger.Infof("Creating agent policy")
_, err = createAgentPolicy(ctx, sp.kibanaClient, options.StackVersion, outputID, settings.SelfMonitor)
if err != nil {
return fmt.Errorf("failed to create agent policy: %w", err)
}
isNewProject = true
// TODO: Ensuring a specific GeoIP database would make tests reproducible
// Currently geo ip files would be ignored when running pipeline tests
case nil:
logger.Debugf("%s project existed: %s", project.Type, project.Name)
printUserConfig(options.Printer, config)
}
logger.Infof("Starting local services")
err = sp.startLocalServices(ctx, options, config)
if err != nil {
return fmt.Errorf("failed to start local services: %w", err)
}
// Updating the output with ssl certificates created in startLocalServices
// The certificates are updated only when a new project is created and logstash is enabled
if isNewProject && settings.LogstashEnabled {
err = updateLogstashFleetOutput(ctx, sp.profile, sp.kibanaClient)
if err != nil {
return err
}
}
return nil
}
func (sp *serverlessProvider) composeProjectName() string {
return DockerComposeProjectName(sp.profile)
}
func (sp *serverlessProvider) localServicesComposeProject() (*compose.Project, error) {
composeFile := sp.profile.Path(ProfileStackPath, ComposeFile)
return compose.NewProject(sp.composeProjectName(), composeFile)
}
func (sp *serverlessProvider) startLocalServices(ctx context.Context, options Options, config Config) error {
err := applyLocalResources(sp.profile, options.StackVersion, config)
if err != nil {
return fmt.Errorf("could not initialize compose files for local services: %w", err)
}
project, err := sp.localServicesComposeProject()
if err != nil {
return fmt.Errorf("could not initialize local services compose project")
}
opts := compose.CommandOptions{
ExtraArgs: []string{},
}
err = project.Build(ctx, opts)
if err != nil {
return fmt.Errorf("failed to build images for local services: %w", err)
}
if options.DaemonMode {
opts.ExtraArgs = append(opts.ExtraArgs, "-d")
}
if err := project.Up(ctx, opts); err != nil {
// At least starting on 8.6.0, fleet-server may be reconfigured or
// restarted after being healthy. If elastic-agent tries to enroll at
// this moment, it fails inmediately, stopping and making `docker-compose up`
// to fail too.
// As a workaround, try to give another chance to docker-compose if only
// elastic-agent failed.
if onlyElasticAgentFailed(ctx, options) && !errors.Is(err, context.Canceled) {
fmt.Println("Elastic Agent failed to start, trying again.")
if err := project.Up(ctx, opts); err != nil {
return fmt.Errorf("failed to start local services: %w", err)
}
}
}
return nil
}
func (sp *serverlessProvider) TearDown(ctx context.Context, options Options) error {
config, err := LoadConfig(sp.profile)
if err != nil {
return fmt.Errorf("failed to load configuration: %w", err)
}
var errs error
err = sp.destroyLocalServices(ctx)
if err != nil {
logger.Errorf("failed to destroy local services: %v", err)
errs = fmt.Errorf("failed to destroy local services: %w", err)
}
project, err := sp.currentProject(ctx, config)
if err != nil {
return fmt.Errorf("failed to find current project: %w", err)
}
logger.Debugf("Deleting project %q (%s)", project.Name, project.ID)
err = sp.deleteProject(ctx, project, options)
if err != nil {
logger.Errorf("failed to delete project: %v", err)
errs = errors.Join(errs, fmt.Errorf("failed to delete project: %w", err))
}
logger.Infof("Project %s (%s) deleted", project.Name, project.ID)
// TODO: if GeoIP database is specified, remove the geoip Bundle (if needed)
return errs
}
func (sp *serverlessProvider) destroyLocalServices(ctx context.Context) error {
project, err := sp.localServicesComposeProject()
if err != nil {
return fmt.Errorf("could not initialize local services compose project")
}
opts := compose.CommandOptions{
// Remove associated volumes.
ExtraArgs: []string{"--volumes", "--remove-orphans"},
}
err = project.Down(ctx, opts)
if err != nil {
return fmt.Errorf("failed to destroy local services: %w", err)
}
return nil
}
func (sp *serverlessProvider) Update(ctx context.Context, options Options) error {
return fmt.Errorf("not implemented")
}
func (sp *serverlessProvider) Dump(ctx context.Context, options DumpOptions) ([]DumpResult, error) {
for _, service := range options.Services {
if service != "elastic-agent" {
return nil, &ErrNotImplemented{
Operation: fmt.Sprintf("logs dump for service %s", service),
Provider: ProviderServerless,
}
}
}
return Dump(ctx, options)
}
func (sp *serverlessProvider) Status(ctx context.Context, options Options) ([]ServiceStatus, error) {
logger.Warn("Elastic Serverless provider is in technical preview")
config, err := LoadConfig(sp.profile)
if err != nil {
return nil, fmt.Errorf("failed to load configuration: %w", err)
}
project, err := sp.currentProjectWithClientsAndFleetEndpoint(ctx, config)
if errors.Is(err, serverless.ErrProjectNotExist) {
return nil, nil
}
if err != nil {
return nil, err
}
projectServiceStatus, err := project.Status(ctx, sp.elasticsearchClient, sp.kibanaClient)
if err != nil {
return nil, err
}
serverlessVersion := fmt.Sprintf("serverless (%s)", project.Type)
var serviceStatus []ServiceStatus
for service, status := range projectServiceStatus {
serviceStatus = append(serviceStatus, ServiceStatus{
Name: service,
Version: serverlessVersion,
Status: status,
})
}
agentStatus, err := sp.localAgentStatus()
if err != nil {
return nil, fmt.Errorf("failed to get local agent status: %w", err)
}
serviceStatus = append(serviceStatus, agentStatus...)
return serviceStatus, nil
}
func (sp *serverlessProvider) localAgentStatus() ([]ServiceStatus, error) {
var services []ServiceStatus
serviceStatusFunc := func(description docker.ContainerDescription) error {
service, err := newServiceStatus(&description)
if err != nil {
return err
}
services = append(services, *service)
return nil
}
err := runOnLocalServices(sp.composeProjectName(), serviceStatusFunc)
if err != nil {
return nil, err
}
return services, nil
}
func runOnLocalServices(project string, serviceFunc func(docker.ContainerDescription) error) error {
// query directly to docker to avoid load environment variables (e.g. STACK_VERSION_VARIANT) and profiles
containerIDs, err := docker.ContainerIDsWithLabel(projectLabelDockerCompose, project)
if err != nil {
return err
}
if len(containerIDs) == 0 {
return nil
}
containerDescriptions, err := docker.InspectContainers(containerIDs...)
if err != nil {
return err
}
for _, containerDescription := range containerDescriptions {
serviceName := containerDescription.Config.Labels.ComposeService
if strings.HasSuffix(serviceName, readyServicesSuffix) {
continue
}
err := serviceFunc(containerDescription)
if err != nil {
return err
}
}
return nil
}