internal/processorexecutor.go (172 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch B.V. licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package internal import ( "context" "strings" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) type processorExecutor[T any] struct { factory processor.Factory settings processor.Settings telemetrySettings component.TelemetrySettings observedLogs *ObservedLogs } func newProcessorExecutor[C any](factory processor.Factory) *processorExecutor[C] { observedLogger, observedLogs := NewLogObserver(zap.DebugLevel, zap.NewDevelopmentEncoderConfig()) logger, _ := zap.NewDevelopmentConfig().Build(zap.WrapCore(func(z zapcore.Core) zapcore.Core { return observedLogger })) telemetrySettings := componenttest.NewNopTelemetrySettings() telemetrySettings.Logger = logger settings := processor.Settings{ ID: component.MustNewIDWithName(factory.Type().String(), "ottl_playground"), TelemetrySettings: telemetrySettings, } return &processorExecutor[C]{ factory: factory, telemetrySettings: telemetrySettings, settings: settings, observedLogs: observedLogs, } } func (p *processorExecutor[C]) parseConfig(yamlConfig string) (*C, error) { deserializedYaml, err := confmap.NewRetrievedFromYAML([]byte(yamlConfig)) if err != nil { return nil, err } deserializedConf, err := deserializedYaml.AsConf() if err != nil { return nil, err } configMap := make(map[string]any) for k, v := range deserializedConf.ToStringMap() { configMap[k] = escapeDollarSigns(v) } defaultConfig := p.factory.CreateDefaultConfig().(*C) err = confmap.NewFromStringMap(configMap).Unmarshal(&defaultConfig) if err != nil { return nil, err } return defaultConfig, nil } func escapeDollarSigns(val any) any { switch v := val.(type) { case string: return strings.ReplaceAll(v, "$$", "$") case []any: escapedVals := make([]any, len(v)) for i, x := range v { escapedVals[i] = escapeDollarSigns(x) } return escapedVals case map[string]any: escapedMap := make(map[string]any, len(v)) for k, x := range v { escapedMap[k] = escapeDollarSigns(x) } return escapedMap default: return val } } func (p *processorExecutor[C]) ExecuteLogStatements(yamlConfig, input string) ([]byte, error) { config, err := p.parseConfig(yamlConfig) if err != nil { return nil, err } transformedLogs := plog.NewLogs() logsConsumer, _ := consumer.NewLogs(func(_ context.Context, ld plog.Logs) error { transformedLogs = ld return nil }) logsProcessor, err := p.factory.CreateLogs(context.Background(), p.settings, config, logsConsumer) if err != nil { return nil, err } logsUnmarshaler := &plog.JSONUnmarshaler{} inputLogs, err := logsUnmarshaler.UnmarshalLogs([]byte(input)) if err != nil { return nil, err } err = logsProcessor.ConsumeLogs(context.Background(), inputLogs) if err != nil { return nil, err } logsMarshaler := plog.JSONMarshaler{} json, err := logsMarshaler.MarshalLogs(transformedLogs) if err != nil { return nil, err } return json, nil } func (p *processorExecutor[C]) ExecuteTraceStatements(yamlConfig, input string) ([]byte, error) { config, err := p.parseConfig(yamlConfig) if err != nil { return nil, err } transformedTraces := ptrace.NewTraces() tracesConsumer, _ := consumer.NewTraces(func(_ context.Context, ld ptrace.Traces) error { transformedTraces = ld return nil }) tracesProcessor, err := p.factory.CreateTraces(context.Background(), p.settings, config, tracesConsumer) if err != nil { return nil, err } tracesUnmarshaler := &ptrace.JSONUnmarshaler{} inputTraces, err := tracesUnmarshaler.UnmarshalTraces([]byte(input)) if err != nil { return nil, err } err = tracesProcessor.ConsumeTraces(context.Background(), inputTraces) if err != nil { return nil, err } tracesMarshaler := ptrace.JSONMarshaler{} json, err := tracesMarshaler.MarshalTraces(transformedTraces) if err != nil { return nil, err } return json, nil } func (p *processorExecutor[C]) ExecuteMetricStatements(yamlConfig, input string) ([]byte, error) { config, err := p.parseConfig(yamlConfig) if err != nil { return nil, err } transformedMetrics := pmetric.NewMetrics() metricsConsumer, _ := consumer.NewMetrics(func(_ context.Context, ld pmetric.Metrics) error { transformedMetrics = ld return nil }) metricsProcessor, err := p.factory.CreateMetrics(context.Background(), p.settings, config, metricsConsumer) if err != nil { return nil, err } tracesUnmarshaler := &pmetric.JSONUnmarshaler{} inputMetrics, err := tracesUnmarshaler.UnmarshalMetrics([]byte(input)) if err != nil { return nil, err } err = metricsProcessor.ConsumeMetrics(context.Background(), inputMetrics) if err != nil { return nil, err } metricsMarshaler := pmetric.JSONMarshaler{} json, err := metricsMarshaler.MarshalMetrics(transformedMetrics) if err != nil { return nil, err } return json, nil } func (p *processorExecutor[C]) ObservedLogs() *ObservedLogs { return p.observedLogs }