e2etest/helpers.go (425 lines of code) (raw):
// Copyright © Microsoft <wastore@microsoft.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
// TODO this file was forked from the cmd package, it needs to cleaned to keep only the necessary part
package e2etest
import (
"bytes"
"context"
"encoding/base64"
"encoding/binary"
"flag"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/pageblob"
blobsas "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
blobservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake"
datalakefile "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/filesystem"
datalakesas "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/sas"
datalakeservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/service"
sharefile "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file"
filesas "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/sas"
fileservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/service"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/share"
"io"
"math/rand"
"mime"
"os"
"strings"
"time"
"github.com/Azure/azure-storage-azcopy/v10/common"
"github.com/Azure/azure-storage-azcopy/v10/ste"
"github.com/minio/minio-go"
"github.com/minio/minio-go/pkg/credentials"
chk "gopkg.in/check.v1"
)
var ctx = context.Background()
const (
blockBlobDefaultData = "AzCopy Random Test Data"
bucketPrefix = "s3bucket"
objectPrefix = "s3object"
objectDefaultData = "AzCopy default data for S3 object"
fileDefaultData = "AzCopy Random Test Data"
sharePrefix = "share"
azureFilePrefix = "azfile"
defaultAzureFileSizeInBytes = 1000
blobfsPrefix = "blobfs"
defaultBlobFSFileSizeInBytes = 1000
)
var runLocallyOnly = flag.Bool("local-tests", false, "Tests with this flag are run locally only")
func pointerTo[T any](in T) *T {
return &in
}
// if S3_TESTS_OFF is set at all, S3 tests are disabled.
func isS3Disabled() bool {
return strings.ToLower(os.Getenv("S3_TESTS_OFF")) != ""
}
func skipIfS3Disabled(c asserter) {
if isS3Disabled() {
c.Skip("S3 testing is disabled for this unit test suite run.")
}
}
func generateContainerName(c asserter) string {
return generateName(c, containerPrefix, 63)
}
func generateBlobName(c asserter) string {
return generateName(c, blobPrefix, 0)
}
func generateBucketName(c asserter) string {
return generateName(c, bucketPrefix, 63)
}
func generateBucketNameWithCustomizedPrefix(c asserter, customizedPrefix string) string {
return generateName(c, customizedPrefix, 63)
}
func generateObjectName(c asserter) string {
return generateName(c, objectPrefix, 0)
}
func generateShareName(c asserter) string {
return generateName(c, sharePrefix, 63)
}
func generateFilesystemName(c asserter) string {
return generateName(c, blobfsPrefix, 63)
}
func getShareURL(c asserter, fsc *fileservice.Client) (sc *share.Client, name string) {
name = generateShareName(c)
sc = fsc.NewShareClient(name)
return sc, name
}
func generateAzureFileName(c asserter) string {
return generateName(c, azureFilePrefix, 0)
}
func generateBfsFileName(c asserter) string {
return generateName(c, blobfsPrefix, 0)
}
func getContainerURL(c asserter, bsc *blobservice.Client) (cc *container.Client, name string) {
name = generateContainerName(c)
cc = bsc.NewContainerClient(name)
return
}
func getFilesystemURL(c asserter, dsc *datalakeservice.Client) (fsc *filesystem.Client, name string) {
name = generateFilesystemName(c)
fsc = dsc.NewFileSystemClient(name)
return
}
func getBlockBlobURL(c asserter, cc *container.Client, prefix string) (bc *blockblob.Client, name string) {
name = prefix + generateBlobName(c)
bc = cc.NewBlockBlobClient(name)
return bc, name
}
func getBfsFileURL(c asserter, fsc *filesystem.Client, prefix string) (fc *datalakefile.Client, name string) {
name = prefix + generateBfsFileName(c)
fc = fsc.NewFileClient(name)
return
}
func getAppendBlobURL(c asserter, cc *container.Client, prefix string) (bc *appendblob.Client, name string) {
name = generateBlobName(c)
bc = cc.NewAppendBlobClient(prefix + name)
return
}
func getPageBlobURL(c asserter, cc *container.Client, prefix string) (bc *pageblob.Client, name string) {
name = generateBlobName(c)
bc = cc.NewPageBlobClient(prefix + name)
return
}
func getAzureFileURL(c asserter, sc *share.Client, prefix string) (fc *sharefile.Client, name string) {
name = prefix + generateAzureFileName(c)
fc = sc.NewRootDirectoryClient().NewFileClient(name)
return
}
// todo: consider whether to replace with common.NewRandomDataGenerator, which is
//
// believed to be faster
func getRandomDataAndReader(n int) (io.ReadSeekCloser, []byte) {
data := make([]byte, n)
rand.Read(data)
return streaming.NopCloser(bytes.NewReader(data)), data
}
func createNewContainer(c asserter, bsc *blobservice.Client) (cc *container.Client, name string) {
cc, name = getContainerURL(c, bsc)
_, err := cc.Create(ctx, nil)
c.AssertNoErr(err)
return
}
func createNewFilesystem(c asserter, dsc *datalakeservice.Client) (fsc *filesystem.Client, name string) {
fsc, name = getFilesystemURL(c, dsc)
_, err := fsc.Create(ctx, nil)
c.AssertNoErr(err)
return
}
func createNewBfsFile(c asserter, fsc *filesystem.Client, prefix string) (fc *datalakefile.Client, name string) {
fc, name = getBfsFileURL(c, fsc, prefix)
// Create the file
_, err := fc.Create(ctx, nil)
c.AssertNoErr(err)
_, err = fc.AppendData(ctx, 0, streaming.NopCloser(strings.NewReader(string(make([]byte, defaultBlobFSFileSizeInBytes)))), nil)
c.AssertNoErr(err)
_, err = fc.FlushData(ctx, defaultBlobFSFileSizeInBytes, &datalakefile.FlushDataOptions{Close: to.Ptr(true)})
c.AssertNoErr(err)
return
}
func createNewBlockBlob(c asserter, cc *container.Client, prefix string) (bc *blockblob.Client, name string) {
bc, name = getBlockBlobURL(c, cc, prefix)
_, err := bc.Upload(ctx, streaming.NopCloser(strings.NewReader(blockBlobDefaultData)), nil)
c.AssertNoErr(err)
return
}
func createNewAzureShare(c asserter, fsc *fileservice.Client) (sc *share.Client, name string) {
sc, name = getShareURL(c, fsc)
_, err := sc.Create(ctx, nil)
c.AssertNoErr(err)
return sc, name
}
func createNewAzureFile(c asserter, sc *share.Client, prefix string) (fc *sharefile.Client, name string) {
fc, name = getAzureFileURL(c, sc, prefix)
// generate parents first
generateParentsForAzureFile(c, fc, sc)
_, err := fc.Create(ctx, defaultAzureFileSizeInBytes, nil)
c.AssertNoErr(err)
return
}
func newNullFolderCreationTracker() ste.FolderCreationTracker {
return ste.NewFolderCreationTracker(common.EFolderPropertiesOption.NoFolders(), nil)
}
func getFileShareClient(c asserter) *share.Client {
accountName, accountKey := GlobalInputManager{}.GetAccountAndKey(EAccountType.Standard())
u := fmt.Sprintf("https://%s.file.core.windows.net/%s", accountName, generateShareName(c))
credential, err := share.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
panic(err)
}
client, err := share.NewClientWithSharedKeyCredential(u, credential, &share.ClientOptions{AllowTrailingDot: to.Ptr(true)})
if err != nil {
panic(err)
}
return client
}
func generateParentsForAzureFile(c asserter, fc *sharefile.Client, fsc *share.Client) {
err := ste.AzureFileParentDirCreator{}.CreateParentDirToRoot(ctx, fc, fsc, newNullFolderCreationTracker())
c.AssertNoErr(err)
}
func createNewAppendBlob(c asserter, cc *container.Client, prefix string) (bc *appendblob.Client, name string) {
bc, name = getAppendBlobURL(c, cc, prefix)
_, err := bc.Create(ctx, nil)
c.AssertNoErr(err)
return
}
func createNewPageBlob(c asserter, cc *container.Client, prefix string) (bc *pageblob.Client, name string) {
bc, name = getPageBlobURL(c, cc, prefix)
_, err := bc.Create(ctx, pageblob.PageBytes*10, nil)
c.AssertNoErr(err)
return
}
func deleteContainer(c asserter, cc *container.Client) {
_, err := cc.Delete(ctx, nil)
c.AssertNoErr(err)
}
func deleteFilesystem(c asserter, fsc *filesystem.Client) {
_, err := fsc.Delete(ctx, nil)
c.AssertNoErr(err)
}
type createS3ResOptions struct {
Location string
}
func createS3ClientWithMinio(c asserter, o createS3ResOptions) (*minio.Client, error) {
skipIfS3Disabled(c)
accessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")
secretAccessKey := os.Getenv("AWS_SECRET_ACCESS_KEY")
if accessKeyID == "" || secretAccessKey == "" {
cred := credentials.NewStatic("", "", "", credentials.SignatureAnonymous)
return minio.NewWithOptions("s3.amazonaws.com", &minio.Options{Creds: cred, Secure: true, Region: o.Location})
}
s3Client, err := minio.NewWithRegion("s3.amazonaws.com", accessKeyID, secretAccessKey, true, o.Location)
if err != nil {
return nil, err
}
return s3Client, nil
}
func createNewBucket(c asserter, client *minio.Client, o createS3ResOptions) string {
bucketName := generateBucketName(c)
err := client.MakeBucket(bucketName, o.Location)
c.AssertNoErr(err)
return bucketName
}
func createNewBucketWithName(c asserter, client *minio.Client, bucketName string, o createS3ResOptions) {
err := client.MakeBucket(bucketName, o.Location)
c.AssertNoErr(err)
}
func createNewObject(c asserter, client *minio.Client, bucketName string, prefix string) (objectKey string) {
objectKey = prefix + generateObjectName(c)
size := int64(len(objectDefaultData))
n, err := client.PutObject(bucketName, objectKey, strings.NewReader(objectDefaultData), size, minio.PutObjectOptions{})
c.AssertNoErr(err)
c.Assert(n, equals(), size)
return
}
func deleteBucket(_ asserter, client *minio.Client, bucketName string, waitQuarterMinute bool) {
// If we error out in this function, simply just skip over deleting the bucket.
// Some of our buckets have become "ghost" buckets in the past.
// Ghost buckets show up in list calls but can't actually be interacted with.
// Some ghost buckets are temporary, others are permanent.
// As such, we need a way to deal with them when they show up.
// By doing this, they'll just be cleaned up the next test run instead of failing all tests.
objectsCh := make(chan string)
go func() {
defer close(objectsCh)
// List all objects from a bucket-name with a matching prefix.
for object := range client.ListObjectsV2(bucketName, "", true, context.Background().Done()) {
if object.Err != nil {
return
}
objectsCh <- object.Key
}
}()
// List bucket, and delete all the objects in the bucket
errChn := client.RemoveObjects(bucketName, objectsCh)
var err error
for rmObjErr := range errChn {
if rmObjErr.Err != nil {
return
}
}
// Remove the bucket.
err = client.RemoveBucket(bucketName)
if err != nil {
return
}
if waitQuarterMinute {
time.Sleep(time.Second * 15)
}
}
func cleanS3Account(c asserter, client *minio.Client) {
buckets, err := client.ListBuckets()
if err != nil {
return
}
for _, bucket := range buckets {
if strings.Contains(bucket.Name, "elastic") {
continue
}
deleteBucket(c, client, bucket.Name, false)
}
time.Sleep(time.Minute)
}
func cleanBlobAccount(c asserter, sc *blobservice.Client) {
pager := sc.NewListContainersPager(nil)
for pager.More() {
resp, err := pager.NextPage(ctx)
c.AssertNoErr(err)
for _, v := range resp.ContainerItems {
_, err = sc.NewContainerClient(*v.Name).Delete(ctx, nil)
c.AssertNoErr(err)
}
}
}
func cleanFileAccount(c asserter, sc *fileservice.Client) {
pager := sc.NewListSharesPager(nil)
for pager.More() {
resp, err := pager.NextPage(ctx)
c.AssertNoErr(err)
for _, v := range resp.Shares {
_, err = sc.NewShareClient(*v.Name).Delete(ctx, nil)
c.AssertNoErr(err)
}
}
time.Sleep(time.Minute)
}
func deleteShare(c asserter, sc *share.Client) {
_, err := sc.Delete(ctx, &share.DeleteOptions{DeleteSnapshots: to.Ptr(share.DeleteSnapshotsOptionTypeInclude)})
c.AssertNoErr(err)
}
// Some tests require setting service properties. It can take up to 30 seconds for the new properties to be reflected across all FEs.
// We will enable the necessary property and try to run the test implementation. If it fails with an error that should be due to
// those changes not being reflected yet, we will wait 30 seconds and try the test again. If it fails this time for any reason,
// we fail the test. It is the responsibility of the the testImplFunc to determine which error string indicates the test should be retried.
// There can only be one such string. All errors that cannot be due to this detail should be asserted and not returned as an error string.
func runTestRequiringServiceProperties(c *chk.C, bsc *blobservice.Client, code string,
enableServicePropertyFunc func(*chk.C, *blobservice.Client),
testImplFunc func(*chk.C, *blobservice.Client) error,
disableServicePropertyFunc func(*chk.C, *blobservice.Client)) {
enableServicePropertyFunc(c, bsc)
defer disableServicePropertyFunc(c, bsc)
err := testImplFunc(c, bsc)
// We cannot assume that the error indicative of slow update will necessarily be a StorageError. As in ListBlobs.
if err != nil && err.Error() == code {
time.Sleep(time.Second * 30)
err = testImplFunc(c, bsc)
c.Assert(err, chk.IsNil)
}
}
func getContainerURLWithSAS(c asserter, credential *blob.SharedKeyCredential, containerName string) *container.Client {
rawURL := fmt.Sprintf("https://%s.blob.core.windows.net/%s",
credential.AccountName(), containerName)
cc, err := container.NewClientWithSharedKeyCredential(rawURL, credential, nil)
c.AssertNoErr(err)
sasURL, err := cc.GetSASURL(blobsas.ContainerPermissions{Read: true, Add: true, Write: true, Create: true, Delete: true, List: true, Tag: true},
time.Now().UTC().Add(48*time.Hour), nil)
c.AssertNoErr(err)
cc, err = container.NewClientWithNoCredential(sasURL, nil)
c.AssertNoErr(err)
return cc
}
func getBlobServiceURLWithSAS(c asserter, credential *blob.SharedKeyCredential) *blobservice.Client {
rawURL := fmt.Sprintf("https://%s.blob.core.windows.net/",
credential.AccountName())
bsc, err := blobservice.NewClientWithSharedKeyCredential(rawURL, credential, nil)
c.AssertNoErr(err)
sasURL, err := bsc.GetSASURL(blobsas.AccountResourceTypes{Service: true, Container: true, Object: true},
blobsas.AccountPermissions{Read: true, List: true, Write: true, Delete: true, DeletePreviousVersion: true, Add: true, Create: true, Update: true, Process: true},
time.Now().UTC().Add(48*time.Hour), nil)
c.AssertNoErr(err)
bsc, err = blobservice.NewClientWithNoCredential(sasURL, nil)
c.AssertNoErr(err)
return bsc
}
func getFileServiceURLWithSAS(c asserter, credential *sharefile.SharedKeyCredential) *fileservice.Client {
rawURL := fmt.Sprintf("https://%s.file.core.windows.net/",
credential.AccountName())
fsc, err := fileservice.NewClientWithSharedKeyCredential(rawURL, credential, nil)
c.AssertNoErr(err)
sasURL, err := fsc.GetSASURL(filesas.AccountResourceTypes{Service: true, Container: true, Object: true},
filesas.AccountPermissions{Read: true, List: true, Write: true, Delete: true, Create: true},
time.Now().UTC().Add(48*time.Hour), nil)
c.AssertNoErr(err)
fsc, err = fileservice.NewClientWithNoCredential(sasURL, nil)
c.AssertNoErr(err)
return fsc
}
func getShareURLWithSAS(c asserter, credential *sharefile.SharedKeyCredential, shareName string) *share.Client {
rawURL := fmt.Sprintf("https://%s.file.core.windows.net/%s",
credential.AccountName(), shareName)
sc, err := share.NewClientWithSharedKeyCredential(rawURL, credential, nil)
c.AssertNoErr(err)
sasURL, err := sc.GetSASURL(filesas.SharePermissions{Read: true, Write: true, Create: true, Delete: true, List: true},
time.Now().UTC().Add(48*time.Hour), nil)
c.AssertNoErr(err)
sc, err = share.NewClientWithNoCredential(sasURL, nil)
c.AssertNoErr(err)
return sc
}
func getAdlsServiceURLWithSAS(c asserter, credential *azdatalake.SharedKeyCredential) *datalakeservice.Client {
rawURL := fmt.Sprintf("https://%s.dfs.core.windows.net/", credential.AccountName())
dsc, err := datalakeservice.NewClientWithSharedKeyCredential(rawURL, credential, nil)
c.AssertNoErr(err)
sasURL, err := dsc.GetSASURL(datalakesas.AccountResourceTypes{Service: true, Container: true, Object: true},
datalakesas.AccountPermissions{Read: true, Write: true, Create: true, Delete: true, List: true, Add: true, Update: true, Process: true},
time.Now().UTC().Add(48*time.Hour), nil)
c.AssertNoErr(err)
dsc, err = datalakeservice.NewClientWithNoCredential(sasURL, nil)
c.AssertNoErr(err)
return dsc
}
// check.v1 style "StringContains" checker
type stringContainsChecker struct {
*chk.CheckerInfo
}
// Check
func (checker *stringContainsChecker) Check(params []interface{}, _ []string) (result bool, error string) {
if len(params) < 2 {
return false, "StringContains requires two parameters"
} // Ignore extra parameters
// Assert that params[0] and params[1] are strings
aStr, aOK := params[0].(string)
bStr, bOK := params[1].(string)
if !aOK || !bOK {
return false, "All parameters must be strings"
}
if strings.Contains(aStr, bStr) {
return true, ""
}
return false, fmt.Sprintf("Failed to find substring in source string:\n\n"+
"SOURCE: %s\n"+
"EXPECTED: %s\n", aStr, bStr)
}
func GetContentTypeMap(fileExtensions []string) map[string]string {
extensionsMap := make(map[string]string)
for _, ext := range fileExtensions {
if guessedType := mime.TypeByExtension(ext); guessedType != "" {
extensionsMap[ext] = strings.Split(guessedType, ";")[0]
}
}
return extensionsMap
}
// BlockIDIntToBase64 functions convert an int block ID to a base-64 string and vice versa
func BlockIDIntToBase64(blockID int) string {
binaryBlockID := (&[4]byte{})[:]
binary.LittleEndian.PutUint32(binaryBlockID, uint32(blockID))
return base64.StdEncoding.EncodeToString(binaryBlockID)
}