internal/agentdeployer/kubernetes.go (206 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package agentdeployer
import (
"bytes"
"context"
_ "embed"
"encoding/base64"
"fmt"
"os"
"strings"
"text/template"
"time"
"github.com/elastic/elastic-package/internal/install"
"github.com/elastic/elastic-package/internal/kind"
"github.com/elastic/elastic-package/internal/kubectl"
"github.com/elastic/elastic-package/internal/logger"
"github.com/elastic/elastic-package/internal/profile"
"github.com/elastic/elastic-package/internal/stack"
)
// KubernetesAgentDeployer is responsible for deploying resources in the Kubernetes cluster.
type KubernetesAgentDeployer struct {
profile *profile.Profile
stackVersion string
policyName string
dataStream string
agentRunID string
runSetup bool
runTestsOnly bool
runTearDown bool
}
type KubernetesAgentDeployerOptions struct {
Profile *profile.Profile
StackVersion string
PolicyName string
DataStream string
RunSetup bool
RunTestsOnly bool
RunTearDown bool
}
type kubernetesDeployedAgent struct {
agentInfo AgentInfo
profile *profile.Profile
stackVersion string
agentName string
}
func (s kubernetesDeployedAgent) TearDown(ctx context.Context) error {
elasticAgentManagedYaml, err := getElasticAgentYAML(ctx, s.profile, s.agentInfo, s.stackVersion, s.agentName)
if err != nil {
return fmt.Errorf("can't retrieve Kubernetes file for Elastic Agent: %w", err)
}
err = kubectl.DeleteStdin(ctx, elasticAgentManagedYaml)
if err != nil {
return fmt.Errorf("can't uninstall Kubernetes Elastic Agent resources: %w", err)
}
return nil
}
func (s kubernetesDeployedAgent) ExitCode(ctx context.Context) (bool, int, error) {
return false, -1, ErrNotSupported
}
func (s kubernetesDeployedAgent) Info() AgentInfo {
return s.agentInfo
}
func (s *kubernetesDeployedAgent) SetInfo(info AgentInfo) {
s.agentInfo = info
}
// Logs returns the logs from the agent starting at the given time
func (s *kubernetesDeployedAgent) Logs(ctx context.Context, t time.Time) ([]byte, error) {
return nil, nil
}
var _ DeployedAgent = new(kubernetesDeployedAgent)
// NewKubernetesAgentDeployer function creates a new instance of KubernetesAgentDeployer.
func NewKubernetesAgentDeployer(opts KubernetesAgentDeployerOptions) (*KubernetesAgentDeployer, error) {
return &KubernetesAgentDeployer{
profile: opts.Profile,
stackVersion: opts.StackVersion,
policyName: opts.PolicyName,
dataStream: opts.DataStream,
runSetup: opts.RunSetup,
runTestsOnly: opts.RunTestsOnly,
runTearDown: opts.RunTearDown,
}, nil
}
// SetUp function links the kind container with elastic-package-stack network, installs Elastic-Agent and optionally
// custom YAML definitions.
func (ksd *KubernetesAgentDeployer) SetUp(ctx context.Context, agentInfo AgentInfo) (DeployedAgent, error) {
ksd.agentRunID = agentInfo.Test.RunID
err := kind.VerifyContext(ctx)
if err != nil {
return nil, fmt.Errorf("kind context verification failed: %w", err)
}
if ksd.runTearDown || ksd.runTestsOnly {
logger.Debug("Skip connect kind to Elastic stack network")
} else {
err = kind.ConnectToElasticStackNetwork(ksd.profile)
if err != nil {
return nil, fmt.Errorf("can't connect control plane to Elastic stack network: %w", err)
}
}
agentName := ksd.agentName()
if ksd.runTearDown || ksd.runTestsOnly {
logger.Debug("Skip install Elastic Agent in cluster")
} else {
err = installElasticAgentInCluster(ctx, ksd.profile, agentInfo, ksd.stackVersion, agentName)
if err != nil {
return nil, fmt.Errorf("can't install Elastic-Agent in the Kubernetes cluster: %w", err)
}
}
agentInfo.Name = kind.ControlPlaneContainerName
agentInfo.Hostname = kind.ControlPlaneContainerName
// kind-control-plane is the name of the kind host where Pod is running since we use hostNetwork setting
// to deploy Agent Pod. Because of this, hostname inside pod will be equal to the name of the k8s host.
agentInfo.Agent.Host.NamePrefix = "kind-control-plane"
return &kubernetesDeployedAgent{
agentInfo: agentInfo,
profile: ksd.profile,
stackVersion: ksd.stackVersion,
agentName: agentName,
}, nil
}
func (ksd *KubernetesAgentDeployer) agentName() string {
name := "elastic-agent"
if ksd.dataStream != "" && ksd.dataStream != "." {
name = fmt.Sprintf("%s-%s", name, strings.ReplaceAll(ksd.dataStream, "_", "-"))
}
if ksd.agentRunID != "" {
name = fmt.Sprintf("%s-%s", name, ksd.agentRunID)
}
return name
}
var _ AgentDeployer = new(KubernetesAgentDeployer)
func installElasticAgentInCluster(ctx context.Context, profile *profile.Profile, agentInfo AgentInfo, stackVersion, agentName string) error {
logger.Debug("install Elastic Agent in the Kubernetes cluster")
elasticAgentManagedYaml, err := getElasticAgentYAML(ctx, profile, agentInfo, stackVersion, agentName)
if err != nil {
return fmt.Errorf("can't retrieve Kubernetes file for Elastic Agent: %w", err)
}
err = kubectl.ApplyStdin(ctx, elasticAgentManagedYaml)
if err != nil {
return fmt.Errorf("can't install Elastic-Agent in Kubernetes cluster: %w", err)
}
// DEBUG DaemonSet is not ready: kube-system/elastic-agent. 0 out of 1 expected pods have been scheduled
return nil
}
//go:embed _static/elastic-agent-managed.yaml.tmpl
var elasticAgentManagedYamlTmpl string
func getElasticAgentYAML(ctx context.Context, profile *profile.Profile, agentInfo AgentInfo, stackVersion, agentName string) ([]byte, error) {
logger.Debugf("Prepare YAML definition for Elastic Agent running in stack v%s", stackVersion)
config, err := stack.LoadConfig(profile)
if err != nil {
return nil, fmt.Errorf("failed to load config from profile: %w", err)
}
fleetURL := "https://fleet-server:8220"
kibanaURL := "https://kibana:5601"
if config.Provider != stack.ProviderCompose {
kibanaURL = config.KibanaHost
}
if url, ok := config.Parameters[stack.ParamServerlessFleetURL]; ok {
fleetURL = url
}
if version, ok := config.Parameters[stack.ParamServerlessLocalStackVersion]; ok {
stackVersion = version
}
enrollmentToken := ""
if config.ElasticsearchAPIKey != "" {
// TODO: Review if this is the correct place to get the enrollment token.
kibanaClient, err := stack.NewKibanaClientFromProfile(profile)
if err != nil {
return nil, fmt.Errorf("failed to create kibana client: %w", err)
}
enrollmentToken, err = kibanaClient.GetEnrollmentTokenForPolicyID(ctx, agentInfo.Policy.ID)
if err != nil {
return nil, fmt.Errorf("failed to get enrollment token for policy %q: %w", agentInfo.Policy.Name, err)
}
}
appConfig, err := install.Configuration(install.OptionWithStackVersion(stackVersion))
if err != nil {
return nil, fmt.Errorf("can't read application configuration: %w", err)
}
caCert, err := readCACertBase64(profile)
if err != nil {
return nil, fmt.Errorf("can't read certificate authority file: %w", err)
}
tmpl := template.Must(template.New("elastic-agent.yml").Parse(elasticAgentManagedYamlTmpl))
var elasticAgentYaml bytes.Buffer
err = tmpl.Execute(&elasticAgentYaml, map[string]string{
"fleetURL": fleetURL,
"kibanaURL": kibanaURL,
"username": config.ElasticsearchUsername,
"password": config.ElasticsearchPassword,
"enrollmentToken": enrollmentToken,
"caCertPem": caCert,
"elasticAgentImage": appConfig.StackImageRefs().ElasticAgent,
"elasticAgentTokenPolicyName": getTokenPolicyName(stackVersion, agentInfo.Policy.Name),
"agentName": agentName,
})
if err != nil {
return nil, fmt.Errorf("can't generate elastic agent manifest: %w", err)
}
return elasticAgentYaml.Bytes(), nil
}
func readCACertBase64(profile *profile.Profile) (string, error) {
caCertPath, err := stack.FindCACertificate(profile)
if err != nil {
return "", fmt.Errorf("can't locate CA certificate: %w", err)
}
d, err := os.ReadFile(caCertPath)
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(d), nil
}
// getTokenPolicyName function returns the policy name for the >= 8.x Elastic stacks. The agent's policy
// is predefined in the Kibana configuration file. The logic is not present in older stacks and it uses
// the default policy in Kibana (empty string).
func getTokenPolicyName(stackVersion, policyName string) string {
if strings.HasPrefix(stackVersion, "7.") {
return ""
}
return policyName
}