in receiver/integrationreceiver/receiver.go [134:289]
func (r *integrationReceiver) startPipeline(ctx context.Context, host factoryGetter, config integrations.Config, pipelineID pipeline.ID, pipeline integrations.PipelineConfig) error {
consumerChain := struct {
logs consumer.Logs
metrics consumer.Metrics
traces consumer.Traces
}{
logs: r.nextLogsConsumer,
metrics: r.nextMetricsConsumer,
traces: r.nextTracesConsumer,
}
if pipeline.Receiver == nil {
return errors.New("no receiver in pipeline configuration")
}
receiverConfig, found := config.Receivers[*pipeline.Receiver]
if !found {
return fmt.Errorf("receiver %q not found", pipeline.Receiver)
}
receiverFactory, ok := host.GetFactory(component.KindReceiver, pipeline.Receiver.Type()).(receiver.Factory)
if !ok {
return fmt.Errorf("could not find receiver factory for %q", pipeline.Receiver.Type())
}
preparedConfig, err := convertComponentConfig(receiverFactory.CreateDefaultConfig, receiverConfig)
if err != nil {
return fmt.Errorf("could not compose receiver config for %s: %w", pipeline.Receiver.String(), err)
}
var components []component.Component
processors := slices.Clone(pipeline.Processors)
slices.Reverse(processors)
for i, id := range processors {
processorConfig, found := config.Processors[id]
if !found {
return fmt.Errorf("processor %q not found", id)
}
factory, ok := host.GetFactory(component.KindProcessor, id.Type()).(processor.Factory)
if !ok {
return fmt.Errorf("could not find processor factory for %q", id.Type())
}
config, err := convertComponentConfig(factory.CreateDefaultConfig, processorConfig)
if err != nil {
return fmt.Errorf("could not compose processor config for %s: %w", id.String(), err)
}
params := processor.Settings{
TelemetrySettings: r.params.TelemetrySettings,
BuildInfo: r.params.BuildInfo,
}
params.ID = component.NewIDWithName(factory.Type(), fmt.Sprintf("%s-%s-%d", r.params.ID, pipelineID, i))
params.Logger = params.Logger.With(zap.String("name", params.ID.String()))
if consumerChain.logs != nil {
logs, err := factory.CreateLogs(ctx, params, config, consumerChain.logs)
if err != nil {
return fmt.Errorf("failed to create logs processor %s: %w", params.ID, err)
}
consumerChain.logs = logs
components = append(components, logs)
}
if consumerChain.metrics != nil {
metrics, err := factory.CreateMetrics(ctx, params, config, consumerChain.metrics)
if err != nil {
return fmt.Errorf("failed to create metrics processor %s: %w", params.ID, err)
}
consumerChain.metrics = metrics
components = append(components, metrics)
}
if consumerChain.traces != nil {
traces, err := factory.CreateTraces(ctx, params, config, consumerChain.traces)
if err != nil {
return fmt.Errorf("failed to create traces processor %s: %w", params.ID, err)
}
consumerChain.traces = traces
components = append(components, traces)
}
}
params := r.params
params.ID = component.NewIDWithName(receiverFactory.Type(), fmt.Sprintf("%s-receiver", pipelineID))
params.Logger = params.Logger.With(zap.String("name", params.ID.String()))
receiversCreated := 0
if consumerChain.logs != nil {
logs, err := receiverFactory.CreateLogs(ctx, params, preparedConfig, consumerChain.logs)
switch {
case err == nil:
components = append(components, logs)
receiversCreated += 1
case errors.Is(err, otelpipeline.ErrSignalNotSupported):
r.params.Logger.Debug("receiver does not support logs telemetry type",
zap.String("integration", r.params.ID.String()),
zap.String("receiver", params.ID.String()))
default:
return fmt.Errorf("failed to create logs receiver %s: %w", params.ID, err)
}
}
if consumerChain.metrics != nil {
metrics, err := receiverFactory.CreateMetrics(ctx, params, preparedConfig, consumerChain.metrics)
switch {
case err == nil:
components = append(components, metrics)
receiversCreated += 1
case errors.Is(err, otelpipeline.ErrSignalNotSupported):
r.params.Logger.Debug("receiver does not support metrics telemetry type",
zap.String("integration", r.params.ID.String()),
zap.String("receiver", params.ID.String()))
default:
return fmt.Errorf("failed to create metrics receiver %s: %w", params.ID, err)
}
}
if consumerChain.traces != nil {
traces, err := receiverFactory.CreateTraces(ctx, params, preparedConfig, consumerChain.traces)
switch {
case err == nil:
components = append(components, traces)
receiversCreated += 1
case errors.Is(err, otelpipeline.ErrSignalNotSupported):
r.params.Logger.Debug("receiver does not support traces telemetry type",
zap.String("integration", r.params.ID.String()),
zap.String("receiver", params.ID.String()))
default:
return fmt.Errorf("failed to create traces receiver %s: %w", params.ID, err)
}
}
// If no receiver has been created the rest of the pipeline won't be used, so don't keep it.
if receiversCreated == 0 {
// Shutting down created components out of kindness, because they haven't been started yet.
if err := shutdownComponents(ctx, components); err != nil {
r.params.Logger.Error("failed to cleanup processors after receiver was not created",
zap.String("integration", r.params.ID.String()),
zap.String("receiver", params.ID.String()))
}
return nil
}
for _, component := range components {
err := component.Start(ctx, host)
if err != nil {
if err := r.Shutdown(ctx); err != nil {
r.params.Logger.Error("failed to cleanup processors after a component failed to start",
zap.String("integration", r.params.ID.String()),
zap.String("receiver", params.ID.String()),
zap.Error(err),
)
}
return fmt.Errorf("failed to start component %q: %w", component, err)
}
r.components = append(r.components, component)
}
return nil
}