in internal/testrunner/runners/pipeline/tester.go [233:308]
func (r *tester) checkElasticsearchLogs(ctx context.Context, startTesting time.Time) ([]testrunner.TestResult, error) {
startTime := time.Now()
testingTime := startTesting.Truncate(time.Second)
statusOptions := stack.Options{
Profile: r.profile,
}
_, err := r.provider.Status(ctx, statusOptions)
if err != nil {
logger.Debugf("Not checking Elasticsearch logs: %s", err)
return nil, nil
}
dumpOptions := stack.DumpOptions{
Profile: r.profile,
Services: []string{"elasticsearch"},
Since: testingTime,
}
dump, err := r.provider.Dump(ctx, dumpOptions)
var notImplementedErr *stack.ErrNotImplemented
if errors.As(err, ¬ImplementedErr) || errors.Is(err, stack.ErrUnavailableStack) {
logger.Debugf("Not checking Elasticsearch logs: %s", err)
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("error at getting the logs of elasticsearch: %w", err)
}
if len(dump) != 1 || dump[0].ServiceName != "elasticsearch" {
return nil, errors.New("expected elasticsearch logs in dump")
}
elasticsearchLogs := dump[0].Logs
seenWarnings := make(map[string]any)
var processorRelatedWarnings []string
err = stack.ParseLogsFromReader(bytes.NewReader(elasticsearchLogs), stack.ParseLogsOptions{
StartTime: testingTime,
}, func(log stack.LogLine) error {
if log.LogLevel != "WARN" {
return nil
}
if _, exists := seenWarnings[log.Message]; exists {
return nil
}
seenWarnings[log.Message] = struct{}{}
logger.Warnf("elasticsearch warning: %s", log.Message)
// trying to catch warnings only related to processors but this is best-effort
if strings.Contains(strings.ToLower(log.Logger), "processor") {
processorRelatedWarnings = append(processorRelatedWarnings, log.Message)
}
return nil
})
if err != nil {
return nil, fmt.Errorf("error at parsing logs of elasticseach: %w", err)
}
tr := testrunner.TestResult{
TestType: TestType,
Name: fmt.Sprintf("(ingest pipeline warnings %s)", r.testCaseFile),
Package: r.testFolder.Package,
DataStream: r.testFolder.DataStream,
TimeElapsed: time.Since(startTime),
}
if totalProcessorWarnings := len(processorRelatedWarnings); totalProcessorWarnings > 0 {
tr.FailureMsg = fmt.Sprintf("detected ingest pipeline warnings: %d", totalProcessorWarnings)
tr.FailureDetails = strings.Join(processorRelatedWarnings, "\n")
}
return []testrunner.TestResult{tr}, nil
}