in internal/command/command.go [169:435]
func New(ctx context.Context, logger log.Logger, nameAndArgs []string, opts ...Option) (_ *Command, returnedErr error) {
if ctx.Done() == nil {
panic("command spawned with context without Done() channel")
}
// Don't launch the command if the context is already canceled. This matches
// Go's own CommandContext's behavior which doesn't start a command if the
// context is already canceled. We can't use it currently due to it sending a
// SIGKILL to the command when the context is canceled during execution. This is not
// fine as we don't have proper logic to recover from git crashes by for example
// cleaning stale reference locks.
//
// Without this, racy behavior will emerge as the command execution will race with the
// context cancellation. This really only helps with cases when the context is already
// canceled before calling New. Raciness still ensues if the context is canceled after
// this check. More details at: https://gitlab.com/gitlab-org/gitaly/-/issues/5021
if err := ctx.Err(); err != nil {
return nil, err
}
if len(nameAndArgs) == 0 {
panic("command spawned without name")
}
if err := checkNullArgv(nameAndArgs); err != nil {
return nil, err
}
var cfg config
for _, opt := range opts {
opt(&cfg)
}
cmdName := path.Base(nameAndArgs[0])
startForking := time.Now()
defer func() {
delta := time.Since(startForking)
spawnForkingTimeHistogram.Observe(delta.Seconds())
if customFields := log.CustomFieldsFromContext(ctx); customFields != nil {
customFields.RecordSum("command.spawn_token_fork_ms", int(delta.Milliseconds()))
}
}()
logPid := -1
defer func() {
logger.WithFields(log.Fields{
"pid": logPid,
"path": nameAndArgs[0],
"args": nameAndArgs[1:],
}).DebugContext(ctx, "spawn")
}()
var spanName string
if cfg.commandName != "" && cfg.subcommandName != "" {
spanName = fmt.Sprintf("%s-%s", cfg.commandName, cfg.subcommandName)
} else {
spanName = cmdName
}
span, ctx := tracing.StartSpanIfHasParent(
ctx,
spanName,
tracing.Tags{
"path": nameAndArgs[0],
"args": strings.Join(nameAndArgs[1:], " "),
},
)
cmd := exec.Command(nameAndArgs[0], nameAndArgs[1:]...)
command := &Command{
logger: logger,
cmd: cmd,
startTime: time.Now(),
context: ctx,
span: span,
finalizers: cfg.finalizers,
metricsCmd: cfg.commandName,
metricsSubCmd: cfg.subcommandName,
cmdGitVersion: cfg.gitVersion,
refBackend: cfg.refBackend,
subprocessLoggerDone: make(chan struct{}),
processExitedCh: make(chan struct{}),
completionErrorLogFilter: cfg.completionErrorLogFilter,
}
cmd.Dir = cfg.dir
// Export allowed environment variables as set in the Gitaly process.
cmd.Env = AllowedEnvironment(os.Environ())
// Append environment variables explicitly requested by the caller.
cmd.Env = append(cmd.Env, cfg.environment...)
// And finally inject environment variables required for tracing into the command.
cmd.Env = envInjector(ctx, cmd.Env)
if (cfg.logConfiguration != log.Config{}) {
// Create a pipe the process will send its logs over.
logReader, logWriter, err := os.Pipe()
if err != nil {
return nil, fmt.Errorf("create log pipe: %w", err)
}
defer func() {
// Close the file descriptor after spawning the command as we're not
// the ones writing into it.
if err := logWriter.Close(); err != nil {
returnedErr = errors.Join(returnedErr, fmt.Errorf("close log writer: %w", err))
}
// If the command failed to be setup, it won't be waited on. Ensure
// the logger has stopped before returning to clean up.
if returnedErr != nil {
// Close the reader to ensure the log consuming goroutine returns if the setup
// fails after spawning the command. This shouldn't actually be needed as we close
// our file descriptor for the write end of the pipe. If the child isn't started
// or is terminated, there would be no open writers on the pipe and we'd receive
// an EOF. However, New() is leaking child processes if the cgroup setup fails after
// the child process was started. Once we no longer leave child processes running
// when returning from New() with an error, we can remove this.
//
// Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/6228
if err := logReader.Close(); err != nil {
returnedErr = errors.Join(returnedErr, fmt.Errorf("close log reader: %w", err))
}
<-command.subprocessLoggerDone
}
}()
// Pass the log writer to the command so it can write logs to it.
cmd.ExtraFiles = append(cmd.ExtraFiles, logWriter)
loggerEnv, err := envSubprocessLoggerConfiguration(subprocessConfiguration{
// The first three file descriptors, indexed from zero, are
// stdin, stdout and stderr. The log file descriptor is the
// last one in the extra files.
FileDescriptor: uintptr(2 + len(cmd.ExtraFiles)),
Config: cfg.logConfiguration,
})
if err != nil {
return nil, fmt.Errorf("subprocess logger env: %w", err)
}
cmd.Env = append(cmd.Env, loggerEnv)
go func() {
defer close(command.subprocessLoggerDone)
if err := command.handleSubprocessLogs(logReader); err != nil {
logger.WithError(err).Error("failed handling subprocess logs")
}
}()
} else {
// If subprocess logger wasn't enabled, close the channel immediately as
// we don't have a log consuming goroutine to wait for.
close(command.subprocessLoggerDone)
}
// Start the command in its own process group (nice for signalling)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
useCloneIntoCgroup := cfg.cgroupsManager != nil && cfg.cgroupsManager.SupportsCloneIntoCgroup()
if useCloneIntoCgroup {
// Configure the command to be executed in the correct cgroup.
cgroupPath, fd, err := cfg.cgroupsManager.CloneIntoCgroup(cmd, cfg.cgroupsAddCommandOpts...)
if err != nil {
return nil, fmt.Errorf("clone into cgroup: %w", err)
}
defer func() {
if err := fd.Close(); err != nil {
logger.WithError(err).ErrorContext(ctx, "failed to close cgroup file descriptor")
}
}()
command.cgroupPath = cgroupPath
}
// If requested, we will set up the command such that `Write()` can be called on it directly. Otherwise,
// we simply pass as stdin whatever the user has asked us to set up. If no `stdin` was set up, the command
// will implicitly read from `/dev/null`.
if _, ok := cfg.stdin.(stdinSentinel); ok {
pipe, err := cmd.StdinPipe()
if err != nil {
return nil, fmt.Errorf("creating stdin pipe: %w", err)
}
command.writer = pipe
} else {
cmd.Stdin = cfg.stdin
}
// Similar, if requested, we will set up the command such that `Read()` can be called on it directly.
// Otherwise, we simply pass as stdout whatever the user has asked us to set up. If no `stdout` was set
// up, the command will implicitly write to `/dev/null`.
if _, ok := cfg.stdout.(stdoutSentinel); ok {
pipe, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("creating stdout pipe: %w", err)
}
command.reader = pipe
} else {
cmd.Stdout = cfg.stdout
}
if cfg.stderr != nil {
cmd.Stderr = cfg.stderr
} else {
var err error
command.stderrBuffer, err = newStderrBuffer(maxStderrBytes, maxStderrLineLength, []byte("\n"))
if err != nil {
return nil, fmt.Errorf("creating stderr buffer: %w", err)
}
cmd.Stderr = command.stderrBuffer
}
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("starting process %v: %w", cmd.Args, err)
}
inFlightCommandGauge.Inc()
commandcounter.Increment()
// The goroutine below is responsible for terminating and reaping the process when ctx is
// canceled. While we must ensure that it does run when `cmd.Start()` was successful, it
// must not run before have fully set up the command. Otherwise, we may end up with racy
// access patterns when the context gets terminated early.
//
// We thus defer spawning the Goroutine.
defer func() {
go func() {
select {
case <-ctx.Done():
// Before we kill the child process we need to close the process' standard streams. If
// we don't, it may happen that the signal gets delivered and that the process exits
// before we close the streams in `command.Wait()`. This would cause downstream readers
// to potentially miss those errors when reading stdout.
command.teardownStandardStreams()
// If the context has been cancelled and we didn't explicitly reap
// the child process then we need to manually kill it and release
// all associated resources.
if cmd.Process.Pid > 0 {
//nolint:errcheck // TODO: do we want to report errors?
// Send SIGTERM to the process group of cmd
syscall.Kill(-cmd.Process.Pid, syscall.SIGTERM)
}
// We do not care for any potential error code, but just want to
// make sure that the subprocess gets properly killed and processed.
_ = command.Wait()
case <-command.processExitedCh:
// Otherwise, if the process has exited via a call to `wait()`
// already then there is nothing we need to do.
}
}()
}()
if cfg.cgroupsManager != nil && !useCloneIntoCgroup {
cgroupPath, err := cfg.cgroupsManager.AddCommand(command.cmd, cfg.cgroupsAddCommandOpts...)
if err != nil {
return nil, err
}
command.cgroupPath = cgroupPath
}
logPid = cmd.Process.Pid
return command, nil
}