in internal/testrunner/runners/system/tester.go [966:1324]
func (r *tester) prepareScenario(ctx context.Context, config *testConfig, stackConfig stack.Config, svcInfo servicedeployer.ServiceInfo) (*scenarioTest, error) {
serviceOptions := r.createServiceOptions(config.ServiceVariantName)
var err error
var serviceStateData ServiceState
if r.runSetup {
err = r.createServiceStateDir()
if err != nil {
return nil, fmt.Errorf("failed to create setup services dir: %w", err)
}
}
scenario := scenarioTest{}
if r.runTearDown || r.runTestsOnly {
serviceStateData, err = readServiceStateData(r.serviceStateFilePath)
if err != nil {
return nil, fmt.Errorf("failed to read service setup data: %w", err)
}
}
serviceOptions.DeployIndependentAgent = r.runIndependentElasticAgent
policyTemplateName := config.PolicyTemplate
if policyTemplateName == "" {
policyTemplateName, err = findPolicyTemplateForInput(*r.pkgManifest, *r.dataStreamManifest, config.Input)
if err != nil {
return nil, fmt.Errorf("failed to determine the associated policy_template: %w", err)
}
}
scenario.policyTemplateName = policyTemplateName
policyTemplate, err := selectPolicyTemplateByName(r.pkgManifest.PolicyTemplates, scenario.policyTemplateName)
if err != nil {
return nil, fmt.Errorf("failed to find the selected policy_template: %w", err)
}
// Configure package (single data stream) via Fleet APIs.
testTime := time.Now().Format("20060102T15:04:05Z")
var policyToTest, policyCurrent, policyToEnroll *kibana.Policy
if r.runTearDown || r.runTestsOnly {
policyCurrent = &serviceStateData.CurrentPolicy
policyToEnroll = &serviceStateData.EnrollPolicy
logger.Debugf("Got current policy from file: %q - %q", policyCurrent.Name, policyCurrent.ID)
} else {
// Created a specific Agent Policy to enrolling purposes
// There are some issues when the stack is running for some time,
// agents cannot enroll with the default policy
// This enroll policy must be created even if independent Elastic Agents are not used. Agents created
// in Kubernetes or Custom Agents require this enroll policy too (service deployer).
logger.Debug("creating enroll policy...")
policyEnroll := kibana.Policy{
Name: fmt.Sprintf("ep-test-system-enroll-%s-%s-%s-%s-%s", r.testFolder.Package, r.testFolder.DataStream, r.serviceVariant, r.configFileName, testTime),
Description: fmt.Sprintf("test policy created by elastic-package to enroll agent for data stream %s/%s", r.testFolder.Package, r.testFolder.DataStream),
Namespace: common.CreateTestRunID(),
}
policyToEnroll, err = r.kibanaClient.CreatePolicy(ctx, policyEnroll)
if err != nil {
return nil, fmt.Errorf("could not create test policy: %w", err)
}
}
r.deleteTestPolicyHandler = func(ctx context.Context) error {
// ensure that policyToEnroll policy gets deleted if the execution receives a signal
// before creating the test policy
// This handler is going to be redefined after creating the test policy
if r.runTestsOnly {
return nil
}
if err := r.kibanaClient.DeletePolicy(ctx, policyToEnroll.ID); err != nil {
return fmt.Errorf("error cleaning up test policy: %w", err)
}
return nil
}
if r.runTearDown {
// required to assign the policy stored in the service state file
// so data stream related to this Agent Policy can be obtained (and deleted)
// in the cleanTestScenarioHandler handler
policyToTest = policyCurrent
} else {
// Create a specific Agent Policy just for testing this test.
// This allows us to ensure that the Agent Policy used for testing is
// assigned to the agent with all the required changes (e.g. Package DataStream)
logger.Debug("creating test policy...")
policy := kibana.Policy{
Name: fmt.Sprintf("ep-test-system-%s-%s-%s-%s-%s", r.testFolder.Package, r.testFolder.DataStream, r.serviceVariant, r.configFileName, testTime),
Description: fmt.Sprintf("test policy created by elastic-package test system for data stream %s/%s", r.testFolder.Package, r.testFolder.DataStream),
Namespace: common.CreateTestRunID(),
}
// Assign the data_output_id to the agent policy to configure the output to logstash. The value is inferred from stack/_static/kibana.yml.tmpl
// TODO: Migrate from stack.logstash_enabled to the stack config.
if r.profile.Config("stack.logstash_enabled", "false") == "true" {
policy.DataOutputID = "fleet-logstash-output"
}
if stackConfig.OutputID != "" {
policy.DataOutputID = stackConfig.OutputID
}
policyToTest, err = r.kibanaClient.CreatePolicy(ctx, policy)
if err != nil {
return nil, fmt.Errorf("could not create test policy: %w", err)
}
}
r.deleteTestPolicyHandler = func(ctx context.Context) error {
logger.Debug("deleting test policies...")
if err := r.kibanaClient.DeletePolicy(ctx, policyToTest.ID); err != nil {
return fmt.Errorf("error cleaning up test policy: %w", err)
}
if r.runTestsOnly {
return nil
}
if err := r.kibanaClient.DeletePolicy(ctx, policyToEnroll.ID); err != nil {
return fmt.Errorf("error cleaning up test policy: %w", err)
}
return nil
}
// policyToEnroll is used in both independent agents and agents created by servicedeployer (custom or kubernetes agents)
policy := policyToEnroll
if r.runTearDown || r.runTestsOnly {
// required in order to be able select the right agent in `checkEnrolledAgents` when
// using independent agents or custom/kubernetes agents since policy data is set into `agentInfo` variable`
policy = policyCurrent
}
agentDeployed, agentInfo, err := r.setupAgent(ctx, config, serviceStateData, policy)
if err != nil {
return nil, err
}
scenario.agent = agentDeployed
if agentDeployed != nil {
// The Elastic Agent created in `r.setupAgent` needs to be retrieved just after starting it, to ensure
// it can be removed and unenrolled if the service fails to start.
// This function must also be called after setting the service (r.setupService), since there are other
// deployers like custom agents or kubernetes deployer that create new Elastic Agents too that needs to
// be retrieved too.
_, err := r.checkEnrolledAgents(ctx, agentInfo, svcInfo)
if err != nil {
return nil, fmt.Errorf("can't check enrolled agents: %w", err)
}
}
service, svcInfo, err := r.setupService(ctx, config, serviceOptions, svcInfo, agentInfo, agentDeployed, policy, serviceStateData)
if errors.Is(err, os.ErrNotExist) {
logger.Debugf("No service deployer defined for this test")
} else if err != nil {
return nil, err
}
// Reload test config with ctx variable substitution.
config, err = newConfig(config.Path, svcInfo, serviceOptions.Variant)
if err != nil {
return nil, fmt.Errorf("unable to reload system test case configuration: %w", err)
}
// store the time just before adding the Test Policy, this time will be used to check
// the agent logs from that time onwards to avoid possible previous errors present in logs
scenario.startTestTime = time.Now()
logger.Debug("adding package data stream to test policy...")
ds := createPackageDatastream(*policyToTest, *r.pkgManifest, policyTemplate, *r.dataStreamManifest, *config, policyToTest.Namespace)
if r.runTearDown {
logger.Debug("Skip adding data stream config to policy")
} else {
if err := r.kibanaClient.AddPackageDataStreamToPolicy(ctx, ds); err != nil {
return nil, fmt.Errorf("could not add data stream config to policy: %w", err)
}
}
scenario.kibanaDataStream = ds
// Delete old data
logger.Debug("deleting old data in data stream...")
// Input packages can set `data_stream.dataset` by convention to customize the dataset.
dataStreamDataset := ds.Inputs[0].Streams[0].DataStream.Dataset
if r.pkgManifest.Type == "input" {
v, _ := config.Vars.GetValue("data_stream.dataset")
if dataset, ok := v.(string); ok && dataset != "" {
dataStreamDataset = dataset
}
}
scenario.indexTemplateName = fmt.Sprintf(
"%s-%s",
ds.Inputs[0].Streams[0].DataStream.Type,
dataStreamDataset,
)
scenario.dataStream = fmt.Sprintf(
"%s-%s",
scenario.indexTemplateName,
ds.Namespace,
)
r.cleanTestScenarioHandler = func(ctx context.Context) error {
logger.Debugf("Deleting data stream for testing %s", scenario.dataStream)
err := r.deleteDataStream(ctx, scenario.dataStream)
if err != nil {
return fmt.Errorf("failed to delete data stream %s: %w", scenario.dataStream, err)
}
return nil
}
// While there could be created Elastic Agents within `setupService()` (custom agents and k8s agents),
// this "checkEnrolledAgents" call must be duplicated here after creating the service too. This will
// ensure to get the right Enrolled Elastic Agent too.
agent, err := r.checkEnrolledAgents(ctx, agentInfo, svcInfo)
if err != nil {
return nil, fmt.Errorf("can't check enrolled agents: %w", err)
}
// FIXME: running per stages does not work when multiple agents are created
var origPolicy kibana.Policy
if r.runTearDown {
origPolicy = serviceStateData.OrigPolicy
logger.Debugf("Got orig policy from file: %q - %q", origPolicy.Name, origPolicy.ID)
} else {
// Store previous agent policy assigned to the agent
origPolicy = kibana.Policy{
ID: agent.PolicyID,
Revision: agent.PolicyRevision,
}
}
r.resetAgentPolicyHandler = func(ctx context.Context) error {
if r.runSetup {
// it should be kept the same policy just when system tests are
// triggered with the flags for running spolicyToAssignDatastreamTestsetup stage (--setup)
return nil
}
// RunTestOnly step (--no-provision) should also reassign back the previous (original) policy
// even with with independent Elastic Agents, since this step creates a new test policy each execution
// Moreover, ensure there is no agent service deployer (deprecated) being used
if scenario.agent != nil && r.runIndependentElasticAgent && !r.runTestsOnly {
return nil
}
logger.Debug("reassigning original policy back to agent...")
if err := r.kibanaClient.AssignPolicyToAgent(ctx, *agent, origPolicy); err != nil {
return fmt.Errorf("error reassigning original policy to agent: %w", err)
}
return nil
}
origAgent := agent
origLogLevel := ""
if r.runTearDown {
logger.Debug("Skip assiging log level debug to agent")
origLogLevel = serviceStateData.Agent.LocalMetadata.Elastic.Agent.LogLevel
} else {
logger.Debug("Set Debug log level to agent")
origLogLevel = agent.LocalMetadata.Elastic.Agent.LogLevel
err = r.kibanaClient.SetAgentLogLevel(ctx, agent.ID, "debug")
if err != nil {
return nil, fmt.Errorf("error setting log level debug for agent %s: %w", agent.ID, err)
}
}
r.resetAgentLogLevelHandler = func(ctx context.Context) error {
if r.runTestsOnly || r.runSetup {
return nil
}
// No need to reset agent log level when running independent Elastic Agents
// since the Elastic Agent is going to be removed/uninstalled
// Morevoer, ensure there is no agent service deployer (deprecated) being used
if scenario.agent != nil && r.runIndependentElasticAgent {
return nil
}
logger.Debugf("reassigning original log level %q back to agent...", origLogLevel)
if err := r.kibanaClient.SetAgentLogLevel(ctx, agent.ID, origLogLevel); err != nil {
return fmt.Errorf("error reassigning original log level to agent: %w", err)
}
return nil
}
if r.runTearDown {
logger.Debug("Skip assigning package data stream to agent")
} else {
policyWithDataStream, err := r.kibanaClient.GetPolicy(ctx, policyToTest.ID)
if err != nil {
return nil, fmt.Errorf("could not read the policy with data stream: %w", err)
}
logger.Debug("assigning package data stream to agent...")
if err := r.kibanaClient.AssignPolicyToAgent(ctx, *agent, *policyWithDataStream); err != nil {
return nil, fmt.Errorf("could not assign policy to agent: %w", err)
}
}
// Signal to the service that the agent is ready (policy is assigned).
if service != nil && config.ServiceNotifySignal != "" {
if err = service.Signal(ctx, config.ServiceNotifySignal); err != nil {
return nil, fmt.Errorf("failed to notify test service: %w", err)
}
}
if r.runTearDown {
return &scenario, nil
}
hits, waitErr := r.waitForDocs(ctx, config, scenario.dataStream)
// before checking "waitErr" error , it is necessary to check if the service has finished with error
// to report it as a test case failed
if service != nil && config.Service != "" && !config.IgnoreServiceError {
exited, code, err := service.ExitCode(ctx, config.Service)
if err != nil && !errors.Is(err, servicedeployer.ErrNotSupported) {
return nil, err
}
if exited && code > 0 {
return nil, testrunner.ErrTestCaseFailed{Reason: fmt.Sprintf("the test service %s unexpectedly exited with code %d", config.Service, code)}
}
}
if waitErr != nil {
return nil, waitErr
}
// Get deprecation warnings after ensuring that there are ingested docs and thus the
// data stream exists.
scenario.deprecationWarnings, err = r.getDeprecationWarnings(ctx, scenario.dataStream)
if err != nil {
return nil, fmt.Errorf("failed to get deprecation warnings for data stream %s: %w", scenario.dataStream, err)
}
logger.Debugf("Found %d deprecation warnings for data stream %s", len(scenario.deprecationWarnings), scenario.dataStream)
logger.Debugf("Check whether or not synthetic source mode is enabled (data stream %s)...", scenario.dataStream)
scenario.syntheticEnabled, err = isSyntheticSourceModeEnabled(ctx, r.esAPI, scenario.dataStream)
if err != nil {
return nil, fmt.Errorf("failed to check if synthetic source mode is enabled for data stream %s: %w", scenario.dataStream, err)
}
logger.Debugf("Data stream %s has synthetic source mode enabled: %t", scenario.dataStream, scenario.syntheticEnabled)
scenario.docs = hits.getDocs(scenario.syntheticEnabled)
scenario.ignoredFields = hits.IgnoredFields
scenario.degradedDocs = hits.DegradedDocs
if r.runSetup {
opts := scenarioStateOpts{
origPolicy: &origPolicy,
enrollPolicy: policyToEnroll,
currentPolicy: policyToTest,
config: config,
agent: *origAgent,
agentInfo: agentInfo,
svcInfo: svcInfo,
}
err = writeScenarioState(opts, r.serviceStateFilePath)
if err != nil {
return nil, err
}
}
return &scenario, nil
}