internal/gitaly/service/hook/pack_objects.go (332 lines of code) (raw):

package hook import ( "bufio" "bytes" "context" "crypto/sha256" "encoding/hex" "errors" "fmt" "hash" "io" "net" "os" "strings" "syscall" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline" gitalyhook "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/stream" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/protobuf/encoding/protojson" ) var ( packObjectsServedBytes = promauto.NewCounter(prometheus.CounterOpts{ Name: "gitaly_pack_objects_served_bytes_total", Help: "Number of bytes of git-pack-objects data served to clients", }) packObjectsCacheLookups = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "gitaly_pack_objects_cache_lookups_total", Help: "Number of lookups in the PackObjectsHook cache, divided by hit/miss", }, []string{"result"}) packObjectsGeneratedBytes = promauto.NewCounter(prometheus.CounterOpts{ Name: "gitaly_pack_objects_generated_bytes_total", Help: "Number of bytes generated in PackObjectsHook by running git-pack-objects", }) ) func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsHookWithSidechannelRequest, args *packObjectsArgs, stdinReader io.Reader, output io.Writer) error { cacheKey, stdin, err := s.computeCacheKey(ctx, req, stdinReader) if err != nil { return err } // We do not know yet who has to close stdin. In case of a cache hit, it // is us. In case of a cache miss, a separate goroutine will run // git-pack-objects, and that goroutine may outlive the current request. // In that case, that separate goroutine will be responsible for closing // stdin. closeStdin := true defer func() { if closeStdin { stdin.Close() } }() servedBytes, created, err := s.packObjectsCache.Fetch(ctx, cacheKey, output, func(w io.Writer) error { ipAddr := net.ParseIP(req.GetRemoteIp()) if ipAddr == nil { // Best effort, maybe the remote IP includes source port if ip, _, err := net.SplitHostPort(req.GetRemoteIp()); err == nil { ipAddr = net.ParseIP(ip) } } // Ignore loop-back IPs if ipAddr != nil && !ipAddr.IsLoopback() { return s.runPackObjectsLimited( ctx, w, ipAddr.String(), req, args, stdin, cacheKey, ) } return s.runPackObjects(ctx, w, req, args, stdin, cacheKey) }) if err != nil { return err } if created { closeStdin = false packObjectsCacheLookups.WithLabelValues("miss").Inc() } else { packObjectsCacheLookups.WithLabelValues("hit").Inc() } customFields := log.CustomFieldsFromContext(ctx) if customFields != nil { customFields.RecordMetadata("pack_objects_cache.key", cacheKey) customFields.RecordSum("pack_objects_cache.served_bytes", int(servedBytes)) if created { customFields.RecordMetadata("pack_objects_cache.hit", "false") } else { customFields.RecordMetadata("pack_objects_cache.hit", "true") } } packObjectsServedBytes.Add(float64(servedBytes)) return nil } // computeCacheKey returns the cache key used for caching pack-objects. A cache key is made up of // both the requested objects and essential parameters that could impact the content of the // generated packfile. Including any insignificant information could result in a lower cache hit rate. func (s *server) computeCacheKey(ctx context.Context, req *gitalypb.PackObjectsHookWithSidechannelRequest, stdinReader io.Reader) (string, io.ReadCloser, error) { cacheHash := sha256.New() repository := req.GetRepository() if tx := storage.ExtractTransaction(ctx); tx != nil { // The cache uses the requests as the keys. As the request's repository in the RPC handler has been rewritten // to point to the transaction's repository, the handler sees each request as different even if they point to // the same repository. Restore the original request to ensure identical requests get the same key. repository = tx.OriginalRepository(req.GetRepository()) } cacheKeyPrefix, err := protojson.Marshal(&gitalypb.PackObjectsHookWithSidechannelRequest{ Repository: repository, Args: req.GetArgs(), GitProtocol: req.GetGitProtocol(), }) if err != nil { return "", nil, err } if _, err := cacheHash.Write(cacheKeyPrefix); err != nil { return "", nil, err } stdin, err := bufferStdin(stdinReader, cacheHash) if err != nil { return "", nil, err } cacheKey := hex.EncodeToString(cacheHash.Sum(nil)) return cacheKey, stdin, nil } func (s *server) runPackObjects( ctx context.Context, w io.Writer, req *gitalypb.PackObjectsHookWithSidechannelRequest, args *packObjectsArgs, stdin io.ReadCloser, key string, ) error { // We want to keep the context for logging, but we want to block all its // cancellation signals (deadline, cancel etc.). This is because of // the following scenario. Imagine client1 calls PackObjectsHook and // causes runPackObjects to run in a goroutine. Now suppose that client2 // calls PackObjectsHook with the same arguments and stdin, so it joins // client1 in waiting for this goroutine. Now client1 hangs up before the // runPackObjects goroutine is done. // // If the cancellation of client1 propagated into the runPackObjects // goroutine this would affect client2. We don't want that. So to prevent // that, we suppress the cancellation of the originating context. ctx = context.WithoutCancel(ctx) ctx, cancel := context.WithCancel(ctx) defer cancel() defer stdin.Close() return s.runPackObjectsFn(ctx, s.gitCmdFactory, w, req, args, stdin, key) } func (s *server) runPackObjectsLimited( ctx context.Context, w io.Writer, limitkey string, req *gitalypb.PackObjectsHookWithSidechannelRequest, args *packObjectsArgs, stdin io.ReadCloser, key string, ) error { ctx = context.WithoutCancel(ctx) ctx, cancel := context.WithCancel(ctx) defer cancel() defer stdin.Close() if _, err := s.packObjectsLimiter.Limit( ctx, limitkey, func() (interface{}, error) { return nil, s.runPackObjectsFn( ctx, s.gitCmdFactory, w, req, args, stdin, key, ) }, ); err != nil { return err } return nil } func runPackObjects( ctx context.Context, gitCmdFactory gitcmd.CommandFactory, w io.Writer, req *gitalypb.PackObjectsHookWithSidechannelRequest, args *packObjectsArgs, stdin io.Reader, key string, ) error { repo := req.GetRepository() counter := &helper.CountingWriter{W: w} sw := pktline.NewSidebandWriter(counter) stdout := bufio.NewWriterSize(sw.Writer(stream.BandStdout), pktline.MaxSidebandData) stderrBuf := &bytes.Buffer{} stderr := io.MultiWriter(sw.Writer(stream.BandStderr), stderrBuf) defer func() { packObjectsGeneratedBytes.Add(float64(counter.N)) customFields := log.CustomFieldsFromContext(ctx) if customFields != nil { customFields.RecordMetadata("pack_objects_cache.key", key) customFields.RecordSum("pack_objects_cache.generated_bytes", int(counter.N)) if total := totalMessage(stderrBuf.Bytes()); total != "" { customFields.RecordMetadata("pack_objects.compression_statistics", total) } } }() cmd, err := gitCmdFactory.New(ctx, repo, args.subcmd(), gitcmd.WithStdin(stdin), gitcmd.WithStdout(stdout), gitcmd.WithStderr(stderr), gitcmd.WithGlobalOption(args.globals()...), ) if err != nil { return err } if err := cmd.Wait(); err != nil { return fmt.Errorf("git-pack-objects: stderr: %q err: %w", stderrBuf.String(), err) } if err := stdout.Flush(); err != nil { return fmt.Errorf("flush stdout: %w", err) } return nil } func totalMessage(stderr []byte) string { start := bytes.Index(stderr, []byte("Total ")) if start < 0 { return "" } end := bytes.Index(stderr[start:], []byte("\n")) if end < 0 { return "" } return string(stderr[start : start+end]) } var ( errNoPackObjects = errors.New("missing pack-objects") errNonFlagArg = errors.New("non-flag argument") errNoStdout = errors.New("missing --stdout") ) func parsePackObjectsArgs(args []string) (*packObjectsArgs, error) { result := &packObjectsArgs{} // Check for special argument used with shallow clone: // https://gitlab.com/gitlab-org/git/-/blob/v2.30.0/upload-pack.c#L287-290 if len(args) >= 2 && args[0] == "--shallow-file" && args[1] == "" { result.shallowFile = true args = args[2:] } if len(args) < 1 || args[0] != "pack-objects" { return nil, errNoPackObjects } args = args[1:] // There should always be "--stdout" somewhere. Git-pack-objects can // write to a file too but we don't want that in this RPC. // https://gitlab.com/gitlab-org/git/-/blob/v2.30.0/upload-pack.c#L296 seenStdout := false for _, a := range args { if !strings.HasPrefix(a, "-") { return nil, errNonFlagArg } if a == "--stdout" { seenStdout = true } else { result.flags = append(result.flags, a) } } if !seenStdout { return nil, errNoStdout } return result, nil } type packObjectsArgs struct { shallowFile bool flags []string } func (p *packObjectsArgs) globals() []gitcmd.GlobalOption { var globals []gitcmd.GlobalOption if p.shallowFile { globals = append(globals, gitcmd.ValueFlag{Name: "--shallow-file", Value: ""}) } return globals } func (p *packObjectsArgs) subcmd() gitcmd.Command { sc := gitcmd.Command{ Name: "pack-objects", Flags: []gitcmd.Option{gitcmd.Flag{Name: "--stdout"}}, } for _, f := range p.flags { sc.Flags = append(sc.Flags, gitcmd.Flag{Name: f}) } return sc } func bufferStdin(r io.Reader, h hash.Hash) (_ io.ReadCloser, err error) { f, err := os.CreateTemp("", "PackObjectsHook-stdin") if err != nil { return nil, err } defer func() { if err != nil { f.Close() } }() if err := os.Remove(f.Name()); err != nil { return nil, err } _, err = io.Copy(f, io.TeeReader(r, h)) if err != nil { return nil, err } if _, err := f.Seek(0, io.SeekStart); err != nil { return nil, err } return f, nil } func (s *server) PackObjectsHookWithSidechannel(ctx context.Context, req *gitalypb.PackObjectsHookWithSidechannelRequest) (*gitalypb.PackObjectsHookWithSidechannelResponse, error) { if err := s.locator.ValidateRepository(ctx, req.GetRepository()); err != nil { return nil, structerr.NewInvalidArgument("%w", err) } args, err := parsePackObjectsArgs(req.GetArgs()) if err != nil { return nil, structerr.NewInvalidArgument("invalid pack-objects command: %v: %w", req.GetArgs(), err) } c, err := gitalyhook.GetSidechannel(ctx) if err != nil { if errors.As(err, &gitalyhook.InvalidSidechannelAddressError{}) { return nil, structerr.NewInvalidArgument("%w", err) } return nil, structerr.NewInternal("get side-channel: %w", err) } defer c.Close() hookPayload, err := gitcmd.HooksPayloadFromEnv(req.GetEnvironmentVariables()) if err != nil { return nil, fmt.Errorf("hook payload from env: %w", err) } if hookPayload.TransactionID > 0 { // If we're running with transactions, we need to restore the transaction into // the context so the helpers we use everywhere work in this context as well. // This handler is invoked through git and gitaly-hooks which means we're not // using the same context as in the actual RPC handler that the led to this call. tx, err := s.txRegistry.Get(hookPayload.TransactionID) if err != nil { return nil, fmt.Errorf("get transaction: %w", err) } ctx = storage.ContextWithTransaction(ctx, tx) } if err := s.packObjectsHook(ctx, req, args, c, c); err != nil { if errors.Is(err, syscall.EPIPE) { // EPIPE is the error we get if we try to write to c after the client has // closed its side of the connection. By convention, we label server side // errors caused by the client disconnecting with the Canceled gRPC code. err = structerr.NewCanceled("%w", err) } return nil, structerr.NewInternal("pack objects hook: %w", err) } if err := c.Close(); err != nil { return nil, structerr.NewInternal("close side-channel: %w", err) } return &gitalypb.PackObjectsHookWithSidechannelResponse{}, nil }