ste/sender-azureFile.go (365 lines of code) (raw):

// Copyright © Microsoft <wastore@microsoft.com> // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package ste import ( "context" "errors" "fmt" "net/http" "net/url" "strings" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/directory" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/fileerror" filesas "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/sas" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/share" "github.com/Azure/azure-storage-azcopy/v10/common" ) type FileClientStub interface { URL() string } // azureFileSenderBase implements both IFolderSender and (most of) IFileSender. // Why implement both interfaces in the one type, even though they are largely unrelated? Because it // makes functions like newAzureFilesUploader easier to reason about, since they always return the same type. // It may also make it easier to describe what's needed when supporting an new backend - e.g. "to send to a new back end // you need a sender that implements IFileSender and, if the back end is folder aware, it should also implement IFolderSender" // (The alternative would be to have the likes of newAzureFilesUploader call sip.EntityType and return a different type // if the entity type is folder). type azureFileSenderBase struct { jptm IJobPartTransferMgr addFileRequestIntent bool fileOrDirClient FileClientStub shareClient *share.Client chunkSize int64 numChunks uint32 pacer pacer ctx context.Context sip ISourceInfoProvider // Headers and other info that we will apply to the destination // object. For S2S, these come from the source service. // When sending local data, they are computed based on // the properties of the local file headersToApply file.HTTPHeaders smbPropertiesToApply file.SMBProperties permissionsToApply file.Permissions metadataToApply common.Metadata } func newAzureFileSenderBase(jptm IJobPartTransferMgr, destination string, pacer pacer, sip ISourceInfoProvider) (*azureFileSenderBase, error) { info := jptm.Info() // compute chunk size (irrelevant but harmless for folders) // If the given chunk Size for the Job is greater than maximum file chunk size i.e 4 MB // then chunk size will be 4 MB. chunkSize := info.BlockSize if chunkSize > common.DefaultAzureFileChunkSize { chunkSize = common.DefaultAzureFileChunkSize if jptm.ShouldLog(common.LogWarning) { jptm.Log(common.LogWarning, fmt.Sprintf("Block size %d larger than maximum file chunk size, 4 MB chunk size used", info.BlockSize)) } } // compute num chunks (irrelevant but harmless for folders) numChunks := getNumChunks(info.SourceSize, chunkSize, chunkSize) // due to the REST parity feature added in 2019-02-02, the File APIs are no longer backward compatible // so we must use the latest SDK version to stay safe // TODO: Should we get rid of this one? props, err := sip.Properties() if err != nil { return nil, err } fileURLParts, err := file.ParseURL(destination) if err != nil { return nil, err } shareName := fileURLParts.ShareName shareSnapshot := fileURLParts.ShareSnapshot directoryOrFilePath := fileURLParts.DirectoryOrFilePath serviceClient, err := jptm.DstServiceClient().FileServiceClient() if err != nil { return nil, err } sURL, _ := file.ParseURL(serviceClient.URL()) addFileRequestIntent := (sURL.SAS.Signature() == "") // We are using oAuth shareClient := serviceClient.NewShareClient(shareName) if shareSnapshot != "" { shareClient, err = shareClient.WithSnapshot(shareSnapshot) if err != nil { return nil, err } } var client FileClientStub if info.IsFolderPropertiesTransfer() { if directoryOrFilePath == "" { client = shareClient.NewRootDirectoryClient() } else { client = shareClient.NewDirectoryClient(directoryOrFilePath) } } else { client = shareClient.NewRootDirectoryClient().NewFileClient(directoryOrFilePath) } return &azureFileSenderBase{ jptm: jptm, addFileRequestIntent: addFileRequestIntent, shareClient: shareClient, fileOrDirClient: client, chunkSize: chunkSize, numChunks: numChunks, pacer: pacer, ctx: jptm.Context(), headersToApply: props.SrcHTTPHeaders.ToFileHTTPHeaders(), smbPropertiesToApply: file.SMBProperties{}, permissionsToApply: file.Permissions{}, sip: sip, metadataToApply: props.SrcMetadata, }, nil } func (u *azureFileSenderBase) getFileClient() *file.Client { return u.fileOrDirClient.(*file.Client) } func (u *azureFileSenderBase) getDirectoryClient() *directory.Client { return u.fileOrDirClient.(*directory.Client) } func (u *azureFileSenderBase) ChunkSize() int64 { return u.chunkSize } func (u *azureFileSenderBase) NumChunks() uint32 { return u.numChunks } func (u *azureFileSenderBase) RemoteFileExists() (bool, time.Time, error) { props, err := u.getFileClient().GetProperties(u.ctx, nil) return remoteObjectExists(filePropertiesResponseAdapter{props}, err) } func (u *azureFileSenderBase) Prologue(state common.PrologueState) (destinationModified bool) { jptm := u.jptm info := jptm.Info() destinationModified = true if jptm.ShouldInferContentType() { // sometimes, specifically when reading local files, we have more info // about the file type at this time than what we had before u.headersToApply.ContentType = state.GetInferredContentType(u.jptm) } stage, err := u.addPermissionsToHeaders(info, u.getFileClient().URL()) if err != nil { jptm.FailActiveSend(stage, err) return } stage, err = u.addSMBPropertiesToHeaders(info) if err != nil { jptm.FailActiveSend(stage, err) return } // Turn off readonly at creation time (because if its set at creation time, we won't be // able to upload any data to the file!). We'll set it in epilogue, if necessary. creationProperties := u.smbPropertiesToApply if creationProperties.Attributes != nil { creationProperties.Attributes.ReadOnly = false } err = common.DoWithOverrideReadOnlyOnAzureFiles(u.ctx, func() (interface{}, error) { return u.getFileClient().Create(u.ctx, info.SourceSize, &file.CreateOptions{HTTPHeaders: &u.headersToApply, Permissions: &u.permissionsToApply, SMBProperties: &creationProperties, Metadata: u.metadataToApply}) }, u.fileOrDirClient, u.jptm.GetForceIfReadOnly()) if fileerror.HasCode(err, fileerror.ParentNotFound) { // Create the parent directories of the file. Note share must be existed, as the files are listed from share or directory. jptm.Log(common.LogError, fmt.Sprintf("%s: %s \n AzCopy is going to create parent directories of the Azure files", fileerror.ParentNotFound, err.Error())) err = AzureFileParentDirCreator{}.CreateParentDirToRoot(u.ctx, u.getFileClient(), u.shareClient, u.jptm.GetFolderCreationTracker()) if err != nil { u.jptm.FailActiveUpload("Creating parent directory", err) } // retrying file creation err = common.DoWithOverrideReadOnlyOnAzureFiles(u.ctx, func() (interface{}, error) { return u.getFileClient().Create(u.ctx, info.SourceSize, &file.CreateOptions{ HTTPHeaders: &u.headersToApply, SMBProperties: &creationProperties, Permissions: &u.permissionsToApply, Metadata: u.metadataToApply, }) }, u.fileOrDirClient, u.jptm.GetForceIfReadOnly()) } if err != nil { jptm.FailActiveUpload("Creating file", err) return } return } func (u *azureFileSenderBase) addPermissionsToHeaders(info *TransferInfo, destURL string) (stage string, err error) { if !info.PreserveSMBPermissions.IsTruthy() { return "", nil } // Prepare to transfer SDDLs from the source. if sddlSIP, ok := u.sip.(ISMBPropertyBearingSourceInfoProvider); ok { // If both sides are Azure Files... if fSIP, ok := sddlSIP.(*fileSourceInfoProvider); ok { srcURLParts, err := file.ParseURL(info.Source) common.PanicIfErr(err) dstURLParts, err := file.ParseURL(destURL) common.PanicIfErr(err) // and happen to be the same account and share, we can get away with using the same key and save a trip. if srcURLParts.Host == dstURLParts.Host && srcURLParts.ShareName == dstURLParts.ShareName { u.permissionsToApply.PermissionKey = &fSIP.cachedPermissionKey } } // If we didn't do the workaround, then let's get the SDDL and put it later. if u.permissionsToApply.PermissionKey == nil || *u.permissionsToApply.PermissionKey == "" { pString, err := sddlSIP.GetSDDL() // Sending "" to the service is invalid, but the service will return it sometimes (e.g. on file shares) // Thus, we'll let the files SDK fill in "inherit" for us, so the service is happy. if pString != "" { u.permissionsToApply.Permission = &pString } if err != nil { return "Getting permissions", err } } } if u.permissionsToApply.Permission != nil && len(*u.permissionsToApply.Permission) > FilesServiceMaxSDDLSize { sipm := u.jptm.SecurityInfoPersistenceManager() pkey, err := sipm.PutSDDL(*u.permissionsToApply.Permission, u.shareClient) u.permissionsToApply.PermissionKey = &pkey if err != nil { return "Putting permissions", err } ePermString := "" u.permissionsToApply.Permission = &ePermString } return "", nil } func (u *azureFileSenderBase) addSMBPropertiesToHeaders(info *TransferInfo) (stage string, err error) { if !info.PreserveSMBInfo { return "", nil } if smbSIP, ok := u.sip.(ISMBPropertyBearingSourceInfoProvider); ok { smbProps, err := smbSIP.GetSMBProperties() if err != nil { return "Obtaining SMB properties", err } fromTo := u.jptm.FromTo() if fromTo.From() == common.ELocation.File() { // Files SDK can panic when the service hands it something unexpected! defer func() { // recover from potential panics and output raw properties for debug purposes if panicerr := recover(); panicerr != nil { stage = "Reading SMB properties" attr, _ := smbProps.FileAttributes() lwt := smbProps.FileLastWriteTime() fct := smbProps.FileCreationTime() err = fmt.Errorf("failed to read SMB properties (%w)! Raw data: attr: `%s` lwt: `%s`, fct: `%s`", err, attr, lwt, fct) } }() } attribs, _ := smbProps.FileAttributes() u.smbPropertiesToApply.Attributes = attribs if info.ShouldTransferLastWriteTime() { lwTime := smbProps.FileLastWriteTime() u.smbPropertiesToApply.LastWriteTime = &lwTime } creationTime := smbProps.FileCreationTime() u.smbPropertiesToApply.CreationTime = &creationTime } return "", nil } func (u *azureFileSenderBase) Epilogue() { // always set the SMB info again after the file content has been uploaded, for the following reasons: // 0. File attributes such as readOnly and archive need to be passed through another Set Properties call. // 1. The syntax for SMB permissions are slightly different for create call vs update call. // This is not trivial but the Files Team has explicitly told us to perform this extra set call. // 2. The service started updating the last-write-time in March 2021 when the file is modified. // So when we uploaded the ranges, we've unintentionally changed the last-write-time. if u.jptm.IsLive() && u.jptm.Info().PreserveSMBInfo { // This is an extra round trip, but we can live with that for these relatively rare cases _, err := u.getFileClient().SetHTTPHeaders(u.ctx, &file.SetHTTPHeadersOptions{ HTTPHeaders: &u.headersToApply, Permissions: &u.permissionsToApply, SMBProperties: &u.smbPropertiesToApply, }) if err != nil { u.jptm.FailActiveSend("Applying final attribute settings", err) } } } func (u *azureFileSenderBase) Cleanup() { jptm := u.jptm // Cleanup if jptm.IsDeadInflight() { // transfer was either failed or cancelled // the file created in share needs to be deleted, since it's // contents will be at an unknown stage of partial completeness deletionContext, cancelFn := context.WithTimeout(context.WithValue(context.Background(), ServiceAPIVersionOverride, DefaultServiceApiVersion), 2*time.Minute) defer cancelFn() _, err := u.getFileClient().Delete(deletionContext, nil) if err != nil { jptm.Log(common.LogError, fmt.Sprintf("error deleting the (incomplete) file %s. Failed with error %s", u.fileOrDirClient.URL(), err.Error())) } } } func (u *azureFileSenderBase) GetDestinationLength() (int64, error) { prop, err := u.getFileClient().GetProperties(u.ctx, nil) if err != nil { return -1, err } if prop.ContentLength == nil { return -1, fmt.Errorf("destination content length not returned") } return *prop.ContentLength, nil } func (u *azureFileSenderBase) EnsureFolderExists() error { return AzureFileParentDirCreator{}.CreateDirToRoot(u.ctx, u.shareClient, u.getDirectoryClient(), u.jptm.GetFolderCreationTracker()) } func (u *azureFileSenderBase) SetFolderProperties() error { info := u.jptm.Info() _, err := u.addPermissionsToHeaders(info, u.getDirectoryClient().URL()) if err != nil { return err } _, err = u.addSMBPropertiesToHeaders(info) if err != nil { return err } err = common.DoWithOverrideReadOnlyOnAzureFiles(u.ctx, func() (interface{}, error) { _, err := u.getDirectoryClient().SetMetadata(u.ctx, &directory.SetMetadataOptions{Metadata: u.metadataToApply}) if err != nil { return nil, err } return u.getDirectoryClient().SetProperties(u.ctx, &directory.SetPropertiesOptions{ FileSMBProperties: &u.smbPropertiesToApply, FilePermissions: &u.permissionsToApply, }) }, u.fileOrDirClient, u.jptm.GetForceIfReadOnly()) return err } func (u *azureFileSenderBase) DirUrlToString() string { directoryURL := u.getDirectoryClient().URL() rawURL, err := url.Parse(directoryURL) common.PanicIfErr(err) rawURL.RawQuery = "" // To avoid encoding/decoding rawURL.RawPath = "" return rawURL.String() } // namespace for functions related to creating parent directories in Azure File // to avoid free floating global funcs type AzureFileParentDirCreator struct{} // getParentDirectoryClient gets parent directory client of a path. func (AzureFileParentDirCreator) getParentDirectoryClient(uh FileClientStub, shareClient *share.Client) (*directory.Client, error) { rawURL, _ := url.Parse(uh.URL()) rawURL.Path = rawURL.Path[:strings.LastIndex(rawURL.Path, "/")] directoryURLParts, err := filesas.ParseURL(rawURL.String()) if err != nil { return nil, err } directoryOrFilePath := directoryURLParts.DirectoryOrFilePath if directoryURLParts.ShareSnapshot != "" { shareClient, err = shareClient.WithSnapshot(directoryURLParts.ShareSnapshot) if err != nil { return nil, err } } return shareClient.NewRootDirectoryClient().NewSubdirectoryClient(directoryOrFilePath), nil } // verifyAndHandleCreateErrors handles create errors, StatusConflict is ignored, as specific level directory could be existing. // Report http.StatusForbidden, as user should at least have read and write permission of the destination, // and there is no permission on directory level, i.e. create directory is a general permission for each level directories for Azure file. func (AzureFileParentDirCreator) verifyAndHandleCreateErrors(err error) error { if err != nil { var respErr *azcore.ResponseError if errors.As(err, &respErr) && respErr.StatusCode == http.StatusConflict { // Note the ServiceCode actually be AuthenticationFailure when share failed to be created, if want to create share as well. return nil } return err } return nil } // splitWithoutToken splits string with a given token, and returns split results without token. func (AzureFileParentDirCreator) splitWithoutToken(str string, token rune) []string { return strings.FieldsFunc(str, func(c rune) bool { return c == token }) } // CreateParentDirToRoot creates parent directories of the Azure file if file's parent directory doesn't exist. func (d AzureFileParentDirCreator) CreateParentDirToRoot(ctx context.Context, fileClient *file.Client, shareClient *share.Client, t FolderCreationTracker) error { directoryClient, err := d.getParentDirectoryClient(fileClient, shareClient) if err != nil { return err } return d.CreateDirToRoot(ctx, shareClient, directoryClient, t) } func (d AzureFileParentDirCreator) CreateDirToRoot(ctx context.Context, shareClient *share.Client, directoryClient *directory.Client, t FolderCreationTracker) error { // ignoring error below because we're getting URL from a valid client. fileURLParts, _ := file.ParseURL(directoryClient.URL()) // Try to create the parent directories. Split directories as segments. segments := d.splitWithoutToken(fileURLParts.DirectoryOrFilePath, '/') if len(segments) == 0 { // If we are trying to create root, perform GetProperties instead. // Azure Files has delayed creation of root, and if we do not perform GetProperties, // some operations like SetMetadata or SetProperties will fail. // TODO: Remove this block once the bug is fixed. _, err := directoryClient.GetProperties(ctx, nil) return err } currentDirectoryClient := shareClient.NewRootDirectoryClient() // Share directory should already exist, doesn't support creating share // Try to create the directories for i := 0; i < len(segments); i++ { currentDirectoryClient = currentDirectoryClient.NewSubdirectoryClient(segments[i]) rawURL := currentDirectoryClient.URL() recorderURL, err := url.Parse(rawURL) if err != nil { return err } recorderURL.RawQuery = "" err = t.CreateFolder(recorderURL.String(), func() error { _, err := currentDirectoryClient.Create(ctx, nil) return err }) if verifiedErr := d.verifyAndHandleCreateErrors(err); verifiedErr != nil { return verifiedErr } } return nil }