in processor/integrationprocessor/processor.go [85:121]
func (r *integrationProcessor) Start(ctx context.Context, ch component.Host) error {
host, ok := ch.(factoryGetter)
if !ok {
return fmt.Errorf("integrationprocessor is not compatible with the provided component.Host")
}
integration, err := integrations.Find(ctx, r.params.Logger, host, r.config.Name)
if err != nil {
return fmt.Errorf("failed to find integration %q: %w", r.config.Name, err)
}
config, err := integration.Resolve(ctx, r.config.Parameters, []pipeline.ID{r.config.Pipeline})
if err != nil {
return fmt.Errorf("failed to build pipeline for integration %q: %w", r.config.Name, err)
}
// Confidence checks, this config should only have the requested pipeline.
if n := len(config.Pipelines); n != 1 {
return fmt.Errorf("exactly one pipeline expected, found %d", n)
}
pipeline, found := config.Pipelines[r.config.Pipeline]
if !found {
return fmt.Errorf("expected pipeline %q not found", r.config.Pipeline)
}
err = r.startPipeline(ctx, host, *config, r.config.Pipeline, pipeline)
if err != nil {
// Shutdown components that had been already started for cleanup.
if err := r.Shutdown(ctx); err != nil {
r.params.Logger.Warn("Failed to shutdown all components on error while starting",
zap.String("error", err.Error()))
}
return fmt.Errorf("failed to start pipeline %q: %w", r.config.Pipeline, err)
}
return nil
}