testSuite/cmd/clean.go (547 lines of code) (raw):
package cmd
import (
gcpUtils "cloud.google.com/go/storage"
"context"
"errors"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
blobservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake"
datalakefile "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/filesystem"
sharefile "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file"
fileservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/service"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/share"
"github.com/Azure/azure-storage-azcopy/v10/ste"
"google.golang.org/api/iterator"
"net/http"
"net/url"
"os"
"reflect"
"strings"
"time"
"github.com/Azure/azure-storage-azcopy/v10/common"
"github.com/JeffreyRichter/enum/enum"
"github.com/spf13/cobra"
)
var EResourceType = ResourceType(0)
// ResourceType defines the different types of credentials
type ResourceType uint8
func (ResourceType) SingleFile() ResourceType { return ResourceType(0) }
func (ResourceType) Bucket() ResourceType { return ResourceType(1) }
func (ResourceType) Account() ResourceType { return ResourceType(2) } // For SAS or public.
func (ct ResourceType) String() string {
return enum.StringInt(ct, reflect.TypeOf(ct))
}
func (ct *ResourceType) Parse(s string) error {
val, err := enum.ParseInt(reflect.TypeOf(ct), s, true, true)
if err == nil {
*ct = val.(ResourceType)
}
return err
}
var EServiceType = ServiceType(0)
// ServiceType defines the different types of credentials
type ServiceType uint8
func (ServiceType) Blob() ServiceType { return ServiceType(0) }
func (ServiceType) File() ServiceType { return ServiceType(1) }
func (ServiceType) BlobFS() ServiceType { return ServiceType(2) } // For SAS or public.
func (ServiceType) S3() ServiceType { return ServiceType(3) }
func (ServiceType) GCP() ServiceType { return ServiceType(4) }
func (ct ServiceType) String() string {
return enum.StringInt(ct, reflect.TypeOf(ct))
}
func (ct *ServiceType) Parse(s string) error {
val, err := enum.ParseInt(reflect.TypeOf(ct), s, true, true)
if err == nil {
*ct = val.(ServiceType)
}
return err
}
// initializes the clean command, its aliases and description.
func init() {
resourceURL := ""
serviceType := EServiceType.Blob()
resourceType := EResourceType.SingleFile()
var serviceTypeStr string
var resourceTypeStr string
cleanCmd := &cobra.Command{
Use: "clean",
Aliases: []string{"clean"},
Short: "clean deletes everything inside the container.",
Args: func(cmd *cobra.Command, args []string) error {
if len(args) > 1 {
return fmt.Errorf("invalid arguments for clean command")
}
resourceURL = args[0]
return nil
},
Run: func(cmd *cobra.Command, args []string) {
err := (&serviceType).Parse(serviceTypeStr)
if err != nil {
panic(fmt.Errorf("fail to parse service type %q, %v", serviceTypeStr, err))
}
err = (&resourceType).Parse(resourceTypeStr)
if err != nil {
panic(fmt.Errorf("fail to parse resource type %q, %v", resourceTypeStr, err))
}
switch serviceType {
case EServiceType.Blob():
switch resourceType {
case EResourceType.Bucket():
cleanContainer(resourceURL)
case EResourceType.SingleFile():
cleanBlob(resourceURL)
case EResourceType.Account():
cleanBlobAccount(resourceURL)
}
case EServiceType.File():
switch resourceType {
case EResourceType.Bucket():
cleanShare(resourceURL)
case EResourceType.SingleFile():
cleanFile(resourceURL)
case EResourceType.Account():
cleanFileAccount(resourceURL)
}
case EServiceType.BlobFS():
switch resourceType {
case EResourceType.Bucket():
cleanFileSystem(resourceURL)
case EResourceType.SingleFile():
cleanBfsFile(resourceURL)
case EResourceType.Account():
cleanBfsAccount(resourceURL)
}
case EServiceType.S3():
switch resourceType {
case EResourceType.Bucket():
cleanBucket(resourceURL)
case EResourceType.SingleFile():
cleanObject(resourceURL)
case EResourceType.Account():
cleanS3Account(resourceURL)
}
case EServiceType.GCP():
switch resourceType {
case EResourceType.Bucket():
cleanBucket(resourceURL)
case EResourceType.SingleFile():
cleanObject(resourceURL)
case EResourceType.Account():
cleanGCPAccount(resourceURL)
}
default:
panic(fmt.Errorf("illegal resourceType %q", resourceType))
}
},
}
rootCmd.AddCommand(cleanCmd)
cleanCmd.PersistentFlags().StringVar(&resourceTypeStr, "resourceType", "SingleFile", "Resource type, could be single file, bucket or account currently.")
cleanCmd.PersistentFlags().StringVar(&serviceTypeStr, "serviceType", "Blob", "Account type, could be blob, file or blobFS currently.")
}
func cleanContainer(resourceURL string) {
containerClient := createContainerClient(resourceURL)
ctx := context.WithValue(context.Background(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
// Create the container. This will fail if it's already present but this saves us the pain of a container being missing for one reason or another.
_, _ = containerClient.Create(ctx, nil)
// perform a list blob
pager := containerClient.NewListBlobsFlatPager(nil)
for pager.More() {
// look for all blobs that start with the prefix, so that if a blob is under the virtual directory, it will show up
listBlob, err := pager.NextPage(ctx)
if err != nil {
fmt.Println("error listing blobs inside the container. Please check the container sas", err)
os.Exit(1)
}
// Process the blobs returned in this result segment (if the segment is empty, the loop body won't execute)
for _, blobInfo := range listBlob.Segment.BlobItems {
_, err := containerClient.NewBlobClient(*blobInfo.Name).Delete(ctx, &blob.DeleteOptions{DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude)})
if err != nil {
fmt.Println("error deleting the blob from container ", blobInfo.Name)
os.Exit(1)
}
}
}
}
func cleanBlob(resourceURL string) {
blobClient := createBlobClient(resourceURL)
ctx := context.WithValue(context.Background(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
_, err := blobClient.Delete(ctx, &blob.DeleteOptions{DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude)})
if err != nil {
fmt.Println("error deleting the blob ", err)
os.Exit(1)
}
}
func cleanShare(resourceURL string) {
shareClient := createShareClient(resourceURL)
ctx := context.WithValue(context.Background(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
// Create the share. This will fail if it's already present but this saves us the pain of a container being missing for one reason or another.
_, _ = shareClient.Create(ctx, nil)
_, err := shareClient.Delete(ctx, &share.DeleteOptions{DeleteSnapshots: to.Ptr(share.DeleteSnapshotsOptionTypeInclude)})
if err != nil {
var respErr *azcore.ResponseError
if errors.As(err, &respErr) && respErr.StatusCode != http.StatusNotFound {
fmt.Fprintf(os.Stdout, "error deleting the share for clean share, error '%v'\n", err)
os.Exit(1)
}
}
// Sleep seconds to wait the share deletion got succeeded
time.Sleep(45 * time.Second)
_, err = shareClient.Create(ctx, nil)
if err != nil {
fmt.Fprintf(os.Stdout, "error creating the share for clean share, error '%v'\n", err)
os.Exit(1)
}
}
func cleanFile(resourceURL string) {
fileClient := createShareFileClient(resourceURL)
ctx := context.WithValue(context.Background(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
_, err := fileClient.Delete(ctx, nil)
if err != nil {
fmt.Println("error deleting the file ", err)
os.Exit(1)
}
}
func createBlobClient(resourceURL string) *blob.Client {
blobURLParts, err := blob.ParseURL(resourceURL)
if err != nil {
fmt.Println("Failed to parse url")
os.Exit(1)
}
containerClient := createContainerClient(resourceURL)
blobClient := containerClient.NewBlobClient(blobURLParts.BlobName)
if blobURLParts.Snapshot != "" {
blobClient, err = blobClient.WithSnapshot(blobURLParts.Snapshot)
if err != nil {
fmt.Println("Failed to create snapshot client")
os.Exit(1)
}
}
if blobURLParts.VersionID != "" {
blobClient, err = blobClient.WithVersionID(blobURLParts.VersionID)
if err != nil {
fmt.Println("Failed to create version id client")
os.Exit(1)
}
}
return blobClient
}
func createContainerClient(resourceURL string) *container.Client {
blobURLParts, err := blob.ParseURL(resourceURL)
if err != nil {
fmt.Println("Failed to parse url")
os.Exit(1)
}
return createBlobServiceClient(resourceURL).NewContainerClient(blobURLParts.ContainerName)
}
func createBlobServiceClient(resourceURL string) *blobservice.Client {
blobURLParts, err := blob.ParseURL(resourceURL)
if err != nil {
fmt.Println("Failed to parse url")
os.Exit(1)
}
blobURLParts.ContainerName = ""
blobURLParts.BlobName = ""
blobURLParts.VersionID = ""
blobURLParts.Snapshot = ""
// create the pipeline, preferring SAS over account name/key
if blobURLParts.SAS.Encode() != "" {
bsc, err := blobservice.NewClientWithNoCredential(blobURLParts.String(), nil)
if err != nil {
fmt.Println("Failed to create blob service client")
os.Exit(1)
}
return bsc
}
// Get name and key variables from environment.
name := os.Getenv("ACCOUNT_NAME")
key := os.Getenv("ACCOUNT_KEY")
// If the ACCOUNT_NAME and ACCOUNT_KEY are not set in the environment, and there is no SAS token present
if (name == "" && key == "") && blobURLParts.SAS.Encode() == "" {
fmt.Println("ACCOUNT_NAME and ACCOUNT_KEY should be set, or a SAS token should be supplied before cleaning the file system")
os.Exit(1)
}
c, err := blob.NewSharedKeyCredential(name, key)
if err != nil {
fmt.Println("Failed to create shared key credential!")
os.Exit(1)
}
bsc, err := blobservice.NewClientWithSharedKeyCredential(blobURLParts.String(), c, nil)
if err != nil {
fmt.Println("Failed to create blob service client")
os.Exit(1)
}
return bsc
}
func createShareFileClient(resourceURL string) *sharefile.Client {
fileURLParts, err := sharefile.ParseURL(resourceURL)
if err != nil {
fmt.Println("Failed to parse url")
os.Exit(1)
}
shareClient := createShareClient(resourceURL)
fileClient := shareClient.NewRootDirectoryClient().NewFileClient(fileURLParts.DirectoryOrFilePath)
return fileClient
}
//func createShareDirectoryClient(resourceURL string) *sharedirectory.Client {
// fileURLParts, err := sharefile.ParseURL(resourceURL)
// if err != nil {
// fmt.Println("Failed to parse url")
// os.Exit(1)
// }
// shareClient := createShareClient(resourceURL)
// if fileURLParts.DirectoryOrFilePath == "" {
// return shareClient.NewRootDirectoryClient()
// } else {
// return shareClient.NewDirectoryClient(fileURLParts.DirectoryOrFilePath)
// }
//}
func createShareClient(resourceURL string) *share.Client {
fileURLParts, err := sharefile.ParseURL(resourceURL)
if err != nil {
fmt.Println("Failed to parse url")
os.Exit(1)
}
sc := createFileServiceClient(resourceURL).NewShareClient(fileURLParts.ShareName)
if fileURLParts.ShareSnapshot != "" {
sc, err = sc.WithSnapshot(fileURLParts.ShareSnapshot)
if err != nil {
fmt.Println("Failed to parse snapshot")
os.Exit(1)
}
}
return sc
}
func createFileServiceClient(resourceURL string) *fileservice.Client {
fileURLParts, err := sharefile.ParseURL(resourceURL)
if err != nil {
fmt.Println("Failed to parse url")
os.Exit(1)
}
fileURLParts.ShareName = ""
fileURLParts.ShareSnapshot = ""
fileURLParts.DirectoryOrFilePath = ""
// create the pipeline, preferring SAS over account name/key
if fileURLParts.SAS.Encode() != "" {
fsc, err := fileservice.NewClientWithNoCredential(fileURLParts.String(), nil)
if err != nil {
fmt.Println("Failed to create blob service client")
os.Exit(1)
}
return fsc
}
// Get name and key variables from environment.
name := os.Getenv("ACCOUNT_NAME")
key := os.Getenv("ACCOUNT_KEY")
// If the ACCOUNT_NAME and ACCOUNT_KEY are not set in the environment, and there is no SAS token present
if (name == "" && key == "") && fileURLParts.SAS.Encode() == "" {
fmt.Println("ACCOUNT_NAME and ACCOUNT_KEY should be set, or a SAS token should be supplied before cleaning the file system")
os.Exit(1)
}
c, err := sharefile.NewSharedKeyCredential(name, key)
if err != nil {
fmt.Println("Failed to create shared key credential!")
os.Exit(1)
}
fsc, err := fileservice.NewClientWithSharedKeyCredential(fileURLParts.String(), c, nil)
if err != nil {
fmt.Println("Failed to create blob service client")
os.Exit(1)
}
return fsc
}
func createFileSystemClient(resourceURL string) *filesystem.Client {
datalakeURLParts, err := azdatalake.ParseURL(resourceURL)
if err != nil {
fmt.Println("Failed to parse url")
os.Exit(1)
}
datalakeURLParts.FileSystemName = ""
datalakeURLParts.PathName = ""
// create the pipeline, preferring SAS over account name/key
if datalakeURLParts.SAS.Encode() != "" {
fsc, err := filesystem.NewClientWithNoCredential(datalakeURLParts.String(), nil)
if err != nil {
fmt.Println("Failed to create filesystem client")
os.Exit(1)
}
return fsc
}
// Get name and key variables from environment.
name := os.Getenv("ACCOUNT_NAME")
key := os.Getenv("ACCOUNT_KEY")
// If the ACCOUNT_NAME and ACCOUNT_KEY are not set in the environment, and there is no SAS token present
if (name == "" && key == "") && datalakeURLParts.SAS.Encode() == "" {
fmt.Println("ACCOUNT_NAME and ACCOUNT_KEY should be set, or a SAS token should be supplied before cleaning the file system")
os.Exit(1)
}
c, err := azdatalake.NewSharedKeyCredential(name, key)
if err != nil {
fmt.Println("Failed to create shared key credential!")
os.Exit(1)
}
fsc, err := filesystem.NewClientWithSharedKeyCredential(resourceURL, c, nil)
if err != nil {
fmt.Println("Failed to create filesystem client")
os.Exit(1)
}
return fsc
}
func createDatalakeFileClient(resourceURL string) *datalakefile.Client {
datalakeURLParts, err := azdatalake.ParseURL(resourceURL)
if err != nil {
fmt.Println("Failed to parse url")
os.Exit(1)
}
fileSystemClient := createFileSystemClient(resourceURL)
fileClient := fileSystemClient.NewFileClient(datalakeURLParts.PathName)
return fileClient
}
func cleanFileSystem(resourceURL string) {
fsc := createFileSystemClient(resourceURL)
ctx := context.WithValue(context.Background(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
// Instead of error checking the delete, error check the create.
// If the filesystem is deleted somehow, this recovers us from CI hell.
_, err := fsc.Delete(ctx, nil)
if err != nil {
fmt.Println(fmt.Fprintf(os.Stdout, "error deleting the file system for cleaning, %v", err))
// don't fail just log
}
// Sleep seconds to wait the share deletion got succeeded
time.Sleep(45 * time.Second)
_, err = fsc.Create(ctx, nil)
if err != nil {
fmt.Println(fmt.Fprintf(os.Stdout, "error creating the file system for cleaning, %v", err))
os.Exit(1)
}
}
func cleanBfsFile(resourceURL string) {
fc := createDatalakeFileClient(resourceURL)
ctx := context.WithValue(context.Background(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
_, err := fc.Delete(ctx, nil)
if err != nil {
fmt.Printf("error deleting the blob FS file, %v\n", err)
os.Exit(1)
}
}
func cleanBlobAccount(resourceURL string) {
serviceClient := createBlobServiceClient(resourceURL)
ctx := context.WithValue(context.Background(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
// perform a list account
pager := serviceClient.NewListContainersPager(nil)
for pager.More() {
// look for all blobs that start with the prefix, so that if a blob is under the virtual directory, it will show up
lResp, err := pager.NextPage(ctx)
if err != nil {
fmt.Println("error listing containers, please check the container sas, ", err)
os.Exit(1)
}
for _, containerItem := range lResp.ContainerItems {
_, err := serviceClient.NewContainerClient(*containerItem.Name).Delete(ctx, nil)
if err != nil {
fmt.Println("error deleting the container from account, ", err)
os.Exit(1)
}
}
}
}
func cleanFileAccount(resourceURL string) {
serviceClient := createFileServiceClient(resourceURL)
ctx := context.WithValue(context.Background(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
// perform a list account
pager := serviceClient.NewListSharesPager(nil)
for pager.More() {
// look for all blobs that start with the prefix, so that if a blob is under the virtual directory, it will show up
lResp, err := pager.NextPage(ctx)
if err != nil {
fmt.Println("error listing shares, please check the share sas, ", err)
os.Exit(1)
}
for _, shareItem := range lResp.Shares {
_, err := serviceClient.NewShareClient(*shareItem.Name).Delete(ctx, &share.DeleteOptions{DeleteSnapshots: to.Ptr(share.DeleteSnapshotsOptionTypeInclude)})
if err != nil {
fmt.Println("error deleting the share from account, ", err)
os.Exit(1)
}
}
}
}
func cleanS3Account(resourceURL string) {
u, err := url.Parse(resourceURL)
if err != nil {
fmt.Println("fail to parse the S3 service URL, ", err)
os.Exit(1)
}
s3URLParts, err := common.NewS3URLParts(*u)
if err != nil {
fmt.Println("new S3 URL parts, ", err)
os.Exit(1)
}
s3Client := createS3ClientWithMinio(createS3ResOptions{
Location: s3URLParts.Region,
})
buckets, err := s3Client.ListBuckets()
if err != nil {
fmt.Println("error listing S3 service, ", err)
os.Exit(1)
}
for _, bucket := range buckets {
// Remove all the things in bucket with prefix
if !strings.HasPrefix(bucket.Name, "s2scopybucket") {
continue // skip buckets not created by s2s copy testings.
}
objectsCh := make(chan string)
go func() {
defer close(objectsCh)
// List all objects from a bucket-name with a matching prefix.
for object := range s3Client.ListObjectsV2(bucket.Name, "", true, context.Background().Done()) {
if object.Err != nil {
fmt.Printf("error listing the objects from bucket %q, %v\n", bucket.Name, err)
return
}
objectsCh <- object.Key
}
}()
// List bucket, and delete all the objects in the bucket
_ = s3Client.RemoveObjects(bucket.Name, objectsCh)
// Remove the bucket.
if err := s3Client.RemoveBucket(bucket.Name); err != nil {
fmt.Printf("error deleting the bucket %q from account, %v\n", bucket.Name, err)
}
}
}
func cleanGCPAccount(resourceURL string) {
u, err := url.Parse(resourceURL)
if err != nil {
fmt.Println("fail to parse the GCP service URL, ", err)
os.Exit(1)
}
_, err = common.NewGCPURLParts(*u)
if err != nil {
fmt.Println("new GCP URL parts, ", err)
os.Exit(1)
}
gcpClient, _ := createGCPClientWithGCSSDK()
it := gcpClient.Buckets(context.Background(), os.Getenv("GOOGLE_CLOUD_PROJECT"))
for {
battrs, err := it.Next()
if err == iterator.Done {
break
}
if err == nil {
if !strings.HasPrefix(battrs.Name, "s2scopybucket") {
continue // skip buckets not created by s2s copy testings.
}
objectsCh := make(chan string)
go func() {
defer close(objectsCh)
// List all objects from a bucket-name with a matching prefix.
itObj := gcpClient.Bucket(battrs.Name).Objects(context.Background(), nil)
for {
attrs, err := itObj.Next()
if err == iterator.Done {
break
}
if err == nil {
objectsCh <- attrs.Name
} else {
fmt.Printf("error listing the objects from bucket %q, %v\n", battrs.Name, err)
return
}
}
}()
deleteGCPBucket(gcpClient, battrs.Name)
}
}
}
func deleteGCPBucket(client *gcpUtils.Client, bucketName string) {
bucket := client.Bucket(bucketName)
ctx := context.Background()
it := bucket.Objects(ctx, &gcpUtils.Query{Prefix: ""})
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err == nil {
err = bucket.Object(attrs.Name).Delete(context.TODO())
if err != nil {
fmt.Println("Could not clear GCS Buckets.")
return
}
}
}
err := bucket.Delete(context.Background())
if err != nil {
fmt.Printf("Failed to Delete GCS Bucket %v", bucketName)
}
}
func cleanBfsAccount(resourceURL string) {
panic("not implemented: not used")
}
func cleanBucket(resourceURL string) {
panic("not implemented: not used")
}
func cleanObject(resourceURL string) {
panic("not implemented: not used")
}