in registry/storage/driver/testsuites/testsuites.go [1986:2125]
func (s *DriverSuite) TestStatCall() {
// NOTE(prozlach): We explicitly need a different blob here to confirm that
// in-place overwrite was indeed successful.
// NOTE(prozlach): The idea is to create a hierarchy in the s3 bucket
// where:
// * there is one common directory for all the blobs (dirPathBase)
// * there are two files in two different "subdirectories" under the same
// common directory (dirA, dirB and filePath, filePathAux)
// * both subdirectories should share the same prefix so that we could test
// a stat on inexistant dir which is actually a common prefix for existing
// subdirectories (DirPartialPrefix)
contentAB := s.blobberFactory.GetBlobber(4096 * 2).GetAllBytes()
contentA := contentAB[:4096]
contentB := contentAB[4096:]
dirPathBase := dtestutil.RandomPath(1, 24)
dirA := "foo" + dtestutil.RandomFilename(13)
dirB := "foo" + dtestutil.RandomFilename(13)
partialPath := path.Join(dirPathBase, "foo")
dirPath := path.Join(dirPathBase, dirA)
dirPathAux := path.Join(dirPathBase, dirB)
fileName := dtestutil.RandomFilename(32)
filePath := path.Join(dirPath, fileName)
// Trigger a case where for given prefix there is more than one object
filePathAux := path.Join(dirPathAux, fileName)
s.T().Logf("directory: %s, filename: %s, filename aux: %s", dirPath, fileName, filePathAux)
err := s.StorageDriver.PutContent(s.ctx, filePath, contentA)
require.NoError(s.T(), err)
err = s.StorageDriver.PutContent(s.ctx, filePathAux, contentA)
require.NoError(s.T(), err)
defer s.deletePath(s.StorageDriver, firstPart(dirPath))
// Call to stat on root directory. The storage healthcheck performs this
// exact call to Stat.
// PathNotFoundErrors are not considered health check failures. Some
// drivers will return a not found here, while others will not return an
// error at all. If we get an error, ensure it's a not found.
s.Run("RootDirectory", func() {
fi, err := s.StorageDriver.Stat(s.ctx, "/")
if err != nil {
assert.ErrorAs(s.T(), err, new(storagedriver.PathNotFoundError))
} else {
assert.NotNil(s.T(), fi)
assert.Equal(s.T(), "/", fi.Path())
assert.True(s.T(), fi.IsDir())
}
})
s.Run("NonExistentDir", func() {
fi, err := s.StorageDriver.Stat(s.ctx, dirPath+"foo")
require.Error(s.T(), err)
assert.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{ // nolint: testifylint
DriverName: s.StorageDriver.Name(),
Path: dirPath + "foo",
})
assert.Nil(s.T(), fi)
})
s.Run("NonExistentPath", func() {
fi, err := s.StorageDriver.Stat(s.ctx, filePath+"bar")
require.Error(s.T(), err)
assert.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{ // nolint: testifylint
DriverName: s.StorageDriver.Name(),
Path: filePath + "bar",
})
assert.Nil(s.T(), fi)
})
s.Run("FileExists", func() {
fi, err := s.StorageDriver.Stat(s.ctx, filePath)
require.NoError(s.T(), err)
require.NotNil(s.T(), fi)
assert.Equal(s.T(), filePath, fi.Path())
assert.Equal(s.T(), int64(len(contentA)), fi.Size())
assert.False(s.T(), fi.IsDir())
})
s.Run("ModTime", func() {
fi, err := s.StorageDriver.Stat(s.ctx, filePath)
require.NoError(s.T(), err)
assert.NotNil(s.T(), fi)
createdTime := fi.ModTime()
// Sleep and modify the file
time.Sleep(time.Second * 10)
err = s.StorageDriver.PutContent(s.ctx, filePath, contentB)
require.NoError(s.T(), err)
fi, err = s.StorageDriver.Stat(s.ctx, filePath)
require.NoError(s.T(), err)
require.NotNil(s.T(), fi)
modTime := fi.ModTime()
// Check if the modification time is after the creation time.
// In case of cloud storage services, storage frontend nodes might have
// time drift between them, however that should be solved with sleeping
// before update.
assert.Greaterf(
s.T(),
modTime,
createdTime,
"modtime (%s) is before the creation time (%s)", modTime, createdTime,
)
})
// Call on directory with one "file"
// (do not check ModTime as dirs don't need to support it)
s.Run("DirWithFile", func() {
fi, err := s.StorageDriver.Stat(s.ctx, dirPath)
require.NoError(s.T(), err)
require.NotNil(s.T(), fi)
assert.Equal(s.T(), dirPath, fi.Path())
assert.Zero(s.T(), fi.Size())
assert.True(s.T(), fi.IsDir())
})
// Call on directory with another "subdirectory"
s.Run("DirWithSubDir", func() {
fi, err := s.StorageDriver.Stat(s.ctx, dirPathBase)
require.NoError(s.T(), err)
require.NotNil(s.T(), fi)
assert.Equal(s.T(), dirPathBase, fi.Path())
assert.Zero(s.T(), fi.Size())
assert.True(s.T(), fi.IsDir())
})
// Call on a partial name of the directory. This should result in
// not-found, as partial match is still not a match for a directory.
s.Run("DirPartialPrefix", func() {
fi, err := s.StorageDriver.Stat(s.ctx, partialPath)
require.Error(s.T(), err)
assert.ErrorIs(s.T(), err, storagedriver.PathNotFoundError{ // nolint: testifylint
DriverName: s.StorageDriver.Name(),
Path: partialPath,
})
assert.Nil(s.T(), fi)
})
}