func Main()

in cmds/contest/server/server.go [138:324]


func Main(pluginConfig *PluginConfig, cmd string, args []string, sigs <-chan os.Signal) error {
	initFlags(cmd)
	if err := flagSet.Parse(args); err != nil {
		return err
	}

	logLevel, err := logger.ParseLogLevel(*flagLogLevel)
	if err != nil {
		return err
	}

	clk := clock.New()

	ctx, cancel := xcontext.WithCancel(logrusctx.NewContext(logLevel, logging.DefaultOptions()...))
	ctx, pause := xcontext.WithNotify(ctx, xcontext.ErrPaused)
	log := ctx.Logger()
	defer cancel()

	pluginRegistry := pluginregistry.NewPluginRegistry(ctx)
	if err := registerPlugins(pluginRegistry, pluginConfig); err != nil {
		return fmt.Errorf("failed to register plugins: %w", err)
	}

	var storageInstances []storage.Storage
	defer func() {
		for i, s := range storageInstances {
			if err := s.Close(); err != nil {
				log.Errorf("Failed to close storage %d: %v", i, err)
			}
		}
	}()

	// primary storage initialization
	if *flagDBURI != "" {
		primaryDBURI := *flagDBURI
		log.Infof("Using database URI for primary storage: %s", primaryDBURI)
		s, err := rdbms.New(primaryDBURI)
		if err != nil {
			log.Fatalf("Could not initialize database: %v", err)
		}
		storageInstances = append(storageInstances, s)
		if err := storage.SetStorage(s); err != nil {
			log.Fatalf("Could not set storage: %v", err)
		}

		dbVerPrim, err := s.Version()
		if err != nil {
			log.Warnf("Could not determine storage version: %v", err)
		} else {
			log.Infof("Storage version: %d", dbVerPrim)
		}

		// replica storage initialization
		// pointing to main database for now but can be used to point to replica
		replicaDBURI := *flagDBURI
		log.Infof("Using database URI for replica storage: %s", replicaDBURI)
		r, err := rdbms.New(replicaDBURI)
		if err != nil {
			log.Fatalf("Could not initialize replica database: %v", err)
		}
		storageInstances = append(storageInstances, r)
		if err := storage.SetAsyncStorage(r); err != nil {
			log.Fatalf("Could not set replica storage: %v", err)
		}

		dbVerRepl, err := r.Version()
		if err != nil {
			log.Warnf("Could not determine storage version: %v", err)
		} else {
			log.Infof("Storage version: %d", dbVerRepl)
		}

		if dbVerPrim != dbVerRepl {
			log.Fatalf("Primary and Replica DB Versions are different: %v and %v", dbVerPrim, dbVerRepl)
		}
	} else {
		log.Warnf("Using in-memory storage")
		if ms, err := memory.New(); err == nil {
			storageInstances = append(storageInstances, ms)
			if err := storage.SetStorage(ms); err != nil {
				log.Fatalf("Could not set storage: %v", err)
			}
			if err := storage.SetAsyncStorage(ms); err != nil {
				log.Fatalf("Could not set replica storage: %v", err)
			}
		} else {
			log.Fatalf("Could not create storage: %v", err)
		}
	}

	// set Locker engine
	if *flagTargetLocker == "auto" {
		if *flagDBURI != "" {
			*flagTargetLocker = dblocker.Name
		} else {
			*flagTargetLocker = inmemory.Name
		}
		log.Infof("Locker engine set to auto, using %s", *flagTargetLocker)
	}
	switch *flagTargetLocker {
	case inmemory.Name:
		target.SetLocker(inmemory.New(clk))
	case dblocker.Name:
		if l, err := dblocker.New(*flagDBURI, dblocker.WithClock(clk)); err == nil {
			target.SetLocker(l)
		} else {
			log.Fatalf("Failed to create locker %q: %v", *flagTargetLocker, err)
		}
	default:
		log.Fatalf("Invalid target locker name %q", *flagTargetLocker)
	}

	// spawn JobManager
	listener := httplistener.New(*flagListenAddr)

	opts := []jobmanager.Option{
		jobmanager.APIOption(api.OptionEventTimeout(*flagProcessTimeout)),
	}
	if *flagServerID != "" {
		opts = append(opts, jobmanager.APIOption(api.OptionServerID(*flagServerID)))
	}
	if *flagInstanceTag != "" {
		opts = append(opts, jobmanager.OptionInstanceTag(*flagInstanceTag))
	}
	if *flagTargetLockDuration != 0 {
		opts = append(opts, jobmanager.OptionTargetLockDuration(*flagTargetLockDuration))
	}

	jm, err := jobmanager.New(listener, pluginRegistry, opts...)
	if err != nil {
		log.Fatalf("%v", err)
	}

	pauseTimeout := *flagPauseTimeout

	go func() {
		intLevel := 0
		// cancel immediately if pauseTimeout is zero
		if *flagPauseTimeout == 0 {
			intLevel = 1
		}
		for {
			sig, ok := <-sigs
			if !ok {
				return
			}
			switch sig {
			case syscall.SIGUSR1:
				// Gentle shutdown: stop accepting requests, drain without asserting pause signal.
				jm.StopAPI()
			case syscall.SIGINT:
				fallthrough
			case syscall.SIGTERM:
				// First signal - pause and drain, second - cancel.
				jm.StopAPI()
				if intLevel == 0 {
					log.Infof("Signal %q, pausing jobs", sig)
					pause()
					if *flagPauseTimeout > 0 {
						go func() {
							select {
							case <-ctx.Done():
							case <-time.After(pauseTimeout):
								log.Errorf("Timed out waiting for jobs to pause, canceling")
								cancel()
							}
						}()
					}
					intLevel++
				} else {
					log.Infof("Signal %q, canceling", sig)
					cancel()
				}
			}
		}
	}()

	err = jm.Run(ctx, *flagResumeJobs)

	target.SetLocker(nil)
	_ = storage.SetStorage(nil)
	_ = storage.SetAsyncStorage(nil)

	log.Infof("Exiting, %v", err)

	return err
}