ste/sender-blobFS.go (237 lines of code) (raw):
// Copyright © Microsoft <wastore@microsoft.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package ste
import (
"context"
"fmt"
datalakesas "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/sas"
"strings"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/datalakeerror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/directory"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file"
"github.com/Azure/azure-storage-azcopy/v10/common"
)
type DatalakeClientStub interface {
DFSURL() string
BlobURL() string
}
type blobFSSenderBase struct {
jptm IJobPartTransferMgr
sip ISourceInfoProvider
blobClient blockblob.Client
fileOrDirClient DatalakeClientStub
parentDirClient *directory.Client
chunkSize int64
numChunks uint32
pacer pacer
creationTimeHeaders *file.HTTPHeaders
flushThreshold int64
metadataToSet common.Metadata
}
func newBlobFSSenderBase(jptm IJobPartTransferMgr, destination string, pacer pacer, sip ISourceInfoProvider) (*blobFSSenderBase, error) {
info := jptm.Info()
// compute chunk size and number of chunks
chunkSize := info.BlockSize
numChunks := getNumChunks(info.SourceSize, chunkSize, chunkSize)
props, err := sip.Properties()
if err != nil {
return nil, err
}
headers := props.SrcHTTPHeaders.ToBlobFSHTTPHeaders()
s, err := jptm.DstServiceClient().DatalakeServiceClient()
if err != nil {
return nil, err
}
datalakeURLParts, err := azdatalake.ParseURL(destination)
if err != nil {
return nil, err
}
fsc := s.NewFileSystemClient(datalakeURLParts.FileSystemName)
directoryOrFilePath := datalakeURLParts.PathName
parentPath := ""
if strings.LastIndex(directoryOrFilePath, "/") != -1 {
parentPath = directoryOrFilePath[:strings.LastIndex(directoryOrFilePath, "/")]
}
var destClient DatalakeClientStub
if info.IsFolderPropertiesTransfer() {
destClient = fsc.NewDirectoryClient(directoryOrFilePath)
} else {
destClient = fsc.NewFileClient(directoryOrFilePath)
}
bsc, _ := jptm.DstServiceClient().BlobServiceClient()
return &blobFSSenderBase{
jptm: jptm,
sip: sip,
blobClient: *bsc.NewContainerClient(info.DstContainer).NewBlockBlobClient(info.DstFilePath),
fileOrDirClient: destClient,
parentDirClient: fsc.NewDirectoryClient(parentPath),
chunkSize: chunkSize,
numChunks: numChunks,
pacer: pacer,
creationTimeHeaders: &headers,
flushThreshold: chunkSize * int64(ADLSFlushThreshold),
metadataToSet: props.SrcMetadata,
}, nil
}
func (u *blobFSSenderBase) getFileClient() *file.Client {
return u.fileOrDirClient.(*file.Client)
}
func (u *blobFSSenderBase) getDirectoryClient() *directory.Client {
return u.fileOrDirClient.(*directory.Client)
}
func (u *blobFSSenderBase) SendableEntityType() common.EntityType {
if _, ok := u.fileOrDirClient.(*directory.Client); ok {
return common.EEntityType.Folder()
} else {
return common.EEntityType.File()
}
}
func (u *blobFSSenderBase) ChunkSize() int64 {
return u.chunkSize
}
func (u *blobFSSenderBase) NumChunks() uint32 {
return u.numChunks
}
func (u *blobFSSenderBase) RemoteFileExists() (bool, time.Time, error) {
props, err := u.getFileClient().GetProperties(u.jptm.Context(), nil)
return remoteObjectExists(datalakePropertiesResponseAdapter{props}, err)
}
func (u *blobFSSenderBase) Prologue(state common.PrologueState) (destinationModified bool) {
destinationModified = true
// create the directory separately
// This "burns" an extra IO operation, unfortunately, but its the only way we can make our
// folderCreationTracker work, and we need that for our overwrite logic for folders.
// (Even tho there's not much in the way of properties to set in ADLS Gen 2 on folders, at least, not
// that we support right now, we still run the same folder logic here to be consistent with our other
// folder-aware sources).
err := u.doEnsureDirExists(u.parentDirClient)
if err != nil {
u.jptm.FailActiveUpload("Ensuring parent directory exists", err)
return
}
// Create file with the source size
_, err = u.getFileClient().Create(u.jptm.Context(), &file.CreateOptions{HTTPHeaders: u.creationTimeHeaders}) // "create" actually calls "create path", so if we didn't need to track folder creation, we could just let this call create the folder as needed
if err != nil {
u.jptm.FailActiveUpload("Creating file", err)
return
}
return
}
func (u *blobFSSenderBase) Cleanup() {
jptm := u.jptm
// Cleanup if status is now failed
if jptm.IsDeadInflight() {
// transfer was either failed or cancelled
// the file created in share needs to be deleted, since it's
// contents will be at an unknown stage of partial completeness
deletionContext, cancelFn := context.WithTimeout(context.WithValue(context.Background(), ServiceAPIVersionOverride, DefaultServiceApiVersion), 2*time.Minute)
defer cancelFn()
_, err := u.getFileClient().Delete(deletionContext, nil)
if err != nil {
jptm.Log(common.LogError, fmt.Sprintf("error deleting the (incomplete) file %s. Failed with error %s", u.getFileClient().DFSURL(), err.Error()))
}
}
}
func (u *blobFSSenderBase) GetDestinationLength() (int64, error) {
prop, err := u.getFileClient().GetProperties(u.jptm.Context(), nil)
if err != nil {
return -1, err
}
if prop.ContentLength == nil {
return -1, fmt.Errorf("destination content length not returned")
}
return *prop.ContentLength, nil
}
func (u *blobFSSenderBase) EnsureFolderExists() error {
return u.doEnsureDirExists(u.getDirectoryClient())
}
func isFilesystemRoot(directoryClient *directory.Client) (bool, error) {
datalakeURLParts, err := azdatalake.ParseURL(directoryClient.DFSURL())
if err != nil {
return false, err
}
return datalakeURLParts.PathName == "", nil
}
func (u *blobFSSenderBase) doEnsureDirExists(directoryClient *directory.Client) error {
isFSRoot, err := isFilesystemRoot(directoryClient)
if err != nil {
return err
}
if isFSRoot {
return nil // nothing to do, there's no directory component to create
}
// must always do this, regardless of whether we are called in a file-centric code path
// or a folder-centric one, since with the parallelism we use, we don't actually
// know which will happen first
err = u.jptm.GetFolderCreationTracker().CreateFolder(directoryClient.DFSURL(), func() error {
_, err := directoryClient.Create(u.jptm.Context(), &directory.CreateOptions{AccessConditions: &directory.AccessConditions{ModifiedAccessConditions: &directory.ModifiedAccessConditions{IfNoneMatch: to.Ptr(azcore.ETagAny)}}})
return err
})
if datalakeerror.HasCode(err, datalakeerror.PathAlreadyExists) {
return nil // not a error as far as we are concerned. It just already exists
}
return err
}
func (u *blobFSSenderBase) GetSourcePOSIXProperties() (common.UnixStatAdapter, error) {
if unixSIP, ok := u.sip.(IUNIXPropertyBearingSourceInfoProvider); ok {
statAdapter, err := unixSIP.GetUNIXProperties()
if err != nil {
return nil, err
}
return statAdapter, nil
} else {
return nil, nil // no properties present!
}
}
func (u *blobFSSenderBase) SetPOSIXProperties() error {
adapter, err := u.GetSourcePOSIXProperties()
if err != nil {
return fmt.Errorf("failed to get POSIX properties: %w", err)
} else if adapter == nil {
return nil
}
meta := u.metadataToSet
common.AddStatToBlobMetadata(adapter, meta)
delete(meta, common.POSIXFolderMeta) // Can't be set on HNS accounts.
_, err = u.blobClient.SetMetadata(u.jptm.Context(), meta, nil)
return err
}
func (u *blobFSSenderBase) SetFolderProperties() error {
if u.jptm.Info().PreservePOSIXProperties {
return u.SetPOSIXProperties()
} else if len(u.metadataToSet) > 0 {
_, err := u.blobClient.SetMetadata(u.jptm.Context(), u.metadataToSet, nil)
if err != nil {
return fmt.Errorf("failed to set metadata: %w", err)
}
}
return nil
}
func (u *blobFSSenderBase) DirUrlToString() string {
directoryURL := u.getDirectoryClient().DFSURL()
parts, err := datalakesas.ParseURL(directoryURL)
common.PanicIfErr(err)
parts.SAS = datalakesas.QueryParameters{}
parts.UnparsedParams = ""
if parts.PathName == "/" {
parts.PathName = ""
}
return parts.String()
}
func (u *blobFSSenderBase) SendSymlink(linkData string) error {
meta := common.Metadata{} // meta isn't traditionally supported for dfs, but still exists
adapter, err := u.GetSourcePOSIXProperties()
if err != nil {
return fmt.Errorf("when polling for POSIX properties: %w", err)
} else if adapter == nil {
return nil // No-op
}
common.AddStatToBlobMetadata(adapter, meta)
meta[common.POSIXSymlinkMeta] = to.Ptr("true") // just in case there isn't any metadata
blobHeaders := blob.HTTPHeaders{ // translate headers, since those still apply
BlobContentType: u.creationTimeHeaders.ContentType,
BlobContentEncoding: u.creationTimeHeaders.ContentEncoding,
BlobContentLanguage: u.creationTimeHeaders.ContentLanguage,
BlobContentDisposition: u.creationTimeHeaders.ContentDisposition,
BlobCacheControl: u.creationTimeHeaders.CacheControl,
BlobContentMD5: u.creationTimeHeaders.ContentMD5,
}
_, err = u.blobClient.Upload(
u.jptm.Context(),
streaming.NopCloser(strings.NewReader(linkData)),
&blockblob.UploadOptions{
HTTPHeaders: &blobHeaders,
Metadata: meta,
})
return err
}