func()

in cmd/zc_traverser_file.go [136:417]


func (t *fileTraverser) Traverse(preprocessor objectMorpher, processor objectProcessor, filters []ObjectFilter) (err error) {
	invalidBlobOrWindowsName := func(path string) bool {
		if t.destination != nil {
			if t.trailingDot == common.ETrailingDotOption.AllowToUnsafeDestination() && (*t.destination != common.ELocation.Blob() || *t.destination != common.ELocation.BlobFS()) { // Allow only Local, Trailing dot files not supported in Blob
				return false // Please let me shoot myself in the foot!
			}

			if (t.destination.IsLocal() && runtime.GOOS == "windows") || *t.destination == common.ELocation.Blob() || *t.destination == common.ELocation.BlobFS() {
				/* Blob or Windows object name is invalid if it ends with period or
				   one of (virtual) directories in path ends with period.
				   This list is not exhaustive
				*/
				return strings.HasSuffix(path, ".") ||
					strings.Contains(path, "./")
			}
		}
		return false
	}
	targetURLParts, err := file.ParseURL(t.rawURL)
	if err != nil {
		return err
	}

	// We stop remove operations if file/dir name is only dots
	checkAllDots := func(path string) bool {
		return strings.Trim(path, ".") == ""
	}
	// if not pointing to a share, check if we are pointing to a single file
	if targetURLParts.DirectoryOrFilePath != "" {
		if invalidBlobOrWindowsName(targetURLParts.DirectoryOrFilePath) {
			WarnStdoutAndScanningLog(fmt.Sprintf(invalidNameErrorMsg, targetURLParts.DirectoryOrFilePath))
			return common.EAzError.InvalidBlobOrWindowsName()
		}
		if !t.trailingDot.IsEnabled() && strings.HasSuffix(targetURLParts.DirectoryOrFilePath, ".") {
			azcopyScanningLogger.Log(common.LogWarning, fmt.Sprintf(trailingDotErrMsg, getObjectNameOnly(targetURLParts.DirectoryOrFilePath)))
		}

		// Abort remove operation for files with only dots. i.e  a file named "dir/..." with trailing dot flag Disabled.
		// The dot is stripped and the file is seen as a directory; incorrectly removing all other files within the parent dir/
		// with Disable, "..." is seen as "dir/..." folder and other child files of dir would be wrongly deleted.
		if !t.trailingDot.IsEnabled() && checkAllDots(getObjectNameOnly(targetURLParts.DirectoryOrFilePath)) {
			glcm.Error(fmt.Sprintf(allDotsErrorMsg, getObjectNameOnly(targetURLParts.DirectoryOrFilePath)))

		}

		// check if the url points to a single file
		fileProperties, isFile, err := t.getPropertiesIfSingleFile()
		if err != nil {
			return err
		}
		if isFile {
			if azcopyScanningLogger != nil {
				azcopyScanningLogger.Log(common.LogDebug, "Detected the root as a file.")
			}

			storedObject := newStoredObject(
				preprocessor,
				getObjectNameOnly(targetURLParts.DirectoryOrFilePath),
				"",
				common.EEntityType.File(),
				*fileProperties.LastModified,
				*fileProperties.ContentLength,
				shareFilePropertiesAdapter{fileProperties},
				noBlobProps,
				fileProperties.Metadata,
				targetURLParts.ShareName,
			)

			storedObject.smbLastModifiedTime = *fileProperties.FileLastWriteTime

			if t.incrementEnumerationCounter != nil {
				t.incrementEnumerationCounter(common.EEntityType.File())
			}
			err := processIfPassedFilters(filters, storedObject, processor)
			_, err = getProcessingError(err)

			return err
		}
	}

	// else, its not just one file

	// This func must be threadsafe/goroutine safe
	convertToStoredObject := func(input parallel.InputObject) (parallel.OutputObject, error) {
		f := input.(azfileEntity)
		// compute the relative path of the file with respect to the target directory
		fileURLParts, err := file.ParseURL(f.url)
		if err != nil {
			return nil, err
		}
		targetPath := strings.TrimSuffix(targetURLParts.DirectoryOrFilePath, common.AZCOPY_PATH_SEPARATOR_STRING)
		relativePath := strings.TrimPrefix(fileURLParts.DirectoryOrFilePath, targetPath)
		relativePath = strings.TrimPrefix(relativePath, common.AZCOPY_PATH_SEPARATOR_STRING)

		size := f.contentLength
		// We need to omit some properties if we don't get properties
		var lmt time.Time
		var smbLMT time.Time
		var contentProps contentPropsProvider = noContentProps
		var metadata common.Metadata

		// Only get the properties if we're told to
		if t.getProperties {
			var fullProperties filePropsProvider
			fullProperties, err = f.propertyGetter(t.ctx)
			if err != nil {
				return StoredObject{
					relativePath: relativePath,
				}, err
			}
			lmt = fullProperties.LastModified()
			smbLMT = fullProperties.FileLastWriteTime()
			contentProps = fullProperties
			// Get an up-to-date size, because it's documented that the size returned by the listing might not be up-to-date,
			// if an SMB client has modified by not yet closed the file. (See https://docs.microsoft.com/en-us/rest/api/storageservices/list-directories-and-files)
			// Doing this here makes sure that our size is just as up-to-date as our LMT .
			// (If s2s-detect-source-changed is false, then this code won't run.  If if its false, we don't check for modifications anyway,
			// so it's fair to assume that the size will stay equal to that returned at by the listing operation)
			size = fullProperties.ContentLength()
			metadata = fullProperties.Metadata()
		}
		obj := newStoredObject(
			preprocessor,
			getObjectNameOnly(f.name),
			relativePath,
			f.entityType,
			lmt,
			size,
			contentProps,
			noBlobProps,
			metadata,
			targetURLParts.ShareName,
		)

		obj.smbLastModifiedTime = smbLMT

		return obj, nil
	}

	processStoredObject := func(s StoredObject) error {
		if t.incrementEnumerationCounter != nil {
			t.incrementEnumerationCounter(s.entityType)
		}
		err := processIfPassedFilters(filters, s, processor)
		_, err = getProcessingError(err)
		return err
	}

	// get the directory URL so that we can list the files
	directoryClient, err := createDirectoryClientFromServiceClient(targetURLParts, t.serviceClient)
	if err != nil {
		return err
	}

	// Our rule is that enumerators of folder-aware sources should include the root folder's properties.
	// So include the root dir/share in the enumeration results, if it exists or is just the share root.
	_, err = directoryClient.GetProperties(t.ctx, nil)
	if err == nil || targetURLParts.DirectoryOrFilePath == "" {
		s, err := convertToStoredObject(newAzFileRootDirectoryEntity(directoryClient, ""))
		if err != nil {
			return err
		}
		err = processStoredObject(s.(StoredObject))
		if err != nil {
			return err
		}
	}

	// Define how to enumerate its contents
	// This func must be threadsafe/goroutine safe
	enumerateOneDir := func(dir parallel.Directory, enqueueDir func(parallel.Directory), enqueueOutput func(parallel.DirectoryEntry, error)) error {
		currentDirectoryClient := dir.(*directory.Client)
		pager := currentDirectoryClient.NewListFilesAndDirectoriesPager(nil)
		var marker *string
		for pager.More() {
			lResp, err := pager.NextPage(t.ctx)
			if err != nil {
				return fmt.Errorf("cannot list files due to reason %w", err)
			}
			for _, fileInfo := range lResp.Segment.Files {
				if invalidBlobOrWindowsName(*fileInfo.Name) {
					//Throw a warning on console and continue
					WarnStdoutAndScanningLog(fmt.Sprintf(invalidNameErrorMsg, *fileInfo.Name))
					continue
				} else {
					if !t.trailingDot.IsEnabled() && strings.HasSuffix(*fileInfo.Name, ".") {
						azcopyScanningLogger.Log(common.LogWarning, fmt.Sprintf(trailingDotErrMsg, *fileInfo.Name))
					}
					if !t.trailingDot.IsEnabled() && checkAllDots(*fileInfo.Name) {
						glcm.Error(fmt.Sprintf(allDotsErrorMsg, *fileInfo.Name))
					}
				}
				enqueueOutput(newAzFileFileEntity(currentDirectoryClient, fileInfo), nil)

			}
			for _, dirInfo := range lResp.Segment.Directories {
				if invalidBlobOrWindowsName(*dirInfo.Name) {
					//Throw a warning on console and continue
					WarnStdoutAndScanningLog(fmt.Sprintf(invalidNameErrorMsg, *dirInfo.Name))
					continue
				} else {
					if !t.trailingDot.IsEnabled() && strings.HasSuffix(*dirInfo.Name, ".") {
						azcopyScanningLogger.Log(common.LogWarning, fmt.Sprintf(trailingDotErrMsg, *dirInfo.Name))
					}
				}
				enqueueOutput(newAzFileSubdirectoryEntity(currentDirectoryClient, *dirInfo.Name), nil)
				if t.recursive {
					// If recursive is turned on, add sub directories to be processed
					enqueueDir(currentDirectoryClient.NewSubdirectoryClient(*dirInfo.Name))
				}

			}

			// if debug mode is on, note down the result, this is not going to be fast
			if azcopyScanningLogger != nil && azcopyScanningLogger.ShouldLog(common.LogDebug) {
				tokenValue := "NONE"
				if marker != nil {
					tokenValue = *marker
				}

				var dirListBuilder strings.Builder
				for _, dir := range lResp.Segment.Directories {
					fmt.Fprintf(&dirListBuilder, " %s,", *dir.Name)
				}
				var fileListBuilder strings.Builder
				for _, fileInfo := range lResp.Segment.Files {
					fmt.Fprintf(&fileListBuilder, " %s,", *fileInfo.Name)
				}

				fileURLParts, err := file.ParseURL(currentDirectoryClient.URL())
				if err != nil {
					return err
				}
				directoryName := fileURLParts.DirectoryOrFilePath
				msg := fmt.Sprintf("Enumerating %s with token %s. Sub-dirs:%s Files:%s", directoryName,
					tokenValue, dirListBuilder.String(), fileListBuilder.String())
				azcopyScanningLogger.Log(common.LogDebug, msg)
			}

			marker = lResp.NextMarker
		}
		return nil
	}

	// run the actual enumeration.
	// First part is a parallel directory crawl
	// Second part is parallel conversion of the directories and files to stored objects. This is necessary because the conversion to stored object may hit the network and therefore be slow if not parallelized
	parallelism := EnumerationParallelism // for Azure Files we'll run two pools of this size, one for crawl and one for transform

	workerContext, cancelWorkers := context.WithCancel(t.ctx)

	cCrawled := parallel.Crawl(workerContext, directoryClient, enumerateOneDir, parallelism)

	cTransformed := parallel.Transform(workerContext, cCrawled, convertToStoredObject, parallelism)

	for x := range cTransformed {
		item, workerError := x.Item()
		if workerError != nil {
			relativePath := ""
			if item != nil {
				relativePath = item.(StoredObject).relativePath
			}
			if !t.trailingDot.IsEnabled() && checkAllDots(relativePath) {
				glcm.Error(fmt.Sprintf(allDotsErrorMsg, relativePath))
			}
			glcm.Info("Failed to scan Directory/File " + relativePath + ". Logging errors in scanning logs.")

			if azcopyScanningLogger != nil {
				azcopyScanningLogger.Log(common.LogWarning, workerError.Error())
			}
			continue
		}
		processErr := processStoredObject(item.(StoredObject))
		if processErr != nil {
			cancelWorkers()
			return processErr
		}
	}

	cancelWorkers()
	return
}