in internal/gitaly/storage/storagemgr/partition/snapshot/manager.go [152:302]
func (mgr *Manager) GetSnapshot(ctx context.Context, relativePaths []string, exclusive bool) (_ FileSystem, returnedErr error) {
defer trace.StartRegion(ctx, "GetSnapshot").End()
if exclusive {
mgr.metrics.createdExclusiveSnapshotTotal.Inc()
snapshot, err := mgr.newSnapshot(ctx, relativePaths, false)
if err != nil {
return nil, fmt.Errorf("new exclusive snapshot: %w", err)
}
mgr.logSnapshotCreation(ctx, exclusive, snapshot.stats)
return closeWrapper{
snapshot: snapshot,
close: func() error {
defer trace.StartRegion(ctx, "close exclusive snapshot").End()
mgr.metrics.destroyedExclusiveSnapshotTotal.Inc()
// Exclusive snapshots are not shared, so it can be removed as soon
// as the user finishes with it.
if err := snapshot.Close(); err != nil {
return fmt.Errorf("close exclusive snapshot: %w", err)
}
return nil
},
}, nil
}
// This is a shared snapshot.
key := mgr.key(relativePaths)
mgr.mutex.Lock()
lsn := mgr.currentLSN
if mgr.activeSharedSnapshots[lsn] == nil {
mgr.activeSharedSnapshots[lsn] = make(map[string]*sharedSnapshot)
}
// Check the active snapshots whether there's already a snapshot we could
// reuse.
wrapper, ok := mgr.activeSharedSnapshots[lsn][key]
if !ok {
// If no one is actively using a similar snapshot, check whether the
// snapshot cache contains snapshot of the data we're looking to access.
if wrapper, ok = mgr.inactiveSharedSnapshots.Get(key); ok {
// There was a suitable snapshot in the cache. Remove it from the cache
// and place it in active snapshots.
mgr.activeSharedSnapshots[lsn][key] = wrapper
mgr.inactiveSharedSnapshots.Remove(key)
} else {
// If there isn't a snapshot yet, create the synchronization
// state to ensure other goroutines won't concurrently create
// another snapshot, and instead wait for us to take the
// snapshot.
//
// Once the synchronization state is in place, we'll release
// the lock to allow other repositories to be concurrently
// snapshotted. The goroutines waiting for this snapshot
// wait on the `ready` channel.
wrapper = &sharedSnapshot{ready: make(chan struct{})}
mgr.activeSharedSnapshots[lsn][key] = wrapper
}
}
// Increment the reference counter to record that we are using
// the snapshot.
wrapper.referenceCount++
mgr.mutex.Unlock()
cleanup := func() error {
defer trace.StartRegion(ctx, "close shared snapshot").End()
var snapshotToRemove *sharedSnapshot
mgr.mutex.Lock()
wrapper.referenceCount--
if wrapper.referenceCount == 0 {
// If we were the last user of the snapshot, remove it.
delete(mgr.activeSharedSnapshots[lsn], key)
// If this was the last snapshot on the given LSN, also
// clear the LSNs entry.
if len(mgr.activeSharedSnapshots[lsn]) == 0 {
delete(mgr.activeSharedSnapshots, lsn)
}
// We need to remove the file system state of the snapshot
// only if it was successfully created.
if wrapper.snapshot != nil {
snapshotToRemove = wrapper
// If this snapshot is up to date, cache it instead of removing it.
if lsn == mgr.currentLSN {
// Since we're caching the snapshot, we don't want to remove it anymore.
snapshotToRemove = nil
// Evict the oldest snapshot from the cache if we're at the limit.
if mgr.inactiveSharedSnapshots.Len() == mgr.maxInactiveSharedSnapshots {
_, snapshotToRemove, _ = mgr.inactiveSharedSnapshots.RemoveOldest()
}
mgr.inactiveSharedSnapshots.Add(key, wrapper)
}
}
}
mgr.mutex.Unlock()
if snapshotToRemove != nil {
mgr.metrics.destroyedSharedSnapshotTotal.Inc()
if err := snapshotToRemove.snapshot.Close(); err != nil {
return fmt.Errorf("close shared snapshot: %w", err)
}
}
return nil
}
defer func() {
if returnedErr != nil {
if err := cleanup(); err != nil {
returnedErr = errors.Join(returnedErr, fmt.Errorf("clean failed snapshot: %w", err))
}
}
}()
if !ok {
mgr.metrics.createdSharedSnapshotTotal.Inc()
// If there was no existing snapshot, we need to create it.
wrapper.snapshot, wrapper.snapshotErr = mgr.newSnapshot(ctx, relativePaths, true)
// Other goroutines are waiting on the ready channel for us to finish the snapshotting
// so close it to signal the process is finished.
close(wrapper.ready)
if wrapper.snapshotErr == nil {
mgr.logSnapshotCreation(ctx, exclusive, wrapper.snapshot.stats)
}
} else {
mgr.metrics.reusedSharedSnapshotTotal.Inc()
}
select {
case <-wrapper.ready:
if wrapper.snapshotErr != nil {
return nil, fmt.Errorf("new shared snapshot: %w", wrapper.snapshotErr)
}
case <-ctx.Done():
return nil, ctx.Err()
}
return closeWrapper{
snapshot: wrapper.snapshot,
close: cleanup,
}, nil
}