func()

in processor/integrationprocessor/processor.go [123:183]


func (r *integrationProcessor) startPipeline(ctx context.Context, host factoryGetter, config integrations.Config, pipelineID pipeline.ID, pipeline integrations.PipelineConfig) error {
	r.logs = r.nextLogsConsumer
	r.metrics = r.nextMetricsConsumer
	r.traces = r.nextTracesConsumer

	processors := slices.Clone(pipeline.Processors)
	slices.Reverse(processors)
	for i, id := range processors {
		processorConfig, found := config.Processors[id]
		if !found {
			return fmt.Errorf("processor %q not found", id)
		}

		factory, ok := host.GetFactory(component.KindProcessor, id.Type()).(processor.Factory)
		if !ok {
			return fmt.Errorf("could not find processor factory for %q", id.Type())
		}

		config, err := convertComponentConfig(factory.CreateDefaultConfig, processorConfig)
		if err != nil {
			return fmt.Errorf("could not compose processor config for %s: %w", id.String(), err)
		}

		params := processor.Settings(r.params)
		params.ID = component.NewIDWithName(factory.Type(), fmt.Sprintf("%s-%s-%d", r.params.ID, pipelineID, i))
		params.Logger = params.Logger.With(zap.String("name", params.ID.String()))
		if r.logs != nil {
			logs, err := factory.CreateLogs(ctx, params, config, r.logs)
			if err != nil {
				return fmt.Errorf("failed to create logs processor %s: %w", params.ID, err)
			}
			r.logs = logs
			r.components = append(r.components, logs)
		}
		if r.metrics != nil {
			metrics, err := factory.CreateMetrics(ctx, params, config, r.metrics)
			if err != nil {
				return fmt.Errorf("failed to create metrics processor %s: %w", params.ID, err)
			}
			r.metrics = metrics
			r.components = append(r.components, metrics)
		}
		if r.traces != nil {
			traces, err := factory.CreateTraces(ctx, params, config, r.traces)
			if err != nil {
				return fmt.Errorf("failed to create traces processor %s: %w", params.ID, err)
			}
			r.traces = traces
			r.components = append(r.components, traces)
		}
	}

	for _, component := range r.components {
		err := component.Start(ctx, host)
		if err != nil {
			return fmt.Errorf("failed to start component %q: %w", component, err)
		}
	}

	return nil
}