in processor/integrationprocessor/processor.go [123:183]
func (r *integrationProcessor) startPipeline(ctx context.Context, host factoryGetter, config integrations.Config, pipelineID pipeline.ID, pipeline integrations.PipelineConfig) error {
r.logs = r.nextLogsConsumer
r.metrics = r.nextMetricsConsumer
r.traces = r.nextTracesConsumer
processors := slices.Clone(pipeline.Processors)
slices.Reverse(processors)
for i, id := range processors {
processorConfig, found := config.Processors[id]
if !found {
return fmt.Errorf("processor %q not found", id)
}
factory, ok := host.GetFactory(component.KindProcessor, id.Type()).(processor.Factory)
if !ok {
return fmt.Errorf("could not find processor factory for %q", id.Type())
}
config, err := convertComponentConfig(factory.CreateDefaultConfig, processorConfig)
if err != nil {
return fmt.Errorf("could not compose processor config for %s: %w", id.String(), err)
}
params := processor.Settings(r.params)
params.ID = component.NewIDWithName(factory.Type(), fmt.Sprintf("%s-%s-%d", r.params.ID, pipelineID, i))
params.Logger = params.Logger.With(zap.String("name", params.ID.String()))
if r.logs != nil {
logs, err := factory.CreateLogs(ctx, params, config, r.logs)
if err != nil {
return fmt.Errorf("failed to create logs processor %s: %w", params.ID, err)
}
r.logs = logs
r.components = append(r.components, logs)
}
if r.metrics != nil {
metrics, err := factory.CreateMetrics(ctx, params, config, r.metrics)
if err != nil {
return fmt.Errorf("failed to create metrics processor %s: %w", params.ID, err)
}
r.metrics = metrics
r.components = append(r.components, metrics)
}
if r.traces != nil {
traces, err := factory.CreateTraces(ctx, params, config, r.traces)
if err != nil {
return fmt.Errorf("failed to create traces processor %s: %w", params.ID, err)
}
r.traces = traces
r.components = append(r.components, traces)
}
}
for _, component := range r.components {
err := component.Start(ctx, host)
if err != nil {
return fmt.Errorf("failed to start component %q: %w", component, err)
}
}
return nil
}