func()

in cmd/zc_traverser_blob.go [341:498]


func (t *blobTraverser) parallelList(containerClient *container.Client, containerName string, searchPrefix string,
	extraSearchPrefix string, preprocessor objectMorpher, processor objectProcessor, filters []ObjectFilter) error {
	// Define how to enumerate its contents
	// This func must be thread safe/goroutine safe
	enumerateOneDir := func(dir parallel.Directory, enqueueDir func(parallel.Directory), enqueueOutput func(parallel.DirectoryEntry, error)) error {
		currentDirPath := dir.(string)

		pager := containerClient.NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{
			Prefix:  &currentDirPath,
			Include: container.ListBlobsInclude{Metadata: true, Tags: t.s2sPreserveSourceTags, Deleted: t.includeDeleted, Snapshots: t.includeSnapshot, Versions: t.includeVersion},
		})
		var marker *string
		for pager.More() {
			lResp, err := pager.NextPage(t.ctx)
			if err != nil {
				return fmt.Errorf("cannot list files due to reason %s", err)
			}
			// queue up the sub virtual directories if recursive is true
			if t.recursive {
				for _, virtualDir := range lResp.Segment.BlobPrefixes {
					enqueueDir(*virtualDir.Name)
					if azcopyScanningLogger != nil {
						azcopyScanningLogger.Log(common.LogDebug, fmt.Sprintf("Enqueuing sub-directory %s for enumeration.", *virtualDir.Name))
					}

					if t.includeDirectoryStubs {
						// try to get properties on the directory itself, since it's not listed in BlobItems
						dName := strings.TrimSuffix(*virtualDir.Name, common.AZCOPY_PATH_SEPARATOR_STRING)
						blobClient := containerClient.NewBlobClient(dName)
					altNameCheck:
						pResp, err := blobClient.GetProperties(t.ctx, nil)
						if err == nil {
							if !t.doesBlobRepresentAFolder(pResp.Metadata) { // We've picked up on a file *named* the folder, not the folder itself. Does folder/ exist?
								if !strings.HasSuffix(dName, "/") {
									blobClient = containerClient.NewBlobClient(dName + common.AZCOPY_PATH_SEPARATOR_STRING) // Tack on the path separator, check.
									dName += common.AZCOPY_PATH_SEPARATOR_STRING
									goto altNameCheck // "foo" is a file, what about "foo/"?
								}

								goto skipDirAdd // We shouldn't add a blob that isn't a folder as a folder. You either have the folder metadata, or you don't.
							}

							pbPropAdapter := blobPropertiesResponseAdapter{&pResp}
							folderRelativePath := strings.TrimPrefix(dName, searchPrefix)

							storedObject := newStoredObject(
								preprocessor,
								getObjectNameOnly(dName),
								folderRelativePath,
								common.EEntityType.Folder(),
								pbPropAdapter.LastModified(),
								pbPropAdapter.ContentLength(),
								pbPropAdapter,
								pbPropAdapter,
								pbPropAdapter.Metadata,
								containerName,
							)

							if t.s2sPreserveSourceTags {
								tResp, err := blobClient.GetTags(t.ctx, nil)

								if err == nil {
									blobTagsMap := common.BlobTags{}
									for _, blobTag := range tResp.BlobTagSet {
										blobTagsMap[url.QueryEscape(*blobTag.Key)] = url.QueryEscape(*blobTag.Value)
									}
									storedObject.blobTags = blobTagsMap
								}
							}

							enqueueOutput(storedObject, err)
						} else {
							// There was nothing there, but is there folder/?
							if !strings.HasSuffix(dName, "/") {
								blobClient = containerClient.NewBlobClient(dName + common.AZCOPY_PATH_SEPARATOR_STRING) // Tack on the path separator, check.
								dName += common.AZCOPY_PATH_SEPARATOR_STRING
								goto altNameCheck // "foo" is a file, what about "foo/"?
							}
						}
					skipDirAdd:
					}
				}
			}

			// process the blobs returned in this result segment
			for _, blobInfo := range lResp.Segment.BlobItems {
				// if the blob represents a hdi folder, then skip it
				if t.doesBlobRepresentAFolder(blobInfo.Metadata) {
					continue
				}

				storedObject := t.createStoredObjectForBlob(preprocessor, blobInfo, strings.TrimPrefix(*blobInfo.Name, searchPrefix), containerName)

				// edge case, blob name happens to be the same as root and ends in /
				if storedObject.relativePath == "" && strings.HasSuffix(storedObject.name, "/") {
					storedObject.relativePath = "\x00" // Short circuit, letting the backend know we *really* meant root/.
				}

				if t.s2sPreserveSourceTags && blobInfo.BlobTags != nil {
					blobTagsMap := common.BlobTags{}
					for _, blobTag := range blobInfo.BlobTags.BlobTagSet {
						blobTagsMap[url.QueryEscape(*blobTag.Key)] = url.QueryEscape(*blobTag.Value)
					}
					storedObject.blobTags = blobTagsMap
				}

				enqueueOutput(storedObject, nil)
			}

			// if debug mode is on, note down the result, this is not going to be fast
			if azcopyScanningLogger != nil && azcopyScanningLogger.ShouldLog(common.LogDebug) {
				tokenValue := "NONE"
				if marker != nil {
					tokenValue = *marker
				}

				var vdirListBuilder strings.Builder
				for _, virtualDir := range lResp.Segment.BlobPrefixes {
					fmt.Fprintf(&vdirListBuilder, " %s,", *virtualDir.Name)
				}
				var fileListBuilder strings.Builder
				for _, blobInfo := range lResp.Segment.BlobItems {
					fmt.Fprintf(&fileListBuilder, " %s,", *blobInfo.Name)
				}
				msg := fmt.Sprintf("Enumerating %s with token %s. Sub-dirs:%s Files:%s", currentDirPath,
					tokenValue, vdirListBuilder.String(), fileListBuilder.String())
				azcopyScanningLogger.Log(common.LogDebug, msg)
			}
			marker = lResp.NextMarker
		}
		return nil
	}

	// initiate parallel scanning, starting at the root path
	workerContext, cancelWorkers := context.WithCancel(t.ctx)
	defer cancelWorkers()
	cCrawled := parallel.Crawl(workerContext, searchPrefix+extraSearchPrefix, enumerateOneDir, EnumerationParallelism)

	for x := range cCrawled {
		item, workerError := x.Item()
		if workerError != nil {
			return workerError
		}

		if t.incrementEnumerationCounter != nil {
			t.incrementEnumerationCounter(common.EEntityType.File())
		}

		object := item.(StoredObject)
		processErr := processIfPassedFilters(filters, object, processor)
		_, processErr = getProcessingError(processErr)
		if processErr != nil {
			return processErr
		}
	}

	return nil
}