func()

in receiver/integrationreceiver/receiver.go [81:118]


func (r *integrationReceiver) Start(ctx context.Context, ch component.Host) error {
	host, ok := ch.(factoryGetter)
	if !ok {
		return fmt.Errorf("integrationreceiver is not compatible with the provided component.Host")
	}

	integration, err := integrations.Find(ctx, r.params.Logger, host, r.config.Name)
	if err != nil {
		return fmt.Errorf("failed to find integration %q: %w", r.config.Name, err)
	}

	config, err := integration.Resolve(ctx, r.config.Parameters, r.config.Pipelines)
	if err != nil {
		return fmt.Errorf("failed to build receiver pipelines for integration %q: %w", r.config.Name, err)
	}

	// Confidence check, we should have as many unique pipelines as requested.
	if found, expected := len(config.Pipelines), len(r.config.Pipelines); expected != found {
		return fmt.Errorf("unexpected number of pipelines configured, found %d, expected %d", found, expected)
	}

	for id, pipeline := range config.Pipelines {
		if !r.hasConsumerForPipelineType(id) {
			continue
		}
		err := r.startPipeline(ctx, host, *config, id, pipeline)
		if err != nil {
			// Shutdown components that had been already started for cleanup.
			if err := r.Shutdown(ctx); err != nil {
				r.params.Logger.Warn("Failed to shutdown all components on error while starting",
					zap.String("error", err.Error()))
			}
			return fmt.Errorf("failed to start pipeline %q: %w", id, err)
		}
	}

	return nil
}