cmd/syncProcessor.go (318 lines of code) (raw):
// Copyright © 2017 Microsoft <wastore@microsoft.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package cmd
import (
"context"
"encoding/json"
"fmt"
"net/url"
"os"
"path"
"strings"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-storage-azcopy/v10/common"
"github.com/Azure/azure-storage-azcopy/v10/ste"
)
// extract the right info from cooked arguments and instantiate a generic copy transfer processor from it
func newSyncTransferProcessor(cca *cookedSyncCmdArgs,
numOfTransfersPerPart int,
fpo common.FolderPropertyOption,
copyJobTemplate *common.CopyJobPartOrderRequest) *copyTransferProcessor {
reportFirstPart := func(jobStarted bool) { cca.setFirstPartOrdered() } // for compatibility with the way sync has always worked, we don't check jobStarted here
reportFinalPart := func() { cca.isEnumerationComplete = true }
// note that the source and destination, along with the template are given to the generic processor's constructor
// this means that given an object with a relative path, this processor already knows how to schedule the right kind of transfers
return newCopyTransferProcessor(copyJobTemplate, numOfTransfersPerPart, cca.source, cca.destination,
reportFirstPart, reportFinalPart, cca.preserveAccessTier, cca.dryrunMode)
}
// base for delete processors targeting different resources
type interactiveDeleteProcessor struct {
// the plugged-in deleter that performs the actual deletion
deleter objectProcessor
// whether we should ask the user for permission the first time we delete a file
shouldPromptUser bool
// note down whether any delete should happen
shouldDelete bool
// used for prompt message
// examples: "blob", "local file", etc.
objectTypeToDisplay string
// used for prompt message
// examples: a directory path, or url to container
objectLocationToDisplay string
// count the deletions that happened
incrementDeletionCount func()
// dryrunMode
dryrunMode bool
}
func (d *interactiveDeleteProcessor) removeImmediately(object StoredObject) (err error) {
if d.shouldPromptUser {
d.shouldDelete, d.shouldPromptUser = d.promptForConfirmation(object) // note down the user's decision
}
if !d.shouldDelete {
return nil
}
if d.dryrunMode {
glcm.Dryrun(func(format common.OutputFormat) string {
if format == common.EOutputFormat.Json() {
deleteTarget := common.ELocation.Local()
if d.objectTypeToDisplay != LocalFileObjectType {
_ = deleteTarget.Parse(d.objectTypeToDisplay)
}
tx := DryrunTransfer{
Source: common.GenerateFullPath(d.objectLocationToDisplay, object.relativePath),
BlobType: common.FromBlobType(object.blobType),
EntityType: object.entityType,
FromTo: common.FromToValue(deleteTarget, common.ELocation.Unknown()),
}
jsonOutput, err := json.Marshal(tx)
common.PanicIfErr(err)
return string(jsonOutput)
} else { // remove for sync
return fmt.Sprintf("DRYRUN: remove %v",
common.GenerateFullPath(d.objectLocationToDisplay, object.relativePath))
}
})
return nil
}
err = d.deleter(object)
if err != nil {
msg := fmt.Sprintf("error %s deleting the object %s", err.Error(), object.relativePath)
glcm.Info(msg + "; check the scanning log file for more details")
if azcopyScanningLogger != nil {
azcopyScanningLogger.Log(common.LogError, msg+": "+err.Error())
}
}
if d.incrementDeletionCount != nil {
d.incrementDeletionCount()
}
return nil // Missing a file is an error, but it's not show-stopping. We logged it earlier; that's OK.
}
func (d *interactiveDeleteProcessor) promptForConfirmation(object StoredObject) (shouldDelete bool, keepPrompting bool) {
answer := glcm.Prompt(fmt.Sprintf("The %s '%s' does not exist at the source. "+
"Do you wish to delete it from the destination(%s)?",
d.objectTypeToDisplay, object.relativePath, d.objectLocationToDisplay),
common.PromptDetails{
PromptType: common.EPromptType.DeleteDestination(),
PromptTarget: object.relativePath,
ResponseOptions: []common.ResponseOption{
common.EResponseOption.Yes(),
common.EResponseOption.No(),
common.EResponseOption.YesForAll(),
common.EResponseOption.NoForAll()},
},
)
switch answer {
case common.EResponseOption.Yes():
// print nothing, since the deleter is expected to log the message when the delete happens
return true, true
case common.EResponseOption.YesForAll():
glcm.Info(fmt.Sprintf("Confirmed. All the extra %ss will be deleted.", d.objectTypeToDisplay))
return true, false
case common.EResponseOption.No():
glcm.Info(fmt.Sprintf("Keeping extra %s: %s", d.objectTypeToDisplay, object.relativePath))
return false, true
case common.EResponseOption.NoForAll():
glcm.Info("No deletions will happen from now onwards.")
return false, false
default:
glcm.Info(fmt.Sprintf("Unrecognizable answer, keeping extra %s: %s.", d.objectTypeToDisplay, object.relativePath))
return false, true
}
}
func newInteractiveDeleteProcessor(deleter objectProcessor, deleteDestination common.DeleteDestination,
objectTypeToDisplay string, objectLocationToDisplay common.ResourceString, incrementDeletionCounter func(), dryrun bool) *interactiveDeleteProcessor {
return &interactiveDeleteProcessor{
deleter: deleter,
objectTypeToDisplay: objectTypeToDisplay,
objectLocationToDisplay: objectLocationToDisplay.Value,
incrementDeletionCount: incrementDeletionCounter,
shouldPromptUser: deleteDestination == common.EDeleteDestination.Prompt(),
shouldDelete: deleteDestination == common.EDeleteDestination.True(), // if shouldPromptUser is true, this will start as false, but we will determine its value later
dryrunMode: dryrun,
}
}
const LocalFileObjectType = "local file"
func newSyncLocalDeleteProcessor(cca *cookedSyncCmdArgs, fpo common.FolderPropertyOption) *interactiveDeleteProcessor {
localDeleter := localFileDeleter{rootPath: cca.destination.ValueLocal(), fpo: fpo, folderManager: common.NewFolderDeletionManager(context.Background(), fpo, azcopyScanningLogger)}
return newInteractiveDeleteProcessor(localDeleter.deleteFile, cca.deleteDestination, LocalFileObjectType, cca.destination, cca.incrementDeletionCount, cca.dryrunMode)
}
type localFileDeleter struct {
rootPath string
fpo common.FolderPropertyOption
folderManager common.FolderDeletionManager
}
func (l *localFileDeleter) getObjectURL(object StoredObject) *url.URL {
return &url.URL{
Scheme: "local",
Path: "/" + strings.ReplaceAll(object.relativePath, "\\", "/"), // consolidate to forward slashes
}
}
func (l *localFileDeleter) deleteFile(object StoredObject) error {
objectURI := l.getObjectURL(object)
l.folderManager.RecordChildExists(objectURI)
if object.entityType == common.EEntityType.File() {
msg := "Deleting extra file: " + object.relativePath
glcm.Info(msg)
if azcopyScanningLogger != nil {
azcopyScanningLogger.Log(common.LogInfo, msg)
}
err := os.Remove(common.GenerateFullPath(l.rootPath, object.relativePath))
l.folderManager.RecordChildDeleted(objectURI)
return err
} else if object.entityType == common.EEntityType.Folder() && l.fpo != common.EFolderPropertiesOption.NoFolders() {
msg := "Deleting extra folder: " + object.relativePath
glcm.Info(msg)
if azcopyScanningLogger != nil {
azcopyScanningLogger.Log(common.LogInfo, msg)
}
l.folderManager.RequestDeletion(objectURI, func(ctx context.Context, logger common.ILogger) bool {
return os.Remove(common.GenerateFullPath(l.rootPath, object.relativePath)) == nil
})
}
return nil
}
func newSyncDeleteProcessor(cca *cookedSyncCmdArgs, fpo common.FolderPropertyOption, dstClient *common.ServiceClient) (*interactiveDeleteProcessor, error) {
rawURL, err := cca.destination.FullURL()
if err != nil {
return nil, err
}
ctx := context.WithValue(context.TODO(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
deleter, err := newRemoteResourceDeleter(ctx, dstClient, rawURL, cca.fromTo.To(), fpo, cca.forceIfReadOnly)
if err != nil {
return nil, err
}
return newInteractiveDeleteProcessor(deleter.delete, cca.deleteDestination, cca.fromTo.To().String(), cca.destination, cca.incrementDeletionCount, cca.dryrunMode), nil
}
type remoteResourceDeleter struct {
remoteClient *common.ServiceClient
containerName string // name of target container/share/filesystem
rootPath string
ctx context.Context
targetLocation common.Location
folderManager common.FolderDeletionManager
folderOption common.FolderPropertyOption
forceIfReadOnly bool
}
func newRemoteResourceDeleter(ctx context.Context, remoteClient *common.ServiceClient, rawRootURL *url.URL, targetLocation common.Location, fpo common.FolderPropertyOption, forceIfReadOnly bool) (*remoteResourceDeleter, error) {
containerName, rootPath, err := common.SplitContainerNameFromPath(rawRootURL.String())
if err != nil {
return nil, err
}
return &remoteResourceDeleter{
containerName: containerName,
rootPath: rootPath,
remoteClient: remoteClient,
ctx: ctx,
targetLocation: targetLocation,
folderManager: common.NewFolderDeletionManager(ctx, fpo, azcopyScanningLogger),
folderOption: fpo,
forceIfReadOnly: forceIfReadOnly,
}, nil
}
func (b *remoteResourceDeleter) getObjectURL(objectURL string) (*url.URL, error) {
u, err := url.Parse(objectURL)
if err != nil {
return nil, err
}
return u, nil
}
func (b *remoteResourceDeleter) delete(object StoredObject) error {
/* knarasim: This needs to be taken care of
if b.targetLocation == common.ELocation.BlobFS() && object.entityType == common.EEntityType.Folder() {
b.clientOptions.PerCallPolicies = append([]policy.Policy{common.NewRecursivePolicy()}, b.clientOptions.PerCallPolicies...)
}
*/
objectPath := path.Join(b.rootPath, object.relativePath)
if object.relativePath == "\x00" && b.targetLocation != common.ELocation.Blob() {
return nil // Do nothing, we don't want to accidentally delete the root.
} else if object.relativePath == "\x00" { // this is acceptable on blob, though. Dir stubs are a thing, and they aren't necessary for normal function.
objectPath = b.rootPath
}
if strings.HasSuffix(object.relativePath, "/") && !strings.HasSuffix(objectPath, "/") && b.targetLocation == common.ELocation.Blob() {
// If we were targeting a directory, we still need to be. path.join breaks that.
// We also want to defensively code around this, and make sure we are not putting folder// or trying to put a weird URI in to an endpoint that can't do this.
objectPath += "/"
}
sc := b.remoteClient
if object.entityType == common.EEntityType.File() {
// TODO: use b.targetLocation.String() in the next line, instead of "object", if we can make it come out as string
msg := "Deleting extra object: " + object.relativePath
glcm.Info(msg)
if azcopyScanningLogger != nil {
azcopyScanningLogger.Log(common.LogInfo, msg)
}
var err error
var objURL *url.URL
switch b.targetLocation {
case common.ELocation.Blob():
bsc, _ := sc.BlobServiceClient()
var blobClient *blob.Client = bsc.NewContainerClient(b.containerName).NewBlobClient(objectPath)
objURL, err = b.getObjectURL(blobClient.URL())
if err != nil {
break
}
b.folderManager.RecordChildExists(objURL)
defer b.folderManager.RecordChildDeleted(objURL)
_, err = blobClient.Delete(b.ctx, nil)
case common.ELocation.File():
fsc, _ := sc.FileServiceClient()
fileClient := fsc.NewShareClient(b.containerName).NewRootDirectoryClient().NewFileClient(objectPath)
objURL, err = b.getObjectURL(fileClient.URL())
if err != nil {
break
}
b.folderManager.RecordChildExists(objURL)
defer b.folderManager.RecordChildDeleted(objURL)
err = common.DoWithOverrideReadOnlyOnAzureFiles(b.ctx, func() (interface{}, error) {
return fileClient.Delete(b.ctx, nil)
}, fileClient, b.forceIfReadOnly)
case common.ELocation.BlobFS():
dsc, _ := sc.DatalakeServiceClient()
fileClient := dsc.NewFileSystemClient(b.containerName).NewFileClient(objectPath)
objURL, err = b.getObjectURL(fileClient.DFSURL())
if err != nil {
break
}
b.folderManager.RecordChildExists(objURL)
defer b.folderManager.RecordChildDeleted(objURL)
_, err = fileClient.Delete(b.ctx, nil)
default:
panic("not implemented, check your code")
}
if err != nil {
msg := fmt.Sprintf("error %s deleting the object %s", err.Error(), object.relativePath)
glcm.Info(msg + "; check the scanning log file for more details")
if azcopyScanningLogger != nil {
azcopyScanningLogger.Log(common.LogError, msg+": "+err.Error())
}
return err
}
return nil
} else {
if b.folderOption == common.EFolderPropertiesOption.NoFolders() {
return nil
}
var deleteFunc func(ctx context.Context, logger common.ILogger) bool
var objURL *url.URL
var err error
switch b.targetLocation {
case common.ELocation.Blob():
bsc, _ := sc.BlobServiceClient()
blobClient := bsc.NewContainerClient(b.containerName).NewBlobClient(objectPath)
// HNS endpoint doesn't like delete snapshots on a directory
objURL, err = b.getObjectURL(blobClient.URL())
if err != nil {
return err
}
deleteFunc = func(ctx context.Context, logger common.ILogger) bool {
_, err = blobClient.Delete(b.ctx, nil)
return (err == nil)
}
case common.ELocation.File():
fsc, _ := sc.FileServiceClient()
dirClient := fsc.NewShareClient(b.containerName).NewDirectoryClient(objectPath)
objURL, err = b.getObjectURL(dirClient.URL())
if err != nil {
return err
}
deleteFunc = func(ctx context.Context, logger common.ILogger) bool {
err = common.DoWithOverrideReadOnlyOnAzureFiles(b.ctx, func() (interface{}, error) {
return dirClient.Delete(b.ctx, nil)
}, dirClient, b.forceIfReadOnly)
return (err == nil)
}
case common.ELocation.BlobFS():
dsc, _ := sc.DatalakeServiceClient()
directoryClient := dsc.NewFileSystemClient(b.containerName).NewDirectoryClient(objectPath)
objURL, err = b.getObjectURL(directoryClient.DFSURL())
if err != nil {
return err
}
deleteFunc = func(ctx context.Context, logger common.ILogger) bool {
recursiveContext := common.WithRecursive(b.ctx, false)
_, err = directoryClient.Delete(recursiveContext, nil)
return (err == nil)
}
default:
panic("not implemented, check your code")
}
b.folderManager.RecordChildExists(objURL)
b.folderManager.RequestDeletion(objURL, deleteFunc)
return nil
}
}