cmd/removeEnumerator.go (242 lines of code) (raw):

// Copyright © 2017 Microsoft <wastore@microsoft.com> // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package cmd import ( "context" "encoding/json" "errors" "fmt" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/filesystem" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/service" "path" "strings" "time" "github.com/Azure/azure-storage-azcopy/v10/common" "github.com/Azure/azure-storage-azcopy/v10/jobsAdmin" "github.com/Azure/azure-storage-azcopy/v10/ste" ) var NothingToRemoveError = errors.New("nothing found to remove") // provide an enumerator that lists a given resource (Blob, File) // and schedule delete transfers to remove them // TODO: Make this merge into the other copy refactor code // TODO: initEnumerator is significantly more verbose at this point, evaluate the impact of switching over func newRemoveEnumerator(cca *CookedCopyCmdArgs) (enumerator *CopyEnumerator, err error) { var sourceTraverser ResourceTraverser ctx := context.WithValue(context.TODO(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion) // Include-path is handled by ListOfFilesChannel. sourceTraverser, err = InitResourceTraverser(cca.Source, cca.FromTo.From(), &ctx, &cca.credentialInfo, common.ESymlinkHandlingType.Skip(), cca.ListOfFilesChannel, cca.Recursive, true, cca.IncludeDirectoryStubs, cca.permanentDeleteOption, func(common.EntityType) {}, cca.ListOfVersionIDs, false, common.ESyncHashType.None(), common.EPreservePermissionsOption.None(), azcopyLogVerbosity, cca.CpkOptions, nil, cca.StripTopDir, cca.trailingDot, nil, cca.excludeContainer, false) // report failure to create traverser if err != nil { return nil, err } includeFilters := buildIncludeFilters(cca.IncludePatterns) excludeFilters := buildExcludeFilters(cca.ExcludePatterns, false) excludePathFilters := buildExcludeFilters(cca.ExcludePathPatterns, true) includeSoftDelete := buildIncludeSoftDeleted(cca.permanentDeleteOption) // set up the filters in the right order filters := append(includeFilters, excludeFilters...) filters = append(filters, excludePathFilters...) filters = append(filters, includeSoftDelete...) if cca.IncludeBefore != nil { filters = append(filters, &IncludeBeforeDateFilter{Threshold: *cca.IncludeBefore}) } if cca.IncludeAfter != nil { filters = append(filters, &IncludeAfterDateFilter{Threshold: *cca.IncludeAfter}) } // decide our folder transfer strategy // (Must enumerate folders when deleting from a folder-aware location. Can't do folder deletion just based on file // deletion, because that would not handle folders that were empty at the start of the job). // isHNStoHNS is IGNORED here, because BlobFS locations don't take this route currently. fpo, message := NewFolderPropertyOption(cca.FromTo, cca.Recursive, cca.StripTopDir, filters, false, false, false, false, cca.IncludeDirectoryStubs) // do not print Info message if in dry run mode if !cca.dryrunMode { glcm.Info(message) } if jobsAdmin.JobsAdmin != nil { jobsAdmin.JobsAdmin.LogToJobLog(message, common.LogInfo) } from := cca.FromTo.From() if !from.SupportsTrailingDot() { cca.trailingDot = common.ETrailingDotOption.Disable() } var reauthTok *common.ScopedAuthenticator if at, ok := cca.credentialInfo.OAuthTokenInfo.TokenCredential.(common.AuthenticateToken); ok { // We don't need two different tokens here since it gets passed in just the same either way. // This will cause a reauth with StorageScope, which is fine, that's the original Authenticate call as it stands. reauthTok = (*common.ScopedAuthenticator)(common.NewScopedCredential(at, common.ECredentialType.OAuthToken())) } options := createClientOptions(common.AzcopyCurrentJobLogger, nil, reauthTok) var fileClientOptions any if cca.FromTo.From() == common.ELocation.File() { fileClientOptions = &common.FileClientOptions{AllowTrailingDot: cca.trailingDot.IsEnabled()} } targetServiceClient, err := common.GetServiceClientForLocation( cca.FromTo.From(), cca.Source, cca.credentialInfo.CredentialType, cca.credentialInfo.OAuthTokenInfo.TokenCredential, &options, fileClientOptions, ) if err != nil { return nil, err } transferScheduler := newRemoveTransferProcessor(cca, NumOfFilesPerDispatchJobPart, fpo, targetServiceClient) finalize := func() error { jobInitiated, err := transferScheduler.dispatchFinalPart() if err != nil { if cca.dryrunMode { return nil } else if err == NothingScheduledError { // No log file needed. Logging begins as a part of awaiting job completion. return NothingToRemoveError } return err } // TODO: this appears to be obsolete due to the above err == NothingScheduledError. Review/discuss. if !jobInitiated { if cca.isCleanupJob { glcm.Error("Cleanup completed (nothing needed to be deleted)") } else { glcm.Error("Nothing to delete. Please verify that recursive flag is set properly if targeting a directory.") } } return nil } return NewCopyEnumerator(sourceTraverser, filters, transferScheduler.scheduleCopyTransfer, finalize), nil } // TODO move after ADLS/Blob interop goes public // TODO this simple remove command is only here to support the scenario temporarily // Ultimately, this code can be merged into the newRemoveEnumerator func removeBfsResources(cca *CookedCopyCmdArgs) (err error) { ctx := context.WithValue(context.Background(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion) sourceURL, _ := cca.Source.String() var reauthTok *common.ScopedAuthenticator if at, ok := cca.credentialInfo.OAuthTokenInfo.TokenCredential.(common.AuthenticateToken); ok { // We don't need two different tokens here since it gets passed in just the same either way. // This will cause a reauth with StorageScope, which is fine, that's the original Authenticate call as it stands. reauthTok = (*common.ScopedAuthenticator)(common.NewScopedCredential(at, common.ECredentialType.OAuthToken())) } options := createClientOptions(common.AzcopyCurrentJobLogger, nil, reauthTok) targetServiceClient, err := common.GetServiceClientForLocation(cca.FromTo.From(), cca.Source, cca.credentialInfo.CredentialType, cca.credentialInfo.OAuthTokenInfo.TokenCredential, &options, nil) if err != nil { return err } dsc, _ := targetServiceClient.DatalakeServiceClient() // We've just created client above, need not verify error here. transferProcessor := newRemoveTransferProcessor(cca, NumOfFilesPerDispatchJobPart, common.EFolderPropertiesOption.AllFolders(), targetServiceClient) // return an error if the unsupported options are passed in if len(cca.InitModularFilters()) > 0 { return errors.New("filter options, such as include/exclude, are not supported for this destination") // because we just ignore them and delete the root } // patterns are not supported if strings.Contains(cca.Source.Value, "*") { return errors.New("pattern matches are not supported in this command") } // parse the given source URL into parts, which separates the filesystem name and directory/file path datalakeURLParts, err := azdatalake.ParseURL(sourceURL) if err != nil { return err } if cca.ListOfFilesChannel == nil { if cca.dryrunMode { return dryrunRemoveSingleDFSResource(ctx, dsc, datalakeURLParts, cca.Recursive) } else { err := transferProcessor.scheduleCopyTransfer(newStoredObject( nil, path.Base(datalakeURLParts.PathName), "", common.EEntityType.File(), // blobfs deleter doesn't differentiate time.Now(), 0, noContentProps, noContentProps, nil, "", )) if err != nil { return err } } } else { // list of files is given, record the parent path parentPath := datalakeURLParts.PathName // read from the list of files channel to find out what needs to be deleted. childPath, ok := <-cca.ListOfFilesChannel for ; ok; childPath, ok = <-cca.ListOfFilesChannel { //remove the child path datalakeURLParts.PathName = common.GenerateFullPath(parentPath, childPath) if cca.dryrunMode { return dryrunRemoveSingleDFSResource(ctx, dsc, datalakeURLParts, cca.Recursive) } else { err := transferProcessor.scheduleCopyTransfer(newStoredObject( nil, path.Base(datalakeURLParts.PathName), childPath, common.EEntityType.File(), // blobfs deleter doesn't differentiate time.Now(), 0, noContentProps, noContentProps, nil, "", )) if err != nil { return err } } } } _, err = transferProcessor.dispatchFinalPart() return err } func dryrunRemoveSingleDFSResource(ctx context.Context, dsc *service.Client, datalakeURLParts azdatalake.URLParts, recursive bool) error { //deleting a filesystem if datalakeURLParts.PathName == "" { glcm.Dryrun(func(of common.OutputFormat) string { switch of { case of.Text(): return fmt.Sprintf("DRYRUN: remove %s", dsc.NewFileSystemClient(datalakeURLParts.FileSystemName).DFSURL()) case of.Json(): tx := DryrunTransfer{ EntityType: common.EEntityType.Folder(), FromTo: common.EFromTo.BlobFSTrash(), Source: dsc.NewFileSystemClient(datalakeURLParts.FileSystemName).DFSURL(), } buf, _ := json.Marshal(tx) return string(buf) default: panic("unsupported output format " + of.String()) } }) return nil } // we do not know if the source is a file or a directory // we assume it is a directory and get its properties directoryClient := dsc.NewFileSystemClient(datalakeURLParts.FileSystemName).NewDirectoryClient(datalakeURLParts.PathName) props, err := directoryClient.GetProperties(ctx, nil) if err != nil { return fmt.Errorf("cannot verify resource due to error: %s", err) } // if the source URL is actually a file // then we should short-circuit and simply remove that file resourceType := common.IffNotNil(props.ResourceType, "") if strings.EqualFold(resourceType, "file") { glcm.Dryrun(func(of common.OutputFormat) string { switch of { case of.Text(): return fmt.Sprintf("DRYRUN: remove %s", directoryClient.DFSURL()) case of.Json(): tx := DryrunTransfer{ EntityType: common.EEntityType.File(), FromTo: common.EFromTo.BlobFSTrash(), Source: directoryClient.DFSURL(), } buf, _ := json.Marshal(tx) return string(buf) default: panic("unsupported output format " + of.String()) } }) return nil } pathName := datalakeURLParts.PathName datalakeURLParts.PathName = "" pager := dsc.NewFileSystemClient(datalakeURLParts.FileSystemName).NewListPathsPager(recursive, &filesystem.ListPathsOptions{Prefix: &pathName}) for pager.More() { resp, err := pager.NextPage(ctx) if err != nil { return err } for _, v := range resp.Paths { entityType := "directory" if v.IsDirectory == nil || !*v.IsDirectory { entityType = "file" } glcm.Dryrun(func(of common.OutputFormat) string { uri := dsc.NewFileSystemClient(datalakeURLParts.FileSystemName).NewFileClient(*v.Name).DFSURL() switch of { case of.Text(): return fmt.Sprintf("DRYRUN: remove %s", uri) case of.Json(): tx := DryrunTransfer{ EntityType: common.Iff(entityType == "directory", common.EEntityType.Folder(), common.EEntityType.File()), FromTo: common.EFromTo.BlobFSTrash(), Source: uri, } buf, _ := json.Marshal(tx) return string(buf) default: panic("unsupported output format " + of.String()) } }) } } return nil }