registry/storage/driver/testsuites/testsuites.go (2,250 lines of code) (raw):
package testsuites
import (
"bytes"
"context"
"errors"
"fmt"
"io"
mrand "math/rand/v2"
"net/http"
"os"
"path"
"path/filepath"
"reflect"
"slices"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/opencontainers/go-digest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/docker/distribution"
"github.com/docker/distribution/reference"
"github.com/docker/libtrust"
"github.com/docker/distribution/registry/storage"
storagedriver "github.com/docker/distribution/registry/storage/driver"
azure_v1 "github.com/docker/distribution/registry/storage/driver/azure/v1"
azure_v2 "github.com/docker/distribution/registry/storage/driver/azure/v2"
dtestutil "github.com/docker/distribution/registry/storage/driver/internal/testutil"
s3_common "github.com/docker/distribution/registry/storage/driver/s3-aws/common"
"github.com/docker/distribution/testutil"
)
// rngCacheSize is used to pre-allocate a blob of pseudo-random data that is
// later re-used among tests. This way we speed up tests by not having to
// re-generate it multiple times and save memory as all blobs used by tests
// reference to this blob (or slices of it).
const rngCacheSize int64 = 2 * 1 << 30
// DriverConstructor is a function which returns a new
// storagedriver.StorageDriver.
type DriverConstructor func() (storagedriver.StorageDriver, error)
// DriverTeardown is a function which cleans up a suite's
// storagedriver.StorageDriver.
type DriverTeardown func() error
func NewDriverSuite(
ctx context.Context,
constructor, constructorRootLess DriverConstructor,
destructor DriverTeardown,
) *DriverSuite {
return &DriverSuite{
ctx: ctx,
Constructor: constructor,
ConstructorRootless: constructorRootLess,
Teardown: destructor,
}
}
// DriverSuite is a test suite designed to test a
// storagedriver.StorageDriver.
type DriverSuite struct {
suite.Suite
Constructor DriverConstructor
// The purpose for ConstructorRootless is to enable testing un-prefixed
// functionality of storage drivers. Care needs to be taken though as the
// tests are going to use the same storage container so there is a risk for
// collisions.
ConstructorRootless DriverConstructor
Teardown DriverTeardown
StorageDriver storagedriver.StorageDriver
StorageDriverRootless storagedriver.StorageDriver
ctx context.Context
blobberFactory *testutil.BlobberFactory
seed [32]byte
}
// SetupSuite sets up the test suite for tests.
func (s *DriverSuite) SetupSuite() {
s.setupSuiteGeneric(s.T())
}
// SetupSuiteWithB sets up the test suite for benchmarks.
func (s *DriverSuite) SetupSuiteWithB(b *testing.B) {
s.setupSuiteGeneric(b)
}
func (s *DriverSuite) setupSuiteGeneric(t testing.TB) {
driver, err := s.Constructor()
require.NoError(t, err)
s.StorageDriver = driver
if s.ConstructorRootless != nil {
driver, err = s.ConstructorRootless()
require.NoError(t, err)
s.StorageDriverRootless = driver
}
s.seed = testutil.SeedFromUnixNano(time.Now().UnixNano())
t.Logf("using rng seed %v for blobbers", s.seed)
s.blobberFactory = testutil.NewBlobberFactory(rngCacheSize, s.seed)
}
// TearDownSuite tears down the test suite when testing.
func (s *DriverSuite) TearDownSuite() {
s.tearDownSuiteGeneric(s.T())
}
// TearDownSuiteWithB tears down the test suite when benchmarking.
func (s *DriverSuite) TearDownSuiteWithB(b *testing.B) {
s.tearDownSuiteGeneric(b)
}
func (s *DriverSuite) tearDownSuiteGeneric(t require.TestingT) {
if s.Teardown == nil {
return
}
err := s.Teardown()
require.NoError(t, err)
}
// TearDownTest tears down the test.
// This causes the suite to abort if any files are left around in the storage
// driver.
func (s *DriverSuite) TearDownTest() {
files, err := s.StorageDriver.List(s.ctx, "/")
assert.NoError(s.T(), err)
assert.Empty(s.T(), files, "Storage driver did not clean up properly")
}
type syncDigestSet struct {
sync.Mutex
members map[digest.Digest]struct{}
}
func newSyncDigestSet() syncDigestSet {
return syncDigestSet{sync.Mutex{}, make(map[digest.Digest]struct{})}
}
// idempotently adds a digest to the set.
func (s *syncDigestSet) add(d digest.Digest) {
s.Lock()
defer s.Unlock()
s.members[d] = struct{}{}
}
// contains reports the digest's membership within the set.
func (s *syncDigestSet) contains(d digest.Digest) bool {
s.Lock()
defer s.Unlock()
_, ok := s.members[d]
return ok
}
// len returns the number of members within the set.
func (s *syncDigestSet) len() int {
s.Lock()
defer s.Unlock()
return len(s.members)
}
type BenchmarkFunc struct {
Name string
Func func(b *testing.B)
}
// EnumerateBenchmarks finds all the benchmark functions for the given object.
// testify suite does not have built-in benchmarking capabilities, so we need
// to write this ourselves. This function is not recursive so it does not work
// for embedded objects!
// NOTE(prozlch): I wrote this function as I did not want to hardcode the list
// of benchmarks defined in the suite. This would require careful
// maintenance/updating compared to simply automating it.
func (s *DriverSuite) EnumerateBenchmarks() []BenchmarkFunc {
benchmarks := make([]BenchmarkFunc, 0)
st := reflect.TypeOf(s)
sv := reflect.ValueOf(s)
for i := 0; i < st.NumMethod(); i++ {
method := st.Method(i)
if !strings.HasPrefix(method.Name, "Benchmark") {
continue
}
benchMethod := sv.Method(i)
benchFunc := func(b *testing.B) {
benchMethod.Call([]reflect.Value{reflect.ValueOf(b)})
}
benchmarks = append(benchmarks, BenchmarkFunc{
Name: strings.TrimPrefix(method.Name, "Benchmark"),
Func: benchFunc,
})
}
return benchmarks
}
// TestRootExists ensures that all storage drivers have a root path by default.
func (s *DriverSuite) TestRootExists() {
_, err := s.StorageDriver.List(s.ctx, "/")
require.NoError(s.T(), err, `the root path "/" should always exist`)
}
// TestValidPaths checks that various valid file paths are accepted by the
// storage driver.
func (s *DriverSuite) TestValidPaths() {
contents := s.blobberFactory.GetBlobber(64).GetAllBytes()
validFiles := []string{
"/a",
"/2",
"/aa",
"/a.a",
"/0-9/abcdefg",
"/abcdefg/z.75",
"/abc/1.2.3.4.5-6_zyx/123.z/4",
"/docker/docker-registry",
"/123.abc",
"/abc./abc",
"/.abc",
"/a--b",
"/a-.b",
"/_.abc",
"/Docker/docker-registry",
"/Abc/Cba",
}
for _, filename := range validFiles {
err := s.StorageDriver.PutContent(s.ctx, filename, contents)
// nolint: revive // defer
defer s.deletePath(s.StorageDriver, firstPart(filename))
require.NoError(s.T(), err)
received, err := s.StorageDriver.GetContent(s.ctx, filename)
require.NoError(s.T(), err)
assert.Equal(s.T(), contents, received)
}
}
func (s *DriverSuite) deletePath(driver storagedriver.StorageDriver, targetPath string) {
dtestutil.EnsurePathDeleted(s.ctx, s.T(), driver, targetPath)
}
func (s *DriverSuite) skipIfWalkParallelIsNotSupported() {
d := s.StorageDriver.Name()
switch d {
case "filesystem", azure_v1.DriverName, azure_v2.DriverName:
s.T().Skipf("%s driver does not support true WalkParallel", d)
case "gcs":
parallelWalk := os.Getenv("GCS_PARALLEL_WALK")
var parallelWalkBool bool
var err error
if parallelWalk != "" {
parallelWalkBool, err = strconv.ParseBool(parallelWalk)
require.NoError(s.T(), err)
}
if !parallelWalkBool || parallelWalk == "" {
s.T().Skipf("%s driver is not configured with parallelwalk", d)
}
}
}
// TestInvalidPaths checks that various invalid file paths are rejected by the
// storage driver.
func (s *DriverSuite) TestInvalidPaths() {
contents := s.blobberFactory.GetBlobber(64).GetAllBytes()
invalidFiles := []string{
"",
"/",
"abc",
"123.abc",
"//bcd",
"/abc_123/",
}
for _, filename := range invalidFiles {
err := s.StorageDriver.PutContent(s.ctx, filename, contents)
// only delete if file was successfully written
if err == nil {
// nolint: revive // defer
defer s.deletePath(s.StorageDriver, firstPart(filename))
}
require.Error(s.T(), err)
require.ErrorIs(s.T(), err, storagedriver.InvalidPathError{
DriverName: s.StorageDriver.Name(),
Path: filename,
})
_, err = s.StorageDriver.GetContent(s.ctx, filename)
require.Error(s.T(), err)
require.ErrorIs(s.T(), err, storagedriver.InvalidPathError{
DriverName: s.StorageDriver.Name(),
Path: filename,
})
}
}
// TestWriteRead1 tests a simple write-read workflow.
func (s *DriverSuite) TestWriteRead1() {
filename := dtestutil.RandomPath(1, 32)
s.T().Logf("blob path used for testing: %s", filename)
contents := []byte("a")
s.writeReadCompare(s.T(), filename, contents)
}
// TestWriteRead2 tests a simple write-read workflow with unicode data.
func (s *DriverSuite) TestWriteRead2() {
filename := dtestutil.RandomPath(1, 32)
s.T().Logf("blob path used for testing: %s", filename)
contents := []byte("\xc3\x9f")
s.writeReadCompare(s.T(), filename, contents)
}
// TestWriteRead3 tests a simple write-read workflow with a small string.
func (s *DriverSuite) TestWriteRead3() {
filename := dtestutil.RandomPath(1, 32)
s.T().Logf("blob path used for testing: %s", filename)
contents := s.blobberFactory.GetBlobber(32).GetAllBytes()
s.writeReadCompare(s.T(), filename, contents)
}
// TestWriteRead4 tests a simple write-read workflow with 1MB of data.
func (s *DriverSuite) TestWriteRead4() {
filename := dtestutil.RandomPath(1, 32)
s.T().Logf("blob path used for testing: %s", filename)
contents := s.blobberFactory.GetBlobber(1 << 20).GetAllBytes()
s.writeReadCompare(s.T(), filename, contents)
}
// TestWriteReadNonUTF8 tests that non-utf8 data may be written to the storage
// driver safely.
func (s *DriverSuite) TestWriteReadNonUTF8() {
filename := dtestutil.RandomPath(1, 32)
s.T().Logf("blob path used for testing: %s", filename)
contents := []byte{0x80, 0x80, 0x80, 0x80}
s.writeReadCompare(s.T(), filename, contents)
}
// TestTruncate tests that putting smaller contents than an original file does
// remove the excess contents.
func (s *DriverSuite) TestTruncate() {
filename := dtestutil.RandomPath(1, 32)
s.T().Logf("blob path used for testing: %s", filename)
// NOTE(prozlach): We explicitly need a different blob to confirm that
// in-place overwrite and truncation was indeed successful. In order to
// avoid allocations, we simply request a bigger blobber and then slice it.
contents := s.blobberFactory.GetBlobber((1024 + 1) * 1024).GetAllBytes()
s.writeReadCompare(s.T(), filename, contents[:1024*1024])
s.writeReadCompare(s.T(), filename, contents[1024*1024:])
}
// TestReadNonexistent tests reading content from an empty path.
func (s *DriverSuite) TestReadNonexistent() {
filename := dtestutil.RandomPath(1, 32)
s.T().Logf("blob path used for testing: %s", filename)
_, err := s.StorageDriver.GetContent(s.ctx, filename)
require.Error(s.T(), err)
require.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{
DriverName: s.StorageDriver.Name(),
Path: filename,
})
}
// TestWriteReadStreams1 tests a simple write-read streaming workflow.
func (s *DriverSuite) TestWriteReadStreams1() {
filename := dtestutil.RandomPath(1, 32)
s.T().Logf("blob path used for testing: %s", filename)
blobber := testutil.NewBlober([]byte("a"))
s.writeReadCompareStreams(s.T(), filename, blobber)
}
// TestWriteReadStreams2 tests a simple write-read streaming workflow with
// unicode data.
func (s *DriverSuite) TestWriteReadStreams2() {
filename := dtestutil.RandomPath(1, 32)
s.T().Logf("blob path used for testing: %s", filename)
blobber := testutil.NewBlober([]byte("\xc3\x9f"))
s.writeReadCompareStreams(s.T(), filename, blobber)
}
// TestWriteReadStreams3 tests a simple write-read streaming workflow with a
// small amount of data.
func (s *DriverSuite) TestWriteReadStreams3() {
filename := dtestutil.RandomPath(1, 32)
s.T().Logf("blob path used for testing: %s", filename)
blobber := s.blobberFactory.GetBlobber(32)
s.writeReadCompareStreams(s.T(), filename, blobber)
}
// TestWriteReadStreams4 tests a simple write-read streaming workflow with 1MB
// of data.
func (s *DriverSuite) TestWriteReadStreams4() {
filename := dtestutil.RandomPath(1, 32)
s.T().Logf("blob path used for testing: %s", filename)
blobber := s.blobberFactory.GetBlobber(1 << 20)
s.writeReadCompareStreams(s.T(), filename, blobber)
}
// TestWriteReadStreamsNonUTF8 tests that non-utf8 data may be written to the
// storage driver safely.
func (s *DriverSuite) TestWriteReadStreamsNonUTF8() {
filename := dtestutil.RandomPath(1, 32)
s.T().Logf("blob path used for testing: %s", filename)
blobber := testutil.NewBlober([]byte{0x80, 0x80, 0x80, 0x80})
s.writeReadCompareStreams(s.T(), filename, blobber)
}
// TestWriteReadLargeStreams tests that a 2GB file may be written to the storage
// driver safely.
func (s *DriverSuite) TestWriteReadLargeStreams() {
filename := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(filename))
s.T().Logf("blob path used for testing: %s", filename)
var fileSize int64 = 2 * 1 << 30
if testing.Short() {
fileSize = 256 * 1 << 20
s.T().Log("Reducing file size to 256MiB for short mode")
}
blobber := s.blobberFactory.GetBlobber(fileSize)
writer, err := s.StorageDriver.Writer(s.ctx, filename, false)
require.NoError(s.T(), err)
// NOTE(prozlach): VERY IMPORTANT - DO NOT WRAP contents into a another
// Reader that does not support WriterTo interface or limit the size of the
// copy buffer. Doing so could cause us to miss critical test cases where a large enough buffer
// is sent to the storage, triggering the driver's Write() method and causing chunking.
// If chunking is not properly handled, this may result in errors such as the following on Azure:
//
// RESPONSE 413: 413 The request body is too large and exceeds the maximum permissible limit.
//
written, err := io.CopyBuffer(writer, blobber.GetReader(), make([]byte, 256*1<<20))
require.NoError(s.T(), err)
require.EqualValues(s.T(), fileSize, written)
if s.StorageDriver.Name() != "gcs" || os.Getenv("REGISTRY_GCS_DRIVER") == "next" {
require.EqualValues(s.T(), fileSize, writer.Size())
}
err = writer.Commit()
require.NoError(s.T(), err)
err = writer.Close()
require.NoError(s.T(), err)
reader, err := s.StorageDriver.Reader(s.ctx, filename, 0)
require.NoError(s.T(), err)
defer reader.Close()
blobber.RequireStreamEqual(s.T(), reader, 0, "main stream")
}
// TestWriteReadSmallStream tests that a small file may be written to the storage
// driver safely using Writer() interface.
func (s *DriverSuite) TestWriteReadSmallStream() {
filename := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(filename))
s.T().Logf("blob path used for testing: %s", filename)
var fileSize int64 = 2 * 1 << 10
blobber := s.blobberFactory.GetBlobber(fileSize)
writer, err := s.StorageDriver.Writer(s.ctx, filename, false)
require.NoError(s.T(), err)
written, err := io.CopyBuffer(writer, blobber.GetReader(), make([]byte, 2*1<<20))
require.NoError(s.T(), err)
require.EqualValues(s.T(), fileSize, written)
if s.StorageDriver.Name() != "gcs" || os.Getenv("REGISTRY_GCS_DRIVER") == "next" {
require.EqualValues(s.T(), fileSize, writer.Size())
}
err = writer.Commit()
require.NoError(s.T(), err)
err = writer.Close()
require.NoError(s.T(), err)
reader, err := s.StorageDriver.Reader(s.ctx, filename, 0)
require.NoError(s.T(), err)
defer reader.Close()
blobber.RequireStreamEqual(s.T(), reader, 0, "main stream")
}
// TestConcurentWriteCausesError tests that a concurent write to the same file
// will cause an error instead of data corruption.
func (s *DriverSuite) TestConcurentWriteCausesError() {
if !slices.Contains([]string{azure_v2.DriverName, s3_common.V2DriverName}, s.StorageDriver.Name()) {
s.T().Skip("only Azure v2 and S3 v2 drivers supports this strong consistency")
}
filename := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(filename))
s.T().Logf("blob path used for testing: %s", filename)
// Must be greater than chunk size so that there is more than one request
// to the backend while writing:
var fileSize int64 = 32 * 1 << 20
contentsAB := s.blobberFactory.GetBlobber(fileSize).GetAllBytes()
blobberA1 := testutil.NewBlober(contentsAB[:fileSize>>2])
blobberA2 := testutil.NewBlober(contentsAB[fileSize>>2 : fileSize>>1])
blobberB := testutil.NewBlober(contentsAB[:fileSize>>1])
writerA, err := s.StorageDriver.Writer(s.ctx, filename, false)
require.NoError(s.T(), err)
written, err := io.Copy(writerA, blobberA1.GetReader())
require.NoError(s.T(), err)
require.EqualValues(s.T(), blobberA1.Size(), written)
if s.StorageDriver.Name() != "gcs" || os.Getenv("REGISTRY_GCS_DRIVER") == "next" {
require.EqualValues(s.T(), blobberA1.Size(), writerA.Size())
}
writerB, err := s.StorageDriver.Writer(s.ctx, filename, false)
require.NoError(s.T(), err)
written, err = io.Copy(writerB, blobberB.GetReader())
require.NoError(s.T(), err)
require.EqualValues(s.T(), blobberB.Size(), written)
if s.StorageDriver.Name() != "gcs" || os.Getenv("REGISTRY_GCS_DRIVER") == "next" {
require.EqualValues(s.T(), blobberB.Size(), writerB.Size())
}
_, err = io.Copy(writerA, blobberA2.GetReader())
require.Error(s.T(), err)
err = writerB.Commit()
require.NoError(s.T(), err)
err = writerB.Close()
require.NoError(s.T(), err)
reader, err := s.StorageDriver.Reader(s.ctx, filename, 0)
require.NoError(s.T(), err)
defer reader.Close()
blobberB.RequireStreamEqual(s.T(), reader, 0, "main stream")
}
// TestReaderWithOffset tests that the appropriate data is streamed when
// reading with a given offset.
func (s *DriverSuite) TestReaderWithOffset() {
filename := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(filename))
s.T().Logf("blob path used for testing: %s", filename)
var chunkSize int64 = 32
blobber := s.blobberFactory.GetBlobber(3 * chunkSize)
contentsChunk123 := blobber.GetAllBytes()
contentsChunk1 := contentsChunk123[0:chunkSize]
contentsChunk2 := contentsChunk123[chunkSize : 2*chunkSize]
contentsChunk3 := contentsChunk123[2*chunkSize : 3*chunkSize]
err := s.StorageDriver.PutContent(s.ctx, filename, append(append(contentsChunk1, contentsChunk2...), contentsChunk3...))
require.NoError(s.T(), err)
reader, err := s.StorageDriver.Reader(s.ctx, filename, 0)
require.NoError(s.T(), err)
defer reader.Close()
blobber.RequireStreamEqual(s.T(), reader, 0, "offset 0")
reader, err = s.StorageDriver.Reader(s.ctx, filename, chunkSize)
require.NoError(s.T(), err)
defer reader.Close()
blobber.RequireStreamEqual(s.T(), reader, chunkSize, "offset equal to chunkSize")
reader, err = s.StorageDriver.Reader(s.ctx, filename, chunkSize*2)
require.NoError(s.T(), err)
defer reader.Close()
blobber.RequireStreamEqual(s.T(), reader, 2*chunkSize, "offset equal to 2*chunkSize")
// Ensure we get invalid offset for negative offsets.
reader, err = s.StorageDriver.Reader(s.ctx, filename, -1)
require.Error(s.T(), err)
require.ErrorIs(s.T(), err, storagedriver.InvalidOffsetError{
DriverName: s.StorageDriver.Name(),
Path: filename,
Offset: -1,
})
require.Nil(s.T(), reader)
// Read past the end of the content and make sure we get a reader that
// returns 0 bytes and io.EOF
reader, err = s.StorageDriver.Reader(s.ctx, filename, chunkSize*3)
require.NoError(s.T(), err)
defer reader.Close()
buf := make([]byte, chunkSize)
n, err := reader.Read(buf)
require.ErrorIs(s.T(), err, io.EOF)
require.Zero(s.T(), n)
// Check the N-1 boundary condition, ensuring we get 1 byte then io.EOF.
reader, err = s.StorageDriver.Reader(s.ctx, filename, chunkSize*3-1)
require.NoError(s.T(), err)
defer reader.Close()
n, err = reader.Read(buf)
require.Equal(s.T(), 1, n)
// We don't care whether the io.EOF comes on the this read or the first
// zero read, but the only error acceptable here is io.EOF.
if err != nil {
require.ErrorIs(s.T(), err, io.EOF)
}
// Any more reads should result in zero bytes and io.EOF
n, err = reader.Read(buf)
assert.Zero(s.T(), n)
assert.ErrorIs(s.T(), err, io.EOF)
}
// TestContinueStreamAppendLarge tests that a stream write can be appended to without
// corrupting the data with a large chunk size.
func (s *DriverSuite) TestContinueStreamAppendLarge() {
// NOTE(prozlach): the size of the stream must be larger than minimum chunk
// size for all drivers times three. Otherwise we will not exercise all the
// relevant code paths.
var streamSize int64 = 25 * 1 << 20
require.Greater(s.T(), streamSize, s3_common.MinChunkSize*3)
// In case of azure_v2, we want to make sure that the chunking loop executes
// at least twice:
require.Greater(s.T(), streamSize, azure_v2.MaxChunkSize*3*2)
// gcs needs to be refactored, we can't import the constant directly yet as
// there are going to be import loops
// minChunkSize * 20 == defaultChunkSizea, 3KiB is meant to test saving
// state session when chunk size is not even.
require.Greater(s.T(), streamSize, int64(20*256*1024+3*2<<10))
s.testContinueStreamAppend(s.T(), streamSize)
}
// TestContinueStreamAppendSmall is the same as TestContinueStreamAppendLarge, but only
// with a tiny chunk size in order to test corner cases for some cloud storage drivers.
func (s *DriverSuite) TestContinueStreamAppendSmall() {
s.testContinueStreamAppend(s.T(), 1024)
}
func (s *DriverSuite) testContinueStreamAppend(t *testing.T, chunkSize int64) {
filename := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(filename))
t.Logf("blob path used for testing: %s", filename)
blobber := s.blobberFactory.GetBlobber(3 * chunkSize)
contentsChunk123 := blobber.GetAllBytes()
// Use uneven sizes to trigger more code paths
contentsChunk1 := contentsChunk123[0:chunkSize]
contentsChunk2 := contentsChunk123[chunkSize : chunkSize+101] // intentionally a prime
contentsChunk3 := contentsChunk123[chunkSize+101 : 3*chunkSize]
writer, err := s.StorageDriver.Writer(s.ctx, filename, false)
require.NoError(t, err)
nn, err := io.Copy(writer, bytes.NewReader(contentsChunk1))
require.NoError(t, err)
require.EqualValues(t, len(contentsChunk1), nn)
require.NoError(t, writer.Close())
require.EqualValues(t, len(contentsChunk1), writer.Size())
writer, err = s.StorageDriver.Writer(s.ctx, filename, true)
require.NoError(t, err)
require.EqualValues(t, len(contentsChunk1), writer.Size())
nn, err = io.Copy(writer, bytes.NewReader(contentsChunk2))
require.NoError(t, err)
require.EqualValues(t, len(contentsChunk2), nn)
require.NoError(t, writer.Close())
require.EqualValues(t, len(contentsChunk2)+len(contentsChunk1), writer.Size())
writer, err = s.StorageDriver.Writer(s.ctx, filename, true)
require.NoError(t, err)
require.EqualValues(t, len(contentsChunk2)+len(contentsChunk1), writer.Size())
nn, err = io.Copy(writer, bytes.NewReader(contentsChunk3))
require.NoError(t, err)
require.EqualValues(t, len(contentsChunk3), nn)
require.NoError(t, writer.Commit())
require.NoError(t, writer.Close())
received, err := s.StorageDriver.GetContent(s.ctx, filename)
require.NoError(t, err)
require.Equal(t, blobber.GetAllBytes(), received)
}
func (s *DriverSuite) TestContinueAppendZeroSizeBlob() {
filename := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(filename))
s.T().Logf("blob path used for testing: %s", filename)
// Chosen arbitrary
var fileSize int64 = 2 * 1 << 20
blobber := s.blobberFactory.GetBlobber(fileSize)
contentsChunk := blobber.GetAllBytes()
writer, err := s.StorageDriver.Writer(s.ctx, filename, false)
require.NoError(s.T(), err)
nn, err := writer.Write(make([]byte, 0))
require.NoError(s.T(), err)
require.Zero(s.T(), nn)
require.NoError(s.T(), writer.Close())
require.Zero(s.T(), writer.Size())
writer, err = s.StorageDriver.Writer(s.ctx, filename, true)
require.NoError(s.T(), err)
require.Zero(s.T(), writer.Size())
nnn, err := io.Copy(writer, bytes.NewReader(contentsChunk))
require.NoError(s.T(), err)
require.EqualValues(s.T(), fileSize, nnn)
require.NoError(s.T(), writer.Commit())
require.NoError(s.T(), writer.Close())
require.EqualValues(s.T(), fileSize, writer.Size())
received, err := s.StorageDriver.GetContent(s.ctx, filename)
require.NoError(s.T(), err)
require.Equal(s.T(), blobber.GetAllBytes(), received)
}
func (s *DriverSuite) TestMaxUploadSize() {
if s.StorageDriver.Name() == "inmemory" {
s.T().Skip("In-memory driver is known to have OOM issues with large uploads.")
}
if slices.Contains([]string{s3_common.V1DriverName, s3_common.V1DriverNameAlt}, s.StorageDriver.Name()) {
s.T().Skip("S3 v1 driver has chunk size limitations which aren't planned to be fixed")
}
filename := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(filename))
s.T().Logf("blob path used for testing: %s", filename)
rng := mrand.NewChaCha8(s.seed)
bigChunkSize := int64(3 * 1 << 30)
smallChunkSize := s3_common.MinChunkSize - 128 // has to be just a little bit smaller than the minimum
blobMaxSize := int64(10 * 1 << 30)
// Docker blob limits is 10GiB, round it up to simplify logic
blobSize := blobMaxSize - (blobMaxSize % (bigChunkSize + smallChunkSize))
buf := make([]byte, 32*1<<20)
doAppend := false
totalWritten := int64(0)
// We alternate between small and big uploads in order to trigger/test a
// specific bug in multipart upload.
//
// The code was trying to re-upload objects as a single s3 multi-part
// upload part irrespective of their size which in certain conditions led
// to violating aws max multipart upload chunk size limit. The fix is
// simply split the files bigger than 5GiB into 5GiB chunks.
//
// In order to trigger the bug one could simply:
// * upload layer/layers that in total are just below 5 GiB in size
// * upload a small layer, < 5MiB in size
// * upload a layer that will bring the total size of all layers above 5GiB.
for {
bigChunk := &io.LimitedReader{
R: rng,
N: bigChunkSize,
}
writer, err := s.StorageDriver.Writer(s.ctx, filename, doAppend)
require.NoError(s.T(), err)
nn, err := io.CopyBuffer(writer, bigChunk, buf)
require.NoError(s.T(), err)
require.EqualValues(s.T(), bigChunkSize, nn)
totalWritten += nn
require.NoError(s.T(), writer.Close())
require.EqualValues(s.T(), totalWritten, writer.Size())
if !doAppend {
doAppend = true
}
smallChunk := &io.LimitedReader{
R: rng,
N: smallChunkSize,
}
writer, err = s.StorageDriver.Writer(s.ctx, filename, doAppend)
require.NoError(s.T(), err)
nn, err = io.Copy(writer, smallChunk)
require.NoError(s.T(), err)
require.EqualValues(s.T(), smallChunkSize, nn)
totalWritten += nn
if totalWritten >= blobSize {
// This is the last write, commit the upload:
require.NoError(s.T(), writer.Commit())
}
require.NoError(s.T(), writer.Close())
require.EqualValues(s.T(), totalWritten, writer.Size())
if totalWritten >= blobSize {
break
}
}
// Verify the uploaded file:
reader, err := s.StorageDriver.Reader(s.ctx, filename, 0)
require.NoError(s.T(), err)
rng.Seed(s.seed) // reset the RNG
readBytesBuffer := make([]byte, 32<<20)
expectedBytesBuffer := make([]byte, 32<<20)
// NOTE(prozlach): `assert.*` calls do a lot in the background, and this is
// a tight loop that is executed many times, hence we optimize.
for chunkNumber, currentOffset := 0, int64(0); currentOffset < totalWritten; {
readBytesRemaining := totalWritten - currentOffset
rbc, err := reader.Read(readBytesBuffer)
readBytesCount := int64(rbc) // nolint: gosec // The Read() method in Go's standard library should always return a non-negative number for n
if readBytesCount > readBytesRemaining {
require.LessOrEqualf(s.T(), readBytesCount, readBytesRemaining, "the object stored in the backend is shorter than expected")
}
limitReader := &io.LimitedReader{
R: rng,
N: readBytesCount,
}
_, _ = limitReader.Read(expectedBytesBuffer)
// nolint: revive // max-control-nesting
if readBytesCount > 0 && !bytes.Equal(readBytesBuffer[:readBytesCount], expectedBytesBuffer[:readBytesCount]) {
// We can't simply display two 32MiB byte slices side by side and
// expect human to be able to parse it, hence we find the first
// differing 512 bytes chunk by re-using the code that testify
// already provides:
chunkSize := int64(512)
for i := int64(0); i < (readBytesCount+chunkSize-1)/chunkSize; i++ {
startByte := i * chunkSize
endByte := startByte + chunkSize
if endByte > readBytesCount {
endByte = readBytesCount
}
require.Equalf(
s.T(), expectedBytesBuffer[startByte:endByte], readBytesBuffer[startByte:endByte],
"difference found. Chunk number %d, offset: %d ", chunkNumber, currentOffset+startByte,
)
}
}
currentOffset += readBytesCount
chunkNumber++
if err != nil {
if err == io.EOF {
require.EqualValues(s.T(), currentOffset, totalWritten, "the object stored in the backend is shorter than expected")
} else {
require.NoError(s.T(), err, "reading data back failed")
}
}
}
}
// TestReadNonexistentStream tests that reading a stream for a nonexistent path
// fails.
func (s *DriverSuite) TestReadNonexistentStream() {
filename := dtestutil.RandomPath(1, 32)
s.T().Logf("blob path used for testing: %s", filename)
_, err := s.StorageDriver.Reader(s.ctx, filename, 0)
require.Error(s.T(), err)
require.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{
DriverName: s.StorageDriver.Name(),
Path: filename,
})
_, err = s.StorageDriver.Reader(s.ctx, filename, 64)
require.Error(s.T(), err)
require.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{
DriverName: s.StorageDriver.Name(),
Path: filename,
})
}
// TestList1File tests the validity of List calls for 1 file.
func (s *DriverSuite) TestList1File() {
s.testList(s.T(), 1)
}
// TestList1200Files tests the validity of List calls for 1200 files.
func (s *DriverSuite) TestList1200Files() {
s.testList(s.T(), 1200)
}
// testList checks the returned list of keys after populating a directory tree.
func (s *DriverSuite) testList(t *testing.T, numFiles int) {
rootDirectory := "/" + dtestutil.RandomFilenameRange(8, 8)
defer s.deletePath(s.StorageDriver, rootDirectory)
t.Logf("root directory path used for testing: %s", rootDirectory)
doesnotexist := path.Join(rootDirectory, "nonexistent")
_, err := s.StorageDriver.List(s.ctx, doesnotexist)
require.ErrorIs(t, err, storagedriver.PathNotFoundError{
Path: doesnotexist,
DriverName: s.StorageDriver.Name(),
})
parentDirectory := rootDirectory + "/" + dtestutil.RandomFilenameRange(8, 8)
childFiles := s.buildFiles(t, parentDirectory, numFiles, 8)
sort.Strings(childFiles)
keys, err := s.StorageDriver.List(s.ctx, "/")
require.NoError(t, err)
require.Equal(t, []string{rootDirectory}, keys)
keys, err = s.StorageDriver.List(s.ctx, rootDirectory)
require.NoError(t, err)
require.Equal(t, []string{parentDirectory}, keys)
keys, err = s.StorageDriver.List(s.ctx, parentDirectory)
require.NoError(t, err)
sort.Strings(keys)
require.Equal(t, childFiles, keys)
// A few checks to add here (check out #819 for more discussion on this):
// 1. Ensure that all paths are absolute.
// 2. Ensure that listings only include direct children.
// 3. Ensure that we only respond to directory listings that end with a slash (maybe?).
}
// TestListOverlappingPaths checks that listing a directory only returns the paths
// directly under that directory, not paths with similar prefixes elsewhere.
func (s *DriverSuite) TestListOverlappingPaths() {
rootDirectory := "/" + dtestutil.RandomFilenameRange(8, 8)
defer s.deletePath(s.StorageDriver, rootDirectory)
s.T().Logf("root directory path used for testing: %s", rootDirectory)
// Create the structure
// /rootDirectory/foo/bar/baz1/baza
// /rootDirectory/foo/bar/baz2/bazb
// /rootDirectory/foo/bar/baz3
// /rootDirectory/foo/barman/baz3/baza
// /rootDirectory/foo/barman/baz4/bazb
// /rootDirectory/foo/barman/baz5
basePaths := []string{
filepath.Join(rootDirectory, "foo/bar/baz1/baza"),
filepath.Join(rootDirectory, "foo/bar/baz2/bazb"),
filepath.Join(rootDirectory, "foo/bar/baz3"),
filepath.Join(rootDirectory, "foo/barman/baz3/baza"),
filepath.Join(rootDirectory, "foo/barman/baz4/bazb"),
filepath.Join(rootDirectory, "foo/barman/baz5"),
}
// Create files at each path
contents := s.blobberFactory.GetBlobber(32).GetAllBytes()
for _, basePath := range basePaths {
err := s.StorageDriver.PutContent(s.ctx, basePath, contents)
require.NoError(s.T(), err)
}
// Test listing /rootDirectory/foo/bar
barPath := filepath.Join(rootDirectory, "foo/bar")
keys, err := s.StorageDriver.List(s.ctx, barPath)
require.NoError(s.T(), err)
// Expected results for /rootDirectory/foo/bar
expectedBarResults := []string{
filepath.Join(rootDirectory, "foo/bar/baz1"),
filepath.Join(rootDirectory, "foo/bar/baz2"),
filepath.Join(rootDirectory, "foo/bar/baz3"),
}
// Sort both slices for comparison
sort.Strings(keys)
sort.Strings(expectedBarResults)
require.Equal(s.T(), expectedBarResults, keys, "Listing %s returned unexpected results", barPath)
// Test listing /rootDirectory/foo
fooPath := filepath.Join(rootDirectory, "foo")
keys, err = s.StorageDriver.List(s.ctx, fooPath)
require.NoError(s.T(), err)
// Expected results for /rootDirectory/foo
expectedFooResults := []string{
filepath.Join(rootDirectory, "foo/bar"),
filepath.Join(rootDirectory, "foo/barman"),
}
// Sort both slices for comparison
sort.Strings(keys)
sort.Strings(expectedFooResults)
require.Equal(s.T(), expectedFooResults, keys, "Listing %s returned unexpected results", fooPath)
}
// TestWalkOverlappingPaths checks that both Walk and WalkParallel functions
// correctly handle directories with overlapping path prefixes.
func (s *DriverSuite) TestWalkOverlappingPaths() {
rootDirectory := "/" + dtestutil.RandomFilenameRange(8, 8)
defer s.deletePath(s.StorageDriver, rootDirectory)
s.T().Logf("root directory path used for testing: %s", rootDirectory)
// Create the structure
// /rootDirectory/foo/bar/baz1/baza
// /rootDirectory/foo/bar/baz2/bazb
// /rootDirectory/foo/bar/baz3
// /rootDirectory/foo/barman/baz3/baza
// /rootDirectory/foo/barman/baz4/bazb
// /rootDirectory/foo/barman/baz5
basePaths := []string{
filepath.Join(rootDirectory, "foo/bar/baz1/baza"),
filepath.Join(rootDirectory, "foo/bar/baz2/bazb"),
filepath.Join(rootDirectory, "foo/bar/baz3"),
filepath.Join(rootDirectory, "foo/barman/baz3/baza"),
filepath.Join(rootDirectory, "foo/barman/baz4/bazb"),
filepath.Join(rootDirectory, "foo/barman/baz5"),
}
// Create files at each path
contents := s.blobberFactory.GetBlobber(32).GetAllBytes()
for _, basePath := range basePaths {
err := s.StorageDriver.PutContent(s.ctx, basePath, contents)
require.NoError(s.T(), err)
}
// Helper function to verify walking works correctly for a specific path
verifyWalk := func(t *testing.T, startPath string, expectedFiles, expectedDirs []string, walkFn func(context.Context, string, storagedriver.WalkFn) error) {
var mu sync.Mutex
var actualFiles []string
var actualDirs []string
collectFn := func(fileInfo storagedriver.FileInfo) error {
mu.Lock()
defer mu.Unlock()
if fileInfo.IsDir() {
actualDirs = append(actualDirs, fileInfo.Path())
} else {
actualFiles = append(actualFiles, fileInfo.Path())
}
return nil
}
err := walkFn(s.ctx, startPath, collectFn)
require.NoError(t, err)
// Sort the actual results before comparison
sort.Strings(actualFiles)
sort.Strings(actualDirs)
require.Equal(t, expectedFiles, actualFiles, "Unexpected files for path %s", startPath)
require.Equal(t, expectedDirs, actualDirs, "Unexpected directories for path %s", startPath)
}
// Expected files and directories when walking from foo/bar
barExpectedFiles := []string{
filepath.Join(rootDirectory, "foo/bar/baz1/baza"),
filepath.Join(rootDirectory, "foo/bar/baz2/bazb"),
filepath.Join(rootDirectory, "foo/bar/baz3"),
}
sort.Strings(barExpectedFiles)
barExpectedDirs := []string{
filepath.Join(rootDirectory, "foo/bar/baz1"),
filepath.Join(rootDirectory, "foo/bar/baz2"),
}
sort.Strings(barExpectedDirs)
// Expected files and directories when walking from foo
fooExpectedFiles := []string{
filepath.Join(rootDirectory, "foo/barman/baz3/baza"),
filepath.Join(rootDirectory, "foo/barman/baz4/bazb"),
filepath.Join(rootDirectory, "foo/barman/baz5"),
}
fooExpectedFiles = append(fooExpectedFiles, barExpectedFiles...)
sort.Strings(fooExpectedFiles)
fooExpectedDirs := []string{
filepath.Join(rootDirectory, "foo/bar"),
filepath.Join(rootDirectory, "foo/barman"),
filepath.Join(rootDirectory, "foo/barman/baz3"),
filepath.Join(rootDirectory, "foo/barman/baz4"),
}
fooExpectedDirs = append(fooExpectedDirs, barExpectedDirs...)
sort.Strings(fooExpectedDirs)
// Run subtests for both Walk and WalkParallel on different paths
s.Run("Walk_BarPath", func() {
barPath := filepath.Join(rootDirectory, "foo/bar")
verifyWalk(s.T(), barPath, barExpectedFiles, barExpectedDirs, s.StorageDriver.Walk)
})
s.Run("WalkParallel_BarPath", func() {
s.skipIfWalkParallelIsNotSupported()
barPath := filepath.Join(rootDirectory, "foo/bar")
verifyWalk(s.T(), barPath, barExpectedFiles, barExpectedDirs, s.StorageDriver.WalkParallel)
})
s.Run("Walk_FooPath", func() {
fooPath := filepath.Join(rootDirectory, "foo")
verifyWalk(s.T(), fooPath, fooExpectedFiles, fooExpectedDirs, s.StorageDriver.Walk)
})
s.Run("WalkParallel_FooPath", func() {
s.skipIfWalkParallelIsNotSupported()
fooPath := filepath.Join(rootDirectory, "foo")
verifyWalk(s.T(), fooPath, fooExpectedFiles, fooExpectedDirs, s.StorageDriver.WalkParallel)
})
}
// TestListUnprefixed checks if listing root directory with no prefix
// configured works.
func (s *DriverSuite) TestListUnprefixed() {
if s.StorageDriver.Name() == "filesystem" {
s.T().Skip("filesystem driver does not support prefix-less operation")
}
// NOTE(prozlach): we are sharing the storage root with other tests, so the
// idea is to create a very uniqe file name and simply look for it in the
// results. This way there should be no collisions.
destPath := "/" + dtestutil.RandomFilename(64)
defer s.deletePath(s.StorageDriverRootless, destPath)
s.T().Logf("destination blob path used for testing: %s", destPath)
destContents := s.blobberFactory.GetBlobber(64).GetAllBytes()
err := s.StorageDriverRootless.PutContent(s.ctx, destPath, destContents)
require.NoError(s.T(), err)
keys, err := s.StorageDriverRootless.List(s.ctx, "/")
require.NoError(s.T(), err)
require.Contains(s.T(), keys, destPath)
}
func (s *DriverSuite) TestCreateListDeleteTightLoop() {
destPath := "/" + dtestutil.RandomFilename(64)
destContents := s.blobberFactory.GetBlobber(64).GetAllBytes()
s.T().Logf("blob path used for testing: %s", destPath)
assertEventually := func(i int, invertCheck bool) {
keys, err := s.StorageDriver.List(s.ctx, "/")
require.NoErrorf(s.T(), err, "iteration %d", i)
// NOTE(prozlach): the blob name not showing imediatelly after is
// already a problem, but we want to also know after how much time it
// shows up (if it does at all), hence only do the `assert` and not
// `require`
if !invertCheck && assert.Containsf(s.T(), keys, destPath, "List() call inconsistency detected, iteration %d - element should have been found", i) {
return
}
if invertCheck && assert.NotContainsf(s.T(), keys, destPath, "List() call inconsistency detected, iteration %d - element should not have been found", i) {
return
}
// NOTE(prozlach): we need to see if the file not showing up in the
// list is a temporary or a permament situation.
ticker := time.NewTicker(700 * time.Millisecond)
defer ticker.Stop()
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
for {
select {
case t := <-ticker.C:
keys, err = s.StorageDriver.List(s.ctx, "/")
require.NoErrorf(s.T(), err, "iteration %d, time %s", i, t.String())
if !invertCheck && slices.Contains(keys, destPath) {
s.T().Logf("OK: found - iteration %d, time %s", i, t.String())
return
}
if invertCheck && !slices.Contains(keys, destPath) {
s.T().Logf("OK: not found - iteration %d, time %s", i, t.String())
return
}
case t := <-timer.C:
notWord := "not"
if invertCheck {
notWord = "can be"
}
require.FailNow(s.T(), fmt.Sprintf("iteration %d, time %s - blob still %s found in output of List()", i, t.String(), notWord))
return
}
}
}
assertEventuallyContains := func(i int) { assertEventually(i, false) }
assertEventuallyDoesNotContain := func(i int) { assertEventually(i, true) }
// NOTE(prozlach): Number of repetitions was chosen basing on how many
// iterations on average are needed to reproduce issues with List call when
// running things locally.
for i := 0; i < 40; i++ {
err := s.StorageDriver.PutContent(s.ctx, destPath, destContents)
require.NoErrorf(s.T(), err, "iteration %d", i)
assertEventuallyContains(i)
err = s.StorageDriver.Delete(s.ctx, destPath)
require.NoErrorf(s.T(), err, "iteration %d", i)
assertEventuallyDoesNotContain(i)
}
}
// TestMovePutContentBlob checks that driver can indeed move an object, and
// that the object no longer exists at the source path and does exist at the
// destination after the move.
// NOTE(prozlach): The reason why we differentiate between blobs created by
// PutContent() and Write() methods is to make sure that there are no
// differences in handling blobs created by PutContent() and Writer() calls
// during move operation.
func (s *DriverSuite) TestMovePutContentBlob() {
contents := s.blobberFactory.GetBlobber(32).GetAllBytes()
sourcePath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(sourcePath))
s.T().Logf("source blob path used for testing: %s", sourcePath)
destPath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(destPath))
s.T().Logf("destination blob path used for testing: %s", destPath)
err := s.StorageDriver.PutContent(s.ctx, sourcePath, contents)
require.NoError(s.T(), err)
err = s.StorageDriver.Move(s.ctx, sourcePath, destPath)
require.NoError(s.T(), err)
received, err := s.StorageDriver.GetContent(s.ctx, destPath)
require.NoError(s.T(), err)
require.Equal(s.T(), contents, received)
_, err = s.StorageDriver.GetContent(s.ctx, sourcePath)
require.Error(s.T(), err)
require.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{
DriverName: s.StorageDriver.Name(),
Path: sourcePath,
})
}
// TestMoveWritterBlob checks that driver can indeed move an object, and
// that the object no longer exists at the source path and does exist at the
// destination after the move.
// NOTE(prozlach): The reason why we differentiate between blobs created by
// PutContent() and Write() methods is to make sure that there are no
// differences in handling blobs created by PutContent() and Writer() calls
// during move operation.
func (s *DriverSuite) TestMoveWritterBlob() {
contents := s.blobberFactory.GetBlobber(32).GetAllBytes()
sourcePath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(sourcePath))
s.T().Logf("source blob path used for testing: %s", sourcePath)
destPath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(destPath))
s.T().Logf("destination blob path used for testing: %s", destPath)
writer, err := s.StorageDriver.Writer(s.ctx, sourcePath, false)
require.NoError(s.T(), err, "unexpected error from driver.Writer")
_, err = writer.Write(contents)
require.NoError(s.T(), err, "writer.Write: unexpected error")
// NOTE(prozlach): For some drivers, Close(), does not imply Commit()
err = writer.Commit()
require.NoError(s.T(), err, "writer.Commit: unexpected error")
err = writer.Close()
require.NoError(s.T(), err, "writer.Close: unexpected error")
err = s.StorageDriver.Move(s.ctx, sourcePath, destPath)
require.NoError(s.T(), err)
received, err := s.StorageDriver.GetContent(s.ctx, destPath)
require.NoError(s.T(), err)
require.Equal(s.T(), contents, received)
_, err = s.StorageDriver.GetContent(s.ctx, sourcePath)
require.Error(s.T(), err)
require.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{
DriverName: s.StorageDriver.Name(),
Path: sourcePath,
})
}
func (s *DriverSuite) TestAppendInexistentBlob() {
destPath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(destPath))
s.T().Logf("destination blob path used for testing: %s", destPath)
_, err := s.StorageDriver.Writer(s.ctx, destPath, true)
require.ErrorAs(s.T(), err, new(storagedriver.PathNotFoundError))
}
func (s *DriverSuite) TestWriterDoubleClose() {
destPath := dtestutil.RandomPath(1, 32)
s.T().Logf("destination path for blob: %s", destPath)
defer s.deletePath(s.StorageDriver, firstPart(destPath))
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
err = writer.Close()
require.NoError(s.T(), err)
err = writer.Close()
require.ErrorIs(s.T(), err, storagedriver.ErrAlreadyClosed)
}
func (s *DriverSuite) TestWriterDoubleCommit() {
destPath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(destPath))
s.T().Logf("destination path for blob: %s", destPath)
contents := s.blobberFactory.GetBlobber(96).GetAllBytes()
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
_, err = writer.Write(contents)
require.NoError(s.T(), err)
err = writer.Commit()
require.NoError(s.T(), err)
err = writer.Commit()
require.ErrorIs(s.T(), err, storagedriver.ErrAlreadyCommited)
}
func (s *DriverSuite) TestWriterWriteAfterClose() {
destPath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(destPath))
s.T().Logf("destination path for blob: %s", destPath)
contents := s.blobberFactory.GetBlobber(96).GetAllBytes()
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
_, err = writer.Write(contents)
require.NoError(s.T(), err)
err = writer.Close()
require.NoError(s.T(), err)
_, err = writer.Write(contents)
require.ErrorIs(s.T(), err, storagedriver.ErrAlreadyClosed)
}
func (s *DriverSuite) TestWriterCancelAfterClose() {
destPath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(destPath))
s.T().Logf("destination path for blob: %s", destPath)
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
err = writer.Close()
require.NoError(s.T(), err)
err = writer.Cancel()
require.ErrorIs(s.T(), err, storagedriver.ErrAlreadyClosed)
}
func (s *DriverSuite) TestWriterCommitAfterClose() {
destPath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(destPath))
s.T().Logf("destination path for blob: %s", destPath)
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
err = writer.Close()
require.NoError(s.T(), err)
err = writer.Commit()
require.ErrorIs(s.T(), err, storagedriver.ErrAlreadyClosed)
}
func (s *DriverSuite) TestWriterCommitAfterCancel() {
destPath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(destPath))
contents := s.blobberFactory.GetBlobber(96).GetAllBytes()
s.T().Logf("destination path for blob: %s", destPath)
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
_, err = writer.Write(contents)
require.NoError(s.T(), err)
err = writer.Commit()
require.NoError(s.T(), err)
err = writer.Cancel()
require.ErrorIs(s.T(), err, storagedriver.ErrAlreadyCommited)
}
func (s *DriverSuite) TestWriterCancelAfterCommit() {
destPath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(destPath))
s.T().Logf("destination path for blob: %s", destPath)
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
err = writer.Cancel()
require.NoError(s.T(), err)
err = writer.Commit()
require.ErrorIs(s.T(), err, storagedriver.ErrAlreadyCanceled)
}
func (s *DriverSuite) TestWriterWriteAfterCommit() {
destPath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(destPath))
contents := s.blobberFactory.GetBlobber(96).GetAllBytes()
s.T().Logf("destination path for blob: %s", destPath)
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
_, err = writer.Write(contents)
require.NoError(s.T(), err)
err = writer.Commit()
require.NoError(s.T(), err)
_, err = writer.Write(contents)
require.ErrorIs(s.T(), err, storagedriver.ErrAlreadyCommited)
}
func (s *DriverSuite) TestWriterWriteAfterCancel() {
destPath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(destPath))
contents := s.blobberFactory.GetBlobber(96).GetAllBytes()
s.T().Logf("destination path for blob: %s", destPath)
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
_, err = writer.Write(contents)
require.NoError(s.T(), err)
err = writer.Cancel()
require.NoError(s.T(), err)
_, err = writer.Write(contents)
require.ErrorIs(s.T(), err, storagedriver.ErrAlreadyCanceled)
}
func (s *DriverSuite) TestWriterCancel() {
destPath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(destPath))
contents := s.blobberFactory.GetBlobber(96).GetAllBytes()
s.T().Logf("destination path for blob: %s", destPath)
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
_, err = writer.Write(contents)
require.NoError(s.T(), err)
err = writer.Cancel()
require.NoError(s.T(), err)
err = writer.Close()
require.NoError(s.T(), err)
_, err = s.StorageDriver.Stat(s.ctx, destPath)
require.ErrorAs(s.T(), err, new(storagedriver.PathNotFoundError))
}
// TestOverwriteAppendBlob checks that driver can overwrite blob created using
// Write() call with PutContent() call. In case of e.g. Azure, Write() creates
// AppendBlob and PutContent() creates a BlockBlob and there is no in-place
// conversion - blob needs to be deleted and re-created when overwritting it.
func (s *DriverSuite) TestOverwriteAppendBlob() {
contents := s.blobberFactory.GetBlobber(64).GetAllBytes()
// NOTE(prozlach): We explicitly need a different blob here to confirm that
// in-place overwrite was indeed successful. In order to avoid extra
// allocations, we simply request a blobber double the size.
contentsAppend := contents[:32]
contentsBlock := contents[32:]
destPath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(destPath))
s.T().Logf("destination blob path used for testing: %s", destPath)
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err, "unexpected error from driver.Writer")
_, err = writer.Write(contentsAppend)
require.NoError(s.T(), err, "writer.Write: unexpected error")
// NOTE(prozlach): For some drivers, Close(), does not imply Commit()
err = writer.Commit()
require.NoError(s.T(), err, "writer.Commit: unexpected error")
err = writer.Close()
require.NoError(s.T(), err, "writer.Close: unexpected error")
err = s.StorageDriver.PutContent(s.ctx, destPath, contentsBlock)
require.NoError(s.T(), err)
received, err := s.StorageDriver.GetContent(s.ctx, destPath)
require.NoError(s.T(), err)
require.Equal(s.T(), contentsBlock, received)
}
// TestOverwriteBlockBlob checks that driver can overwrite blob created using
// PutContent() call using Write() call. In case of e.g. Azure Write() creates
// Append blob and Write() creates a Block blob and there is no in-place
// conversion - blob needs to be deleted.
func (s *DriverSuite) TestOverwriteBlockBlob() {
contents := s.blobberFactory.GetBlobber(64).GetAllBytes()
// NOTE(prozlach): We explicitly need a different blob here to confirm that
// in-place overwrite was indeed successful. In order to avoid extra
// allocations, we simply request a blobber double the size.
contentsAppend := contents[:32]
contentsBlock := contents[32:]
destPath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(destPath))
s.T().Logf("destination blob path used for testing: %s", destPath)
err := s.StorageDriver.PutContent(s.ctx, destPath, contentsBlock)
require.NoError(s.T(), err)
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err, "unexpected error from driver.Writer")
_, err = writer.Write(contentsAppend)
require.NoError(s.T(), err, "writer.Write: unexpected error")
// NOTE(prozlach): For some drivers, Close(), does not imply Commit()
err = writer.Commit()
require.NoError(s.T(), err, "writer.Commit: unexpected error")
err = writer.Close()
require.NoError(s.T(), err, "writer.Close: unexpected error")
received, err := s.StorageDriver.GetContent(s.ctx, destPath)
require.NoError(s.T(), err)
require.Equal(s.T(), contentsAppend, received)
}
// TestMoveOverwrite checks that a moved object no longer exists at the source
// path and overwrites the contents at the destination.
func (s *DriverSuite) TestMoveOverwrite() {
sourcePath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(sourcePath))
s.T().Logf("source blob path used for testing: %s", sourcePath)
destPath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(destPath))
s.T().Logf("destination blob path used for testing: %s", destPath)
// NOTE(prozlach): We explicitly need a different blob here to confirm that
// in-place overwrite was indeed successful. In order to avoid extra
// allocations, we simply request a blobber double the size.
contents := s.blobberFactory.GetBlobber(96).GetAllBytes()
sourceContents := contents[:32]
destContents := contents[32:]
err := s.StorageDriver.PutContent(s.ctx, sourcePath, sourceContents)
require.NoError(s.T(), err)
err = s.StorageDriver.PutContent(s.ctx, destPath, destContents)
require.NoError(s.T(), err)
err = s.StorageDriver.Move(s.ctx, sourcePath, destPath)
require.NoError(s.T(), err)
received, err := s.StorageDriver.GetContent(s.ctx, destPath)
require.NoError(s.T(), err)
require.Equal(s.T(), sourceContents, received)
_, err = s.StorageDriver.GetContent(s.ctx, sourcePath)
require.Error(s.T(), err)
require.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{
DriverName: s.StorageDriver.Name(),
Path: sourcePath,
})
}
// TestMoveNonexistent checks that moving a nonexistent key fails and does not
// delete the data at the destination path.
func (s *DriverSuite) TestMoveNonexistent() {
contents := s.blobberFactory.GetBlobber(32).GetAllBytes()
sourcePath := dtestutil.RandomPath(1, 32)
s.T().Logf("source blob path used for testing: %s", sourcePath)
destPath := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(destPath))
s.T().Logf("destination blob path used for testing: %s", destPath)
err := s.StorageDriver.PutContent(s.ctx, destPath, contents)
require.NoError(s.T(), err)
err = s.StorageDriver.Move(s.ctx, sourcePath, destPath)
require.Error(s.T(), err)
require.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{
DriverName: s.StorageDriver.Name(),
Path: sourcePath,
})
received, err := s.StorageDriver.GetContent(s.ctx, destPath)
require.NoError(s.T(), err)
require.Equal(s.T(), contents, received)
}
// TestMoveInvalid provides various checks for invalid moves.
func (s *DriverSuite) TestMoveInvalid() {
contents := s.blobberFactory.GetBlobber(32).GetAllBytes()
// Create a regular file.
err := s.StorageDriver.PutContent(s.ctx, "/notadir", contents)
require.NoError(s.T(), err)
defer s.deletePath(s.StorageDriver, "/notadir")
// Now try to move a non-existent file under it.
err = s.StorageDriver.Move(s.ctx, "/notadir/foo", "/notadir/bar")
require.Error(s.T(), err)
}
// TestDelete checks that the delete operation removes data from the storage
// driver
func (s *DriverSuite) TestDelete() {
filename := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(filename))
s.T().Logf("blob path used for testing: %s", filename)
contents := s.blobberFactory.GetBlobber(32).GetAllBytes()
err := s.StorageDriver.PutContent(s.ctx, filename, contents)
require.NoError(s.T(), err)
err = s.StorageDriver.Delete(s.ctx, filename)
require.NoError(s.T(), err)
_, err = s.StorageDriver.GetContent(s.ctx, filename)
require.Error(s.T(), err)
require.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{
DriverName: s.StorageDriver.Name(),
Path: filename,
})
}
// TestDeleteDir1File ensures the driver is able to delete all objects in a
// directory with 1 file.
func (s *DriverSuite) TestDeleteDir1File() {
s.testDeleteDir(s.T(), 1)
}
// TestDeleteDir1200Files ensures the driver is able to delete all objects in a
// directory with 1200 files.
func (s *DriverSuite) TestDeleteDir1200Files() {
s.testDeleteDir(s.T(), 1200)
}
func (s *DriverSuite) testDeleteDir(t *testing.T, numFiles int) {
rootDirectory := "/" + dtestutil.RandomFilenameRange(8, 8)
defer s.deletePath(s.StorageDriver, rootDirectory)
t.Logf("root directory path used for testing: %s", rootDirectory)
parentDirectory := rootDirectory + "/" + dtestutil.RandomFilenameRange(8, 8)
childFiles := s.buildFiles(t, parentDirectory, numFiles, 8)
err := s.StorageDriver.Delete(s.ctx, parentDirectory)
require.NoError(t, err)
// Most storage backends delete files in lexicographic order, so we'll access
// them in the same way, this should help point out errors due to deletion order.
sort.Strings(childFiles)
// This test can be flaky when large numbers of objects are deleted for
// storage backends which are eventually consistent. We'll log any files we
// encounter which are not delete and fail later. This way information about
// the failure/flake can be preserved to aid in debugging.
var filesRemaining bool
for i, f := range childFiles {
if _, err = s.StorageDriver.GetContent(s.ctx, f); err == nil {
filesRemaining = true
t.Logf("able to access file %d after deletion", i)
} else {
require.Error(t, err)
require.ErrorIs(t, err, storagedriver.PathNotFoundError{
DriverName: s.StorageDriver.Name(),
Path: f,
})
}
}
require.False(t, filesRemaining, "Encountered files remaining after deletion")
}
func (s *DriverSuite) buildFiles(t testing.TB, parentDirectory string, numFiles int, size int64) []string {
return dtestutil.BuildFiles(
s.ctx,
t,
s.StorageDriver,
parentDirectory,
numFiles,
s.blobberFactory.GetBlobber(size),
)
}
// assertPathNotFound asserts that path does not exist in the storage driver filesystem.
func (s *DriverSuite) assertPathNotFound(t require.TestingT, p ...string) {
for _, p := range p {
_, err := s.StorageDriver.GetContent(s.ctx, p)
require.Errorf(t, err, "path %q expected to be not found but it is still there", p)
require.ErrorIs(t, err, storagedriver.PathNotFoundError{
DriverName: s.StorageDriver.Name(),
Path: p,
})
}
}
// TestDeleteFiles checks that DeleteFiles removes data from the storage driver for a random (<10) number of files.
func (s *DriverSuite) TestDeleteFiles() {
parentDir := dtestutil.RandomPath(1, 8)
defer s.deletePath(s.StorageDriver, firstPart(parentDir))
s.T().Logf("parent directory path used for testing: %s", parentDir)
/* #nosec G404 */
blobPaths := s.buildFiles(s.T(), parentDir, mrand.IntN(10), 32)
count, err := s.StorageDriver.DeleteFiles(s.ctx, blobPaths)
require.NoError(s.T(), err)
require.Equal(s.T(), len(blobPaths), count)
s.assertPathNotFound(s.T(), blobPaths...)
}
// TestDeleteFileEqualFolderFileName is a regression test for deleting files
// where the file name and folder where the file resides have the same names
func (s *DriverSuite) TestDeleteFileEqualFolderFileName() {
parentDir := dtestutil.RandomPath(1, 8)
fileName := "Maryna"
p := path.Join(parentDir, fileName, fileName)
defer s.deletePath(s.StorageDriver, firstPart(parentDir))
s.T().Logf("parent directory path used for testing: %s", parentDir)
err := s.StorageDriver.PutContent(s.ctx, p, s.blobberFactory.GetBlobber(32).GetAllBytes())
require.NoError(s.T(), err)
err = s.StorageDriver.Delete(s.ctx, p)
require.NoError(s.T(), err)
s.assertPathNotFound(s.T(), p)
}
// TestDeleteFilesNotFound checks that DeleteFiles is idempotent and doesn't return an error if a file was not found.
func (s *DriverSuite) TestDeleteFilesNotFound() {
parentDir := dtestutil.RandomPath(1, 8)
defer s.deletePath(s.StorageDriver, firstPart(parentDir))
s.T().Logf("parent directory path used for testing: %s", parentDir)
blobPaths := s.buildFiles(s.T(), parentDir, 5, 32)
// delete the 1st, 3rd and last file so that they don't exist anymore
s.deletePath(s.StorageDriver, blobPaths[0])
s.deletePath(s.StorageDriver, blobPaths[2])
s.deletePath(s.StorageDriver, blobPaths[4])
count, err := s.StorageDriver.DeleteFiles(s.ctx, blobPaths)
require.NoError(s.T(), err)
require.Equal(s.T(), len(blobPaths), count)
s.assertPathNotFound(s.T(), blobPaths...)
}
// benchmarkDeleteFiles benchmarks DeleteFiles for an amount of num files.
func (s *DriverSuite) benchmarkDeleteFiles(b *testing.B, num int) {
parentDir := dtestutil.RandomPath(1, 8)
defer s.deletePath(s.StorageDriver, firstPart(parentDir))
for i := 0; i < b.N; i++ {
b.StopTimer()
paths := s.buildFiles(b, parentDir, num, 32)
b.StartTimer()
count, err := s.StorageDriver.DeleteFiles(s.ctx, paths)
b.StopTimer()
require.NoError(b, err)
require.Len(b, paths, count)
s.assertPathNotFound(b, paths...)
}
}
// BenchmarkDeleteFiles1File benchmarks DeleteFiles for 1 file.
func (s *DriverSuite) BenchmarkDeleteFiles1File(b *testing.B) {
s.benchmarkDeleteFiles(b, 1)
}
// BenchmarkDeleteFiles100Files benchmarks DeleteFiles for 100 files.
func (s *DriverSuite) BenchmarkDeleteFiles100Files(b *testing.B) {
s.benchmarkDeleteFiles(b, 100)
}
// TestURLFor checks that the URLFor method functions properly, but only if it
// is implemented
func (s *DriverSuite) TestURLFor() {
filename := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(filename))
contents := s.blobberFactory.GetBlobber(32).GetAllBytes()
s.T().Logf("blob path used for testing: %s", filename)
err := s.StorageDriver.PutContent(s.ctx, filename, contents)
require.NoError(s.T(), err)
url, err := s.StorageDriver.URLFor(s.ctx, filename, nil)
if errors.As(err, new(storagedriver.ErrUnsupportedMethod)) {
return
}
require.NoError(s.T(), err)
response, err := http.Get(url)
require.NoError(s.T(), err)
defer response.Body.Close()
read, err := io.ReadAll(response.Body)
require.NoError(s.T(), err)
require.Equal(s.T(), contents, read)
url, err = s.StorageDriver.URLFor(s.ctx, filename, map[string]any{"method": http.MethodHead})
if errors.As(err, new(storagedriver.ErrUnsupportedMethod)) {
return
}
require.NoError(s.T(), err)
response, err = http.Head(url)
require.NoError(s.T(), err)
err = response.Body.Close()
require.NoError(s.T(), err)
assert.Equal(s.T(), http.StatusOK, response.StatusCode)
assert.Equal(s.T(), int64(32), response.ContentLength)
}
// TestDeleteNonexistent checks that removing a nonexistent key fails.
func (s *DriverSuite) TestDeleteNonexistent() {
filename := dtestutil.RandomPath(1, 32)
err := s.StorageDriver.Delete(s.ctx, filename)
require.Error(s.T(), err)
require.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{
DriverName: s.StorageDriver.Name(),
Path: filename,
})
}
// TestDeleteFolder checks that deleting a folder removes all child elements.
func (s *DriverSuite) TestDeleteFolder() {
dirname := dtestutil.RandomPath(1, 32)
filename1 := dtestutil.RandomPath(1, 32)
filename2 := dtestutil.RandomPath(1, 32)
filename3 := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(dirname))
contents := s.blobberFactory.GetBlobber(32).GetAllBytes()
s.T().Logf("parent directory: %s, filename1: %s, filename2: %s, filename3: %s", dirname, filename1, filename2, filename3)
err := s.StorageDriver.PutContent(s.ctx, path.Join(dirname, filename1), contents)
require.NoError(s.T(), err)
err = s.StorageDriver.PutContent(s.ctx, path.Join(dirname, filename2), contents)
require.NoError(s.T(), err)
err = s.StorageDriver.PutContent(s.ctx, path.Join(dirname, filename3), contents)
require.NoError(s.T(), err)
err = s.StorageDriver.Delete(s.ctx, path.Join(dirname, filename1))
require.NoError(s.T(), err)
_, err = s.StorageDriver.GetContent(s.ctx, path.Join(dirname, filename1))
require.Error(s.T(), err)
require.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{
DriverName: s.StorageDriver.Name(),
Path: path.Join(dirname, filename1),
})
_, err = s.StorageDriver.GetContent(s.ctx, path.Join(dirname, filename2))
require.NoError(s.T(), err)
_, err = s.StorageDriver.GetContent(s.ctx, path.Join(dirname, filename3))
require.NoError(s.T(), err)
err = s.StorageDriver.Delete(s.ctx, dirname)
require.NoError(s.T(), err)
_, err = s.StorageDriver.GetContent(s.ctx, path.Join(dirname, filename1))
require.Error(s.T(), err)
require.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{
DriverName: s.StorageDriver.Name(),
Path: path.Join(dirname, filename1),
})
_, err = s.StorageDriver.GetContent(s.ctx, path.Join(dirname, filename2))
require.Error(s.T(), err)
require.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{
DriverName: s.StorageDriver.Name(),
Path: path.Join(dirname, filename2),
})
_, err = s.StorageDriver.GetContent(s.ctx, path.Join(dirname, filename3))
require.Error(s.T(), err)
require.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{
DriverName: s.StorageDriver.Name(),
Path: path.Join(dirname, filename3),
})
}
// TestDeleteOnlyDeletesSubpaths checks that deleting path A does not
// delete path B when A is a prefix of B but B is not a subpath of A (so that
// deleting "/a" does not delete "/ab"). This matters for services like S3 that
// do not implement directories.
func (s *DriverSuite) TestDeleteOnlyDeletesSubpaths() {
dirname := dtestutil.RandomPath(1, 32)
filename := dtestutil.RandomPath(1, 32)
contents := s.blobberFactory.GetBlobber(32).GetAllBytes()
defer s.deletePath(s.StorageDriver, firstPart(dirname))
s.T().Logf("directory: %s, filename: %s", dirname, filename)
err := s.StorageDriver.PutContent(s.ctx, path.Join(dirname, filename), contents)
require.NoError(s.T(), err)
err = s.StorageDriver.PutContent(s.ctx, path.Join(dirname, filename+"suffix"), contents)
require.NoError(s.T(), err)
err = s.StorageDriver.PutContent(s.ctx, path.Join(dirname, dirname, filename), contents)
require.NoError(s.T(), err)
err = s.StorageDriver.PutContent(s.ctx, path.Join(dirname, dirname+"suffix", filename), contents)
require.NoError(s.T(), err)
err = s.StorageDriver.Delete(s.ctx, path.Join(dirname, filename))
require.NoError(s.T(), err)
_, err = s.StorageDriver.GetContent(s.ctx, path.Join(dirname, filename))
require.Error(s.T(), err)
require.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{
DriverName: s.StorageDriver.Name(),
Path: path.Join(dirname, filename),
})
_, err = s.StorageDriver.GetContent(s.ctx, path.Join(dirname, filename+"suffix"))
require.NoError(s.T(), err)
err = s.StorageDriver.Delete(s.ctx, path.Join(dirname, dirname))
require.NoError(s.T(), err)
_, err = s.StorageDriver.GetContent(s.ctx, path.Join(dirname, dirname, filename))
require.Error(s.T(), err)
require.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{
DriverName: s.StorageDriver.Name(),
Path: path.Join(dirname, dirname, filename),
})
_, err = s.StorageDriver.GetContent(s.ctx, path.Join(dirname, dirname+"suffix", filename))
require.NoError(s.T(), err)
}
// TestStatCall runs verifies the implementation of the storagedriver's Stat call.
func (s *DriverSuite) TestStatCall() {
// NOTE(prozlach): We explicitly need a different blob here to confirm that
// in-place overwrite was indeed successful.
// NOTE(prozlach): The idea is to create a hierarchy in the s3 bucket
// where:
// * there is one common directory for all the blobs (dirPathBase)
// * there are two files in two different "subdirectories" under the same
// common directory (dirA, dirB and filePath, filePathAux)
// * both subdirectories should share the same prefix so that we could test
// a stat on inexistant dir which is actually a common prefix for existing
// subdirectories (DirPartialPrefix)
contentAB := s.blobberFactory.GetBlobber(4096 * 2).GetAllBytes()
contentA := contentAB[:4096]
contentB := contentAB[4096:]
dirPathBase := dtestutil.RandomPath(1, 24)
dirA := "foo" + dtestutil.RandomFilename(13)
dirB := "foo" + dtestutil.RandomFilename(13)
partialPath := path.Join(dirPathBase, "foo")
dirPath := path.Join(dirPathBase, dirA)
dirPathAux := path.Join(dirPathBase, dirB)
fileName := dtestutil.RandomFilename(32)
filePath := path.Join(dirPath, fileName)
// Trigger a case where for given prefix there is more than one object
filePathAux := path.Join(dirPathAux, fileName)
s.T().Logf("directory: %s, filename: %s, filename aux: %s", dirPath, fileName, filePathAux)
err := s.StorageDriver.PutContent(s.ctx, filePath, contentA)
require.NoError(s.T(), err)
err = s.StorageDriver.PutContent(s.ctx, filePathAux, contentA)
require.NoError(s.T(), err)
defer s.deletePath(s.StorageDriver, firstPart(dirPath))
// Call to stat on root directory. The storage healthcheck performs this
// exact call to Stat.
// PathNotFoundErrors are not considered health check failures. Some
// drivers will return a not found here, while others will not return an
// error at all. If we get an error, ensure it's a not found.
s.Run("RootDirectory", func() {
fi, err := s.StorageDriver.Stat(s.ctx, "/")
if err != nil {
assert.ErrorAs(s.T(), err, new(storagedriver.PathNotFoundError))
} else {
assert.NotNil(s.T(), fi)
assert.Equal(s.T(), "/", fi.Path())
assert.True(s.T(), fi.IsDir())
}
})
s.Run("NonExistentDir", func() {
fi, err := s.StorageDriver.Stat(s.ctx, dirPath+"foo")
require.Error(s.T(), err)
assert.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{ // nolint: testifylint
DriverName: s.StorageDriver.Name(),
Path: dirPath + "foo",
})
assert.Nil(s.T(), fi)
})
s.Run("NonExistentPath", func() {
fi, err := s.StorageDriver.Stat(s.ctx, filePath+"bar")
require.Error(s.T(), err)
assert.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{ // nolint: testifylint
DriverName: s.StorageDriver.Name(),
Path: filePath + "bar",
})
assert.Nil(s.T(), fi)
})
s.Run("FileExists", func() {
fi, err := s.StorageDriver.Stat(s.ctx, filePath)
require.NoError(s.T(), err)
require.NotNil(s.T(), fi)
assert.Equal(s.T(), filePath, fi.Path())
assert.Equal(s.T(), int64(len(contentA)), fi.Size())
assert.False(s.T(), fi.IsDir())
})
s.Run("ModTime", func() {
fi, err := s.StorageDriver.Stat(s.ctx, filePath)
require.NoError(s.T(), err)
assert.NotNil(s.T(), fi)
createdTime := fi.ModTime()
// Sleep and modify the file
time.Sleep(time.Second * 10)
err = s.StorageDriver.PutContent(s.ctx, filePath, contentB)
require.NoError(s.T(), err)
fi, err = s.StorageDriver.Stat(s.ctx, filePath)
require.NoError(s.T(), err)
require.NotNil(s.T(), fi)
modTime := fi.ModTime()
// Check if the modification time is after the creation time.
// In case of cloud storage services, storage frontend nodes might have
// time drift between them, however that should be solved with sleeping
// before update.
assert.Greaterf(
s.T(),
modTime,
createdTime,
"modtime (%s) is before the creation time (%s)", modTime, createdTime,
)
})
// Call on directory with one "file"
// (do not check ModTime as dirs don't need to support it)
s.Run("DirWithFile", func() {
fi, err := s.StorageDriver.Stat(s.ctx, dirPath)
require.NoError(s.T(), err)
require.NotNil(s.T(), fi)
assert.Equal(s.T(), dirPath, fi.Path())
assert.Zero(s.T(), fi.Size())
assert.True(s.T(), fi.IsDir())
})
// Call on directory with another "subdirectory"
s.Run("DirWithSubDir", func() {
fi, err := s.StorageDriver.Stat(s.ctx, dirPathBase)
require.NoError(s.T(), err)
require.NotNil(s.T(), fi)
assert.Equal(s.T(), dirPathBase, fi.Path())
assert.Zero(s.T(), fi.Size())
assert.True(s.T(), fi.IsDir())
})
// Call on a partial name of the directory. This should result in
// not-found, as partial match is still not a match for a directory.
s.Run("DirPartialPrefix", func() {
fi, err := s.StorageDriver.Stat(s.ctx, partialPath)
require.Error(s.T(), err)
assert.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{ // nolint: testifylint
DriverName: s.StorageDriver.Name(),
Path: partialPath,
})
assert.Nil(s.T(), fi)
})
}
// TestPutContentMultipleTimes checks that if storage driver can overwrite the content
// in the subsequent puts. Validates that PutContent does not have to work
// with an offset like Writer does and overwrites the file entirely
// rather than writing the data to the [0,len(data)) of the file.
func (s *DriverSuite) TestPutContentMultipleTimes() {
filename := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(filename))
s.T().Logf("blob path used for testing: %s", filename)
// NOTE(prozlach): We explicitly need a different blob here to confirm that
// in-place overwrite was indeed successful.
contentsAB := s.blobberFactory.GetBlobber(4096 + 2048).GetAllBytes()
contentsA := contentsAB[:4096]
contentsB := contentsAB[4096:]
err := s.StorageDriver.PutContent(s.ctx, filename, contentsA)
require.NoError(s.T(), err)
err = s.StorageDriver.PutContent(s.ctx, filename, contentsB)
require.NoError(s.T(), err)
readContents, err := s.StorageDriver.GetContent(s.ctx, filename)
require.NoError(s.T(), err)
require.Equal(s.T(), contentsB, readContents)
}
// TestConcurrentStreamReads checks that multiple clients can safely read from
// the same file simultaneously with various offsets.
func (s *DriverSuite) TestConcurrentStreamReads() {
var fileSize int64 = 128 * 1 << 20
if testing.Short() {
fileSize = 10 * 1 << 20
s.T().Logf("Reducing file size to 10MB for short mode")
}
filename := dtestutil.RandomPath(1, 32)
defer s.deletePath(s.StorageDriver, firstPart(filename))
s.T().Logf("blob path used for testing: %s", filename)
blobber := s.blobberFactory.GetBlobber(fileSize)
err := s.StorageDriver.PutContent(s.ctx, filename, blobber.GetAllBytes())
require.NoError(s.T(), err)
var wg sync.WaitGroup
readContents := func() {
defer wg.Done()
/* #nosec G404 */
offset := mrand.Int64N(fileSize)
reader, err := s.StorageDriver.Reader(s.ctx, filename, offset)
// nolint: testifylint // require-error
if !assert.NoError(s.T(), err) {
return
}
defer reader.Close()
blobber.AssertStreamEqual(s.T(), reader, offset, fmt.Sprintf("filename: %s, offset: %d", filename, offset))
}
wg.Add(10)
for i := 0; i < 10; i++ {
go readContents()
}
wg.Wait()
}
// TestConcurrentFileStreams checks that multiple *os.File objects can be passed
// in to Writer concurrently without hanging.
func (s *DriverSuite) TestConcurrentFileStreams() {
numStreams := 32
if testing.Short() {
numStreams = 8
s.T().Log("Reducing number of streams to 8 for short mode")
}
var wg sync.WaitGroup
testStream := func(size int64) {
defer wg.Done()
s.testFileStreams(s.T(), size)
}
wg.Add(numStreams)
for i := numStreams; i > 0; i-- {
go testStream(int64(numStreams) * 1 << 20)
}
wg.Wait()
}
// TODO (brianbland): evaluate the relevancy of this test
// TestEventualConsistency checks that if stat says that a file is a certain size, then
// you can freely read from the file (this is the only guarantee that the driver needs to provide)
// func (suite *DriverTestSuite) TestEventualConsistency() {
// if testing.Short() {
// suite.T().Skip("Skipping test in short mode")
// }
//
// filename := randomPath(1,32)
// defer suite.deletePath(suite.T(), firstPart(filename), false)
//
// var offset int64
// var misswrites int
// var chunkSize int64 = 32
//
// for i := 0; i < 1024; i++ {
// contents := s.blobberFactory.GetBlober(chunkSize).GetAllBytes()
// read, err := suite.StorageDriver.Writer(suite.ctx, filename, offset, bytes.NewReader(contents))
// require.NoError(suite.T(), err)
//
// fi, err := suite.StorageDriver.Stat(suite.ctx, filename)
// require.NoError(suite.T(), err)
//
// // We are most concerned with being able to read data as soon as Stat declares
// // it is uploaded. This is the strongest guarantee that some drivers (that guarantee
// // at best eventual consistency) absolutely need to provide.
// if fi.Size() == offset+chunkSize {
// reader, err := suite.StorageDriver.Reader(suite.ctx, filename, offset)
// require.NoError(suite.T(), err)
//
// readContents, err := io.ReadAll(reader)
// require.NoError(suite.T(), err)
//
// require.Equal(suite.T(), contents, readContents)
//
// reader.Close()
// offset += read
// } else {
// misswrites++
// }
// }
//
// if misswrites > 0 {
// c.Log("There were " + string(misswrites) + " occurrences of a write not being instantly available.")
// }
//
// require.NotEqual(suite.T(), 1024, misswrites)
// }
// TestWalk ensures that all files are visted by WalkParallel.
func (s *DriverSuite) TestWalk() {
rootDirectory := "/" + dtestutil.RandomFilenameRange(8, 8)
defer s.deletePath(s.StorageDriver, rootDirectory)
s.T().Logf("root directory used for testing: %s", rootDirectory)
numWantedFiles := 10
wantedFiles := dtestutil.RandomBranchingFiles(rootDirectory, numWantedFiles)
wantedDirectoriesSet := make(map[string]struct{})
for i := 0; i < numWantedFiles; i++ {
// Gather unique directories from the full path, excluding the root directory.
p := path.Dir(wantedFiles[i])
for {
// Guard against non-terminating loops: path.Dir returns "." if the path is empty.
if p == rootDirectory || p == "." {
break
}
wantedDirectoriesSet[p] = struct{}{}
p = path.Dir(p)
}
/* #nosec G404 */
err := s.StorageDriver.PutContent(s.ctx, wantedFiles[i], s.blobberFactory.GetBlobber(8+mrand.Int64N(8)).GetAllBytes())
require.NoError(s.T(), err)
}
verifyResults := func(actualFiles, actualDirectories []string) {
require.ElementsMatch(s.T(), wantedFiles, actualFiles)
// Convert from a set of wanted directories into a slice.
wantedDirectories := make([]string, len(wantedDirectoriesSet))
var i int
for k := range wantedDirectoriesSet {
wantedDirectories[i] = k
i++
}
require.ElementsMatch(s.T(), wantedDirectories, actualDirectories)
}
s.Run("PararellWalk", func() {
s.skipIfWalkParallelIsNotSupported()
fChan := make(chan string)
dChan := make(chan string)
var actualFiles []string
var actualDirectories []string
var wg sync.WaitGroup
go func() {
defer wg.Done()
wg.Add(1)
for f := range fChan {
actualFiles = append(actualFiles, f)
}
}()
go func() {
defer wg.Done()
wg.Add(1)
for d := range dChan {
actualDirectories = append(actualDirectories, d)
}
}()
err := s.StorageDriver.WalkParallel(s.ctx, rootDirectory, func(fInfo storagedriver.FileInfo) error {
// Use append here to prevent a panic if walk finds more than we expect.
if fInfo.IsDir() {
dChan <- fInfo.Path()
} else {
fChan <- fInfo.Path()
}
return nil
})
require.NoError(s.T(), err)
close(fChan)
close(dChan)
wg.Wait()
verifyResults(actualFiles, actualDirectories)
})
s.Run("PlainWalk", func() {
var actualFiles []string
var actualDirectories []string
err := s.StorageDriver.Walk(s.ctx, rootDirectory, func(fInfo storagedriver.FileInfo) error {
if fInfo.IsDir() {
actualDirectories = append(actualDirectories, fInfo.Path())
} else {
actualFiles = append(actualFiles, fInfo.Path())
}
return nil
})
require.NoError(s.T(), err)
verifyResults(actualFiles, actualDirectories)
})
}
// TestWalkError ensures that walk reports WalkFn errors.
func (s *DriverSuite) TestWalkError() {
rootDirectory := "/" + dtestutil.RandomFilenameRange(8, 8)
defer s.deletePath(s.StorageDriver, rootDirectory)
s.T().Logf("root directory used for testing: %s", rootDirectory)
wantedFiles := dtestutil.RandomBranchingFiles(rootDirectory, 100)
for _, file := range wantedFiles {
/* #nosec G404 */
err := s.StorageDriver.PutContent(s.ctx, file, s.blobberFactory.GetBlobber(8+mrand.Int64N(8)).GetAllBytes())
require.NoError(s.T(), err)
}
innerErr := errors.New("walk: expected test error")
errorFile := wantedFiles[0]
s.Run("PararellWalk", func() {
s.skipIfWalkParallelIsNotSupported()
err := s.StorageDriver.WalkParallel(s.ctx, rootDirectory, func(fInfo storagedriver.FileInfo) error {
if fInfo.Path() == errorFile {
return innerErr
}
return nil
})
// Drivers may or may not return a multierror here, check that the innerError
// is present in the error returned by walk.
require.ErrorIs(s.T(), err, innerErr)
})
s.Run("PlainWalk", func() {
err := s.StorageDriver.Walk(s.ctx, rootDirectory, func(fInfo storagedriver.FileInfo) error {
if fInfo.Path() == errorFile {
return innerErr
}
return nil
})
// Drivers may or may not return a multierror here, check that the innerError
// is present in the error returned by walk.
require.ErrorIs(s.T(), err, innerErr)
})
}
// TestWalkSkipDir tests that the Walk and WalkParallel functions
// properly handle the ErrSkipDir special case.
func (s *DriverSuite) TestWalkSkipDir() {
rootDirectory := "/" + dtestutil.RandomFilenameRange(8, 8)
defer s.deletePath(s.StorageDriver, rootDirectory)
s.T().Logf("root directory used for testing: %s", rootDirectory)
// Create directories with a structure like:
// rootDirectory/
// ├── dir1/
// │ ├── file1-1
// │ └── file1-2
// ├── dir2/ (this one will be skipped)
// │ ├── file2-1
// │ └── file2-2
// └── dir3/
// ├── file3-1
// └── file3-2
dir1 := path.Join(rootDirectory, "dir1")
dir2 := path.Join(rootDirectory, "dir2") // this will be skipped
dir3 := path.Join(rootDirectory, "dir3")
// Create map of all files to create with full paths
fileStructure := map[string][]string{
dir1: {
path.Join(dir1, "file1-1"),
path.Join(dir1, "file1-2"),
},
dir2: { // dir2 will be skipped during walk
path.Join(dir2, "file2-1"),
path.Join(dir2, "file2-2"),
},
dir3: {
path.Join(dir3, "file3-1"),
path.Join(dir3, "file3-2"),
},
}
// Create all files in all directories
for _, files := range fileStructure {
for _, filePath := range files {
err := s.StorageDriver.PutContent(s.ctx, filePath, s.blobberFactory.GetBlobber(32).GetAllBytes())
require.NoError(s.T(), err)
}
}
// Build expected and unexpected paths
expectedPaths := []string{
dir1,
dir3,
fileStructure[dir1][0],
fileStructure[dir1][1],
fileStructure[dir3][0],
fileStructure[dir3][1],
}
// Unexpected paths are dir2 and all its files
unexpectedPaths := []string{
dir2,
fileStructure[dir2][0],
fileStructure[dir2][1],
}
// Helper function to create a walk function that skips dir2
createWalkFunc := func(pathCollector *sync.Map) storagedriver.WalkFn {
return func(fInfo storagedriver.FileInfo) error {
path := fInfo.Path()
// Skip dir2
if path == dir2 {
return storagedriver.ErrSkipDir
}
// Record this path was visited
pathCollector.Store(path, struct{}{})
return nil
}
}
// Verify all expected paths were visited and unexpected were not
verifyPaths := func(visitedPaths *sync.Map) {
// Check expected paths were visited
for _, expectedPath := range expectedPaths {
_, found := visitedPaths.Load(expectedPath)
assert.Truef(s.T(), found, "Path %s should have been visited", expectedPath)
}
// Check unexpected paths were not visited
for _, unexpectedPath := range unexpectedPaths {
_, found := visitedPaths.Load(unexpectedPath)
assert.Falsef(s.T(), found, "Path %s should not have been visited", unexpectedPath)
}
}
s.Run("PlainWalk", func() {
var visitedPaths sync.Map
err := s.StorageDriver.Walk(s.ctx, rootDirectory, createWalkFunc(&visitedPaths))
require.NoError(s.T(), err)
verifyPaths(&visitedPaths)
})
s.Run("ParallelWalk", func() {
s.skipIfWalkParallelIsNotSupported()
var visitedPaths sync.Map
err := s.StorageDriver.WalkParallel(s.ctx, rootDirectory, createWalkFunc(&visitedPaths))
require.NoError(s.T(), err)
verifyPaths(&visitedPaths)
})
}
// TestWalkErrorPathNotFound ensures that walk reports an error on a path not
// found.
func (s *DriverSuite) TestWalkErrorPathNotFound() {
s.Run("PararellWalk", func() {
s.skipIfWalkParallelIsNotSupported()
err := s.StorageDriver.WalkParallel(s.ctx, "/maryna/boryna", func(_ storagedriver.FileInfo) error {
return nil
})
// Drivers may or may not return a multierror here, check that the innerError
// is present in the error returned by walk.
require.ErrorAs(s.T(), err, new(storagedriver.PathNotFoundError))
})
s.Run("PlainWalk", func() {
err := s.StorageDriver.Walk(s.ctx, "/maryna/boryna", func(_ storagedriver.FileInfo) error {
return nil
})
// Drivers may or may not return a multierror here, check that the innerError
// is present in the error returned by walk.
require.ErrorAs(s.T(), err, new(storagedriver.PathNotFoundError))
})
}
// TestWalkParallelStopsProcessingOnError ensures that walk stops processing when an error is encountered.
func (s *DriverSuite) TestWalkParallelStopsProcessingOnError() {
s.skipIfWalkParallelIsNotSupported()
rootDirectory := "/" + dtestutil.RandomFilenameRange(8, 8)
defer s.deletePath(s.StorageDriver, rootDirectory)
s.T().Logf("root directory used for testing: %s", rootDirectory)
numWantedFiles := 1000
wantedFiles := dtestutil.RandomBranchingFiles(rootDirectory, numWantedFiles)
// Add a file right under the root directory, so that processing is stopped
// early in the walk cycle.
errorFile := filepath.Join(rootDirectory, dtestutil.RandomFilenameRange(8, 8))
wantedFiles = append(wantedFiles, errorFile)
for _, file := range wantedFiles {
/* #nosec G404 */
err := s.StorageDriver.PutContent(s.ctx, file, s.blobberFactory.GetBlobber(8+mrand.Int64N(8)).GetAllBytes())
require.NoError(s.T(), err)
}
processingTime := time.Second * 1
// Rough limit that should scale with longer or shorter processing times. Shorter than full uncancled runtime.
limit := time.Second * time.Duration(int64(processingTime)*4)
start := time.Now()
s.StorageDriver.WalkParallel(s.ctx, rootDirectory, func(fInfo storagedriver.FileInfo) error {
if fInfo.Path() == errorFile {
return errors.New("")
}
// Imitate workload.
time.Sleep(processingTime)
return nil
})
end := time.Now()
require.Less(s.T(), end.Sub(start), limit)
}
// BenchmarkPutGetEmptyFiles benchmarks PutContent/GetContent for 0B files
func (s *DriverSuite) BenchmarkPutGetEmptyFiles(b *testing.B) {
s.benchmarkPutGetFiles(b, 0)
}
// BenchmarkPutGet1KBFiles benchmarks PutContent/GetContent for 1KB files
func (s *DriverSuite) BenchmarkPutGet1KBFiles(b *testing.B) {
s.benchmarkPutGetFiles(b, 1024)
}
// BenchmarkPutGet1MBFiles benchmarks PutContent/GetContent for 1MB files
func (s *DriverSuite) BenchmarkPutGet1MBFiles(b *testing.B) {
s.benchmarkPutGetFiles(b, 1024*1024)
}
// BenchmarkPutGet1GBFiles benchmarks PutContent/GetContent for 1GB files
func (s *DriverSuite) BenchmarkPutGet1GBFiles(b *testing.B) {
s.benchmarkPutGetFiles(b, 1024*1024*1024)
}
func (s *DriverSuite) benchmarkPutGetFiles(b *testing.B, size int64) {
b.SetBytes(size)
parentDir := dtestutil.RandomPath(1, 8)
defer func() {
b.StopTimer()
s.StorageDriver.Delete(s.ctx, firstPart(parentDir))
}()
for i := 0; i < b.N; i++ {
filename := path.Join(parentDir, dtestutil.RandomPath(4, 32))
err := s.StorageDriver.PutContent(s.ctx, filename, s.blobberFactory.GetBlobber(size).GetAllBytes())
require.NoError(b, err)
_, err = s.StorageDriver.GetContent(s.ctx, filename)
require.NoError(b, err)
}
}
// BenchmarkStreamEmptyFiles benchmarks Writer/Reader for 0B files
func (s *DriverSuite) BenchmarkStreamEmptyFiles(b *testing.B) {
if s.StorageDriver.Name() == "s3aws" {
s.T().Skip("S3 multipart uploads require at least 1 chunk (>0B)")
}
s.benchmarkStreamFiles(b, 0)
}
// BenchmarkStream1KBFiles benchmarks Writer/Reader for 1KB files
func (s *DriverSuite) BenchmarkStream1KBFiles(b *testing.B) {
s.benchmarkStreamFiles(b, 1024)
}
// BenchmarkStream1MBFiles benchmarks Writer/Reader for 1MB files
func (s *DriverSuite) BenchmarkStream1MBFiles(b *testing.B) {
s.benchmarkStreamFiles(b, 1024*1024)
}
// BenchmarkStream1GBFiles benchmarks Writer/Reader for 1GB files
func (s *DriverSuite) BenchmarkStream1GBFiles(b *testing.B) {
s.benchmarkStreamFiles(b, 1024*1024*1024)
}
func (s *DriverSuite) benchmarkStreamFiles(b *testing.B, size int64) {
b.SetBytes(size)
parentDir := dtestutil.RandomPath(1, 8)
defer func() {
b.StopTimer()
s.StorageDriver.Delete(s.ctx, firstPart(parentDir))
}()
for i := 0; i < b.N; i++ {
filename := path.Join(parentDir, dtestutil.RandomPath(4, 32))
writer, err := s.StorageDriver.Writer(s.ctx, filename, false)
require.NoError(b, err)
written, err := io.Copy(writer, s.blobberFactory.GetBlobber(size).GetReader())
require.NoError(b, err)
require.Equal(b, size, written)
err = writer.Commit()
require.NoError(b, err)
err = writer.Close()
require.NoError(b, err)
rc, err := s.StorageDriver.Reader(s.ctx, filename, 0)
require.NoError(b, err)
rc.Close()
}
}
// BenchmarkList5Files benchmarks List for 5 small files
func (s *DriverSuite) BenchmarkList5Files(b *testing.B) {
s.benchmarkListFiles(b, 5)
}
// BenchmarkList50Files benchmarks List for 50 small files
func (s *DriverSuite) BenchmarkList50Files(b *testing.B) {
s.benchmarkListFiles(b, 50)
}
func (s *DriverSuite) benchmarkListFiles(b *testing.B, numFiles int64) {
parentDir := dtestutil.RandomPath(1, 8)
defer func() {
b.StopTimer()
s.StorageDriver.Delete(s.ctx, firstPart(parentDir))
}()
for i := int64(0); i < numFiles; i++ {
err := s.StorageDriver.PutContent(s.ctx, path.Join(parentDir, dtestutil.RandomPath(4, 32)), nil)
require.NoError(b, err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
files, err := s.StorageDriver.List(s.ctx, parentDir)
require.NoError(b, err)
require.Equal(b, numFiles, int64(len(files)))
}
}
// BenchmarkDelete5Files benchmarks Delete for 5 small files
func (s *DriverSuite) BenchmarkDelete5Files(b *testing.B) {
s.benchmarkDelete(b, 5)
}
// BenchmarkDelete50Files benchmarks Delete for 50 small files
func (s *DriverSuite) BenchmarkDelete50Files(b *testing.B) {
s.benchmarkDelete(b, 50)
}
func (s *DriverSuite) benchmarkDelete(b *testing.B, numFiles int64) {
for i := 0; i < b.N; i++ {
parentDir := dtestutil.RandomPath(4, 12)
// nolint: revive// defer
defer s.deletePath(s.StorageDriver, firstPart(parentDir))
b.StopTimer()
for j := int64(0); j < numFiles; j++ {
err := s.StorageDriver.PutContent(s.ctx, path.Join(parentDir, dtestutil.RandomPath(4, 32)), nil)
require.NoError(b, err)
}
b.StartTimer()
// This is the operation we're benchmarking
err := s.StorageDriver.Delete(s.ctx, firstPart(parentDir))
require.NoError(b, err)
}
}
// BenchmarkWalkParallelNop10Files benchmarks WalkParallel with a Nop function that visits 10 files
func (s *DriverSuite) BenchmarkWalkParallelNop10Files(b *testing.B) {
s.benchmarkWalkParallel(b, 10, func(_ storagedriver.FileInfo) error {
return nil
})
}
// BenchmarkWalkParallelNop500Files benchmarks WalkParallel with a Nop function that visits 500 files
func (s *DriverSuite) BenchmarkWalkParallelNop500Files(b *testing.B) {
s.benchmarkWalkParallel(b, 500, func(_ storagedriver.FileInfo) error {
return nil
})
}
func (s *DriverSuite) benchmarkWalkParallel(b *testing.B, numFiles int, f storagedriver.WalkFn) {
for i := 0; i < b.N; i++ {
rootDirectory := "/" + dtestutil.RandomFilenameRange(8, 8)
// nolint: revive // defer
defer s.deletePath(s.StorageDriver, rootDirectory)
b.StopTimer()
wantedFiles := dtestutil.RandomBranchingFiles(rootDirectory, numFiles)
// NOTE(prozlach): We are creating random size blobs, so we
// pre-allocate blobber that is able to accommodate the worst-case where
// all blobs have maximum size.
blobber := s.blobberFactory.GetBlobber(int64(numFiles * (8 + 8)))
offset := 0
for i := 0; i < numFiles; i++ {
/* #nosec G404 */
newOffset := 8 + mrand.IntN(8) + offset
contents := blobber.GetAllBytes()[offset:newOffset]
offset = newOffset
err := s.StorageDriver.PutContent(s.ctx, wantedFiles[i], contents)
require.NoError(b, err)
}
b.StartTimer()
err := s.StorageDriver.WalkParallel(s.ctx, rootDirectory, f)
require.NoError(b, err)
}
}
func (s *DriverSuite) createRegistry(t require.TestingT) distribution.Namespace {
k, err := libtrust.GenerateECP256PrivateKey()
require.NoError(t, err)
opts := []storage.RegistryOption{
storage.EnableDelete,
storage.Schema1SigningKey(k),
storage.EnableSchema1,
}
registry, err := storage.NewRegistry(s.ctx, s.StorageDriver, opts...)
require.NoError(t, err, "Failed to construct namespace")
return registry
}
func (s *DriverSuite) makeRepository(t require.TestingT, registry distribution.Namespace, name string) distribution.Repository {
named, err := reference.WithName(name)
require.NoErrorf(t, err, "Failed to parse name %s", name)
repo, err := registry.Repository(s.ctx, named)
require.NoError(t, err, "Failed to construct repository")
return repo
}
// BenchmarkMarkAndSweep10ImagesKeepUntagged uploads 10 images, deletes half
// and runs garbage collection on the registry without removing untaged images.
func (s *DriverSuite) BenchmarkMarkAndSweep10ImagesKeepUntagged(b *testing.B) {
s.benchmarkMarkAndSweep(b, 10, false)
}
// BenchmarkMarkAndSweep50ImagesKeepUntagged uploads 50 images, deletes half
// and runs garbage collection on the registry without removing untaged images.
func (s *DriverSuite) BenchmarkMarkAndSweep50ImagesKeepUntagged(b *testing.B) {
s.benchmarkMarkAndSweep(b, 50, false)
}
func (s *DriverSuite) benchmarkMarkAndSweep(b *testing.B, numImages int, removeUntagged bool) {
// Setup for this test takes a long time, even with small numbers of images,
// so keep the skip logic here in the sub test.
defer s.deletePath(s.StorageDriver, firstPart("docker/"))
for n := 0; n < b.N; n++ {
b.StopTimer()
registry := s.createRegistry(b)
repo := s.makeRepository(b, registry, fmt.Sprintf("benchmarks-repo-%d", n))
manifests, err := repo.Manifests(s.ctx)
require.NoError(b, err)
images := make([]testutil.Image, numImages)
for i := 0; i < numImages; i++ {
// Alternate between Schema1 and Schema2 images
if i%2 == 0 {
images[i], err = testutil.UploadRandomSchema1Image(repo)
require.NoError(b, err)
} else {
images[i], err = testutil.UploadRandomSchema2Image(repo)
require.NoError(b, err)
}
// Delete the manifests, so that their blobs can be garbage collected.
manifests.Delete(s.ctx, images[i].ManifestDigest)
}
b.StartTimer()
// Run GC
err = storage.MarkAndSweep(context.Background(), s.StorageDriver, registry, storage.GCOpts{
DryRun: false,
RemoveUntagged: removeUntagged,
})
require.NoError(b, err)
}
}
func (*DriverSuite) buildBlobs(t require.TestingT, repo distribution.Repository, n int) []digest.Digest {
dgsts := make([]digest.Digest, 0, n)
// build and upload random layers
layers, err := testutil.CreateRandomLayers(n)
require.NoError(t, err, "failed to create random digest")
err = testutil.UploadBlobs(repo, layers)
require.NoError(t, err, "failed to upload blob")
// collect digests from layers map
for d := range layers {
dgsts = append(dgsts, d)
}
return dgsts
}
// TestRemoveBlob checks that storage.Vacuum is able to delete a single blob.
func (s *DriverSuite) TestRemoveBlob() {
defer s.deletePath(s.StorageDriver, firstPart("docker/"))
registry := s.createRegistry(s.T())
repoName := dtestutil.RandomFilename(5)
repo := s.makeRepository(s.T(), registry, repoName)
s.T().Logf("repo name used for testing: %s", repoName)
v := storage.NewVacuum(s.StorageDriver)
// build two blobs, one more than the number to delete, otherwise there will be no /docker/registry/v2/blobs path
// for validation after delete
blobs := s.buildBlobs(s.T(), repo, 2)
blob := blobs[0]
err := v.RemoveBlob(s.ctx, blob)
require.NoError(s.T(), err)
blobService := registry.Blobs()
blobsLeft := newSyncDigestSet()
err = blobService.Enumerate(s.ctx, func(desc distribution.Descriptor) error {
blobsLeft.add(desc.Digest)
return nil
})
require.NoError(s.T(), err, "error getting all blobs")
assert.Equal(s.T(), 1, blobsLeft.len())
assert.Falsef(s.T(), blobsLeft.contains(blob), "blob %q was not deleted", blob.String())
}
func (s *DriverSuite) benchmarkRemoveBlob(b *testing.B, numBlobs int) {
defer s.deletePath(s.StorageDriver, firstPart("docker/"))
registry := s.createRegistry(b)
repoName := dtestutil.RandomFilename(5)
repo := s.makeRepository(s.T(), registry, repoName)
s.T().Logf("repo name used for testing: %s", repoName)
v := storage.NewVacuum(s.StorageDriver)
for n := 0; n < b.N; n++ {
b.StopTimer()
blobs := s.buildBlobs(b, repo, numBlobs)
b.StartTimer()
for _, bl := range blobs {
err := v.RemoveBlob(s.ctx, bl)
require.NoError(b, err)
}
}
}
// BenchmarkRemoveBlob1Blob creates 1 blob and deletes it using the storage.Vacuum.RemoveBlob method.
func (s *DriverSuite) BenchmarkRemoveBlob1Blob(b *testing.B) {
s.benchmarkRemoveBlob(b, 1)
}
// BenchmarkRemoveBlob10Blobs creates 10 blobs and deletes them using the storage.Vacuum.RemoveBlob method.
func (s *DriverSuite) BenchmarkRemoveBlob10Blobs(b *testing.B) {
s.benchmarkRemoveBlob(b, 10)
}
// BenchmarkRemoveBlob100Blobs creates 100 blobs and deletes them using the storage.Vacuum.RemoveBlob method.
func (s *DriverSuite) BenchmarkRemoveBlob100Blobs(b *testing.B) {
s.benchmarkRemoveBlob(b, 100)
}
// TestRemoveBlobs checks that storage.Vacuum is able to delete a set of blobs in bulk.
func (s *DriverSuite) TestRemoveBlobs() {
defer s.deletePath(s.StorageDriver, firstPart("docker/"))
registry := s.createRegistry(s.T())
repoName := dtestutil.RandomFilename(5)
repo := s.makeRepository(s.T(), registry, repoName)
s.T().Logf("repo name used for testing: %s", repoName)
v := storage.NewVacuum(s.StorageDriver)
// build some blobs and remove half of them, otherwise there will be no /docker/registry/v2/blobs path to look at
// for validation if there are no blobs left
blobs := s.buildBlobs(s.T(), repo, 4)
blobs = blobs[:2]
err := v.RemoveBlobs(s.ctx, blobs)
require.NoError(s.T(), err)
// assert that blobs were deleted
blobService := registry.Blobs()
blobsLeft := newSyncDigestSet()
err = blobService.Enumerate(s.ctx, func(desc distribution.Descriptor) error {
blobsLeft.add(desc.Digest)
return nil
})
require.NoError(s.T(), err, "error getting all blobs")
require.Equal(s.T(), 2, blobsLeft.len())
for _, b := range blobs {
assert.Falsef(s.T(), blobsLeft.contains(b), "blob %q was not deleted", b.String())
}
}
func (s *DriverSuite) benchmarkRemoveBlobs(b *testing.B, numBlobs int) {
defer s.deletePath(s.StorageDriver, firstPart("docker/"))
registry := s.createRegistry(b)
repoName := dtestutil.RandomFilename(5)
repo := s.makeRepository(s.T(), registry, repoName)
s.T().Logf("repo name used for testing: %s", repoName)
v := storage.NewVacuum(s.StorageDriver)
for n := 0; n < b.N; n++ {
b.StopTimer()
blobs := s.buildBlobs(b, repo, numBlobs)
b.StartTimer()
err := v.RemoveBlobs(s.ctx, blobs)
assert.NoError(b, err)
}
}
// BenchmarkRemoveBlobs1Blob creates 1 blob and deletes it using the storage.Vacuum.RemoveBlobs method.
func (s *DriverSuite) BenchmarkRemoveBlobs1Blob(b *testing.B) {
s.benchmarkRemoveBlobs(b, 1)
}
// BenchmarkRemoveBlobs10Blobs creates 10 blobs and deletes them using the storage.Vacuum.RemoveBlobs method.
func (s *DriverSuite) BenchmarkRemoveBlobs10Blobs(b *testing.B) {
s.benchmarkRemoveBlobs(b, 10)
}
// BenchmarkRemoveBlobs100Blobs creates 100 blobs and deletes them using the storage.Vacuum.RemoveBlobs method.
func (s *DriverSuite) BenchmarkRemoveBlobs100Blobs(b *testing.B) {
s.benchmarkRemoveBlobs(b, 100)
}
// BenchmarkRemoveBlobs1000Blobs creates 1000 blobs and deletes them using the storage.Vacuum.RemoveBlobs method.
func (s *DriverSuite) BenchmarkRemoveBlobs1000Blobs(b *testing.B) {
s.benchmarkRemoveBlobs(b, 1000)
}
func (s *DriverSuite) buildManifests(t require.TestingT, repo distribution.Repository, numManifests, numTagsPerManifest int) []storage.ManifestDel {
images := make([]testutil.Image, numManifests)
manifests := make([]storage.ManifestDel, 0)
repoName := repo.Named().Name()
var err error
for i := 0; i < numManifests; i++ {
// build images, alternating between Schema1 and Schema2 manifests
if i%2 == 0 {
images[i], err = testutil.UploadRandomSchema1Image(repo)
} else {
images[i], err = testutil.UploadRandomSchema2Image(repo)
}
require.NoError(t, err)
// build numTags tags per manifest
tags := make([]string, 0, numTagsPerManifest)
for j := 0; j < numTagsPerManifest; j++ {
rfn := dtestutil.RandomFilename(5)
d := images[i].ManifestDigest
err := repo.Tags(s.ctx).Tag(s.ctx, rfn, distribution.Descriptor{Digest: d})
require.NoError(t, err)
tags = append(tags, rfn)
}
manifests = append(manifests, storage.ManifestDel{
Name: repoName,
Digest: images[i].ManifestDigest,
Tags: tags,
})
}
return manifests
}
// TestRemoveManifests checks that storage.Vacuum is able to delete a set of manifests in bulk.
func (s *DriverSuite) TestRemoveManifests() {
defer s.deletePath(s.StorageDriver, firstPart("docker/"))
registry := s.createRegistry(s.T())
repoName := dtestutil.RandomFilename(5)
repo := s.makeRepository(s.T(), registry, repoName)
s.T().Logf("repo name used for testing: %s", repoName)
// build some manifests
manifests := s.buildManifests(s.T(), repo, 3, 1)
v := storage.NewVacuum(s.StorageDriver)
// remove all manifests except one, otherwise there will be no `_manifests/revisions` folder to look at for
// validation (empty "folders" are not preserved)
numToDelete := len(manifests) - 1
toDelete := manifests[:numToDelete]
err := v.RemoveManifests(s.ctx, toDelete)
require.NoError(s.T(), err)
// assert that toDelete manifests were actually deleted
manifestsLeft := newSyncDigestSet()
manifestService, err := repo.Manifests(s.ctx)
require.NoError(s.T(), err, "error building manifest service")
manifestEnumerator, ok := manifestService.(distribution.ManifestEnumerator)
require.True(s.T(), ok, "unable to convert ManifestService into ManifestEnumerator")
err = manifestEnumerator.Enumerate(s.ctx, func(dgst digest.Digest) error {
manifestsLeft.add(dgst)
return nil
})
require.NoError(s.T(), err, "error getting all manifests")
require.Equal(s.T(), len(manifests)-numToDelete, manifestsLeft.len())
for _, m := range toDelete {
assert.Falsef(s.T(), manifestsLeft.contains(m.Digest), "manifest %q was not deleted as expected", m.Digest)
}
}
func (s *DriverSuite) testRemoveManifestsPathBuild(numManifests, numTagsPerManifest int) {
v := storage.NewVacuum(s.StorageDriver)
var tags []string
for i := 0; i < numTagsPerManifest; i++ {
tags = append(tags, "foo")
}
var toDelete []storage.ManifestDel
for i := 0; i < numManifests; i++ {
m := storage.ManifestDel{
Name: dtestutil.RandomFilename(10),
Digest: digest.FromString(dtestutil.RandomFilename(20)),
Tags: tags,
}
toDelete = append(toDelete, m)
}
err := v.RemoveManifests(s.ctx, toDelete)
require.NoError(s.T(), err)
}
// TestRemoveManifestsPathBuildLargeScale simulates the execution of vacuum.RemoveManifests for repositories with large
// numbers of manifests eligible for deletion. No files are created in this test, we only simulate their existence so
// that we can test and profile the execution of the path build process within vacuum.RemoveManifests. The storage
// drivers DeleteFiles method is idempotent, so no error will be raised by attempting to delete non-existing files.
// However, to avoid large number of HTTP requests against cloud storage backends, it's recommended to run this test
// against the filesystem storage backend only. For safety, the test is skipped when not using the filesystem storage
// backend. Tweak the method locally to test use cases with different sizes and/or storage drivers.
func (s *DriverSuite) TestRemoveManifestsPathBuildLargeScale() {
if s.StorageDriver.Name() != "filesystem" {
s.T().Skipf("Skipping test for the %s driver", s.StorageDriver.Name())
}
numManifests := 100
numTagsPerManifest := 10
s.testRemoveManifestsPathBuild(numManifests, numTagsPerManifest)
}
func (s *DriverSuite) benchmarkRemoveManifests(b *testing.B, numManifests, numTagsPerManifest int) {
defer s.deletePath(s.StorageDriver, firstPart("docker/"))
registry := s.createRegistry(b)
repoName := dtestutil.RandomFilename(5)
repo := s.makeRepository(s.T(), registry, repoName)
s.T().Logf("repo name used for testing: %s", repoName)
for n := 0; n < b.N; n++ {
b.StopTimer()
manifests := s.buildManifests(b, repo, numManifests, numTagsPerManifest)
v := storage.NewVacuum(s.StorageDriver)
b.StartTimer()
err := v.RemoveManifests(s.ctx, manifests)
require.NoError(b, err)
}
}
// BenchmarkRemoveManifests1Manifest0Tags creates 1 manifest with no tags and deletes it using the
// storage.Vacuum.RemoveManifests method.
func (s *DriverSuite) BenchmarkRemoveManifests1Manifest0Tags(b *testing.B) {
s.benchmarkRemoveManifests(b, 1, 0)
}
// BenchmarkRemoveManifests1Manifest1Tag creates 1 manifest with 1 tag and deletes them using the
// storage.Vacuum.RemoveManifests method.
func (s *DriverSuite) BenchmarkRemoveManifests1Manifest1Tag(b *testing.B) {
s.benchmarkRemoveManifests(b, 1, 1)
}
// BenchmarkRemoveManifests10Manifests0TagsEach creates 10 manifests with no tags and deletes them using the
// storage.Vacuum.RemoveManifests method.
func (s *DriverSuite) BenchmarkRemoveManifests10Manifests0TagsEach(b *testing.B) {
s.benchmarkRemoveManifests(b, 10, 0)
}
// BenchmarkRemoveManifests10Manifests1TagEach creates 10 manifests with 1 tag each and deletes them using the
// storage.Vacuum.RemoveManifests method.
func (s *DriverSuite) BenchmarkRemoveManifests10Manifests1TagEach(b *testing.B) {
s.benchmarkRemoveManifests(b, 10, 1)
}
// BenchmarkRemoveManifests100Manifests0TagsEach creates 100 manifests with no tags and deletes them using the
// storage.Vacuum.RemoveManifests method.
func (s *DriverSuite) BenchmarkRemoveManifests100Manifests0TagsEach(b *testing.B) {
s.benchmarkRemoveManifests(b, 100, 0)
}
// BenchmarkRemoveManifests100Manifests1TagEach creates 100 manifests with 1 tag each and deletes them using the
// storage.Vacuum.RemoveManifests method.
func (s *DriverSuite) BenchmarkRemoveManifests100Manifests1TagEach(b *testing.B) {
s.benchmarkRemoveManifests(b, 100, 1)
}
// BenchmarkRemoveManifests100Manifests20TagsEach creates 100 manifests with 20 tags each and deletes them using the
// storage.Vacuum.RemoveManifests method.
func (s *DriverSuite) BenchmarkRemoveManifests100Manifests20TagsEach(b *testing.B) {
s.benchmarkRemoveManifests(b, 100, 20)
}
// NOTE(prozlach) testFileStreams is used in a goroutine, we can't use
// `require` here
func (s *DriverSuite) testFileStreams(t *testing.T, size int64) {
filename := dtestutil.RandomPath(4, 32)
defer s.deletePath(s.StorageDriver, firstPart(filename))
s.T().Logf("blob path for this stream: %s", filename)
blobber := s.blobberFactory.GetBlobber(size)
writer, err := s.StorageDriver.Writer(s.ctx, filename, false)
// nolint: testifylint // require-error
if !assert.NoError(t, err) {
return
}
nn, err := io.Copy(writer, blobber.GetReader())
// nolint: testifylint // require-error
if !assert.NoError(t, err) {
return
}
if !assert.EqualValues(t, size, nn) {
return
}
if s.StorageDriver.Name() != "gcs" || os.Getenv("REGISTRY_GCS_DRIVER") == "next" {
if !assert.EqualValues(t, size, writer.Size()) {
return
}
}
// nolint: testifylint // require-error
if !assert.NoError(t, writer.Commit()) {
return
}
// nolint: testifylint // require-error
if !assert.NoError(t, writer.Close()) {
return
}
reader, err := s.StorageDriver.Reader(s.ctx, filename, 0)
// nolint: testifylint // require-error
if !assert.NoError(t, err) {
return
}
defer reader.Close()
streamID := fmt.Sprintf("file stream of size %d, path %s", size, filename)
blobber.AssertStreamEqual(t, reader, 0, streamID)
}
func (s *DriverSuite) writeReadCompare(t *testing.T, filename string, contents []byte) {
defer s.deletePath(s.StorageDriver, firstPart(filename))
err := s.StorageDriver.PutContent(s.ctx, filename, contents)
require.NoError(t, err)
readContents, err := s.StorageDriver.GetContent(s.ctx, filename)
require.NoError(t, err)
require.Equal(t, contents, readContents)
}
func (s *DriverSuite) writeReadCompareStreams(t *testing.T, filename string, blobber *testutil.Blobber) {
defer s.deletePath(s.StorageDriver, firstPart(filename))
writer, err := s.StorageDriver.Writer(s.ctx, filename, false)
require.NoError(t, err)
nn, err := io.Copy(writer, blobber.GetReader())
require.NoError(t, err)
require.EqualValues(t, blobber.Size(), nn)
err = writer.Commit()
require.NoError(t, err)
err = writer.Close()
require.NoError(t, err)
reader, err := s.StorageDriver.Reader(s.ctx, filename, 0)
require.NoError(t, err)
defer reader.Close()
blobber.AssertStreamEqual(s.T(), reader, 0, fmt.Sprintf("filename: %s", filename))
}
func firstPart(filePath string) string {
if filePath == "" {
return "/"
}
for {
if filePath[len(filePath)-1] == '/' {
filePath = filePath[:len(filePath)-1]
}
dir, file := path.Split(filePath)
if dir == "" && file == "" {
return "/"
}
if dir == "/" || dir == "" {
return "/" + file
}
if file == "" {
return dir
}
filePath = dir
}
}