in internal/processorexecutor.go [186:221]
func (p *processorExecutor[C]) ExecuteMetricStatements(yamlConfig, input string) ([]byte, error) {
config, err := p.parseConfig(yamlConfig)
if err != nil {
return nil, err
}
transformedMetrics := pmetric.NewMetrics()
metricsConsumer, _ := consumer.NewMetrics(func(_ context.Context, ld pmetric.Metrics) error {
transformedMetrics = ld
return nil
})
metricsProcessor, err := p.factory.CreateMetrics(context.Background(), p.settings, config, metricsConsumer)
if err != nil {
return nil, err
}
tracesUnmarshaler := &pmetric.JSONUnmarshaler{}
inputMetrics, err := tracesUnmarshaler.UnmarshalMetrics([]byte(input))
if err != nil {
return nil, err
}
err = metricsProcessor.ConsumeMetrics(context.Background(), inputMetrics)
if err != nil {
return nil, err
}
metricsMarshaler := pmetric.JSONMarshaler{}
json, err := metricsMarshaler.MarshalMetrics(transformedMetrics)
if err != nil {
return nil, err
}
return json, nil
}