func()

in internal/testrunner/runners/system/tester.go [966:1324]


func (r *tester) prepareScenario(ctx context.Context, config *testConfig, stackConfig stack.Config, svcInfo servicedeployer.ServiceInfo) (*scenarioTest, error) {
	serviceOptions := r.createServiceOptions(config.ServiceVariantName)

	var err error
	var serviceStateData ServiceState
	if r.runSetup {
		err = r.createServiceStateDir()
		if err != nil {
			return nil, fmt.Errorf("failed to create setup services dir: %w", err)
		}
	}
	scenario := scenarioTest{}

	if r.runTearDown || r.runTestsOnly {
		serviceStateData, err = readServiceStateData(r.serviceStateFilePath)
		if err != nil {
			return nil, fmt.Errorf("failed to read service setup data: %w", err)
		}
	}

	serviceOptions.DeployIndependentAgent = r.runIndependentElasticAgent

	policyTemplateName := config.PolicyTemplate
	if policyTemplateName == "" {
		policyTemplateName, err = findPolicyTemplateForInput(*r.pkgManifest, *r.dataStreamManifest, config.Input)
		if err != nil {
			return nil, fmt.Errorf("failed to determine the associated policy_template: %w", err)
		}
	}
	scenario.policyTemplateName = policyTemplateName

	policyTemplate, err := selectPolicyTemplateByName(r.pkgManifest.PolicyTemplates, scenario.policyTemplateName)
	if err != nil {
		return nil, fmt.Errorf("failed to find the selected policy_template: %w", err)
	}

	// Configure package (single data stream) via Fleet APIs.
	testTime := time.Now().Format("20060102T15:04:05Z")
	var policyToTest, policyCurrent, policyToEnroll *kibana.Policy
	if r.runTearDown || r.runTestsOnly {
		policyCurrent = &serviceStateData.CurrentPolicy
		policyToEnroll = &serviceStateData.EnrollPolicy
		logger.Debugf("Got current policy from file: %q - %q", policyCurrent.Name, policyCurrent.ID)
	} else {
		// Created a specific Agent Policy to enrolling purposes
		// There are some issues when the stack is running for some time,
		// agents cannot enroll with the default policy
		// This enroll policy must be created even if independent Elastic Agents are not used. Agents created
		// in Kubernetes or Custom Agents require this enroll policy too (service deployer).
		logger.Debug("creating enroll policy...")
		policyEnroll := kibana.Policy{
			Name:        fmt.Sprintf("ep-test-system-enroll-%s-%s-%s-%s-%s", r.testFolder.Package, r.testFolder.DataStream, r.serviceVariant, r.configFileName, testTime),
			Description: fmt.Sprintf("test policy created by elastic-package to enroll agent for data stream %s/%s", r.testFolder.Package, r.testFolder.DataStream),
			Namespace:   common.CreateTestRunID(),
		}

		policyToEnroll, err = r.kibanaClient.CreatePolicy(ctx, policyEnroll)
		if err != nil {
			return nil, fmt.Errorf("could not create test policy: %w", err)
		}
	}

	r.deleteTestPolicyHandler = func(ctx context.Context) error {
		// ensure that policyToEnroll policy gets deleted if the execution receives a signal
		// before creating the test policy
		// This handler is going to be redefined after creating the test policy
		if r.runTestsOnly {
			return nil
		}
		if err := r.kibanaClient.DeletePolicy(ctx, policyToEnroll.ID); err != nil {
			return fmt.Errorf("error cleaning up test policy: %w", err)
		}
		return nil
	}

	if r.runTearDown {
		// required to assign the policy stored in the service state file
		// so data stream related to this Agent Policy can be obtained (and deleted)
		// in the cleanTestScenarioHandler handler
		policyToTest = policyCurrent
	} else {
		// Create a specific Agent Policy just for testing this test.
		// This allows us to ensure that the Agent Policy used for testing is
		// assigned to the agent with all the required changes (e.g. Package DataStream)
		logger.Debug("creating test policy...")
		policy := kibana.Policy{
			Name:        fmt.Sprintf("ep-test-system-%s-%s-%s-%s-%s", r.testFolder.Package, r.testFolder.DataStream, r.serviceVariant, r.configFileName, testTime),
			Description: fmt.Sprintf("test policy created by elastic-package test system for data stream %s/%s", r.testFolder.Package, r.testFolder.DataStream),
			Namespace:   common.CreateTestRunID(),
		}
		// Assign the data_output_id to the agent policy to configure the output to logstash. The value is inferred from stack/_static/kibana.yml.tmpl
		// TODO: Migrate from stack.logstash_enabled to the stack config.
		if r.profile.Config("stack.logstash_enabled", "false") == "true" {
			policy.DataOutputID = "fleet-logstash-output"
		}
		if stackConfig.OutputID != "" {
			policy.DataOutputID = stackConfig.OutputID
		}
		policyToTest, err = r.kibanaClient.CreatePolicy(ctx, policy)
		if err != nil {
			return nil, fmt.Errorf("could not create test policy: %w", err)
		}
	}

	r.deleteTestPolicyHandler = func(ctx context.Context) error {
		logger.Debug("deleting test policies...")
		if err := r.kibanaClient.DeletePolicy(ctx, policyToTest.ID); err != nil {
			return fmt.Errorf("error cleaning up test policy: %w", err)
		}
		if r.runTestsOnly {
			return nil
		}
		if err := r.kibanaClient.DeletePolicy(ctx, policyToEnroll.ID); err != nil {
			return fmt.Errorf("error cleaning up test policy: %w", err)
		}
		return nil
	}

	// policyToEnroll is used in both independent agents and agents created by servicedeployer (custom or kubernetes agents)
	policy := policyToEnroll
	if r.runTearDown || r.runTestsOnly {
		// required in order to be able select the right agent in `checkEnrolledAgents` when
		// using independent agents or custom/kubernetes agents since policy data is set into `agentInfo` variable`
		policy = policyCurrent
	}

	agentDeployed, agentInfo, err := r.setupAgent(ctx, config, serviceStateData, policy)
	if err != nil {
		return nil, err
	}

	scenario.agent = agentDeployed

	if agentDeployed != nil {
		// The Elastic Agent created in `r.setupAgent` needs to be retrieved just after starting it, to ensure
		// it can be removed and unenrolled if the service fails to start.
		// This function must also be called after setting the service (r.setupService), since there are other
		// deployers like custom agents or kubernetes deployer that create new Elastic Agents too that needs to
		// be retrieved too.
		_, err := r.checkEnrolledAgents(ctx, agentInfo, svcInfo)
		if err != nil {
			return nil, fmt.Errorf("can't check enrolled agents: %w", err)
		}
	}

	service, svcInfo, err := r.setupService(ctx, config, serviceOptions, svcInfo, agentInfo, agentDeployed, policy, serviceStateData)
	if errors.Is(err, os.ErrNotExist) {
		logger.Debugf("No service deployer defined for this test")
	} else if err != nil {
		return nil, err
	}

	// Reload test config with ctx variable substitution.
	config, err = newConfig(config.Path, svcInfo, serviceOptions.Variant)
	if err != nil {
		return nil, fmt.Errorf("unable to reload system test case configuration: %w", err)
	}

	// store the time just before adding the Test Policy, this time will be used to check
	// the agent logs from that time onwards to avoid possible previous errors present in logs
	scenario.startTestTime = time.Now()

	logger.Debug("adding package data stream to test policy...")
	ds := createPackageDatastream(*policyToTest, *r.pkgManifest, policyTemplate, *r.dataStreamManifest, *config, policyToTest.Namespace)
	if r.runTearDown {
		logger.Debug("Skip adding data stream config to policy")
	} else {
		if err := r.kibanaClient.AddPackageDataStreamToPolicy(ctx, ds); err != nil {
			return nil, fmt.Errorf("could not add data stream config to policy: %w", err)
		}
	}
	scenario.kibanaDataStream = ds

	// Delete old data
	logger.Debug("deleting old data in data stream...")

	// Input packages can set `data_stream.dataset` by convention to customize the dataset.
	dataStreamDataset := ds.Inputs[0].Streams[0].DataStream.Dataset
	if r.pkgManifest.Type == "input" {
		v, _ := config.Vars.GetValue("data_stream.dataset")
		if dataset, ok := v.(string); ok && dataset != "" {
			dataStreamDataset = dataset
		}
	}
	scenario.indexTemplateName = fmt.Sprintf(
		"%s-%s",
		ds.Inputs[0].Streams[0].DataStream.Type,
		dataStreamDataset,
	)
	scenario.dataStream = fmt.Sprintf(
		"%s-%s",
		scenario.indexTemplateName,
		ds.Namespace,
	)

	r.cleanTestScenarioHandler = func(ctx context.Context) error {
		logger.Debugf("Deleting data stream for testing %s", scenario.dataStream)
		err := r.deleteDataStream(ctx, scenario.dataStream)
		if err != nil {
			return fmt.Errorf("failed to delete data stream %s: %w", scenario.dataStream, err)
		}
		return nil
	}

	// While there could be created Elastic Agents within `setupService()` (custom agents and k8s agents),
	// this "checkEnrolledAgents" call must be duplicated here after creating the service too. This will
	// ensure to get the right Enrolled Elastic Agent too.
	agent, err := r.checkEnrolledAgents(ctx, agentInfo, svcInfo)
	if err != nil {
		return nil, fmt.Errorf("can't check enrolled agents: %w", err)
	}

	// FIXME: running per stages does not work when multiple agents are created
	var origPolicy kibana.Policy
	if r.runTearDown {
		origPolicy = serviceStateData.OrigPolicy
		logger.Debugf("Got orig policy from file: %q - %q", origPolicy.Name, origPolicy.ID)
	} else {
		// Store previous agent policy assigned to the agent
		origPolicy = kibana.Policy{
			ID:       agent.PolicyID,
			Revision: agent.PolicyRevision,
		}
	}

	r.resetAgentPolicyHandler = func(ctx context.Context) error {
		if r.runSetup {
			// it should be kept the same policy just when system tests are
			// triggered with the flags for running spolicyToAssignDatastreamTestsetup stage (--setup)
			return nil
		}

		// RunTestOnly step (--no-provision) should also reassign back the previous (original) policy
		// even with with independent Elastic Agents, since this step creates a new test policy each execution
		// Moreover, ensure there is no agent service deployer (deprecated) being used
		if scenario.agent != nil && r.runIndependentElasticAgent && !r.runTestsOnly {
			return nil
		}

		logger.Debug("reassigning original policy back to agent...")
		if err := r.kibanaClient.AssignPolicyToAgent(ctx, *agent, origPolicy); err != nil {
			return fmt.Errorf("error reassigning original policy to agent: %w", err)
		}
		return nil
	}

	origAgent := agent
	origLogLevel := ""
	if r.runTearDown {
		logger.Debug("Skip assiging log level debug to agent")
		origLogLevel = serviceStateData.Agent.LocalMetadata.Elastic.Agent.LogLevel
	} else {
		logger.Debug("Set Debug log level to agent")
		origLogLevel = agent.LocalMetadata.Elastic.Agent.LogLevel
		err = r.kibanaClient.SetAgentLogLevel(ctx, agent.ID, "debug")
		if err != nil {
			return nil, fmt.Errorf("error setting log level debug for agent %s: %w", agent.ID, err)
		}
	}
	r.resetAgentLogLevelHandler = func(ctx context.Context) error {
		if r.runTestsOnly || r.runSetup {
			return nil
		}

		// No need to reset agent log level when running independent Elastic Agents
		// since the Elastic Agent is going to be removed/uninstalled
		// Morevoer, ensure there is no agent service deployer (deprecated) being used
		if scenario.agent != nil && r.runIndependentElasticAgent {
			return nil
		}

		logger.Debugf("reassigning original log level %q back to agent...", origLogLevel)

		if err := r.kibanaClient.SetAgentLogLevel(ctx, agent.ID, origLogLevel); err != nil {
			return fmt.Errorf("error reassigning original log level to agent: %w", err)
		}
		return nil
	}

	if r.runTearDown {
		logger.Debug("Skip assigning package data stream to agent")
	} else {
		policyWithDataStream, err := r.kibanaClient.GetPolicy(ctx, policyToTest.ID)
		if err != nil {
			return nil, fmt.Errorf("could not read the policy with data stream: %w", err)
		}

		logger.Debug("assigning package data stream to agent...")
		if err := r.kibanaClient.AssignPolicyToAgent(ctx, *agent, *policyWithDataStream); err != nil {
			return nil, fmt.Errorf("could not assign policy to agent: %w", err)
		}
	}

	// Signal to the service that the agent is ready (policy is assigned).
	if service != nil && config.ServiceNotifySignal != "" {
		if err = service.Signal(ctx, config.ServiceNotifySignal); err != nil {
			return nil, fmt.Errorf("failed to notify test service: %w", err)
		}
	}

	if r.runTearDown {
		return &scenario, nil
	}

	hits, waitErr := r.waitForDocs(ctx, config, scenario.dataStream)

	// before checking "waitErr" error , it is necessary to check if the service has finished with error
	// to report it as a test case failed
	if service != nil && config.Service != "" && !config.IgnoreServiceError {
		exited, code, err := service.ExitCode(ctx, config.Service)
		if err != nil && !errors.Is(err, servicedeployer.ErrNotSupported) {
			return nil, err
		}
		if exited && code > 0 {
			return nil, testrunner.ErrTestCaseFailed{Reason: fmt.Sprintf("the test service %s unexpectedly exited with code %d", config.Service, code)}
		}
	}

	if waitErr != nil {
		return nil, waitErr
	}

	// Get deprecation warnings after ensuring that there are ingested docs and thus the
	// data stream exists.
	scenario.deprecationWarnings, err = r.getDeprecationWarnings(ctx, scenario.dataStream)
	if err != nil {
		return nil, fmt.Errorf("failed to get deprecation warnings for data stream %s: %w", scenario.dataStream, err)
	}
	logger.Debugf("Found %d deprecation warnings for data stream %s", len(scenario.deprecationWarnings), scenario.dataStream)

	logger.Debugf("Check whether or not synthetic source mode is enabled (data stream %s)...", scenario.dataStream)
	scenario.syntheticEnabled, err = isSyntheticSourceModeEnabled(ctx, r.esAPI, scenario.dataStream)
	if err != nil {
		return nil, fmt.Errorf("failed to check if synthetic source mode is enabled for data stream %s: %w", scenario.dataStream, err)
	}
	logger.Debugf("Data stream %s has synthetic source mode enabled: %t", scenario.dataStream, scenario.syntheticEnabled)

	scenario.docs = hits.getDocs(scenario.syntheticEnabled)
	scenario.ignoredFields = hits.IgnoredFields
	scenario.degradedDocs = hits.DegradedDocs

	if r.runSetup {
		opts := scenarioStateOpts{
			origPolicy:    &origPolicy,
			enrollPolicy:  policyToEnroll,
			currentPolicy: policyToTest,
			config:        config,
			agent:         *origAgent,
			agentInfo:     agentInfo,
			svcInfo:       svcInfo,
		}
		err = writeScenarioState(opts, r.serviceStateFilePath)
		if err != nil {
			return nil, err
		}
	}

	return &scenario, nil
}