in internal/processorexecutor.go [112:147]
func (p *processorExecutor[C]) ExecuteLogStatements(yamlConfig, input string) ([]byte, error) {
config, err := p.parseConfig(yamlConfig)
if err != nil {
return nil, err
}
transformedLogs := plog.NewLogs()
logsConsumer, _ := consumer.NewLogs(func(_ context.Context, ld plog.Logs) error {
transformedLogs = ld
return nil
})
logsProcessor, err := p.factory.CreateLogs(context.Background(), p.settings, config, logsConsumer)
if err != nil {
return nil, err
}
logsUnmarshaler := &plog.JSONUnmarshaler{}
inputLogs, err := logsUnmarshaler.UnmarshalLogs([]byte(input))
if err != nil {
return nil, err
}
err = logsProcessor.ConsumeLogs(context.Background(), inputLogs)
if err != nil {
return nil, err
}
logsMarshaler := plog.JSONMarshaler{}
json, err := logsMarshaler.MarshalLogs(transformedLogs)
if err != nil {
return nil, err
}
return json, nil
}