internal/gitaly/storage/storage.go (103 lines of code) (raw):
// Package storage contains the storage layer of Gitaly.
//
// Each Gitaly node contains one or more storages. Each storage has a name,
// and points to a directory on the file system where it stores its state.
//
// Each storage can contain one or more partitions. Storages are a collection of
// partitions. Data is stored within partitions.
//
// Partitions are accessed through transactions.
package storage
import (
"context"
"errors"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
housekeepingcfg "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
var (
// ErrTransactionProcessingStopped is returned when the TransactionManager stops processing transactions.
ErrTransactionProcessingStopped = errors.New("transaction processing stopped")
// ErrTransactionAlreadyCommitted is returned when attempting to rollback or commit a transaction that
// already had commit called on it.
ErrTransactionAlreadyCommitted = errors.New("transaction already committed")
// ErrTransactionAlreadyRollbacked is returned when attempting to rollback or commit a transaction that
// already had rollback called on it.
ErrTransactionAlreadyRollbacked = errors.New("transaction already rollbacked")
// ErrAlternatePointsToSelf is returned when a repository's alternate points to the
// repository itself.
ErrAlternatePointsToSelf = errors.New("repository's alternate points to self")
// ErrAlternateHasAlternate is returned when a repository's alternate itself has an
// alternate listed.
ErrAlternateHasAlternate = errors.New("repository's alternate has an alternate itself")
// ErrPartitionAssignmentNotFound is returned when attempting to access a
// partition assignment in the database that doesn't yet exist.
ErrPartitionAssignmentNotFound = errors.New("partition assignment not found")
)
// PartitionIterator provides an interface for iterating over partition IDs.
type PartitionIterator interface {
// Next advances the iterator to the next valid partition ID.
Next() bool
// GetPartitionID returns the current partition ID of the iterator.
GetPartitionID() PartitionID
// Err returns the error of the iterator.
Err() error
// Close closes the iterator and discards the underlying transaction
Close()
}
// FS is the transaction's file system snapshot.
//
// All of the input paths must be relative to Root().
type FS interface {
// Root is the absolute path to the root of the transaction's file system snapshot.
Root() string
// RecordRead records the given path as read by the transaction.
RecordRead(path string) error
// RecordFile records a file creation into the transaction.
RecordFile(path string) error
// RecordLink records a hard link creation into the transaction.
RecordLink(sourcePath, destinationPath string) error
// RecordDirectory records a directory creation into the transaction.
RecordDirectory(path string) error
// RecordRemoval records a directory entry removal into the transaction.
RecordRemoval(path string) error
}
// Transaction is a single unit-of-work that executes as a whole.
type Transaction interface {
// Commit commits the transaction. It returns once the transaction's
// changes have been durably persisted.
Commit(context.Context) error
// Rollback aborts the transactions and discards all of its changes.
Rollback(context.Context) error
// SnapshotLSN returns the Log Sequence Number (LSN) of the transaction's snapshot. This value is used to track and order transactions.
SnapshotLSN() LSN
// KV returns a ReadWriter that can be used to read or write the key-value state
// in the transaction's snapshot.
KV() keyvalue.ReadWriter
// FS is the interface of the transaction's file system snapshot.
FS() FS
// UpdateReferences updates the given references as part of the transaction. Each call is treated as
// a different reference transaction. This allows for performing directory-file conflict inducing
// changes in a transaction. For example:
//
// - First call - delete 'refs/heads/parent'
// - Second call - create 'refs/heads/parent/child'
//
// If a reference is updated multiple times during a transaction, its first recorded old OID used as
// the old OID when verifying the reference update, and the last recorded new OID is used as the new
// OID in the final commit. This means updates like 'oid-1 -> oid-2 -> oid-3' will ultimately be
// committed as 'oid-1 -> oid-3'. The old OIDs of the intermediate states are not verified when
// committing the write to the actual repository and are discarded from the final committed log
// entry.
UpdateReferences(context.Context, git.ReferenceUpdates) error
// RecordInitialReferenceValues records the initial values of the references for the next UpdateReferences call. If oid is
// not a zero OID, it's used as the initial value. If oid is a zero value, the reference's actual value is resolved.
//
// The reference's first recorded value is used as its old OID in the update. RecordInitialReferenceValues can be used to
// record the value without staging an update in the transaction. This is useful for example generally recording the initial
// value in the 'prepare' phase of the reference transaction hook before any changes are made without staging any updates
// before the 'committed' phase is reached. The recorded initial values are only used for the next UpdateReferences call.
RecordInitialReferenceValues(context.Context, map[git.ReferenceName]git.Reference) error
// IncludeObject includes the given object and its dependencies in the transaction's logged pack file even
// if the object is unreachable from the references.
IncludeObject(git.ObjectID)
// DeleteRepository deletes the repository when the transaction is committed.
DeleteRepository()
// PackRefs runs reference repacking housekeeping when the transaction commits. If this
// is called, the transaction is limited to running only other housekeeping tasks. No other
// updates are allowed.
PackRefs()
// Repack runs object repacking housekeeping task when the transaction commits. If this
// is called, the transaction is limited to running only other housekeeping tasks. No other
// updates are allowed.
Repack(housekeepingcfg.RepackObjectsConfig)
// WriteCommitGraphs rewrites the commit graphs when the transaction commits. If this
// is called, the transaction is limited to running only other housekeeping tasks. No other
// updates are allowed.
WriteCommitGraphs(housekeepingcfg.WriteCommitGraphConfig)
// RewriteRepository rewrites the repository to point to the transaction's snapshot.
RewriteRepository(*gitalypb.Repository) *gitalypb.Repository
// OriginalRepository returns the repository as it was before rewriting it to point to the snapshot.
OriginalRepository(*gitalypb.Repository) *gitalypb.Repository
// PartitionRelativePaths returns all known repository relative paths for
// the transactions partition.
PartitionRelativePaths() []string
// SetOffloadingConfig configures the transaction with repository offloading parameters
// and schedules the offloading operation to execute when the transaction commits.
// It stores the offloading configuration in the transaction's runOffloading struct.
SetOffloadingConfig(housekeepingcfg.OffloadingConfig)
}
// BeginOptions are used to configure a transaction that is being started.
type BeginOptions struct {
// Write indicates whether this is a write transaction. Transactions
// are read-only by default.
Write bool
// RelativePaths can be set to filter the relative paths that are included
// in the transaction's snapshot. When set, only the contained relative paths
// are included in the transaction's disk snapshot. If empty, nothing is
// included in the transactions disk snapshot. If nil, no filtering is done
// and the partition's full disk state is included in the snapshot.
//
// The first relative path is the target repository, and is the only repository
// that can be written into.
RelativePaths []string
// ForceExclusiveSnapshot forces the transaction to use an exclusive snapshot.
// This is a temporary workaround for some RPCs that do not work well with shared
// read-only snapshots yet.
ForceExclusiveSnapshot bool
}
// LogConsumer is the interface of a log consumer that is passed to a TransactionManager.
// The LogConsumer may perform read-only operations against the on-disk log entry.
// The TransactionManager notifies the consumer of new transactions by invoking the
// NotifyNewTransaction method after they are committed.
type LogConsumer interface {
// NotifyNewEntries alerts the LogConsumer that new log entries are available for
// consumption. The method invoked both when the TransactionManager
// initializes and when new transactions are committed. Both the low and high water mark
// LSNs are sent so that a newly initialized consumer is aware of the full range of
// entries it can process.
NotifyNewEntries(storageName string, partitionID PartitionID, lowWaterMark, highWaterMark LSN)
}
// LogReader consumes and acknowledges entries from the Write-Ahead Log.
type LogReader interface {
// GetEntryPath returns the path of the log entry's root directory.
GetEntryPath(lsn LSN) string
// AcknowledgePosition acknowledges log entries up and including lsn as successfully processed
// for the specified position type.
AcknowledgePosition(PositionType, LSN) error
// AppendedLSN returns the LSN of the latest appended log entry.
AppendedLSN() LSN
// LowWaterMark returns the earliest LSN of log entries which should be kept in the database. Any log entries LESS than
// this mark are removed.
LowWaterMark() LSN
}
// LogWriter adds entries to the Write-Ahead Log.
type LogWriter interface {
// AppendLogEntry appends an entry to the WAL. logEntryPath specifies the directory of the log entry. It returns
// the Log Sequence Number (LSN) of the appended log entry.
AppendLogEntry(logEntryPath string) (LSN, error)
// CompareAndAppendLogEntry is a variant of AppendLogEntry. It appends the log entry to the write-ahead log if and only
// if the inserting position matches the expected LSN.
CompareAndAppendLogEntry(lsn LSN, logEntryPath string) (LSN, error)
// NotifyNewEntries sends a signal to the notification queue. This signal indicates that new log entries were
// inserted into the write-ahead log. The listener of GetNotificationQueue() should act accordingly. By default,
// only errors are reported through that channel. Typically, the caller inserts log entries via AppendLogEntry
// or CompareAndAppendLogEntry. The result is returned immediately. Sending a signal is redundant. On rarer
// occasions, another caller inserts a log entries out-of-band. Thus, it needs to trigger this notification
// manually.
NotifyNewEntries()
// DeleteLogEntry deletes the log entry at the given LSN from the log.
DeleteLogEntry(lsn LSN) error
}
// LogManager is the interface used to manage the underlying Write-Ahead Log entries.
type LogManager interface {
LogReader
LogWriter
// Initialize sets up the initial state of the LogManager, preparing it to manage log entries.
// It ensures the environment is ready, and previous states are resumed correctly.
Initialize(ctx context.Context, appliedLSN LSN) error
// Close stops the log manager, cleans up resources, and stops internal workers. The caller
// is blocked until complete.
Close() error
// GetNotificationQueue returns a channel that is used to notify external components of changes.
GetNotificationQueue() <-chan error
}
// PositionType implements storage.LogPositionType. It's a specific type of position to be tracked in the
// Write-Ahead Log (WAL) tracking system. It defines whether changes to this position type should trigger notifications.
type PositionType struct {
Name string
ShouldNotify bool
}
// Partition is responsible for a single partition of data.
type Partition interface {
// Begin begins a transaction against the partition.
Begin(context.Context, BeginOptions) (Transaction, error)
// Close closes the partition handle to signal the caller is done using it.
Close()
// GetLogReader provides controlled access to underlying log management system for log consumption purpose.
// It allows the consumers to access to on-disk location of a LSN and acknowledge consumed position.
GetLogReader() LogReader
// GetLogWriter provides controlled access to underlying log management system for log appending purpose.
GetLogWriter() LogWriter
}
// TransactionOptions are used to pass transaction options into Begin.
type TransactionOptions struct {
// ReadOnly indicates whether this is a read-only transaction. Read-only transactions are not
// configured with a quarantine directory and do not commit a log entry.
ReadOnly bool
// RelativePath specifies which repository in the partition will be the target.
RelativePath string
// AlternateRelativePath specifies a repository to include in the transaction's snapshot as well.
AlternateRelativePath string
// AllowPartitionAssignmentWithoutRepository determines whether a partition assignment should be
// written out even if repository does not exist.
AllowPartitionAssignmentWithoutRepository bool
// ForceExclusiveSnapshot forces the transactions to use an exclusive snapshot. This is a temporary
// workaround for some RPCs that do not work well with shared read-only snapshots yet.
ForceExclusiveSnapshot bool
}
// Storage is the interface of a storage.
type Storage interface {
// ListPartitions returns a partition iterator for listing the partitions.
ListPartitions(partitionID PartitionID) (PartitionIterator, error)
// MaybeAssignToPartition ensures that the repository at relativePath is assigned to a partition.
MaybeAssignToPartition(ctx context.Context, relativePath string) (PartitionID, error)
// GetAssignedPartitionID returns the assigned ID of the partition the relative path
// has been assigned to.
GetAssignedPartitionID(relativePath string) (PartitionID, error)
// Begin begins a transaction against a partition.
Begin(context.Context, TransactionOptions) (Transaction, error)
// GetPartition returns a new handle to a given partition. The caller must call
// Partition.Close() when the Partition is no longer needed.
GetPartition(context.Context, PartitionID) (Partition, error)
}
// Node is the interface of a node. Each Node may have zero or more storages.
type Node interface {
// GetStorage retrieves a handle to a Storage by its name.
GetStorage(storageName string) (Storage, error)
}