cmd/syncEnumerator.go (259 lines of code) (raw):
// Copyright © 2017 Microsoft <wastore@microsoft.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package cmd
import (
"context"
"errors"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/fileerror"
"runtime"
"strings"
"sync/atomic"
"github.com/Azure/azure-storage-azcopy/v10/jobsAdmin"
"github.com/Azure/azure-storage-azcopy/v10/common"
)
// -------------------------------------- Implemented Enumerators -------------------------------------- \\
func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *syncEnumerator, err error) {
srcCredInfo, _, err := GetCredentialInfoForLocation(ctx, cca.fromTo.From(), cca.source, true, cca.cpkOptions)
if err != nil {
return nil, err
}
if cca.fromTo.IsS2S() && srcCredInfo.CredentialType != common.ECredentialType.Anonymous() {
if srcCredInfo.CredentialType.IsAzureOAuth() && cca.fromTo.To().CanForwardOAuthTokens() {
// no-op, this is OK
} else if srcCredInfo.CredentialType == common.ECredentialType.GoogleAppCredentials() || srcCredInfo.CredentialType == common.ECredentialType.S3AccessKey() || srcCredInfo.CredentialType == common.ECredentialType.S3PublicBucket() {
// this too, is OK
} else if srcCredInfo.CredentialType == common.ECredentialType.Anonymous() {
// this is OK
} else {
return nil, fmt.Errorf("the source of a %s->%s sync must either be public, or authorized with a SAS token; blob destinations can forward OAuth", cca.fromTo.From(), cca.fromTo.To())
}
}
includeDirStubs := (cca.fromTo.From().SupportsHnsACLs() && cca.fromTo.To().SupportsHnsACLs() && cca.preservePermissions.IsTruthy()) || cca.includeDirectoryStubs
// TODO: enable symlink support in a future release after evaluating the implications
// TODO: Consider passing an errorChannel so that enumeration errors during sync can be conveyed to the caller.
// GetProperties is enabled by default as sync supports both upload and download.
// This property only supports Files and S3 at the moment, but provided that Files sync is coming soon, enable to avoid stepping on Files sync work
dest := cca.fromTo.To()
sourceTraverser, err := InitResourceTraverser(cca.source, cca.fromTo.From(), &ctx, &srcCredInfo, common.ESymlinkHandlingType.Skip(), nil, cca.recursive, true, includeDirStubs, common.EPermanentDeleteOption.None(), func(entityType common.EntityType) {
if entityType == common.EEntityType.File() {
atomic.AddUint64(&cca.atomicSourceFilesScanned, 1)
}
}, nil, cca.s2sPreserveBlobTags, cca.compareHash, cca.preservePermissions, azcopyLogVerbosity, cca.cpkOptions, nil, false, cca.trailingDot, &dest, nil, false)
if err != nil {
return nil, err
}
// Because we can't trust cca.credinfo, given that it's for the overall job, not the individual traversers, we get cred info again here.
dstCredInfo, _, err := GetCredentialInfoForLocation(ctx, cca.fromTo.To(), cca.destination, false, cca.cpkOptions)
if err != nil {
return nil, err
}
// TODO: enable symlink support in a future release after evaluating the implications
// GetProperties is enabled by default as sync supports both upload and download.
// This property only supports Files and S3 at the moment, but provided that Files sync is coming soon, enable to avoid stepping on Files sync work
destinationTraverser, err := InitResourceTraverser(cca.destination, cca.fromTo.To(), &ctx, &dstCredInfo, common.ESymlinkHandlingType.Skip(), nil, cca.recursive, true, includeDirStubs, common.EPermanentDeleteOption.None(), func(entityType common.EntityType) {
if entityType == common.EEntityType.File() {
atomic.AddUint64(&cca.atomicDestinationFilesScanned, 1)
}
}, nil, cca.s2sPreserveBlobTags, cca.compareHash, cca.preservePermissions, azcopyLogVerbosity, cca.cpkOptions, nil, false, cca.trailingDot, nil, nil, false)
if err != nil {
return nil, err
}
// verify that the traversers are targeting the same type of resources
sourceIsDir, _ := sourceTraverser.IsDirectory(true)
destIsDir, err := destinationTraverser.IsDirectory(true)
var resourceMismatchError = errors.New("trying to sync between different resource types (either file <-> directory or directory <-> file) which is not allowed." +
"sync must happen between source and destination of the same type, e.g. either file <-> file or directory <-> directory." +
"To make sure target is handled as a directory, add a trailing '/' to the target.")
if cca.fromTo.To() == common.ELocation.Blob() || cca.fromTo.To() == common.ELocation.BlobFS() {
/*
This is an "opinionated" choice. Blob has no formal understanding of directories. As such, we don't care about if it's a directory.
If they sync a lone blob, they sync a lone blob.
If it lands on a directory stub, FNS is OK with this, but HNS isn't. It'll fail in that case. This is still semantically valid in FNS.
If they sync a prefix of blobs, they sync a prefix of blobs. This will always succeed, and won't break any semantics about FNS.
So my (Adele's) opinion moving forward is:
- Hierarchies don't exist in flat namespaces.
- Instead, there are objects and prefixes.
- Stubs exist to clarify prefixes.
- Stubs do not exist to enforce naming conventions.
- We are a tool, tools can be misused. It is up to the customer to validate everything they intend to do.
*/
if bloberror.HasCode(err, bloberror.ContainerNotFound) { // We can resolve a missing container. Let's create it.
bt := destinationTraverser.(*blobTraverser)
sc := bt.serviceClient // it being a blob traverser is a relatively safe assumption, because
bUrlParts, _ := blob.ParseURL(bt.rawURL) // it should totally have succeeded by now anyway
_, err = sc.NewContainerClient(bUrlParts.ContainerName).Create(ctx, nil) // If it doesn't work out, this will surely bubble up later anyway. It won't be long.
if err != nil {
glcm.Warn(fmt.Sprintf("Failed to create the missing destination container: %v", err))
}
// At this point, we'll let the destination be written to with the original resource type.
}
} else if err != nil && fileerror.HasCode(err, fileerror.ShareNotFound) { // We can resolve a missing share. Let's create it.
ft := destinationTraverser.(*fileTraverser)
sc := ft.serviceClient
fUrlParts, _ := file.ParseURL(ft.rawURL) // this should have succeeded by now.
_, err = sc.NewShareClient(fUrlParts.ShareName).Create(ctx, nil) // If it doesn't work out, this will surely bubble up later anyway. It won't be long.
if err != nil {
glcm.Warn(fmt.Sprintf("Failed to create the missing destination container: %v", err))
}
// At this point, we'll let the destination be written to with the original resource type, as it will get created in this transfer.
} else if err == nil && sourceIsDir != destIsDir {
// If the destination exists, and isn't blob though, we have to match resource types.
return nil, resourceMismatchError
}
// set up the filters in the right order
// Note: includeFilters and includeAttrFilters are ANDed
// They must both pass to get the file included
// Same rule applies to excludeFilters and excludeAttrFilters
filters := buildIncludeFilters(cca.includePatterns)
if cca.fromTo.From() == common.ELocation.Local() {
includeAttrFilters := buildAttrFilters(cca.includeFileAttributes, cca.source.ValueLocal(), true)
filters = append(filters, includeAttrFilters...)
}
filters = append(filters, buildExcludeFilters(cca.excludePatterns, false)...)
filters = append(filters, buildExcludeFilters(cca.excludePaths, true)...)
if cca.fromTo.From() == common.ELocation.Local() {
excludeAttrFilters := buildAttrFilters(cca.excludeFileAttributes, cca.source.ValueLocal(), false)
filters = append(filters, excludeAttrFilters...)
}
// includeRegex
filters = append(filters, buildRegexFilters(cca.includeRegex, true)...)
filters = append(filters, buildRegexFilters(cca.excludeRegex, false)...)
// after making all filters, log any search prefix computed from them
if jobsAdmin.JobsAdmin != nil {
if prefixFilter := FilterSet(filters).GetEnumerationPreFilter(cca.recursive); prefixFilter != "" {
jobsAdmin.JobsAdmin.LogToJobLog("Search prefix, which may be used to optimize scanning, is: "+prefixFilter, common.LogInfo) // "May be used" because we don't know here which enumerators will use it
}
}
// decide our folder transfer strategy
// sync always acts like stripTopDir=true, but if we intend to persist the root, we must tell NewFolderPropertyOption stripTopDir=false.
fpo, folderMessage := NewFolderPropertyOption(cca.fromTo, cca.recursive, !cca.includeRoot, filters, cca.preserveSMBInfo, cca.preservePermissions.IsTruthy(), false, strings.EqualFold(cca.destination.Value, common.Dev_Null), cca.includeDirectoryStubs)
if !cca.dryrunMode {
glcm.Info(folderMessage)
}
if jobsAdmin.JobsAdmin != nil {
jobsAdmin.JobsAdmin.LogToJobLog(folderMessage, common.LogInfo)
}
if cca.trailingDot == common.ETrailingDotOption.Enable() && !cca.fromTo.BothSupportTrailingDot() {
cca.trailingDot = common.ETrailingDotOption.Disable()
}
copyJobTemplate := &common.CopyJobPartOrderRequest{
JobID: cca.jobID,
CommandString: cca.commandString,
FromTo: cca.fromTo,
Fpo: fpo,
SymlinkHandlingType: cca.symlinkHandling,
SourceRoot: cca.source.CloneWithConsolidatedSeparators(),
DestinationRoot: cca.destination.CloneWithConsolidatedSeparators(),
CredentialInfo: cca.credentialInfo,
// flags
BlobAttributes: common.BlobTransferAttributes{
PreserveLastModifiedTime: cca.preserveSMBInfo, // true by default for sync so that future syncs have this information available
PutMd5: cca.putMd5,
MD5ValidationOption: cca.md5ValidationOption,
BlockSizeInBytes: cca.blockSize,
PutBlobSizeInBytes: cca.putBlobSize,
DeleteDestinationFileIfNecessary: cca.deleteDestinationFileIfNecessary,
},
ForceWrite: common.EOverwriteOption.True(), // once we decide to transfer for a sync operation, we overwrite the destination regardless
ForceIfReadOnly: cca.forceIfReadOnly,
LogLevel: azcopyLogVerbosity,
PreserveSMBPermissions: cca.preservePermissions,
PreserveSMBInfo: cca.preserveSMBInfo,
PreservePOSIXProperties: cca.preservePOSIXProperties,
S2SSourceChangeValidation: true,
DestLengthValidation: true,
S2SGetPropertiesInBackend: true,
S2SInvalidMetadataHandleOption: common.EInvalidMetadataHandleOption.RenameIfInvalid(),
CpkOptions: cca.cpkOptions,
S2SPreserveBlobTags: cca.s2sPreserveBlobTags,
S2SSourceCredentialType: cca.s2sSourceCredentialType,
FileAttributes: common.FileTransferAttributes{
TrailingDot: cca.trailingDot,
},
}
var srcReauthTok *common.ScopedAuthenticator
if at, ok := srcCredInfo.OAuthTokenInfo.TokenCredential.(common.AuthenticateToken); ok {
// This will cause a reauth with StorageScope, which is fine, that's the original Authenticate call as it stands.
srcReauthTok = (*common.ScopedAuthenticator)(common.NewScopedCredential(at, common.ECredentialType.OAuthToken()))
}
options := createClientOptions(common.AzcopyCurrentJobLogger, nil, srcReauthTok)
// Create Source Client.
var azureFileSpecificOptions any
if cca.fromTo.From() == common.ELocation.File() {
azureFileSpecificOptions = &common.FileClientOptions{
AllowTrailingDot: cca.trailingDot == common.ETrailingDotOption.Enable(),
}
}
copyJobTemplate.SrcServiceClient, err = common.GetServiceClientForLocation(
cca.fromTo.From(),
cca.source,
srcCredInfo.CredentialType,
srcCredInfo.OAuthTokenInfo.TokenCredential,
&options,
azureFileSpecificOptions,
)
if err != nil {
return nil, err
}
// Create Destination client
if cca.fromTo.To() == common.ELocation.File() {
azureFileSpecificOptions = &common.FileClientOptions{
AllowTrailingDot: cca.trailingDot == common.ETrailingDotOption.Enable(),
AllowSourceTrailingDot: (cca.trailingDot == common.ETrailingDotOption.Enable() && cca.fromTo.To() == common.ELocation.File()),
}
}
var dstReauthTok *common.ScopedAuthenticator
if at, ok := srcCredInfo.OAuthTokenInfo.TokenCredential.(common.AuthenticateToken); ok {
// This will cause a reauth with StorageScope, which is fine, that's the original Authenticate call as it stands.
dstReauthTok = (*common.ScopedAuthenticator)(common.NewScopedCredential(at, common.ECredentialType.OAuthToken()))
}
var srcTokenCred *common.ScopedToken
if cca.fromTo.IsS2S() && srcCredInfo.CredentialType.IsAzureOAuth() {
srcTokenCred = common.NewScopedCredential(srcCredInfo.OAuthTokenInfo.TokenCredential, srcCredInfo.CredentialType)
}
options = createClientOptions(common.AzcopyCurrentJobLogger, srcTokenCred, dstReauthTok)
copyJobTemplate.DstServiceClient, err = common.GetServiceClientForLocation(
cca.fromTo.To(),
cca.destination,
dstCredInfo.CredentialType,
dstCredInfo.OAuthTokenInfo.TokenCredential,
&options,
azureFileSpecificOptions,
)
transferScheduler := newSyncTransferProcessor(cca, NumOfFilesPerDispatchJobPart, fpo, copyJobTemplate)
// set up the comparator so that the source/destination can be compared
indexer := newObjectIndexer()
var comparator objectProcessor
var finalize func() error
switch cca.fromTo {
case common.EFromTo.LocalBlob(), common.EFromTo.LocalFile():
// Upload implies transferring from a local disk to a remote resource.
// In this scenario, the local disk (source) is scanned/indexed first because it is assumed that local file systems will be faster to enumerate than remote resources
// Then the destination is scanned and filtered based on what the destination contains
destinationCleaner, err := newSyncDeleteProcessor(cca, fpo, copyJobTemplate.DstServiceClient)
if err != nil {
return nil, fmt.Errorf("unable to instantiate destination cleaner due to: %s", err.Error())
}
destCleanerFunc := newFpoAwareProcessor(fpo, destinationCleaner.removeImmediately)
// when uploading, we can delete remote objects immediately, because as we traverse the remote location
// we ALREADY have available a complete map of everything that exists locally
// so as soon as we see a remote destination object we can know whether it exists in the local source
comparator = newSyncDestinationComparator(indexer, transferScheduler.scheduleCopyTransfer, destCleanerFunc, cca.compareHash, cca.preserveSMBInfo, cca.mirrorMode).processIfNecessary
finalize = func() error {
// schedule every local file that doesn't exist at the destination
err = indexer.traverse(transferScheduler.scheduleCopyTransfer, filters)
if err != nil {
return err
}
jobInitiated, err := transferScheduler.dispatchFinalPart()
// sync cleanly exits if nothing is scheduled.
if err != nil && err != NothingScheduledError {
return err
}
quitIfInSync(jobInitiated, cca.getDeletionCount() > 0, cca)
cca.setScanningComplete()
return nil
}
return newSyncEnumerator(sourceTraverser, destinationTraverser, indexer, filters, comparator, finalize), nil
default:
indexer.isDestinationCaseInsensitive = IsDestinationCaseInsensitive(cca.fromTo)
// in all other cases (download and S2S), the destination is scanned/indexed first
// then the source is scanned and filtered based on what the destination contains
comparator = newSyncSourceComparator(indexer, transferScheduler.scheduleCopyTransfer, cca.compareHash, cca.preserveSMBInfo, cca.mirrorMode).processIfNecessary
finalize = func() error {
// remove the extra files at the destination that were not present at the source
// we can only know what needs to be deleted when we have FINISHED traversing the remote source
// since only then can we know which local files definitely don't exist remotely
var deleteScheduler objectProcessor
switch cca.fromTo.To() {
case common.ELocation.Blob(), common.ELocation.File(), common.ELocation.BlobFS():
deleter, err := newSyncDeleteProcessor(cca, fpo, copyJobTemplate.DstServiceClient)
if err != nil {
return err
}
deleteScheduler = newFpoAwareProcessor(fpo, deleter.removeImmediately)
default:
deleteScheduler = newFpoAwareProcessor(fpo, newSyncLocalDeleteProcessor(cca, fpo).removeImmediately)
}
err = indexer.traverse(deleteScheduler, nil)
if err != nil {
return err
}
// let the deletions happen first
// otherwise if the final part is executed too quickly, we might quit before deletions could finish
jobInitiated, err := transferScheduler.dispatchFinalPart()
// sync cleanly exits if nothing is scheduled.
if err != nil && err != NothingScheduledError {
return err
}
quitIfInSync(jobInitiated, cca.getDeletionCount() > 0, cca)
cca.setScanningComplete()
return nil
}
return newSyncEnumerator(destinationTraverser, sourceTraverser, indexer, filters, comparator, finalize), nil
}
}
func IsDestinationCaseInsensitive(fromTo common.FromTo) bool {
if fromTo.IsDownload() && runtime.GOOS == "windows" {
return true
} else {
return false
}
}
func quitIfInSync(transferJobInitiated, anyDestinationFileDeleted bool, cca *cookedSyncCmdArgs) {
if !transferJobInitiated && !anyDestinationFileDeleted {
cca.reportScanningProgress(glcm, 0)
glcm.Exit(func(format common.OutputFormat) string {
return "The source and destination are already in sync."
}, common.EExitCode.Success())
} else if !transferJobInitiated && anyDestinationFileDeleted {
// some files were deleted but no transfer scheduled
cca.reportScanningProgress(glcm, 0)
glcm.Exit(func(format common.OutputFormat) string {
return "The source and destination are now in sync."
}, common.EExitCode.Success())
}
}