func()

in receiver/integrationreceiver/receiver.go [134:289]


func (r *integrationReceiver) startPipeline(ctx context.Context, host factoryGetter, config integrations.Config, pipelineID pipeline.ID, pipeline integrations.PipelineConfig) error {
	consumerChain := struct {
		logs    consumer.Logs
		metrics consumer.Metrics
		traces  consumer.Traces
	}{
		logs:    r.nextLogsConsumer,
		metrics: r.nextMetricsConsumer,
		traces:  r.nextTracesConsumer,
	}

	if pipeline.Receiver == nil {
		return errors.New("no receiver in pipeline configuration")
	}

	receiverConfig, found := config.Receivers[*pipeline.Receiver]
	if !found {
		return fmt.Errorf("receiver %q not found", pipeline.Receiver)
	}

	receiverFactory, ok := host.GetFactory(component.KindReceiver, pipeline.Receiver.Type()).(receiver.Factory)
	if !ok {
		return fmt.Errorf("could not find receiver factory for %q", pipeline.Receiver.Type())
	}

	preparedConfig, err := convertComponentConfig(receiverFactory.CreateDefaultConfig, receiverConfig)
	if err != nil {
		return fmt.Errorf("could not compose receiver config for %s: %w", pipeline.Receiver.String(), err)
	}

	var components []component.Component
	processors := slices.Clone(pipeline.Processors)
	slices.Reverse(processors)
	for i, id := range processors {
		processorConfig, found := config.Processors[id]
		if !found {
			return fmt.Errorf("processor %q not found", id)
		}

		factory, ok := host.GetFactory(component.KindProcessor, id.Type()).(processor.Factory)
		if !ok {
			return fmt.Errorf("could not find processor factory for %q", id.Type())
		}

		config, err := convertComponentConfig(factory.CreateDefaultConfig, processorConfig)
		if err != nil {
			return fmt.Errorf("could not compose processor config for %s: %w", id.String(), err)
		}

		params := processor.Settings{
			TelemetrySettings: r.params.TelemetrySettings,
			BuildInfo:         r.params.BuildInfo,
		}
		params.ID = component.NewIDWithName(factory.Type(), fmt.Sprintf("%s-%s-%d", r.params.ID, pipelineID, i))
		params.Logger = params.Logger.With(zap.String("name", params.ID.String()))
		if consumerChain.logs != nil {
			logs, err := factory.CreateLogs(ctx, params, config, consumerChain.logs)
			if err != nil {
				return fmt.Errorf("failed to create logs processor %s: %w", params.ID, err)
			}
			consumerChain.logs = logs
			components = append(components, logs)
		}
		if consumerChain.metrics != nil {
			metrics, err := factory.CreateMetrics(ctx, params, config, consumerChain.metrics)
			if err != nil {
				return fmt.Errorf("failed to create metrics processor %s: %w", params.ID, err)
			}
			consumerChain.metrics = metrics
			components = append(components, metrics)
		}
		if consumerChain.traces != nil {
			traces, err := factory.CreateTraces(ctx, params, config, consumerChain.traces)
			if err != nil {
				return fmt.Errorf("failed to create traces processor %s: %w", params.ID, err)
			}
			consumerChain.traces = traces
			components = append(components, traces)
		}
	}

	params := r.params
	params.ID = component.NewIDWithName(receiverFactory.Type(), fmt.Sprintf("%s-receiver", pipelineID))
	params.Logger = params.Logger.With(zap.String("name", params.ID.String()))
	receiversCreated := 0
	if consumerChain.logs != nil {
		logs, err := receiverFactory.CreateLogs(ctx, params, preparedConfig, consumerChain.logs)
		switch {
		case err == nil:
			components = append(components, logs)
			receiversCreated += 1
		case errors.Is(err, otelpipeline.ErrSignalNotSupported):
			r.params.Logger.Debug("receiver does not support logs telemetry type",
				zap.String("integration", r.params.ID.String()),
				zap.String("receiver", params.ID.String()))
		default:
			return fmt.Errorf("failed to create logs receiver %s: %w", params.ID, err)
		}
	}
	if consumerChain.metrics != nil {
		metrics, err := receiverFactory.CreateMetrics(ctx, params, preparedConfig, consumerChain.metrics)
		switch {
		case err == nil:
			components = append(components, metrics)
			receiversCreated += 1
		case errors.Is(err, otelpipeline.ErrSignalNotSupported):
			r.params.Logger.Debug("receiver does not support metrics telemetry type",
				zap.String("integration", r.params.ID.String()),
				zap.String("receiver", params.ID.String()))
		default:
			return fmt.Errorf("failed to create metrics receiver %s: %w", params.ID, err)
		}
	}
	if consumerChain.traces != nil {
		traces, err := receiverFactory.CreateTraces(ctx, params, preparedConfig, consumerChain.traces)
		switch {
		case err == nil:
			components = append(components, traces)
			receiversCreated += 1
		case errors.Is(err, otelpipeline.ErrSignalNotSupported):
			r.params.Logger.Debug("receiver does not support traces telemetry type",
				zap.String("integration", r.params.ID.String()),
				zap.String("receiver", params.ID.String()))
		default:
			return fmt.Errorf("failed to create traces receiver %s: %w", params.ID, err)
		}
	}

	// If no receiver has been created the rest of the pipeline won't be used, so don't keep it.
	if receiversCreated == 0 {
		// Shutting down created components out of kindness, because they haven't been started yet.
		if err := shutdownComponents(ctx, components); err != nil {
			r.params.Logger.Error("failed to cleanup processors after receiver was not created",
				zap.String("integration", r.params.ID.String()),
				zap.String("receiver", params.ID.String()))
		}
		return nil
	}

	for _, component := range components {
		err := component.Start(ctx, host)
		if err != nil {
			if err := r.Shutdown(ctx); err != nil {
				r.params.Logger.Error("failed to cleanup processors after a component failed to start",
					zap.String("integration", r.params.ID.String()),
					zap.String("receiver", params.ID.String()),
					zap.Error(err),
				)
			}
			return fmt.Errorf("failed to start component %q: %w", component, err)
		}
		r.components = append(r.components, component)
	}

	return nil
}