internal/gitaly/service/smarthttp/upload_pack.go (135 lines of code) (raw):
package smarthttp
import (
"context"
"crypto/sha1"
"errors"
"fmt"
"io"
"gitlab.com/gitlab-org/gitaly/v16/internal/bundleuri"
"gitlab.com/gitlab-org/gitaly/v16/internal/command"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/stats"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
func (s *server) PostUploadPackWithSidechannel(ctx context.Context, req *gitalypb.PostUploadPackWithSidechannelRequest) (*gitalypb.PostUploadPackWithSidechannelResponse, error) {
repoPath, gitConfig, err := s.validateUploadPackRequest(ctx, req)
if err != nil {
return nil, structerr.NewInvalidArgument("%w", err)
}
var sidechannelRetryableError sidechannel.RetryableError
conn, err := sidechannel.OpenSidechannel(ctx)
if err != nil {
if errors.As(err, &sidechannelRetryableError) {
// Clients of PostUploadPackWithSidechannel are configured to retry the RPC upon receiving
// Unavailable, so it should be OK to return it in this case.
//nolint:gitaly-linters
return nil, structerr.NewUnavailable("open sidechannel: %w", err)
}
return nil, structerr.NewInternal("open sidechannel: %w", err)
}
defer conn.Close()
stats, err := s.runUploadPack(ctx, req, repoPath, gitConfig, conn, conn)
if err != nil {
return nil, structerr.NewInternal("running upload-pack: %w", err)
}
if err := conn.Close(); err != nil {
return nil, structerr.NewInternal("close sidechannel connection: %w", err)
}
return &gitalypb.PostUploadPackWithSidechannelResponse{
PackfileNegotiationStatistics: stats.ToProto(),
}, nil
}
type statsCollector struct {
c io.Closer
statsCh chan stats.PackfileNegotiation
}
func (sc *statsCollector) finish() *stats.PackfileNegotiation {
sc.c.Close()
stats := <-sc.statsCh
return &stats
}
func (s *server) runStatsCollector(ctx context.Context, r io.Reader) (io.Reader, *statsCollector) {
pr, pw := io.Pipe()
sc := &statsCollector{
c: pw,
statsCh: make(chan stats.PackfileNegotiation, 1),
}
go func() {
defer close(sc.statsCh)
stats, err := stats.ParsePackfileNegotiation(pr)
if err != nil {
s.logger.WithError(err).DebugContext(ctx, "failed parsing packfile negotiation")
return
}
stats.UpdateMetrics(s.packfileNegotiationMetrics)
stats.UpdateLogFields(ctx)
sc.statsCh <- stats
}()
return io.TeeReader(r, pw), sc
}
func (s *server) validateUploadPackRequest(ctx context.Context, req *gitalypb.PostUploadPackWithSidechannelRequest) (string, []gitcmd.ConfigPair, error) {
repository := req.GetRepository()
if err := s.locator.ValidateRepository(ctx, repository); err != nil {
return "", nil, err
}
repoPath, err := s.locator.GetRepoPath(ctx, repository)
if err != nil {
return "", nil, err
}
gitcmd.WarnIfTooManyBitmaps(ctx, s.logger, s.locator, repository.GetStorageName(), repoPath)
config, err := gitcmd.ConvertConfigOptions(req.GetGitConfigOptions())
if err != nil {
return "", nil, err
}
return repoPath, config, nil
}
func (s *server) runUploadPack(ctx context.Context, req *gitalypb.PostUploadPackWithSidechannelRequest, repoPath string, gitConfig []gitcmd.ConfigPair, stdin io.Reader, stdout io.Writer) (stats *stats.PackfileNegotiation, _ error) {
h := sha1.New()
stdin = io.TeeReader(stdin, h)
stdin, collector := s.runStatsCollector(ctx, stdin)
defer func() {
if stats == nil {
stats = collector.finish()
}
}()
repo := s.localRepoFactory.Build(req.GetRepository())
if s.bundleURIManager != nil {
// Bundle generation is an optimization that is transparent to users.
// If it fails, we log the error but continue with the regular upload-pack
// operation without the bundle optimization.
// If successful, a goroutine is spawned to generate the bundle, in which case
// the bundle generation becomes independent of the RPC request.
if err := s.bundleURIManager.GenerateWithStrategy(ctx, repo); err != nil {
s.logger.WithError(err).Error("failed generating bundle")
}
gitConfig = append(gitConfig, s.bundleURIManager.UploadPackGitConfig(ctx, req.GetRepository())...)
} else {
gitConfig = append(gitConfig, bundleuri.CapabilitiesGitConfig(ctx, false)...)
}
objectHash, err := repo.ObjectHash(ctx)
if err != nil {
return nil, fmt.Errorf("detecting object hash: %w", err)
}
commandOpts := []gitcmd.CmdOpt{
gitcmd.WithStdin(stdin),
gitcmd.WithSetupStdout(),
gitcmd.WithGitProtocol(s.logger, req),
gitcmd.WithConfig(gitConfig...),
gitcmd.WithPackObjectsHookEnv(objectHash, req.GetRepository(), "http"),
}
cmd, err := repo.Exec(ctx, gitcmd.Command{
Name: "upload-pack",
Flags: []gitcmd.Option{gitcmd.Flag{Name: "--stateless-rpc"}},
Args: []string{repoPath},
}, commandOpts...)
if err != nil {
return nil, structerr.NewFailedPrecondition("spawning upload-pack: %w", err)
}
// Use a custom buffer size to minimize the number of system calls.
respBytes, err := io.CopyBuffer(stdout, cmd, make([]byte, 64*1024))
if err != nil {
return nil, structerr.NewFailedPrecondition("copying stdout from upload-pack: %w", err)
}
if err := cmd.Wait(); err != nil {
stats = collector.finish()
if _, ok := command.ExitStatus(err); ok && stats.Deepen != "" {
// We have seen a 'deepen' message in the request. It is expected that
// git-upload-pack has a non-zero exit status: don't treat this as an
// error.
return stats, nil
}
return nil, structerr.NewFailedPrecondition("waiting for upload-pack: %w", err)
}
s.logger.WithField("request_sha", fmt.Sprintf("%x", h.Sum(nil))).WithField("response_bytes", respBytes).InfoContext(ctx, "request details")
return nil, nil
}