cmd/archiver/main.go (225 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"os"
"time"
archiverConfig "github.com/uber/peloton/pkg/archiver/config"
"github.com/uber/peloton/pkg/archiver/engine"
"github.com/uber/peloton/pkg/auth"
"github.com/uber/peloton/pkg/common"
"github.com/uber/peloton/pkg/common/buildversion"
"github.com/uber/peloton/pkg/common/config"
"github.com/uber/peloton/pkg/common/health"
"github.com/uber/peloton/pkg/common/leader"
"github.com/uber/peloton/pkg/common/logging"
"github.com/uber/peloton/pkg/common/metrics"
"github.com/uber/peloton/pkg/common/rpc"
log "github.com/sirupsen/logrus"
_ "go.uber.org/automaxprocs"
"gopkg.in/alecthomas/kingpin.v2"
)
var (
version string
app = kingpin.New(archiverConfig.PelotonArchiver, "Peloton Archiver")
debug = app.Flag(
"debug", "enable debug mode (print full json responses)").
Short('d').
Default("false").
Envar("ENABLE_DEBUG_LOGGING").
Bool()
enableSentry = app.Flag(
"enable-sentry", "enable logging hook up to sentry").
Default("false").
Envar("ENABLE_SENTRY_LOGGING").
Bool()
cfgFiles = app.Flag(
"config",
"YAML config files (can be provided multiple times to merge configs)").
Short('c').
Required().
ExistingFiles()
zkServers = app.Flag(
"zk-server",
"Zookeeper servers. Specify multiple times for multiple servers "+
"(election.zk_servers override) (set $ELECTION_ZK_SERVERS to override)").
Envar("ELECTION_ZK_SERVERS").
Strings()
httpPort = app.Flag(
"http-port", "Archiver HTTP port (archiver.http_port override) "+
"(set $PORT to override)").
Envar("HTTP_PORT").
Int()
grpcPort = app.Flag(
"grpc-port", "Archiver gRPC port (archiver.grpc_port override) "+
"(set $PORT to override)").
Envar("GRPC_PORT").
Int()
enableArchiver = app.Flag(
"enable-archiver", "enable Archiver").
Default("false").
Envar("ENABLE_ARCHIVER").
Bool()
podEventsCleanup = app.Flag(
"pod-events-cleanup", "enable Pod Events Cleanup").
Default("false").
Envar("POD_EVENTS_CLEANUP").
Bool()
streamOnlyMode = app.Flag(
"stream-only-mode", "Archiver streams jobs without deleting them").
Default("false").
Envar("STREAM_ONLY_MODE").
Bool()
archiveInterval = app.Flag(
"archive-interval",
"Archive interval duration in h/m/s (archiver.archive_interval override) (set $ARCHIVE_INTERVAL to override)").
Envar("ARCHIVE_INTERVAL").
String()
archiveAge = app.Flag(
"archive-age",
"Archive age duration in h/m/s (archiver.archive_age override) (set $ARCHIVE_AGE to override)").
Envar("ARCHIVE_AGE").
String()
archiveStepSize = app.Flag(
"archive-step-size",
"Archive step size in h/m/s (archiver.archive_step_size override) (set $ARCHIVE_STEP_SIZE to override)").
Envar("ARCHIVE_STEP_SIZE").
String()
kafkaTopic = app.Flag(
"kafka-topic",
"kafka topic used by archiver to stream completed jobs").
Envar("KAFKA_TOPIC").
String()
authType = app.Flag(
"auth-type",
"Define the auth type used, default to NOOP").
Default("NOOP").
Envar("AUTH_TYPE").
Enum("NOOP", "BASIC")
authConfigFile = app.Flag(
"auth-config-file",
"config file for the auth feature, which is specific to the auth type used").
Default("").
Envar("AUTH_CONFIG_FILE").
String()
)
func main() {
var cfg archiverConfig.Config
var err error
app.Version(version)
app.HelpFlag.Short('h')
kingpin.MustParse(app.Parse(os.Args[1:]))
log.SetFormatter(
&logging.LogFieldFormatter{
Formatter: &log.JSONFormatter{},
Fields: log.Fields{
common.AppLogField: app.Name,
},
},
)
initialLevel := log.InfoLevel
if *debug {
initialLevel = log.DebugLevel
}
log.SetLevel(initialLevel)
log.WithField("files", *cfgFiles).
Info("Loading archiver config")
if err = config.Parse(&cfg, *cfgFiles...); err != nil {
log.WithError(err).
Fatal("Cannot parse yaml config")
}
if *enableSentry {
logging.ConfigureSentry(&cfg.SentryConfig)
}
if *httpPort != 0 {
cfg.Archiver.HTTPPort = *httpPort
}
if *grpcPort != 0 {
cfg.Archiver.GRPCPort = *grpcPort
}
if *enableArchiver {
cfg.Archiver.Enable = *enableArchiver
}
if *streamOnlyMode {
cfg.Archiver.StreamOnlyMode = *streamOnlyMode
}
if *podEventsCleanup {
cfg.Archiver.PodEventsCleanup = *podEventsCleanup
}
if *archiveInterval != "" {
cfg.Archiver.ArchiveInterval, err = time.ParseDuration(*archiveInterval)
if err != nil {
log.WithError(err).
WithField("ARCHIVE_INTERVAL", *archiveInterval).
Fatal("Cannot parse Archive Interval")
}
}
if *archiveAge != "" {
cfg.Archiver.ArchiveAge, err = time.ParseDuration(*archiveAge)
if err != nil {
log.WithError(err).
WithField("ARCHIVE_AGE", *archiveAge).
Fatal("Cannot parse Archive Age")
}
}
if *archiveStepSize != "" {
cfg.Archiver.ArchiveStepSize, err = time.ParseDuration(*archiveStepSize)
if err != nil {
log.WithError(err).
WithField("ARCHIVE_STEP_SIZE", *archiveStepSize).
Fatal("Cannot parse Archive Step Size")
}
}
if *kafkaTopic != "" {
cfg.Archiver.KafkaTopic = *kafkaTopic
}
// zkservers list is needed to create peloton client.
// Archiver does not depend on leader election
if len(*zkServers) > 0 {
cfg.Election.ZKServers = *zkServers
}
// Parse and setup peloton auth
if len(*authType) != 0 {
cfg.Auth.AuthType = auth.Type(*authType)
cfg.Auth.Path = *authConfigFile
}
log.WithField("config", cfg).
Info("Loaded Archiver configuration")
rootScope, scopeCloser, mux := metrics.InitMetricScope(
&cfg.Metrics,
archiverConfig.PelotonArchiver,
metrics.TallyFlushInterval,
)
defer scopeCloser.Close()
mux.HandleFunc(
logging.LevelOverwrite,
logging.LevelOverwriteHandler(initialLevel),
)
mux.HandleFunc(buildversion.Get, buildversion.Handler(version))
inbounds := rpc.NewInbounds(
cfg.Archiver.HTTPPort,
cfg.Archiver.GRPCPort,
mux,
)
discovery, err := leader.NewZkServiceDiscovery(
cfg.Election.ZKServers, cfg.Election.Root)
if err != nil {
log.WithError(err).
Fatal("Could not create zk service discovery")
}
archiverEngine, err := engine.New(
cfg,
rootScope,
mux,
discovery,
inbounds)
if err != nil {
log.WithError(err).
WithField("zkservers", cfg.Election.ZKServers).
WithField("zkroot", cfg.Election.Root).
Fatal("Could not create archiver engine")
}
health.InitHeartbeat(rootScope, cfg.Health, nil)
log.Info("Started archiver")
if err := archiverEngine.Start(); err != nil {
archiverEngine.Cleanup()
log.WithError(err).Fatal("Archiver engine got a fatal error." +
" Restarting.")
}
select {}
}