testSuite/cmd/create.go (376 lines of code) (raw):
package cmd
import (
"bytes"
gcpUtils "cloud.google.com/go/storage"
"context"
"crypto/md5"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
sharedirectory "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/directory"
sharefile "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/share"
"net/url"
"os"
"time"
"io"
"math/rand"
"net/http"
"strings"
"github.com/Azure/azure-storage-azcopy/v10/common"
minio "github.com/minio/minio-go"
"github.com/spf13/cobra"
)
const charset = "abcdefghijklmnopqrstuvwxyz" +
"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
func createStringWithRandomChars(length int) string {
b := make([]byte, length)
for i := range b {
b[i] = charset[rand.Int()%len(charset)]
}
return string(b)
}
var genMD5 = false
// initializes the create command, its aliases and description.
func init() {
resourceURL := ""
serviceType := EServiceType.Blob()
resourceType := EResourceType.SingleFile()
serviceTypeStr := ""
resourceTypeStr := ""
blobSize := uint32(0)
metadata := ""
contentType := ""
contentEncoding := ""
contentDisposition := ""
contentLanguage := ""
cacheControl := ""
contentMD5 := ""
location := ""
var tier *blob.AccessTier = nil
createCmd := &cobra.Command{
Use: "create",
Aliases: []string{"create"},
Short: "create creates resource.",
Args: func(cmd *cobra.Command, args []string) error {
if len(args) > 1 {
return fmt.Errorf("invalid arguments for create command")
}
resourceURL = args[0]
return nil
},
Run: func(cmd *cobra.Command, args []string) {
err := (&serviceType).Parse(serviceTypeStr)
if err != nil {
panic(fmt.Errorf("fail to parse service type %q, %v", serviceTypeStr, err))
}
err = (&resourceType).Parse(resourceTypeStr)
if err != nil {
panic(fmt.Errorf("fail to parse resource type %q, %v", resourceTypeStr, err))
}
var md5 []byte
if contentMD5 != "" {
md5 = []byte(contentMD5)
}
switch serviceType {
case EServiceType.Blob():
switch resourceType {
case EResourceType.Bucket():
createContainer(resourceURL)
case EResourceType.SingleFile():
createBlob(
resourceURL,
blobSize,
getMetadata(metadata),
&blob.HTTPHeaders{
BlobContentType: &contentType,
BlobContentDisposition: &contentDisposition,
BlobContentEncoding: &contentEncoding,
BlobContentLanguage: &contentLanguage,
BlobContentMD5: md5,
BlobCacheControl: &cacheControl,
}, tier)
default:
panic(fmt.Errorf("not implemented %v", resourceType))
}
case EServiceType.File():
switch resourceType {
case EResourceType.Bucket():
createShareOrDirectory(resourceURL)
case EResourceType.SingleFile():
createFile(
resourceURL,
blobSize,
getMetadata(metadata),
&sharefile.HTTPHeaders{
ContentType: &contentType,
ContentDisposition: &contentDisposition,
ContentEncoding: &contentEncoding,
ContentLanguage: &contentLanguage,
ContentMD5: md5,
CacheControl: &cacheControl,
})
default:
panic(fmt.Errorf("not implemented %v", resourceType))
}
case EServiceType.S3():
switch resourceType {
case EResourceType.Bucket():
createBucket(resourceURL)
case EResourceType.SingleFile():
// For S3, no content-MD5 will be returned during HEAD, i.e. no content-MD5 will be preserved during copy.
// And content-MD5 header is not set during upload. E.g. in S3 management portal, no property content-MD5 can be set.
// So here create object without content-MD5 as common practice.
createObject(
resourceURL,
blobSize,
minio.PutObjectOptions{
ContentType: contentType,
ContentDisposition: contentDisposition,
ContentEncoding: contentEncoding,
ContentLanguage: contentLanguage,
CacheControl: cacheControl,
UserMetadata: getS3Metadata(metadata),
})
default:
panic(fmt.Errorf("not implemented %v", resourceType))
}
case EServiceType.GCP():
switch resourceType {
case EResourceType.Bucket():
createGCPBucket(resourceURL)
case EResourceType.SingleFile():
createGCPObject(resourceURL, blobSize, gcpUtils.ObjectAttrsToUpdate{
ContentType: contentType,
ContentDisposition: contentDisposition,
ContentEncoding: contentEncoding,
ContentLanguage: contentLanguage,
CacheControl: cacheControl,
Metadata: getS3Metadata(metadata),
})
}
case EServiceType.BlobFS():
panic(fmt.Errorf("not implemented %v", serviceType))
default:
panic(fmt.Errorf("illegal resourceType %q", resourceType))
}
},
}
rootCmd.AddCommand(createCmd)
createCmd.PersistentFlags().StringVar(&serviceTypeStr, "serviceType", "Blob", "Service type, could be blob, file or blobFS currently.")
createCmd.PersistentFlags().StringVar(&resourceTypeStr, "resourceType", "SingleFile", "Resource type, could be a single file, bucket.")
createCmd.PersistentFlags().Uint32Var(&blobSize, "blob-size", 0, "")
createCmd.PersistentFlags().StringVar(&metadata, "metadata", "", "metadata for blob.")
createCmd.PersistentFlags().StringVar(&contentType, "content-type", "", "content type for blob.")
createCmd.PersistentFlags().StringVar(&contentEncoding, "content-encoding", "", "content encoding for blob.")
createCmd.PersistentFlags().StringVar(&contentDisposition, "content-disposition", "", "content disposition for blob.")
createCmd.PersistentFlags().StringVar(&contentLanguage, "content-language", "", "content language for blob.")
createCmd.PersistentFlags().StringVar(&cacheControl, "cache-control", "", "cache control for blob.")
createCmd.PersistentFlags().StringVar(&contentMD5, "content-md5", "", "content MD5 for blob.")
createCmd.PersistentFlags().StringVar(&location, "location", "", "Location of the Azure account or S3 bucket to create")
createCmd.PersistentFlags().BoolVar(&genMD5, "generate-md5", false, "auto-generate MD5 for a new blob")
}
func getMetadata(metadataString string) map[string]*string {
var metadata map[string]*string
if len(metadataString) > 0 {
metadata = map[string]*string{}
for _, keyAndValue := range strings.Split(metadataString, ";") { // key/value pairs are separated by ';'
kv := strings.Split(keyAndValue, "=") // key/value are separated by '='
metadata[kv[0]] = to.Ptr(kv[1])
}
}
return metadata
}
func getS3Metadata(metadataString string) map[string]string {
metadata := make(map[string]string)
if len(metadataString) > 0 {
for _, keyAndValue := range strings.Split(metadataString, ";") { // key/value pairs are separated by ';'
kv := strings.Split(keyAndValue, "=") // key/value are separated by '='
metadata[kv[0]] = kv[1]
}
}
return metadata
}
// Can be used for overwrite scenarios.
func createContainer(containerURL string) {
containerClient, _ := container.NewClientWithNoCredential(containerURL, nil)
_, err := containerClient.Create(context.Background(), nil)
if ignoreStorageConflictStatus(err) != nil {
fmt.Println("fail to create container, ", err)
os.Exit(1)
}
}
func createBlob(blobURL string, blobSize uint32, metadata map[string]*string, blobHTTPHeaders *blob.HTTPHeaders, tier *blob.AccessTier) {
blobClient, _ := blockblob.NewClientWithNoCredential(blobURL, nil)
randomString := createStringWithRandomChars(int(blobSize))
if blobHTTPHeaders.BlobContentType == nil {
blobHTTPHeaders.BlobContentType = to.Ptr(strings.Split(http.DetectContentType([]byte(randomString)), ";")[0])
}
// Generate a content MD5 for the new blob if requested
if genMD5 {
md5hasher := md5.New()
md5hasher.Write([]byte(randomString))
blobHTTPHeaders.BlobContentMD5 = md5hasher.Sum(nil)
}
_, err := blobClient.Upload(context.Background(), streaming.NopCloser(strings.NewReader(randomString)),
&blockblob.UploadOptions{
HTTPHeaders: blobHTTPHeaders,
Metadata: metadata,
Tier: tier,
})
if err != nil {
fmt.Printf("error uploading the blob %v\n", err)
os.Exit(1)
}
}
func createShareOrDirectory(shareOrDirectoryURLStr string) {
fileURLParts, err := sharefile.ParseURL(shareOrDirectoryURLStr)
if err != nil {
fmt.Println("error createShareOrDirectory with URL, ", err)
os.Exit(1)
}
isShare := false
if fileURLParts.ShareName != "" && fileURLParts.DirectoryOrFilePath == "" {
isShare = true
// This is a share
shareClient, _ := share.NewClientWithNoCredential(shareOrDirectoryURLStr, nil)
_, err := shareClient.Create(context.Background(), nil)
if ignoreStorageConflictStatus(err) != nil {
fmt.Println("fail to create share, ", err)
os.Exit(1)
}
}
directoryClient, _ := sharedirectory.NewClientWithNoCredential(shareOrDirectoryURLStr, nil) // i.e. root directory, in share's case
if !isShare {
_, err := directoryClient.Create(context.Background(), nil)
if ignoreStorageConflictStatus(err) != nil {
fmt.Println("fail to create directory, ", err)
os.Exit(1)
}
}
// Finally valdiate if directory with specified URL exists, if doesn't exist, then report create failure.
time.Sleep(1 * time.Second)
_, err = directoryClient.GetProperties(context.Background(), nil)
if err != nil {
fmt.Println("error createShareOrDirectory with URL, ", err)
os.Exit(1)
}
}
func createFile(fileURLStr string, fileSize uint32, metadata map[string]*string, fileHTTPHeaders *sharefile.HTTPHeaders) {
fileClient, _ := sharefile.NewClientWithNoCredential(fileURLStr, nil)
randomString := createStringWithRandomChars(int(fileSize))
if fileHTTPHeaders.ContentType == nil {
fileHTTPHeaders.ContentType = to.Ptr(strings.Split(http.DetectContentType([]byte(randomString)), ";")[0])
}
// Generate a content MD5 for the new blob if requested
if genMD5 {
md5hasher := md5.New()
md5hasher.Write([]byte(randomString))
fileHTTPHeaders.ContentMD5 = md5hasher.Sum(nil)
}
_, err := fileClient.Create(context.Background(), int64(fileSize), &sharefile.CreateOptions{HTTPHeaders: fileHTTPHeaders, Metadata: metadata})
if err != nil {
fmt.Printf("error creating the file %v\n", err)
os.Exit(1)
}
if fileSize > 0 {
err = fileClient.UploadBuffer(context.Background(), []byte(randomString), nil)
if err != nil {
fmt.Printf("error uploading the file %v\n", err)
os.Exit(1)
}
}
}
func createBucket(bucketURLStr string) {
u, err := url.Parse(bucketURLStr)
if err != nil {
fmt.Println("fail to parse the bucket URL, ", err)
os.Exit(1)
}
s3URLParts, err := common.NewS3URLParts(*u)
if err != nil {
fmt.Println("new S3 URL parts, ", err)
os.Exit(1)
}
s3Client := createS3ClientWithMinio(createS3ResOptions{
Location: s3URLParts.Region,
})
if err := s3Client.MakeBucket(s3URLParts.BucketName, s3URLParts.Region); err != nil {
exists, err := s3Client.BucketExists(s3URLParts.BucketName)
if err != nil || !exists {
fmt.Println("fail to create bucket, ", err)
os.Exit(1)
}
}
}
func createGCPBucket(bucketURLStr string) {
u, err := url.Parse(bucketURLStr)
if err != nil {
fmt.Println("fail to parse the bucket URL, ", err)
os.Exit(1)
}
gcpURLParts, err := common.NewGCPURLParts(*u)
if err != nil {
fmt.Println("new GCP URL parts, ", err)
os.Exit(1)
}
gcpClient, err := createGCPClientWithGCSSDK()
if err != nil {
fmt.Println("Failed to create GCS Client: ", err)
}
bkt := gcpClient.Bucket(gcpURLParts.BucketName)
err = bkt.Create(context.Background(), os.Getenv("GOOGLE_CLOUD_PROJECT"), &gcpUtils.BucketAttrs{})
if err != nil {
bkt := gcpClient.Bucket(gcpURLParts.BucketName)
_, err := bkt.Attrs(context.Background())
if err == nil {
fmt.Println("fail to create bucket, ", err)
os.Exit(1)
}
}
}
func createObject(objectURLStr string, objectSize uint32, o minio.PutObjectOptions) {
u, err := url.Parse(objectURLStr)
if err != nil {
fmt.Println("fail to parse the object URL, ", err)
os.Exit(1)
}
s3URLParts, err := common.NewS3URLParts(*u)
if err != nil {
fmt.Println("new S3 URL parts, ", err)
os.Exit(1)
}
s3Client := createS3ClientWithMinio(createS3ResOptions{
Location: s3URLParts.Region,
})
randomString := createStringWithRandomChars(int(objectSize))
if o.ContentType == "" {
o.ContentType = strings.Split(http.DetectContentType([]byte(randomString)), ";")[0]
}
_, err = s3Client.PutObject(s3URLParts.BucketName, s3URLParts.ObjectKey, bytes.NewReader([]byte(randomString)), int64(objectSize), o)
if err != nil {
fmt.Println("fail to upload file to S3 object, ", err)
os.Exit(1)
}
}
func createGCPObject(objectURLStr string, objectSize uint32, o gcpUtils.ObjectAttrsToUpdate) {
u, err := url.Parse(objectURLStr)
if err != nil {
fmt.Println("fail to parse the object URL, ", err)
os.Exit(1)
}
gcpURLParts, err := common.NewGCPURLParts(*u)
if err != nil {
fmt.Println("new GCP URL parts, ", err)
os.Exit(1)
}
gcpClient, _ := createGCPClientWithGCSSDK()
randomString := createStringWithRandomChars(int(objectSize))
if o.ContentType == "" {
o.ContentType = http.DetectContentType([]byte(randomString))
}
obj := gcpClient.Bucket(gcpURLParts.BucketName).Object(gcpURLParts.ObjectKey)
wc := obj.NewWriter(context.Background())
reader := strings.NewReader(randomString)
_, _ = io.Copy(wc, reader)
_ = wc.Close()
_, err = obj.Update(context.Background(), o)
if err != nil {
fmt.Println("fail to upload file to S3 object, ", err)
os.Exit(1)
}
}