func()

in filebeat/beater/filebeat.go [261:534]


func (fb *Filebeat) Run(b *beat.Beat) error {
	var err error
	config := fb.config

	if !fb.moduleRegistry.Empty() {
		err = fb.loadModulesPipelines(b)
		if err != nil {
			return err
		}
	}

	waitFinished := newSignalWait()
	waitEvents := newSignalWait()

	// count active events for waiting on shutdown
	var reg *monitoring.Registry

	if b.Info.Monitoring.Namespace != nil {
		reg = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("stats")
		if reg == nil {
			reg = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("stats")
		}
	}
	wgEvents := &eventCounter{
		count: monitoring.NewInt(reg, "filebeat.events.active"), // Gauge
		added: monitoring.NewUint(reg, "filebeat.events.added"),
		done:  monitoring.NewUint(reg, "filebeat.events.done"),
	}
	finishedLogger := newFinishedLogger(wgEvents)

	registryMigrator := registrar.NewMigrator(config.Registry, fb.logger)
	if err := registryMigrator.Run(); err != nil {
		fb.logger.Errorf("Failed to migrate registry file: %+v", err)
		return err
	}

	// Use context, like normal people do, hooking up to the beat.done channel
	ctx, cn := context.WithCancel(context.Background())
	go func() {
		<-fb.done
		cn()
	}()

	stateStore, err := openStateStore(ctx, b.Info, fb.logger.Named("filebeat"), config.Registry)
	if err != nil {
		fb.logger.Errorf("Failed to open state store: %+v", err)
		return err
	}
	defer stateStore.Close()

	// If notifier is set, configure the listener for output configuration
	// The notifier passes the elasticsearch output configuration down to the Elasticsearch backed state storage
	// in order to allow it fully configure
	if stateStore.notifier != nil {
		b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error {
			outCfg := conf.Namespace{}
			if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" {
				fb.logger.Errorf("Failed to unpack the output config: %v", err)
				return nil
			}

			// Create a new config with the output configuration. Since r.Config is a pointer, a copy is required to
			// avoid concurrent map read and write.
			// See https://github.com/elastic/beats/issues/42815
			configCopy, err := conf.NewConfigFrom(outCfg.Config())
			if err != nil {
				fb.logger.Errorf("Failed to create a new config from the output config: %v", err)
				return nil
			}
			stateStore.notifier.Notify(configCopy)
			return nil
		})
	}

	err = filestream.ValidateInputIDs(config.Inputs, fb.logger.Named("input.filestream"))
	if err != nil {
		fb.logger.Errorf("invalid filestream configuration: %+v", err)
		return err
	}

	// Setup registrar to persist state
	registrar, err := registrar.New(stateStore, finishedLogger, config.Registry.FlushTimeout)
	if err != nil {
		fb.logger.Errorf("Could not init registrar: %v", err)
		return err
	}

	// Make sure all events that were published in
	registrarChannel := newRegistrarLogger(registrar)

	// setup event counting for startup and a global common ACKer, such that all events will be
	// routed to the reigstrar after they've been ACKed.
	// Events with Private==nil or the type of private != file.State are directly
	// forwarded to `finishedLogger`. Events from the `logs` input will first be forwarded
	// to the registrar via `registrarChannel`, which finally forwards the events to finishedLogger as well.
	// The finishedLogger decrements the counters in wgEvents after all events have been securely processed
	// by the registry.
	fb.pipeline = withPipelineEventCounter(b.Publisher, wgEvents)
	fb.pipeline = pipetool.WithACKer(fb.pipeline, eventACKer(finishedLogger, registrarChannel))

	// Filebeat by default required infinite retry. Let's configure this for all
	// inputs by default.  Inputs (and InputController) can overwrite the sending
	// guarantees explicitly when connecting with the pipeline.
	fb.pipeline = pipetool.WithDefaultGuarantees(fb.pipeline, beat.GuaranteedSend)

	outDone := make(chan struct{}) // outDone closes down all active pipeline connections
	pipelineConnector := channel.NewOutletFactory(outDone).Create

	inputsLogger := fb.logger.Named("input")
	v2Inputs := fb.pluginFactory(b.Info, inputsLogger, stateStore)
	v2InputLoader, err := v2.NewLoader(inputsLogger, v2Inputs, "type", cfg.DefaultType)
	if err != nil {
		panic(err) // loader detected invalid state.
	}

	var inputTaskGroup unison.TaskGroup
	defer func() {
		_ = inputTaskGroup.Stop()
	}()

	// Store needs to be fully configured at this point
	if err := v2InputLoader.Init(&inputTaskGroup); err != nil {
		fb.logger.Errorf("Failed to initialize the input managers: %v", err)
		return err
	}

	inputLoader := channel.RunnerFactoryWithCommonInputSettings(b.Info, compat.Combine(
		compat.RunnerFactory(inputsLogger, b.Info, v2InputLoader),
		input.NewRunnerFactory(pipelineConnector, registrar, fb.done, fb.logger),
	))

	// Create a ES connection factory for dynamic modules pipeline loading
	var pipelineLoaderFactory fileset.PipelineLoaderFactory
	// The pipelineFactory needs a context to control the connections to ES,
	// when the pipelineFactory/ESClient are not needed any more the context
	// must be cancelled. This pipeline factory will be used by the moduleLoader
	// that is run by a crawler, whenever this crawler is stopped we also cancel
	// the context.
	pipelineFactoryCtx, cancelPipelineFactoryCtx := context.WithCancel(context.Background())
	defer cancelPipelineFactoryCtx()
	if b.Config.Output.Name() == "elasticsearch" {
		pipelineLoaderFactory = newPipelineLoaderFactory(pipelineFactoryCtx, b.Config.Output.Config())
	} else {
		fb.logger.Warn(pipelinesWarning)
	}
	moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines)
	crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once, fb.logger)
	if err != nil {
		fb.logger.Errorf("Could not init crawler: %v", err)
		return err
	}

	// The order of starting and stopping is important. Stopping is inverted to the starting order.
	// The current order is: registrar, publisher, spooler, crawler
	// That means, crawler is stopped first.

	// Start the registrar
	err = registrar.Start()
	if err != nil {
		return fmt.Errorf("Could not start registrar: %w", err) //nolint:staticcheck //Keep old behavior
	}

	// Stopping registrar will write last state
	defer registrar.Stop()

	// Stopping publisher (might potentially drop items)
	defer func() {
		// Closes first the registrar logger to make sure not more events arrive at the registrar
		// registrarChannel must be closed first to potentially unblock (pretty unlikely) the publisher
		registrarChannel.Close()
		close(outDone) // finally close all active connections to publisher pipeline
	}()

	// Wait for all events to be processed or timeout
	defer waitEvents.Wait()

	if config.OverwritePipelines {
		fb.logger.Debug("modules", "Existing Ingest pipelines will be updated")
	}

	err = crawler.Start(fb.pipeline, config.ConfigInput, config.ConfigModules)
	if err != nil {
		crawler.Stop()
		cancelPipelineFactoryCtx()
		return fmt.Errorf("Failed to start crawler: %w", err) //nolint:staticcheck //Keep old behavior
	}

	// If run once, add crawler completion check as alternative to done signal
	if *once {
		runOnce := func() {
			fb.logger.Info("Running filebeat once. Waiting for completion ...")
			crawler.WaitForCompletion()
			fb.logger.Info("All data collection completed. Shutting down.")
		}
		waitFinished.Add(runOnce)
	}

	// Register reloadable list of inputs and modules
	inputs := cfgfile.NewRunnerList(management.DebugK, inputLoader, fb.pipeline, fb.logger)
	b.Registry.MustRegisterInput(inputs)

	modules := cfgfile.NewRunnerList(management.DebugK, moduleLoader, fb.pipeline, fb.logger)

	var adiscover *autodiscover.Autodiscover
	if fb.config.Autodiscover != nil {
		adiscover, err = autodiscover.NewAutodiscover(
			"filebeat",
			fb.pipeline,
			cfgfile.MultiplexedRunnerFactory(
				cfgfile.MatchHasField("module", moduleLoader),
				cfgfile.MatchDefault(inputLoader),
			),
			autodiscover.QueryConfig(),
			config.Autodiscover,
			b.Keystore,
			fb.logger,
		)
		if err != nil {
			return err
		}
	}
	adiscover.Start()

	// We start the manager when all the subsystem are initialized and ready to received events.
	if err := b.Manager.Start(); err != nil {
		return err
	}

	// Add done channel to wait for shutdown signal
	waitFinished.AddChan(fb.done)
	waitFinished.Wait()

	// Stop reloadable lists, autodiscover -> Stop crawler -> stop inputs -> stop harvesters
	// Note: waiting for crawlers to stop here in order to install wgEvents.Wait
	//       after all events have been enqueued for publishing. Otherwise wgEvents.Wait
	//       or publisher might panic due to concurrent updates.
	inputs.Stop()
	modules.Stop()
	adiscover.Stop()
	crawler.Stop()
	cancelPipelineFactoryCtx()

	timeout := fb.config.ShutdownTimeout
	// Checks if on shutdown it should wait for all events to be published
	waitPublished := fb.config.ShutdownTimeout > 0 || *once
	if waitPublished {
		// Wait for registrar to finish writing registry
		waitEvents.Add(withLog(wgEvents.Wait,
			"Continue shutdown: All enqueued events being published."))
		// Wait for either timeout or all events having been ACKed by outputs.
		if fb.config.ShutdownTimeout > 0 {
			fb.logger.Info("Shutdown output timer started. Waiting for max %v.", timeout)
			waitEvents.Add(withLog(waitDuration(timeout),
				"Continue shutdown: Time out waiting for events being published."))
		} else {
			waitEvents.AddChan(fb.done)
		}
	}

	// Stop the manager and stop the connection to any dependent services.
	// The Manager started to have a working implementation when
	// https://github.com/elastic/beats/pull/34416 was merged.
	// This is intended to enable TLS certificates reload on a long
	// running Beat.
	//
	// However calling b.Manager.Stop() here messes up the behavior of the
	// --once flag because it makes Filebeat exit early.
	// So if --once is passed, we don't call b.Manager.Stop().
	if !*once {
		b.Manager.Stop()
	}

	return nil
}