testSuite/cmd/testblobFS.go (248 lines of code) (raw):
package cmd
import (
"context"
"crypto/md5"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/filesystem"
"github.com/Azure/azure-storage-azcopy/v10/ste"
"io"
"os"
"path/filepath"
"strings"
"github.com/spf13/cobra"
)
// TestBlobFSCommand represents the struct to get command
// for validating azcopy operations upload and download operations
// to and from Blob FS Service.
type TestBlobFSCommand struct {
// object is the resource which needs to be validated against a resource on container.
Object string
//Subject is the remote resource against which object needs to be validated.
Subject string
// IsObjectDirectory defines if the object is a directory or not.
// If the object is directory, then validation goes through another path.
IsObjectDirectory bool
}
// initializes the testblobfs command, its aliases and description.
// also adds the possible flags that can be supplied with testBlob command.
func init() {
cmdInput := TestBlobFSCommand{}
testBlobCmd := &cobra.Command{
Use: "testBlobFS",
Aliases: []string{"tBlobFS"},
Short: "tests the blob created using AZCopy v2",
Args: func(cmd *cobra.Command, args []string) error {
if len(args) != 2 {
return fmt.Errorf("invalid arguments for test blob command")
}
// first argument is the resource name.
cmdInput.Object = args[0]
// second argument is the test directory.
cmdInput.Subject = args[1]
return nil
},
Run: func(cmd *cobra.Command, args []string) {
cmdInput.processTest()
},
}
rootCmd.AddCommand(testBlobCmd)
// add flags.
testBlobCmd.PersistentFlags().BoolVar(&cmdInput.IsObjectDirectory, "is-object-dir", false, "set the type of object to verify against the subject")
}
// verify the blob downloaded or uploaded.
func (tbfsc TestBlobFSCommand) processTest() {
if tbfsc.IsObjectDirectory {
tbfsc.verifyRemoteDir()
} else {
tbfsc.verifyRemoteFile()
}
}
// verifyRemoteFile verifies the local file (object) against the file on remote fileSystem (subject)
func (tbfsc TestBlobFSCommand) verifyRemoteFile() {
// Get BFS url parts to test SAS
datalakeURLParts, err := azdatalake.ParseURL(tbfsc.Subject)
if err != nil {
fmt.Println("error parsing the datalake sas ", tbfsc.Subject)
os.Exit(1)
}
// Get the Account Name and Key variables from environment
name := os.Getenv("ACCOUNT_NAME")
key := os.Getenv("ACCOUNT_KEY")
// If ACCOUNT_NAME or ACCOUNT_KEY is not supplied AND a SAS is not supplied
if (name == "" && key == "") && datalakeURLParts.SAS.Encode() == "" {
fmt.Println("ACCOUNT_NAME and ACCOUNT_KEY should be set before executing the test, OR a SAS token should be supplied in the subject URL.")
os.Exit(1)
}
var fc *file.Client
ctx := context.Background()
if datalakeURLParts.SAS.Encode() != "" {
fc, err = file.NewClientWithNoCredential(datalakeURLParts.String(), nil)
} else {
var cred *azdatalake.SharedKeyCredential
cred, err = azdatalake.NewSharedKeyCredential(name, key)
if err != nil {
fmt.Printf("error creating shared key cred. failed with error %s\n", err.Error())
os.Exit(1)
}
perCallPolicies := []policy.Policy{ste.NewVersionPolicy()}
fc, err = file.NewClientWithSharedKeyCredential(datalakeURLParts.String(), cred, &file.ClientOptions{ClientOptions: azcore.ClientOptions{PerCallPolicies: perCallPolicies}})
ctx = context.WithValue(ctx, ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
}
if err != nil {
fmt.Printf("error creating client. failed with error %s\n", err.Error())
os.Exit(1)
}
// create the file url and download the file Url
dResp, err := fc.DownloadStream(ctx, nil)
if err != nil {
fmt.Printf("error downloading the subject %s. Failed with error %s\n", datalakeURLParts.String(), err.Error())
os.Exit(1)
}
// get the size of the downloaded file
downloadedLength := *dResp.ContentLength
// open the local file
f, err := os.Open(tbfsc.Object)
if err != nil {
fmt.Println("error opening the object ", tbfsc.Object, " failed with error %", err.Error())
os.Exit(1)
}
fInfo, err := f.Stat()
if err != nil {
fmt.Println("error getting the file Info of opened file ", tbfsc.Object, " failed with error ", err.Error())
os.Exit(1)
}
defer f.Close()
// If the length of file at two location is not same
// validation has failed
if downloadedLength != fInfo.Size() {
fmt.Printf("validation failed because there is difference in the source size %d and destination size %d\n", fInfo.Size(), downloadedLength)
os.Exit(1)
}
// If the size of the file is 0 both locally and remote
// there is no need to download the file and memory map the file
// validation is passed.
if fInfo.Size() == 0 {
os.Exit(1)
}
// read the downloaded content into the buffer
downloadedBuffer := make([]byte, downloadedLength)
_, err = io.ReadFull(dResp.Body, downloadedBuffer)
if err != nil {
fmt.Println("error reading the downloaded body ", err.Error())
os.Exit(1)
}
// memory map the local file
mMap, err := NewMMF(f, false, 0, fInfo.Size())
if err != nil {
fmt.Println("error memory mapping the file ", tbfsc.Object, " failed with error ", err.Error())
os.Exit(1)
}
defer mMap.Unmap()
// calculate the md5Sum of object and subject
objMd5 := md5.Sum(mMap)
subjMd5 := md5.Sum(downloadedBuffer)
// if the md5 of two doesn't match
// validation has failed
if objMd5 != subjMd5 {
fmt.Println("object md5 is not equal to the downloaded md5")
os.Exit(1)
}
}
// verifyRemoteDir validates the local directory (object) against the directory
// on filesystem (subject)
func (tbfsc TestBlobFSCommand) verifyRemoteDir() {
// Get BFS url parts to test SAS
datalakeURLParts, err := azdatalake.ParseURL(tbfsc.Subject)
if err != nil {
fmt.Println("error parsing the datalake sas ", tbfsc.Subject)
os.Exit(1)
}
// break the remote Url into parts
// and save the directory path
currentDirectoryPath := datalakeURLParts.PathName
datalakeURLParts.PathName = ""
// Get the Account Name and Key variables from environment
name := os.Getenv("ACCOUNT_NAME")
key := os.Getenv("ACCOUNT_KEY")
// If ACCOUNT_NAME or ACCOUNT_KEY is not supplied AND a SAS is not supplied
if (name == "" && key == "") && datalakeURLParts.SAS.Encode() == "" {
fmt.Println("ACCOUNT_NAME and ACCOUNT_KEY should be set before executing the test, OR a SAS token should be supplied in the subject URL.")
os.Exit(1)
}
var fsc *filesystem.Client
ctx := context.Background()
if datalakeURLParts.SAS.Encode() != "" {
fsc, err = filesystem.NewClientWithNoCredential(datalakeURLParts.String(), nil)
} else {
var cred *azdatalake.SharedKeyCredential
cred, err = azdatalake.NewSharedKeyCredential(name, key)
if err != nil {
fmt.Printf("error creating shared key cred. failed with error %s\n", err.Error())
os.Exit(1)
}
perCallPolicies := []policy.Policy{ste.NewVersionPolicy()}
fsc, err = filesystem.NewClientWithSharedKeyCredential(datalakeURLParts.String(), cred, &filesystem.ClientOptions{ClientOptions: azcore.ClientOptions{PerCallPolicies: perCallPolicies}})
ctx = context.WithValue(ctx, ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
}
if err != nil {
fmt.Printf("error creating client. failed with error %s\n", err.Error())
os.Exit(1)
}
// Get the object Info and If the object is not a directory
// validation fails since validation has two be done between directories
// local and remote
objectInfo, err := os.Stat(tbfsc.Object)
if err != nil {
fmt.Printf("error getting the file info for dir %s. failed with error %s\n", tbfsc.Object, err.Error())
os.Exit(1)
}
if !objectInfo.IsDir() {
fmt.Printf("the source provided %s is not a directory path\n", tbfsc.Object)
os.Exit(1)
}
// List the directory
pager := fsc.NewListPathsPager(true, &filesystem.ListPathsOptions{Prefix: ¤tDirectoryPath})
// numberOfFilesinSubject keeps the count of number of files of at the destination
numberOfFilesinSubject := int(0)
for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
fmt.Printf("error listing the directory path defined by url %s. Failed with error %s\n", datalakeURLParts.String(), err.Error())
os.Exit(1)
}
paths := resp.PathList.Paths
numberOfFilesinSubject += len(paths)
for _, p := range paths {
// Get the file path
// remove the directory path from the file path
// to get the relative path
filePath := *p.Name
filePath = strings.Replace(filePath, currentDirectoryPath, "", 1)
relativefilepath := strings.Trim(filePath, "/")
// replace the "/" with os path separator
relativefilepath = strings.Replace(relativefilepath, "/", string(os.PathSeparator), -1)
// create the expected local path of remote file
filepathLocal := filepath.Join(tbfsc.Object, relativefilepath)
// open the filePath locally and calculate the md5
fpLocal, err := os.Open(filepathLocal)
if err != nil {
fmt.Printf("error opening the file %s. failed with error %s\n", filepathLocal, err.Error())
os.Exit(1)
}
// Get the fileInfo to get size.
fpLocalInfo, err := fpLocal.Stat()
if err != nil {
fmt.Printf("error getting the file info for file %s. failed with error %s\n", filepathLocal, err.Error())
os.Exit(1)
}
// Check the size of file
// If the size of file doesn't matches, then exit with error
if fpLocalInfo.Size() != *p.ContentLength {
fmt.Println("the size of local file does not match the remote file")
os.Exit(1)
}
// If the size of file is zero then continue to next file
if fpLocalInfo.Size() == 0 {
continue
}
defer fpLocal.Close()
// memory map the file
fpMMf, err := NewMMF(fpLocal, false, 0, fpLocalInfo.Size())
if err != nil {
fmt.Printf("error memory mapping the file %s. failed with error %s\n", filepathLocal, err.Error())
os.Exit(1)
}
defer fpMMf.Unmap()
// calculated the source md5
objMd5 := md5.Sum(fpMMf)
// Download the remote file and calculate md5
tempUrlParts := datalakeURLParts
tempUrlParts.PathName = *p.Name
fc := fsc.NewFileClient(tempUrlParts.PathName)
fResp, err := fc.DownloadStream(ctx, nil)
if err != nil {
fmt.Printf("error downloading the file %s. failed with error %s\n", fc.DFSURL(), err.Error())
os.Exit(1)
}
downloadedBuffer := make([]byte, *p.ContentLength) // byte buffer in which file will be downloaded to
_, err = io.ReadFull(fResp.Body, downloadedBuffer)
if err != nil {
fmt.Println("error reading the downloaded body ", err.Error())
os.Exit(1)
}
// calculate the downloaded file Md5
subjMd5 := md5.Sum(downloadedBuffer)
if objMd5 != subjMd5 {
fmt.Printf("source file %s doesn't match the remote file %s\n", filepathLocal, fc.DFSURL())
os.Exit(1)
}
}
}
// walk through the directory and count the number of files inside the local directory
numberOFFilesInObject := int(0)
err = filepath.Walk(tbfsc.Object, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
numberOFFilesInObject++
}
return nil
})
if err != nil {
fmt.Printf("validation failed with error %s walking inside the source %s\n", err.Error(), tbfsc.Object)
os.Exit(1)
}
// If the number of files inside the directories locally and remote
// is not same, validation fails.
if numberOFFilesInObject != numberOfFilesinSubject {
fmt.Println("validation failed since there is difference in the number of files in source and destination")
os.Exit(1)
}
fmt.Printf("successfully validated the source %s and destination %s\n", tbfsc.Object, tbfsc.Subject)
}