func()

in processor/integrationprocessor/processor.go [85:121]


func (r *integrationProcessor) Start(ctx context.Context, ch component.Host) error {
	host, ok := ch.(factoryGetter)
	if !ok {
		return fmt.Errorf("integrationprocessor is not compatible with the provided component.Host")
	}

	integration, err := integrations.Find(ctx, r.params.Logger, host, r.config.Name)
	if err != nil {
		return fmt.Errorf("failed to find integration %q: %w", r.config.Name, err)
	}

	config, err := integration.Resolve(ctx, r.config.Parameters, []pipeline.ID{r.config.Pipeline})
	if err != nil {
		return fmt.Errorf("failed to build pipeline for integration %q: %w", r.config.Name, err)
	}

	// Confidence checks, this config should only have the requested pipeline.
	if n := len(config.Pipelines); n != 1 {
		return fmt.Errorf("exactly one pipeline expected, found %d", n)
	}
	pipeline, found := config.Pipelines[r.config.Pipeline]
	if !found {
		return fmt.Errorf("expected pipeline %q not found", r.config.Pipeline)
	}

	err = r.startPipeline(ctx, host, *config, r.config.Pipeline, pipeline)
	if err != nil {
		// Shutdown components that had been already started for cleanup.
		if err := r.Shutdown(ctx); err != nil {
			r.params.Logger.Warn("Failed to shutdown all components on error while starting",
				zap.String("error", err.Error()))
		}
		return fmt.Errorf("failed to start pipeline %q: %w", r.config.Pipeline, err)
	}

	return nil
}