func()

in internal/testrunner/runners/pipeline/tester.go [233:308]


func (r *tester) checkElasticsearchLogs(ctx context.Context, startTesting time.Time) ([]testrunner.TestResult, error) {
	startTime := time.Now()

	testingTime := startTesting.Truncate(time.Second)

	statusOptions := stack.Options{
		Profile: r.profile,
	}
	_, err := r.provider.Status(ctx, statusOptions)
	if err != nil {
		logger.Debugf("Not checking Elasticsearch logs: %s", err)
		return nil, nil
	}

	dumpOptions := stack.DumpOptions{
		Profile:  r.profile,
		Services: []string{"elasticsearch"},
		Since:    testingTime,
	}
	dump, err := r.provider.Dump(ctx, dumpOptions)
	var notImplementedErr *stack.ErrNotImplemented
	if errors.As(err, &notImplementedErr) || errors.Is(err, stack.ErrUnavailableStack) {
		logger.Debugf("Not checking Elasticsearch logs: %s", err)
		return nil, nil
	}
	if err != nil {
		return nil, fmt.Errorf("error at getting the logs of elasticsearch: %w", err)
	}

	if len(dump) != 1 || dump[0].ServiceName != "elasticsearch" {
		return nil, errors.New("expected elasticsearch logs in dump")
	}
	elasticsearchLogs := dump[0].Logs

	seenWarnings := make(map[string]any)
	var processorRelatedWarnings []string
	err = stack.ParseLogsFromReader(bytes.NewReader(elasticsearchLogs), stack.ParseLogsOptions{
		StartTime: testingTime,
	}, func(log stack.LogLine) error {
		if log.LogLevel != "WARN" {
			return nil
		}

		if _, exists := seenWarnings[log.Message]; exists {
			return nil
		}

		seenWarnings[log.Message] = struct{}{}
		logger.Warnf("elasticsearch warning: %s", log.Message)

		// trying to catch warnings only related to processors but this is best-effort
		if strings.Contains(strings.ToLower(log.Logger), "processor") {
			processorRelatedWarnings = append(processorRelatedWarnings, log.Message)
		}

		return nil
	})
	if err != nil {
		return nil, fmt.Errorf("error at parsing logs of elasticseach: %w", err)
	}

	tr := testrunner.TestResult{
		TestType:    TestType,
		Name:        fmt.Sprintf("(ingest pipeline warnings %s)", r.testCaseFile),
		Package:     r.testFolder.Package,
		DataStream:  r.testFolder.DataStream,
		TimeElapsed: time.Since(startTime),
	}

	if totalProcessorWarnings := len(processorRelatedWarnings); totalProcessorWarnings > 0 {
		tr.FailureMsg = fmt.Sprintf("detected ingest pipeline warnings: %d", totalProcessorWarnings)
		tr.FailureDetails = strings.Join(processorRelatedWarnings, "\n")
	}

	return []testrunner.TestResult{tr}, nil
}