internal/git/catfile/object_reader.go (167 lines of code) (raw):
package catfile
import (
"bufio"
"context"
"fmt"
"strings"
"sync/atomic"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v16/internal/command"
"gitlab.com/gitlab-org/gitaly/v16/internal/featureflag"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd"
)
// ObjectReader returns information about an object referenced by a given revision.
type ObjectReader interface {
cacheable
// Info returns object information for the given revision.
Info(context.Context, git.Revision) (*ObjectInfo, error)
// Object returns a new Object for the given revision. The Object must be fully consumed
// before another object is requested.
Object(context.Context, git.Revision) (*Object, error)
// Queue returns an Queue that can be used to batch multiple requests. Using the
// queue is more efficient than using `Object()` when requesting a bunch of requests.
// The returned function must be executed after use of the Queue has finished. Object
// Content and information can be requested from the queue but their respective
// ordering must be maintained.
Queue(context.Context) (Queue, func(), error)
}
// Queue allows for requesting and reading objects independently of each other. The number of
// RequestObject+RequestInfo and ReadObject+RequestInfo calls must match and their ordering must be
// maintained. ReadObject/ReadInfo must be executed after the object has been requested already.
// The order of objects returned by ReadObject/ReadInfo is the same as the order in
// which objects have been requested. Users of this interface must call `Flush()` after all requests
// have been queued up such that all requested objects will be readable.
type Queue interface {
// RequestObject requests the given revision from git-cat-file(1).
RequestObject(context.Context, git.Revision) error
// ReadObject reads an object which has previously been requested.
ReadObject(context.Context) (*Object, error)
// RequestInfo requests the given revision from git-cat-file(1).
RequestInfo(context.Context, git.Revision) error
// ReadInfo reads object info which has previously been requested.
ReadInfo(context.Context) (*ObjectInfo, error)
// Flush flushes all queued requests and asks git-cat-file(1) to print all objects which
// have been requested up to this point.
Flush(context.Context) error
}
// objectReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file
// --batch-command` process such that we do not have to spawn a new process for each object we
// are about to read.
type objectReader struct {
cmd *command.Command
counter *prometheus.CounterVec
q requestQueue
queueInUse int32
}
type objectReaderConfig struct {
disableMailmap bool
}
// ObjectReaderOption are options which can be passed to newObjectReader to set
// required configuration.
type ObjectReaderOption func(cfg *objectReaderConfig)
// WithoutMailmap ensure mailmap entries are not considered.
func WithoutMailmap() ObjectReaderOption {
return func(cfg *objectReaderConfig) {
cfg.disableMailmap = true
}
}
func newObjectReader(
ctx context.Context,
repo gitcmd.RepositoryExecutor,
counter *prometheus.CounterVec,
opts ...ObjectReaderOption,
) (*objectReader, error) {
flags := []gitcmd.Option{
gitcmd.Flag{Name: "-Z"},
gitcmd.Flag{Name: "--batch-command"},
gitcmd.Flag{Name: "--buffer"},
}
var cfg objectReaderConfig
for _, opt := range opts {
opt(&cfg)
}
if featureflag.MailmapOptions.IsEnabled(ctx) && !cfg.disableMailmap {
flags = append([]gitcmd.Option{gitcmd.Flag{Name: "--use-mailmap"}}, flags...)
}
batchCmd, err := repo.Exec(ctx,
gitcmd.Command{
Name: "cat-file",
Flags: flags,
},
gitcmd.WithSetupStdin(),
gitcmd.WithSetupStdout(),
gitcmd.WithCompletionErrorLogFilter(func(cmd *command.Command, stderr string) bool {
return isEvictedCatfileProcessWithMigratedQuarantine(cmd.Env(), stderr)
}),
)
if err != nil {
return nil, err
}
objectHash, err := repo.ObjectHash(ctx)
if err != nil {
return nil, fmt.Errorf("detecting object hash: %w", err)
}
objectReader := &objectReader{
cmd: batchCmd,
counter: counter,
q: requestQueue{
objectHash: objectHash,
stdout: bufio.NewReader(batchCmd),
stdin: bufio.NewWriter(batchCmd),
},
}
return objectReader, nil
}
func (o *objectReader) close() {
o.q.close()
_ = o.cmd.Wait()
}
func (o *objectReader) isClosed() bool {
return o.q.isClosed()
}
func (o *objectReader) isDirty() bool {
if atomic.LoadInt32(&o.queueInUse) != 0 {
return true
}
return o.q.isDirty()
}
func (o *objectReader) queue(ctx context.Context, tracedMethod string) (*requestQueue, func(), error) {
if !atomic.CompareAndSwapInt32(&o.queueInUse, 0, 1) {
return nil, nil, fmt.Errorf("object queue already in use")
}
trace := startTrace(ctx, o.counter, tracedMethod)
o.q.trace = trace
return &o.q, func() {
atomic.StoreInt32(&o.queueInUse, 0)
trace.finish()
}, nil
}
// Object returns a new Object for the given revision. The Object must be fully consumed
// before another object is requested.
func (o *objectReader) Object(ctx context.Context, revision git.Revision) (*Object, error) {
queue, finish, err := o.queue(ctx, "catfile.Object")
if err != nil {
return nil, err
}
defer finish()
if err := queue.RequestObject(ctx, revision); err != nil {
return nil, err
}
if err := queue.Flush(ctx); err != nil {
return nil, err
}
object, err := queue.ReadObject(ctx)
if err != nil {
return nil, err
}
return object, nil
}
// Queue returns an Queue that can be used to batch multiple requests. Using the
// queue is more efficient than using `Object()` when requesting a bunch of requests.
// The returned function must be executed after use of the Queue has finished. Object
// Content and information can be requested from the queue but their respective
// ordering must be maintained.
func (o *objectReader) Queue(ctx context.Context) (Queue, func(), error) {
queue, finish, err := o.queue(ctx, "catfile.Queue")
if err != nil {
return nil, nil, err
}
return queue, finish, nil
}
// Info returns object information for the given revision.
func (o *objectReader) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) {
queue, cleanup, err := o.queue(ctx, "catfile.Info")
if err != nil {
return nil, err
}
defer cleanup()
if err := queue.RequestInfo(ctx, revision); err != nil {
return nil, err
}
if err := queue.Flush(ctx); err != nil {
return nil, err
}
objectInfo, err := queue.ReadInfo(ctx)
if err != nil {
return nil, err
}
return objectInfo, nil
}
// isEvictedCatfileProcessWithMigratedQuarantine tests whether a command with the given env and stderr output
// is likely a cat-file process that was operating on a quarantine repository. When a quarantine is applied, the
// GIT_OBJECT_DIRECTORY and GIT_ALTERNATE_OBJECT_DIRECTORIES are set to point Git at the quarantined objects.
func isEvictedCatfileProcessWithMigratedQuarantine(env []string, stderr string) bool {
if !strings.HasPrefix(stderr, "fatal: not a git repository") {
return false
}
matchedConditions := 0
for _, env := range env {
if strings.HasPrefix(env, "GIT_OBJECT_DIRECTORY") || strings.HasPrefix(env, "GIT_ALTERNATE_OBJECT_DIRECTORIES") {
matchedConditions++
}
}
return matchedConditions == 2
}