in cmds/contest/server/server.go [138:324]
func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.Signal) error {
initFlags(cmd)
if err := flagSet.Parse(args); err != nil {
return err
}
logLevel, err := logger.ParseLogLevel(*flagLogLevel)
if err != nil {
return err
}
clk := clock.New()
ctx, cancel := xcontext.WithCancel(logrusctx.NewContext(logLevel, logging.DefaultOptions()...))
ctx, pause := xcontext.WithNotify(ctx, xcontext.ErrPaused)
log := ctx.Logger()
defer cancel()
pluginRegistry := pluginregistry.NewPluginRegistry(ctx)
if err := registerPlugins(pluginRegistry, pluginConfig); err != nil {
return fmt.Errorf("failed to register plugins: %w", err)
}
var storageInstances []storage.Storage
defer func() {
for i, s := range storageInstances {
if err := s.Close(); err != nil {
log.Errorf("Failed to close storage %d: %v", i, err)
}
}
}()
// primary storage initialization
if *flagDBURI != "" {
primaryDBURI := *flagDBURI
log.Infof("Using database URI for primary storage: %s", primaryDBURI)
s, err := rdbms.New(primaryDBURI)
if err != nil {
log.Fatalf("Could not initialize database: %v", err)
}
storageInstances = append(storageInstances, s)
if err := storage.SetStorage(s); err != nil {
log.Fatalf("Could not set storage: %v", err)
}
dbVerPrim, err := s.Version()
if err != nil {
log.Warnf("Could not determine storage version: %v", err)
} else {
log.Infof("Storage version: %d", dbVerPrim)
}
// replica storage initialization
// pointing to main database for now but can be used to point to replica
replicaDBURI := *flagDBURI
log.Infof("Using database URI for replica storage: %s", replicaDBURI)
r, err := rdbms.New(replicaDBURI)
if err != nil {
log.Fatalf("Could not initialize replica database: %v", err)
}
storageInstances = append(storageInstances, r)
if err := storage.SetAsyncStorage(r); err != nil {
log.Fatalf("Could not set replica storage: %v", err)
}
dbVerRepl, err := r.Version()
if err != nil {
log.Warnf("Could not determine storage version: %v", err)
} else {
log.Infof("Storage version: %d", dbVerRepl)
}
if dbVerPrim != dbVerRepl {
log.Fatalf("Primary and Replica DB Versions are different: %v and %v", dbVerPrim, dbVerRepl)
}
} else {
log.Warnf("Using in-memory storage")
if ms, err := memory.New(); err == nil {
storageInstances = append(storageInstances, ms)
if err := storage.SetStorage(ms); err != nil {
log.Fatalf("Could not set storage: %v", err)
}
if err := storage.SetAsyncStorage(ms); err != nil {
log.Fatalf("Could not set replica storage: %v", err)
}
} else {
log.Fatalf("Could not create storage: %v", err)
}
}
// set Locker engine
if *flagTargetLocker == "auto" {
if *flagDBURI != "" {
*flagTargetLocker = dblocker.Name
} else {
*flagTargetLocker = inmemory.Name
}
log.Infof("Locker engine set to auto, using %s", *flagTargetLocker)
}
switch *flagTargetLocker {
case inmemory.Name:
target.SetLocker(inmemory.New(clk))
case dblocker.Name:
if l, err := dblocker.New(*flagDBURI, dblocker.WithClock(clk)); err == nil {
target.SetLocker(l)
} else {
log.Fatalf("Failed to create locker %q: %v", *flagTargetLocker, err)
}
default:
log.Fatalf("Invalid target locker name %q", *flagTargetLocker)
}
// spawn JobManager
listener := httplistener.New(*flagListenAddr)
opts := []jobmanager.Option{
jobmanager.APIOption(api.OptionEventTimeout(*flagProcessTimeout)),
}
if *flagServerID != "" {
opts = append(opts, jobmanager.APIOption(api.OptionServerID(*flagServerID)))
}
if *flagInstanceTag != "" {
opts = append(opts, jobmanager.OptionInstanceTag(*flagInstanceTag))
}
if *flagTargetLockDuration != 0 {
opts = append(opts, jobmanager.OptionTargetLockDuration(*flagTargetLockDuration))
}
jm, err := jobmanager.New(listener, pluginRegistry, opts...)
if err != nil {
log.Fatalf("%v", err)
}
pauseTimeout := *flagPauseTimeout
go func() {
intLevel := 0
// cancel immediately if pauseTimeout is zero
if *flagPauseTimeout == 0 {
intLevel = 1
}
for {
sig, ok := <-sigs
if !ok {
return
}
switch sig {
case syscall.SIGUSR1:
// Gentle shutdown: stop accepting requests, drain without asserting pause signal.
jm.StopAPI()
case syscall.SIGINT:
fallthrough
case syscall.SIGTERM:
// First signal - pause and drain, second - cancel.
jm.StopAPI()
if intLevel == 0 {
log.Infof("Signal %q, pausing jobs", sig)
pause()
if *flagPauseTimeout > 0 {
go func() {
select {
case <-ctx.Done():
case <-time.After(pauseTimeout):
log.Errorf("Timed out waiting for jobs to pause, canceling")
cancel()
}
}()
}
intLevel++
} else {
log.Infof("Signal %q, canceling", sig)
cancel()
}
}
}
}()
err = jm.Run(ctx, *flagResumeJobs)
target.SetLocker(nil)
_ = storage.SetStorage(nil)
_ = storage.SetAsyncStorage(nil)
log.Infof("Exiting, %v", err)
return err
}