in cmd/zc_traverser_blob.go [341:498]
func (t *blobTraverser) parallelList(containerClient *container.Client, containerName string, searchPrefix string,
extraSearchPrefix string, preprocessor objectMorpher, processor objectProcessor, filters []ObjectFilter) error {
// Define how to enumerate its contents
// This func must be thread safe/goroutine safe
enumerateOneDir := func(dir parallel.Directory, enqueueDir func(parallel.Directory), enqueueOutput func(parallel.DirectoryEntry, error)) error {
currentDirPath := dir.(string)
pager := containerClient.NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{
Prefix: ¤tDirPath,
Include: container.ListBlobsInclude{Metadata: true, Tags: t.s2sPreserveSourceTags, Deleted: t.includeDeleted, Snapshots: t.includeSnapshot, Versions: t.includeVersion},
})
var marker *string
for pager.More() {
lResp, err := pager.NextPage(t.ctx)
if err != nil {
return fmt.Errorf("cannot list files due to reason %s", err)
}
// queue up the sub virtual directories if recursive is true
if t.recursive {
for _, virtualDir := range lResp.Segment.BlobPrefixes {
enqueueDir(*virtualDir.Name)
if azcopyScanningLogger != nil {
azcopyScanningLogger.Log(common.LogDebug, fmt.Sprintf("Enqueuing sub-directory %s for enumeration.", *virtualDir.Name))
}
if t.includeDirectoryStubs {
// try to get properties on the directory itself, since it's not listed in BlobItems
dName := strings.TrimSuffix(*virtualDir.Name, common.AZCOPY_PATH_SEPARATOR_STRING)
blobClient := containerClient.NewBlobClient(dName)
altNameCheck:
pResp, err := blobClient.GetProperties(t.ctx, nil)
if err == nil {
if !t.doesBlobRepresentAFolder(pResp.Metadata) { // We've picked up on a file *named* the folder, not the folder itself. Does folder/ exist?
if !strings.HasSuffix(dName, "/") {
blobClient = containerClient.NewBlobClient(dName + common.AZCOPY_PATH_SEPARATOR_STRING) // Tack on the path separator, check.
dName += common.AZCOPY_PATH_SEPARATOR_STRING
goto altNameCheck // "foo" is a file, what about "foo/"?
}
goto skipDirAdd // We shouldn't add a blob that isn't a folder as a folder. You either have the folder metadata, or you don't.
}
pbPropAdapter := blobPropertiesResponseAdapter{&pResp}
folderRelativePath := strings.TrimPrefix(dName, searchPrefix)
storedObject := newStoredObject(
preprocessor,
getObjectNameOnly(dName),
folderRelativePath,
common.EEntityType.Folder(),
pbPropAdapter.LastModified(),
pbPropAdapter.ContentLength(),
pbPropAdapter,
pbPropAdapter,
pbPropAdapter.Metadata,
containerName,
)
if t.s2sPreserveSourceTags {
tResp, err := blobClient.GetTags(t.ctx, nil)
if err == nil {
blobTagsMap := common.BlobTags{}
for _, blobTag := range tResp.BlobTagSet {
blobTagsMap[url.QueryEscape(*blobTag.Key)] = url.QueryEscape(*blobTag.Value)
}
storedObject.blobTags = blobTagsMap
}
}
enqueueOutput(storedObject, err)
} else {
// There was nothing there, but is there folder/?
if !strings.HasSuffix(dName, "/") {
blobClient = containerClient.NewBlobClient(dName + common.AZCOPY_PATH_SEPARATOR_STRING) // Tack on the path separator, check.
dName += common.AZCOPY_PATH_SEPARATOR_STRING
goto altNameCheck // "foo" is a file, what about "foo/"?
}
}
skipDirAdd:
}
}
}
// process the blobs returned in this result segment
for _, blobInfo := range lResp.Segment.BlobItems {
// if the blob represents a hdi folder, then skip it
if t.doesBlobRepresentAFolder(blobInfo.Metadata) {
continue
}
storedObject := t.createStoredObjectForBlob(preprocessor, blobInfo, strings.TrimPrefix(*blobInfo.Name, searchPrefix), containerName)
// edge case, blob name happens to be the same as root and ends in /
if storedObject.relativePath == "" && strings.HasSuffix(storedObject.name, "/") {
storedObject.relativePath = "\x00" // Short circuit, letting the backend know we *really* meant root/.
}
if t.s2sPreserveSourceTags && blobInfo.BlobTags != nil {
blobTagsMap := common.BlobTags{}
for _, blobTag := range blobInfo.BlobTags.BlobTagSet {
blobTagsMap[url.QueryEscape(*blobTag.Key)] = url.QueryEscape(*blobTag.Value)
}
storedObject.blobTags = blobTagsMap
}
enqueueOutput(storedObject, nil)
}
// if debug mode is on, note down the result, this is not going to be fast
if azcopyScanningLogger != nil && azcopyScanningLogger.ShouldLog(common.LogDebug) {
tokenValue := "NONE"
if marker != nil {
tokenValue = *marker
}
var vdirListBuilder strings.Builder
for _, virtualDir := range lResp.Segment.BlobPrefixes {
fmt.Fprintf(&vdirListBuilder, " %s,", *virtualDir.Name)
}
var fileListBuilder strings.Builder
for _, blobInfo := range lResp.Segment.BlobItems {
fmt.Fprintf(&fileListBuilder, " %s,", *blobInfo.Name)
}
msg := fmt.Sprintf("Enumerating %s with token %s. Sub-dirs:%s Files:%s", currentDirPath,
tokenValue, vdirListBuilder.String(), fileListBuilder.String())
azcopyScanningLogger.Log(common.LogDebug, msg)
}
marker = lResp.NextMarker
}
return nil
}
// initiate parallel scanning, starting at the root path
workerContext, cancelWorkers := context.WithCancel(t.ctx)
defer cancelWorkers()
cCrawled := parallel.Crawl(workerContext, searchPrefix+extraSearchPrefix, enumerateOneDir, EnumerationParallelism)
for x := range cCrawled {
item, workerError := x.Item()
if workerError != nil {
return workerError
}
if t.incrementEnumerationCounter != nil {
t.incrementEnumerationCounter(common.EEntityType.File())
}
object := item.(StoredObject)
processErr := processIfPassedFilters(filters, object, processor)
_, processErr = getProcessingError(processErr)
if processErr != nil {
return processErr
}
}
return nil
}