func()

in internal/gitaly/storage/storagemgr/partition/snapshot/manager.go [152:302]


func (mgr *Manager) GetSnapshot(ctx context.Context, relativePaths []string, exclusive bool) (_ FileSystem, returnedErr error) {
	defer trace.StartRegion(ctx, "GetSnapshot").End()
	if exclusive {
		mgr.metrics.createdExclusiveSnapshotTotal.Inc()
		snapshot, err := mgr.newSnapshot(ctx, relativePaths, false)
		if err != nil {
			return nil, fmt.Errorf("new exclusive snapshot: %w", err)
		}

		mgr.logSnapshotCreation(ctx, exclusive, snapshot.stats)

		return closeWrapper{
			snapshot: snapshot,
			close: func() error {
				defer trace.StartRegion(ctx, "close exclusive snapshot").End()

				mgr.metrics.destroyedExclusiveSnapshotTotal.Inc()
				// Exclusive snapshots are not shared, so it can be removed as soon
				// as the user finishes with it.
				if err := snapshot.Close(); err != nil {
					return fmt.Errorf("close exclusive snapshot: %w", err)
				}

				return nil
			},
		}, nil
	}

	// This is a shared snapshot.
	key := mgr.key(relativePaths)

	mgr.mutex.Lock()
	lsn := mgr.currentLSN
	if mgr.activeSharedSnapshots[lsn] == nil {
		mgr.activeSharedSnapshots[lsn] = make(map[string]*sharedSnapshot)
	}

	// Check the active snapshots whether there's already a snapshot we could
	// reuse.
	wrapper, ok := mgr.activeSharedSnapshots[lsn][key]
	if !ok {
		// If no one is actively using a similar snapshot, check whether the
		// snapshot cache contains snapshot of the data we're looking to access.
		if wrapper, ok = mgr.inactiveSharedSnapshots.Get(key); ok {
			// There was a suitable snapshot in the cache. Remove it from the cache
			// and place it in active snapshots.
			mgr.activeSharedSnapshots[lsn][key] = wrapper
			mgr.inactiveSharedSnapshots.Remove(key)
		} else {
			// If there isn't a snapshot yet, create the synchronization
			// state to ensure other goroutines won't concurrently create
			// another snapshot, and instead wait for us to take the
			// snapshot.
			//
			// Once the synchronization state is in place, we'll release
			// the lock to allow other repositories to be concurrently
			// snapshotted. The goroutines waiting for this snapshot
			// wait on the `ready` channel.
			wrapper = &sharedSnapshot{ready: make(chan struct{})}
			mgr.activeSharedSnapshots[lsn][key] = wrapper
		}
	}
	// Increment the reference counter to record that we are using
	// the snapshot.
	wrapper.referenceCount++
	mgr.mutex.Unlock()

	cleanup := func() error {
		defer trace.StartRegion(ctx, "close shared snapshot").End()

		var snapshotToRemove *sharedSnapshot

		mgr.mutex.Lock()
		wrapper.referenceCount--
		if wrapper.referenceCount == 0 {
			// If we were the last user of the snapshot, remove it.
			delete(mgr.activeSharedSnapshots[lsn], key)

			// If this was the last snapshot on the given LSN, also
			// clear the LSNs entry.
			if len(mgr.activeSharedSnapshots[lsn]) == 0 {
				delete(mgr.activeSharedSnapshots, lsn)
			}

			// We need to remove the file system state of the snapshot
			// only if it was successfully created.
			if wrapper.snapshot != nil {
				snapshotToRemove = wrapper

				// If this snapshot is up to date, cache it instead of removing it.
				if lsn == mgr.currentLSN {
					// Since we're caching the snapshot, we don't want to remove it anymore.
					snapshotToRemove = nil
					// Evict the oldest snapshot from the cache if we're at the limit.
					if mgr.inactiveSharedSnapshots.Len() == mgr.maxInactiveSharedSnapshots {
						_, snapshotToRemove, _ = mgr.inactiveSharedSnapshots.RemoveOldest()
					}

					mgr.inactiveSharedSnapshots.Add(key, wrapper)
				}
			}
		}
		mgr.mutex.Unlock()

		if snapshotToRemove != nil {
			mgr.metrics.destroyedSharedSnapshotTotal.Inc()
			if err := snapshotToRemove.snapshot.Close(); err != nil {
				return fmt.Errorf("close shared snapshot: %w", err)
			}
		}

		return nil
	}

	defer func() {
		if returnedErr != nil {
			if err := cleanup(); err != nil {
				returnedErr = errors.Join(returnedErr, fmt.Errorf("clean failed snapshot: %w", err))
			}
		}
	}()

	if !ok {
		mgr.metrics.createdSharedSnapshotTotal.Inc()
		// If there was no existing snapshot, we need to create it.
		wrapper.snapshot, wrapper.snapshotErr = mgr.newSnapshot(ctx, relativePaths, true)
		// Other goroutines are waiting on the ready channel for us to finish the snapshotting
		// so close it to signal the process is finished.
		close(wrapper.ready)

		if wrapper.snapshotErr == nil {
			mgr.logSnapshotCreation(ctx, exclusive, wrapper.snapshot.stats)
		}
	} else {
		mgr.metrics.reusedSharedSnapshotTotal.Inc()
	}

	select {
	case <-wrapper.ready:
		if wrapper.snapshotErr != nil {
			return nil, fmt.Errorf("new shared snapshot: %w", wrapper.snapshotErr)
		}
	case <-ctx.Done():
		return nil, ctx.Err()
	}

	return closeWrapper{
		snapshot: wrapper.snapshot,
		close:    cleanup,
	}, nil
}