commands/single.go (186 lines of code) (raw):
package commands
import (
"context"
"os"
"os/signal"
"sync/atomic"
"syscall"
"time"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
"gitlab.com/gitlab-org/gitlab-runner/common"
"gitlab.com/gitlab-org/gitlab-runner/network"
)
type RunSingleCommand struct {
common.RunnerConfig
network common.Network
WaitTimeout int `long:"wait-timeout" description:"How long to wait in seconds before receiving the first job"`
lastBuild time.Time
runForever bool
MaxBuilds int `long:"max-builds" description:"How many builds to process before exiting"`
finished atomic.Bool
interruptSignals chan os.Signal
configOptions
RunnerName string `short:"r" long:"runner" description:"Runner name from the config file to use instead of command-line arguments"`
shutdownTimeout int `long:"shutdown-timeout" description:"Number of seconds after which the forceful shutdown operation will timeout and process will exit"`
}
func waitForInterrupts(
finished *atomic.Bool,
abortSignal chan os.Signal,
doneSignal chan int,
interruptSignals chan os.Signal,
shutdownTimeout time.Duration,
) {
if interruptSignals == nil {
interruptSignals = make(chan os.Signal)
}
signal.Notify(interruptSignals, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
interrupt := <-interruptSignals
if finished != nil {
finished.Store(true)
}
// request stop, but wait for force exit
for interrupt == syscall.SIGQUIT {
logrus.Warningln("Requested quit, waiting for builds to finish")
interrupt = <-interruptSignals
}
logrus.Warningln("Requested exit:", interrupt)
go func() {
for {
abortSignal <- interrupt
}
}()
select {
case newSignal := <-interruptSignals:
logrus.Fatalln("forced exit:", newSignal)
case <-time.After(shutdownTimeout):
logrus.Fatalln("shutdown timed out")
case <-doneSignal:
}
}
// Things to do after a build
func (r *RunSingleCommand) postBuild() {
if r.MaxBuilds > 0 {
r.MaxBuilds--
}
r.lastBuild = time.Now()
}
func (r *RunSingleCommand) processBuild(data common.ExecutorData, abortSignal chan os.Signal) error {
jobData, healthy := r.network.RequestJob(context.Background(), r.RunnerConfig, nil)
if !healthy {
logrus.Println("Runner is not healthy!")
select {
case <-time.After(common.NotHealthyCheckInterval * time.Second):
case <-abortSignal:
}
return nil
}
if jobData == nil {
select {
case <-time.After(common.CheckInterval):
case <-abortSignal:
}
return nil
}
config := common.NewConfig()
newBuild, err := common.NewBuild(*jobData, &r.RunnerConfig, abortSignal, data)
if err != nil {
return err
}
jobCredentials := &common.JobCredentials{
ID: jobData.ID,
Token: jobData.Token,
}
trace, err := r.network.ProcessJob(r.RunnerConfig, jobCredentials)
if err != nil {
return err
}
trace.SetDebugModeEnabled(newBuild.IsDebugModeEnabled())
defer func() {
logTerminationError(logrus.StandardLogger(), "Success", trace.Success())
}()
err = newBuild.Run(config, trace)
r.postBuild()
return err
}
func (r *RunSingleCommand) checkFinishedConditions() {
if r.MaxBuilds < 1 && !r.runForever {
logrus.Println("This runner has processed its build limit, so now exiting")
r.finished.Store(true)
}
if r.WaitTimeout > 0 && int(time.Since(r.lastBuild).Seconds()) > r.WaitTimeout {
logrus.Println("This runner has not received a job in", r.WaitTimeout, "seconds, so now exiting")
r.finished.Store(true)
}
}
func (r *RunSingleCommand) HandleArgs() {
if r.RunnerName != "" {
fileConfig := &configOptions{ConfigFile: r.ConfigFile}
err := fileConfig.loadConfig()
if err != nil {
logrus.Fatalf("Error loading config: %v", err)
}
runner, err := fileConfig.RunnerByName(r.RunnerName)
if err != nil {
logrus.Fatalf("Error loading runner by name: %v", err)
}
r.RunnerConfig = *runner
}
if r.URL == "" {
logrus.Fatalln("Missing URL")
}
if r.Token == "" {
logrus.Fatalln("Missing Token")
}
if r.Executor == "" {
logrus.Fatalln("Missing Executor")
}
}
func (r *RunSingleCommand) Execute(c *cli.Context) {
r.HandleArgs()
executorProvider := common.GetExecutorProvider(r.Executor)
if executorProvider == nil {
logrus.Fatalln("Unknown executor:", r.Executor)
}
managedProvider, ok := executorProvider.(common.ManagedExecutorProvider)
if ok {
managedProvider.Init()
}
r.RunnerConfig.SystemIDState = common.NewSystemIDState()
if err := r.RunnerConfig.SystemIDState.EnsureSystemID(); err != nil {
logrus.WithError(err).Fatal("Failed to generate random system ID")
}
logrus.Println("Starting runner for", r.URL, "with token", r.ShortDescription(), "...")
abortSignal := make(chan os.Signal)
doneSignal := make(chan int, 1)
r.runForever = r.MaxBuilds == 0
go waitForInterrupts(&r.finished, abortSignal, doneSignal, r.interruptSignals, r.getShutdownTimeout())
r.lastBuild = time.Now()
for !r.finished.Load() {
data, err := executorProvider.Acquire(&r.RunnerConfig)
if err != nil {
logrus.Warningln("Executor update:", err)
}
pErr := r.processBuild(data, abortSignal)
if pErr != nil {
logrus.WithError(pErr).Error("Failed to process build")
}
r.checkFinishedConditions()
executorProvider.Release(&r.RunnerConfig, data)
}
doneSignal <- 0
providerShutdownCtx, shutdownProvider := context.WithTimeout(context.Background(), r.getShutdownTimeout())
defer shutdownProvider()
if managedProvider != nil {
managedProvider.Shutdown(providerShutdownCtx)
}
}
func (r *RunSingleCommand) getShutdownTimeout() time.Duration {
if r.shutdownTimeout > 0 {
return time.Duration(r.shutdownTimeout) * time.Second
}
return common.DefaultShutdownTimeout
}
func init() {
common.RegisterCommand2("run-single", "start single runner", &RunSingleCommand{
network: network.NewGitLabClient(),
})
}