in cmd/zc_traverser_file.go [136:417]
func (t *fileTraverser) Traverse(preprocessor objectMorpher, processor objectProcessor, filters []ObjectFilter) (err error) {
invalidBlobOrWindowsName := func(path string) bool {
if t.destination != nil {
if t.trailingDot == common.ETrailingDotOption.AllowToUnsafeDestination() && (*t.destination != common.ELocation.Blob() || *t.destination != common.ELocation.BlobFS()) { // Allow only Local, Trailing dot files not supported in Blob
return false // Please let me shoot myself in the foot!
}
if (t.destination.IsLocal() && runtime.GOOS == "windows") || *t.destination == common.ELocation.Blob() || *t.destination == common.ELocation.BlobFS() {
/* Blob or Windows object name is invalid if it ends with period or
one of (virtual) directories in path ends with period.
This list is not exhaustive
*/
return strings.HasSuffix(path, ".") ||
strings.Contains(path, "./")
}
}
return false
}
targetURLParts, err := file.ParseURL(t.rawURL)
if err != nil {
return err
}
// We stop remove operations if file/dir name is only dots
checkAllDots := func(path string) bool {
return strings.Trim(path, ".") == ""
}
// if not pointing to a share, check if we are pointing to a single file
if targetURLParts.DirectoryOrFilePath != "" {
if invalidBlobOrWindowsName(targetURLParts.DirectoryOrFilePath) {
WarnStdoutAndScanningLog(fmt.Sprintf(invalidNameErrorMsg, targetURLParts.DirectoryOrFilePath))
return common.EAzError.InvalidBlobOrWindowsName()
}
if !t.trailingDot.IsEnabled() && strings.HasSuffix(targetURLParts.DirectoryOrFilePath, ".") {
azcopyScanningLogger.Log(common.LogWarning, fmt.Sprintf(trailingDotErrMsg, getObjectNameOnly(targetURLParts.DirectoryOrFilePath)))
}
// Abort remove operation for files with only dots. i.e a file named "dir/..." with trailing dot flag Disabled.
// The dot is stripped and the file is seen as a directory; incorrectly removing all other files within the parent dir/
// with Disable, "..." is seen as "dir/..." folder and other child files of dir would be wrongly deleted.
if !t.trailingDot.IsEnabled() && checkAllDots(getObjectNameOnly(targetURLParts.DirectoryOrFilePath)) {
glcm.Error(fmt.Sprintf(allDotsErrorMsg, getObjectNameOnly(targetURLParts.DirectoryOrFilePath)))
}
// check if the url points to a single file
fileProperties, isFile, err := t.getPropertiesIfSingleFile()
if err != nil {
return err
}
if isFile {
if azcopyScanningLogger != nil {
azcopyScanningLogger.Log(common.LogDebug, "Detected the root as a file.")
}
storedObject := newStoredObject(
preprocessor,
getObjectNameOnly(targetURLParts.DirectoryOrFilePath),
"",
common.EEntityType.File(),
*fileProperties.LastModified,
*fileProperties.ContentLength,
shareFilePropertiesAdapter{fileProperties},
noBlobProps,
fileProperties.Metadata,
targetURLParts.ShareName,
)
storedObject.smbLastModifiedTime = *fileProperties.FileLastWriteTime
if t.incrementEnumerationCounter != nil {
t.incrementEnumerationCounter(common.EEntityType.File())
}
err := processIfPassedFilters(filters, storedObject, processor)
_, err = getProcessingError(err)
return err
}
}
// else, its not just one file
// This func must be threadsafe/goroutine safe
convertToStoredObject := func(input parallel.InputObject) (parallel.OutputObject, error) {
f := input.(azfileEntity)
// compute the relative path of the file with respect to the target directory
fileURLParts, err := file.ParseURL(f.url)
if err != nil {
return nil, err
}
targetPath := strings.TrimSuffix(targetURLParts.DirectoryOrFilePath, common.AZCOPY_PATH_SEPARATOR_STRING)
relativePath := strings.TrimPrefix(fileURLParts.DirectoryOrFilePath, targetPath)
relativePath = strings.TrimPrefix(relativePath, common.AZCOPY_PATH_SEPARATOR_STRING)
size := f.contentLength
// We need to omit some properties if we don't get properties
var lmt time.Time
var smbLMT time.Time
var contentProps contentPropsProvider = noContentProps
var metadata common.Metadata
// Only get the properties if we're told to
if t.getProperties {
var fullProperties filePropsProvider
fullProperties, err = f.propertyGetter(t.ctx)
if err != nil {
return StoredObject{
relativePath: relativePath,
}, err
}
lmt = fullProperties.LastModified()
smbLMT = fullProperties.FileLastWriteTime()
contentProps = fullProperties
// Get an up-to-date size, because it's documented that the size returned by the listing might not be up-to-date,
// if an SMB client has modified by not yet closed the file. (See https://docs.microsoft.com/en-us/rest/api/storageservices/list-directories-and-files)
// Doing this here makes sure that our size is just as up-to-date as our LMT .
// (If s2s-detect-source-changed is false, then this code won't run. If if its false, we don't check for modifications anyway,
// so it's fair to assume that the size will stay equal to that returned at by the listing operation)
size = fullProperties.ContentLength()
metadata = fullProperties.Metadata()
}
obj := newStoredObject(
preprocessor,
getObjectNameOnly(f.name),
relativePath,
f.entityType,
lmt,
size,
contentProps,
noBlobProps,
metadata,
targetURLParts.ShareName,
)
obj.smbLastModifiedTime = smbLMT
return obj, nil
}
processStoredObject := func(s StoredObject) error {
if t.incrementEnumerationCounter != nil {
t.incrementEnumerationCounter(s.entityType)
}
err := processIfPassedFilters(filters, s, processor)
_, err = getProcessingError(err)
return err
}
// get the directory URL so that we can list the files
directoryClient, err := createDirectoryClientFromServiceClient(targetURLParts, t.serviceClient)
if err != nil {
return err
}
// Our rule is that enumerators of folder-aware sources should include the root folder's properties.
// So include the root dir/share in the enumeration results, if it exists or is just the share root.
_, err = directoryClient.GetProperties(t.ctx, nil)
if err == nil || targetURLParts.DirectoryOrFilePath == "" {
s, err := convertToStoredObject(newAzFileRootDirectoryEntity(directoryClient, ""))
if err != nil {
return err
}
err = processStoredObject(s.(StoredObject))
if err != nil {
return err
}
}
// Define how to enumerate its contents
// This func must be threadsafe/goroutine safe
enumerateOneDir := func(dir parallel.Directory, enqueueDir func(parallel.Directory), enqueueOutput func(parallel.DirectoryEntry, error)) error {
currentDirectoryClient := dir.(*directory.Client)
pager := currentDirectoryClient.NewListFilesAndDirectoriesPager(nil)
var marker *string
for pager.More() {
lResp, err := pager.NextPage(t.ctx)
if err != nil {
return fmt.Errorf("cannot list files due to reason %w", err)
}
for _, fileInfo := range lResp.Segment.Files {
if invalidBlobOrWindowsName(*fileInfo.Name) {
//Throw a warning on console and continue
WarnStdoutAndScanningLog(fmt.Sprintf(invalidNameErrorMsg, *fileInfo.Name))
continue
} else {
if !t.trailingDot.IsEnabled() && strings.HasSuffix(*fileInfo.Name, ".") {
azcopyScanningLogger.Log(common.LogWarning, fmt.Sprintf(trailingDotErrMsg, *fileInfo.Name))
}
if !t.trailingDot.IsEnabled() && checkAllDots(*fileInfo.Name) {
glcm.Error(fmt.Sprintf(allDotsErrorMsg, *fileInfo.Name))
}
}
enqueueOutput(newAzFileFileEntity(currentDirectoryClient, fileInfo), nil)
}
for _, dirInfo := range lResp.Segment.Directories {
if invalidBlobOrWindowsName(*dirInfo.Name) {
//Throw a warning on console and continue
WarnStdoutAndScanningLog(fmt.Sprintf(invalidNameErrorMsg, *dirInfo.Name))
continue
} else {
if !t.trailingDot.IsEnabled() && strings.HasSuffix(*dirInfo.Name, ".") {
azcopyScanningLogger.Log(common.LogWarning, fmt.Sprintf(trailingDotErrMsg, *dirInfo.Name))
}
}
enqueueOutput(newAzFileSubdirectoryEntity(currentDirectoryClient, *dirInfo.Name), nil)
if t.recursive {
// If recursive is turned on, add sub directories to be processed
enqueueDir(currentDirectoryClient.NewSubdirectoryClient(*dirInfo.Name))
}
}
// if debug mode is on, note down the result, this is not going to be fast
if azcopyScanningLogger != nil && azcopyScanningLogger.ShouldLog(common.LogDebug) {
tokenValue := "NONE"
if marker != nil {
tokenValue = *marker
}
var dirListBuilder strings.Builder
for _, dir := range lResp.Segment.Directories {
fmt.Fprintf(&dirListBuilder, " %s,", *dir.Name)
}
var fileListBuilder strings.Builder
for _, fileInfo := range lResp.Segment.Files {
fmt.Fprintf(&fileListBuilder, " %s,", *fileInfo.Name)
}
fileURLParts, err := file.ParseURL(currentDirectoryClient.URL())
if err != nil {
return err
}
directoryName := fileURLParts.DirectoryOrFilePath
msg := fmt.Sprintf("Enumerating %s with token %s. Sub-dirs:%s Files:%s", directoryName,
tokenValue, dirListBuilder.String(), fileListBuilder.String())
azcopyScanningLogger.Log(common.LogDebug, msg)
}
marker = lResp.NextMarker
}
return nil
}
// run the actual enumeration.
// First part is a parallel directory crawl
// Second part is parallel conversion of the directories and files to stored objects. This is necessary because the conversion to stored object may hit the network and therefore be slow if not parallelized
parallelism := EnumerationParallelism // for Azure Files we'll run two pools of this size, one for crawl and one for transform
workerContext, cancelWorkers := context.WithCancel(t.ctx)
cCrawled := parallel.Crawl(workerContext, directoryClient, enumerateOneDir, parallelism)
cTransformed := parallel.Transform(workerContext, cCrawled, convertToStoredObject, parallelism)
for x := range cTransformed {
item, workerError := x.Item()
if workerError != nil {
relativePath := ""
if item != nil {
relativePath = item.(StoredObject).relativePath
}
if !t.trailingDot.IsEnabled() && checkAllDots(relativePath) {
glcm.Error(fmt.Sprintf(allDotsErrorMsg, relativePath))
}
glcm.Info("Failed to scan Directory/File " + relativePath + ". Logging errors in scanning logs.")
if azcopyScanningLogger != nil {
azcopyScanningLogger.Log(common.LogWarning, workerError.Error())
}
continue
}
processErr := processStoredObject(item.(StoredObject))
if processErr != nil {
cancelWorkers()
return processErr
}
}
cancelWorkers()
return
}