internal/gitaly/service/ssh/receive_pack.go (134 lines of code) (raw):

package ssh import ( "context" "errors" "fmt" "io" "os" "strings" "sync" "gitlab.com/gitlab-org/gitaly/v16/internal/command" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/receivepack" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/transaction/voting" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v16/streamio" ) func (s *server) SSHReceivePack(stream gitalypb.SSHService_SSHReceivePackServer) error { req, err := stream.Recv() // First request contains only Repository, GlId, and GlUsername if err != nil { return structerr.NewInternal("%w", err) } s.logger.WithFields(log.Fields{ "GlID": req.GetGlId(), "GlRepository": req.GetGlRepository(), "GlUsername": req.GetGlUsername(), "GitConfigOptions": req.GetGitConfigOptions(), "GitProtocol": req.GetGitProtocol(), }).DebugContext(stream.Context(), "SSHReceivePack") if err = validateFirstReceivePackRequest(stream.Context(), s.locator, req); err != nil { return structerr.NewInvalidArgument("%w", err) } if err := s.sshReceivePack(stream, req); err != nil { return structerr.NewInternal("%w", err) } // In cases where all reference updates are rejected by git-receive-pack(1), we would end up // with no transactional votes at all. We thus do a final vote which concludes this RPC to // ensure there's always at least one vote. In case there was diverging behaviour in // git-receive-pack(1) which led to a different outcome across voters, then this final vote // would fail because the sequence of votes would be different. if err := transaction.VoteOnContext(stream.Context(), s.txManager, voting.Vote{}, voting.Committed); err != nil { // When the pre-receive hook failed, git-receive-pack(1) exits with code 0. // It's arguable whether this is the expected behavior, but anyhow it means // cmd.Wait() did not error out. On the other hand, the gitaly-hooks command did // stop the transaction upon failure. So this final vote fails. // To avoid this error being presented to the end user, ignore it when the // transaction was stopped. if !errors.Is(err, transaction.ErrTransactionStopped) { return structerr.NewAborted("final transactional vote: %w", err) } } return nil } func (s *server) sshReceivePack(stream gitalypb.SSHService_SSHReceivePackServer, req *gitalypb.SSHReceivePackRequest) (returnedErr error) { ctx := stream.Context() stdin := streamio.NewReader(func() ([]byte, error) { request, err := stream.Recv() return request.GetStdin(), err }) var m sync.Mutex stdout := streamio.NewSyncWriter(&m, func(p []byte) error { return stream.Send(&gitalypb.SSHReceivePackResponse{Stdout: p}) }) // We both need to listen in on the stderr stream in order to be able to judge what exactly // is happening, but also relay the output to the client. We thus create a MultiWriter to // enable both at the same time. var stderrBuilder strings.Builder stderr := streamio.NewSyncWriter(&m, func(p []byte) error { return stream.Send(&gitalypb.SSHReceivePackResponse{Stderr: p}) }) stderr = io.MultiWriter(&stderrBuilder, stderr) repoPath, err := s.locator.GetRepoPath(ctx, req.GetRepository()) if err != nil { return err } config, err := gitcmd.ConvertConfigOptions(req.GetGitConfigOptions()) if err != nil { return err } // When an `exec.Cmd` has its `cmd.Stdin` configured with an `io.Reader` // that is not also of type `os.File` a goroutine is automatically // configured that performs an `io.Copy()` between the reader and a newly // created pipe. A problem with this can arise when `cmd.Wait()` is invoked // because it waits not only for the process to complete but also all the // goroutine to end. If the configured `cmd.Stdin` is only of type // `io.Reader` and never closed, the goroutine will never end. This leads to // `cmd.Wait()` being blocked indefinitely. // // Within Gitaly this problem can manifest itself when a git process crashes // before `stdin` reaches EOF. To date this has only been noticed as a // problem for the `SSHReceivePack` RPC, so a pipe and goroutine have been // created explicitly to prevent `cmd.Wait()` from blocking indefinitely. pr, pw, err := os.Pipe() if err != nil { return fmt.Errorf("creating pipe: %w", err) } go func() { _, _ = io.Copy(pw, stdin) _ = pw.Close() }() repo := s.localRepoFactory.Build(req.GetRepository()) transactionID := storage.ExtractTransactionID(ctx) transactionsEnabled := transactionID > 0 if transactionsEnabled { procReceiveCleanup, err := receivepack.RegisterProcReceiveHook( ctx, s.logger, s.cfg, req, repo, s.hookManager, hook.NewTransactionRegistry(s.txRegistry), transactionID, ) if err != nil { return err } defer func() { if err := procReceiveCleanup(); err != nil && returnedErr == nil { returnedErr = err } }() } objectHash, err := repo.ObjectHash(ctx) if err != nil { return fmt.Errorf("detecting object hash: %w", err) } cmd, err := repo.Exec(ctx, gitcmd.Command{Name: "receive-pack", Args: []string{repoPath}}, gitcmd.WithStdin(pr), gitcmd.WithStdout(stdout), gitcmd.WithStderr(stderr), gitcmd.WithReceivePackHooks(objectHash, req, "ssh", transactionsEnabled), gitcmd.WithGitProtocol(s.logger, req), gitcmd.WithConfig(config...), ) if err != nil { return fmt.Errorf("start cmd: %w", err) } if err := cmd.Wait(); err != nil { status, ok := command.ExitStatus(err) if !ok { return fmt.Errorf("extracting exit status: %w", err) } // When the command has failed we both want to send its exit status as well as // return an error from this RPC call. Otherwise we'd fail the RPC, but return with // an `OK` error code to the client. if errSend := stream.Send(&gitalypb.SSHReceivePackResponse{ ExitStatus: &gitalypb.ExitStatus{Value: int32(status)}, }); errSend != nil { s.logger.WithError(errSend).ErrorContext(ctx, "send final status code") } // Detect the case where the user has cancelled the push and log it with a proper // gRPC error code. We can't do anything about this error anyway and it is a totally // valid outcome. if stderrBuilder.String() == "fatal: the remote end hung up unexpectedly\n" { return structerr.NewCanceled("user canceled the push") } return fmt.Errorf("cmd wait: %w", err) } return nil } func validateFirstReceivePackRequest(ctx context.Context, locator storage.Locator, req *gitalypb.SSHReceivePackRequest) error { if req.GetGlId() == "" { return errors.New("empty GlId") } if req.Stdin != nil { return errors.New("non-empty data in first request") } return locator.ValidateRepository(ctx, req.GetRepository()) }