in cmd/archiver/main.go [138:290]
func main() {
var cfg archiverConfig.Config
var err error
app.Version(version)
app.HelpFlag.Short('h')
kingpin.MustParse(app.Parse(os.Args[1:]))
log.SetFormatter(
&logging.LogFieldFormatter{
Formatter: &log.JSONFormatter{},
Fields: log.Fields{
common.AppLogField: app.Name,
},
},
)
initialLevel := log.InfoLevel
if *debug {
initialLevel = log.DebugLevel
}
log.SetLevel(initialLevel)
log.WithField("files", *cfgFiles).
Info("Loading archiver config")
if err = config.Parse(&cfg, *cfgFiles...); err != nil {
log.WithError(err).
Fatal("Cannot parse yaml config")
}
if *enableSentry {
logging.ConfigureSentry(&cfg.SentryConfig)
}
if *httpPort != 0 {
cfg.Archiver.HTTPPort = *httpPort
}
if *grpcPort != 0 {
cfg.Archiver.GRPCPort = *grpcPort
}
if *enableArchiver {
cfg.Archiver.Enable = *enableArchiver
}
if *streamOnlyMode {
cfg.Archiver.StreamOnlyMode = *streamOnlyMode
}
if *podEventsCleanup {
cfg.Archiver.PodEventsCleanup = *podEventsCleanup
}
if *archiveInterval != "" {
cfg.Archiver.ArchiveInterval, err = time.ParseDuration(*archiveInterval)
if err != nil {
log.WithError(err).
WithField("ARCHIVE_INTERVAL", *archiveInterval).
Fatal("Cannot parse Archive Interval")
}
}
if *archiveAge != "" {
cfg.Archiver.ArchiveAge, err = time.ParseDuration(*archiveAge)
if err != nil {
log.WithError(err).
WithField("ARCHIVE_AGE", *archiveAge).
Fatal("Cannot parse Archive Age")
}
}
if *archiveStepSize != "" {
cfg.Archiver.ArchiveStepSize, err = time.ParseDuration(*archiveStepSize)
if err != nil {
log.WithError(err).
WithField("ARCHIVE_STEP_SIZE", *archiveStepSize).
Fatal("Cannot parse Archive Step Size")
}
}
if *kafkaTopic != "" {
cfg.Archiver.KafkaTopic = *kafkaTopic
}
// zkservers list is needed to create peloton client.
// Archiver does not depend on leader election
if len(*zkServers) > 0 {
cfg.Election.ZKServers = *zkServers
}
// Parse and setup peloton auth
if len(*authType) != 0 {
cfg.Auth.AuthType = auth.Type(*authType)
cfg.Auth.Path = *authConfigFile
}
log.WithField("config", cfg).
Info("Loaded Archiver configuration")
rootScope, scopeCloser, mux := metrics.InitMetricScope(
&cfg.Metrics,
archiverConfig.PelotonArchiver,
metrics.TallyFlushInterval,
)
defer scopeCloser.Close()
mux.HandleFunc(
logging.LevelOverwrite,
logging.LevelOverwriteHandler(initialLevel),
)
mux.HandleFunc(buildversion.Get, buildversion.Handler(version))
inbounds := rpc.NewInbounds(
cfg.Archiver.HTTPPort,
cfg.Archiver.GRPCPort,
mux,
)
discovery, err := leader.NewZkServiceDiscovery(
cfg.Election.ZKServers, cfg.Election.Root)
if err != nil {
log.WithError(err).
Fatal("Could not create zk service discovery")
}
archiverEngine, err := engine.New(
cfg,
rootScope,
mux,
discovery,
inbounds)
if err != nil {
log.WithError(err).
WithField("zkservers", cfg.Election.ZKServers).
WithField("zkroot", cfg.Election.Root).
Fatal("Could not create archiver engine")
}
health.InitHeartbeat(rootScope, cfg.Health, nil)
log.Info("Started archiver")
if err := archiverEngine.Start(); err != nil {
archiverEngine.Cleanup()
log.WithError(err).Fatal("Archiver engine got a fatal error." +
" Restarting.")
}
select {}
}