in receiver/integrationreceiver/receiver.go [81:118]
func (r *integrationReceiver) Start(ctx context.Context, ch component.Host) error {
host, ok := ch.(factoryGetter)
if !ok {
return fmt.Errorf("integrationreceiver is not compatible with the provided component.Host")
}
integration, err := integrations.Find(ctx, r.params.Logger, host, r.config.Name)
if err != nil {
return fmt.Errorf("failed to find integration %q: %w", r.config.Name, err)
}
config, err := integration.Resolve(ctx, r.config.Parameters, r.config.Pipelines)
if err != nil {
return fmt.Errorf("failed to build receiver pipelines for integration %q: %w", r.config.Name, err)
}
// Confidence check, we should have as many unique pipelines as requested.
if found, expected := len(config.Pipelines), len(r.config.Pipelines); expected != found {
return fmt.Errorf("unexpected number of pipelines configured, found %d, expected %d", found, expected)
}
for id, pipeline := range config.Pipelines {
if !r.hasConsumerForPipelineType(id) {
continue
}
err := r.startPipeline(ctx, host, *config, id, pipeline)
if err != nil {
// Shutdown components that had been already started for cleanup.
if err := r.Shutdown(ctx); err != nil {
r.params.Logger.Warn("Failed to shutdown all components on error while starting",
zap.String("error", err.Error()))
}
return fmt.Errorf("failed to start pipeline %q: %w", id, err)
}
}
return nil
}