func()

in cmd/copyEnumeratorInit.go [48:329]


func (cca *CookedCopyCmdArgs) initEnumerator(jobPartOrder common.CopyJobPartOrderRequest, srcCredInfo common.CredentialInfo, ctx context.Context) (*CopyEnumerator, error) {
	var traverser ResourceTraverser
	var err error
	jobPartOrder.FileAttributes = common.FileTransferAttributes{
		TrailingDot: cca.trailingDot,
	}
	jobPartOrder.CpkOptions = cca.CpkOptions
	jobPartOrder.PreserveSMBPermissions = cca.preservePermissions
	jobPartOrder.PreserveSMBInfo = cca.preserveSMBInfo
	// We set preservePOSIXProperties if the customer has explicitly asked for this in transfer or if it is just a Posix-property only transfer
	jobPartOrder.PreservePOSIXProperties = cca.preservePOSIXProperties || (cca.ForceWrite == common.EOverwriteOption.PosixProperties())

	// Infer on download so that we get LMT and MD5 on files download
	// On S2S transfers the following rules apply:
	// If preserve properties is enabled, but get properties in backend is disabled, turn it on
	// If source change validation is enabled on files to remote, turn it on (consider a separate flag entirely?)
	getRemoteProperties := cca.ForceWrite == common.EOverwriteOption.IfSourceNewer() ||
		(cca.FromTo.From() == common.ELocation.File() && !cca.FromTo.To().IsRemote()) || // If it's a download, we still need LMT and MD5 from files.
		(cca.FromTo.From() == common.ELocation.File() && cca.FromTo.To().IsRemote() && (cca.s2sSourceChangeValidation || cca.IncludeAfter != nil || cca.IncludeBefore != nil)) || // If S2S from File to *, and sourceChangeValidation is enabled, we get properties so that we have LMTs. Likewise, if we are using includeAfter or includeBefore, which require LMTs.
		(cca.FromTo.From().IsRemote() && cca.FromTo.To().IsRemote() && cca.s2sPreserveProperties && !cca.s2sGetPropertiesInBackend) // If S2S and preserve properties AND get properties in backend is on, turn this off, as properties will be obtained in the backend.
	jobPartOrder.S2SGetPropertiesInBackend = cca.s2sPreserveProperties && !getRemoteProperties && cca.s2sGetPropertiesInBackend // Infer GetProperties if GetPropertiesInBackend is enabled.
	jobPartOrder.S2SSourceChangeValidation = cca.s2sSourceChangeValidation
	jobPartOrder.DestLengthValidation = cca.CheckLength
	jobPartOrder.S2SInvalidMetadataHandleOption = cca.s2sInvalidMetadataHandleOption
	jobPartOrder.S2SPreserveBlobTags = cca.S2sPreserveBlobTags

	dest := cca.FromTo.To()
	traverser, err = InitResourceTraverser(cca.Source, cca.FromTo.From(), &ctx, &srcCredInfo, cca.SymlinkHandling, cca.ListOfFilesChannel, cca.Recursive, getRemoteProperties, cca.IncludeDirectoryStubs, cca.permanentDeleteOption, func(common.EntityType) {}, cca.ListOfVersionIDs, cca.S2sPreserveBlobTags, common.ESyncHashType.None(), cca.preservePermissions, azcopyLogVerbosity, cca.CpkOptions, nil, cca.StripTopDir, cca.trailingDot, &dest, cca.excludeContainer, false)

	if err != nil {
		return nil, err
	}

	err = cca.validateSourceDir(traverser)
	if err != nil {
		return nil, err
	}

	// Check if the destination is a directory to correctly decide where our files land
	isDestDir := cca.isDestDirectory(cca.Destination, &ctx)
	if cca.ListOfVersionIDs != nil && (!(cca.FromTo == common.EFromTo.BlobLocal() || cca.FromTo == common.EFromTo.BlobTrash()) || cca.IsSourceDir || !isDestDir) {
		log.Fatalf("Either source is not a blob or destination is not a local folder")
	}
	srcLevel, err := DetermineLocationLevel(cca.Source.Value, cca.FromTo.From(), true)

	if err != nil {
		return nil, err
	}

	dstLevel, err := DetermineLocationLevel(cca.Destination.Value, cca.FromTo.To(), false)

	if err != nil {
		return nil, err
	}

	// Disallow list-of-files and include-path on service-level traversal due to a major bug
	// TODO: Fix the bug.
	//       Two primary issues exist with the list-of-files implementation:
	//       1) Account name doesn't get trimmed from the path
	//       2) List-of-files is not considered an account traverser; therefore containers don't get made.
	//       Resolve these two issues and service-level list-of-files/include-path will work
	if cca.ListOfFilesChannel != nil && srcLevel == ELocationLevel.Service() {
		return nil, errors.New("cannot combine list-of-files or include-path with account traversal")
	}

	if (srcLevel == ELocationLevel.Object() || cca.FromTo.From().IsLocal()) && dstLevel == ELocationLevel.Service() {
		return nil, errors.New("cannot transfer individual files/folders to the root of a service. Add a container or directory to the destination URL")
	}

	if srcLevel == ELocationLevel.Container() && dstLevel == ELocationLevel.Service() && !cca.asSubdir {
		return nil, errors.New("cannot use --as-subdir=false with a service level destination")
	}

	// When copying a container directly to a container, strip the top directory, unless we're attempting to persist permissions.
	if srcLevel == ELocationLevel.Container() && dstLevel == ELocationLevel.Container() && cca.FromTo.From().IsRemote() && cca.FromTo.To().IsRemote() {
		if cca.preservePermissions.IsTruthy() {
			// if we're preserving permissions, we need to keep the top directory, but with container->container, we don't need to add the container name to the path.
			// asSubdir is a better option than stripTopDir as stripTopDir disincludes the root.
			cca.asSubdir = false
		} else {
			cca.StripTopDir = true
		}
	}

	// Create a Remote resource resolver
	// Giving it nothing to work with as new names will be added as we traverse.
	var containerResolver BucketToContainerNameResolver
	containerResolver = NewS3BucketNameToAzureResourcesResolver(nil)
	if cca.FromTo == common.EFromTo.GCPBlob() {
		containerResolver = NewGCPBucketNameToAzureResourcesResolver(nil)
	}
	existingContainers := make(map[string]bool)
	var logDstContainerCreateFailureOnce sync.Once
	seenFailedContainers := make(map[string]bool) // Create map of already failed container conversions so we don't log a million items just for one container.

	dstContainerName := ""
	// Extract the existing destination container name
	if cca.FromTo.To().IsRemote() {
		dstContainerName, err = GetContainerName(cca.Destination.Value, cca.FromTo.To())

		if err != nil {
			return nil, err
		}

		// only create the destination container in S2S scenarios
		if cca.FromTo.From().IsRemote() && dstContainerName != "" { // if the destination has a explicit container name
			// Attempt to create the container. If we fail, fail silently.
			err = cca.createDstContainer(dstContainerName, cca.Destination, ctx, existingContainers, common.ELogLevel.None())

			// check against seenFailedContainers so we don't spam the job log with initialization failed errors
			if _, ok := seenFailedContainers[dstContainerName]; err != nil && jobsAdmin.JobsAdmin != nil && !ok {
				logDstContainerCreateFailureOnce.Do(func() {
					glcm.Warn("Failed to create one or more destination container(s). Your transfers may still succeed if the container already exists.")
				})
				jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("Failed to create destination container %s. The transfer will continue if the container exists", dstContainerName), common.LogWarning)
				jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("Error %s", err), common.LogDebug)
				seenFailedContainers[dstContainerName] = true
			}
		} else if cca.FromTo.From().IsRemote() { // if the destination has implicit container names
			if acctTraverser, ok := traverser.(AccountTraverser); ok && dstLevel == ELocationLevel.Service() {
				containers, err := acctTraverser.listContainers()

				if err != nil {
					return nil, fmt.Errorf("failed to list containers: %w", err)
				}

				// Resolve all container names up front.
				// If we were to resolve on-the-fly, then name order would affect the results inconsistently.
				if cca.FromTo == common.EFromTo.S3Blob() {
					containerResolver = NewS3BucketNameToAzureResourcesResolver(containers)
				} else if cca.FromTo == common.EFromTo.GCPBlob() {
					containerResolver = NewGCPBucketNameToAzureResourcesResolver(containers)
				}

				for _, v := range containers {
					bucketName, err := containerResolver.ResolveName(v)

					if err != nil {
						// Silently ignore the failure; it'll get logged later.
						continue
					}

					err = cca.createDstContainer(bucketName, cca.Destination, ctx, existingContainers, common.ELogLevel.None())

					// if JobsAdmin is nil, we're probably in testing mode.
					// As a result, container creation failures are expected as we don't give the SAS tokens adequate permissions.
					// check against seenFailedContainers so we don't spam the job log with initialization failed errors
					if _, ok := seenFailedContainers[bucketName]; err != nil && jobsAdmin.JobsAdmin != nil && !ok {
						logDstContainerCreateFailureOnce.Do(func() {
							glcm.Warn("Failed to create one or more destination container(s). Your transfers may still succeed if the container already exists.")
						})
						jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("failed to initialize destination container %s; the transfer will continue (but be wary it may fail).", bucketName), common.LogWarning)
						jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("Error %s", err), common.LogDebug)
						seenFailedContainers[bucketName] = true
					}
				}
			} else {
				cName, err := GetContainerName(cca.Source.Value, cca.FromTo.From())

				if err != nil || cName == "" {
					// this will probably never be reached
					return nil, fmt.Errorf("failed to get container name from source (is it formatted correctly?)")
				}
				resName, err := containerResolver.ResolveName(cName)

				if err == nil {
					err = cca.createDstContainer(resName, cca.Destination, ctx, existingContainers, common.ELogLevel.None())

					if _, ok := seenFailedContainers[dstContainerName]; err != nil && jobsAdmin.JobsAdmin != nil && !ok {
						logDstContainerCreateFailureOnce.Do(func() {
							glcm.Warn("Failed to create one or more destination container(s). Your transfers may still succeed if the container already exists.")
						})
						jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("failed to initialize destination container %s; the transfer will continue (but be wary it may fail).", resName), common.LogWarning)
						jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("Error %s", err), common.LogDebug)
						seenFailedContainers[dstContainerName] = true
					}
				}
			}
		}
	}

	filters := cca.InitModularFilters()

	// decide our folder transfer strategy
	var message string
	jobPartOrder.Fpo, message = NewFolderPropertyOption(cca.FromTo, cca.Recursive, cca.StripTopDir, filters, cca.preserveSMBInfo, cca.preservePermissions.IsTruthy(), cca.preservePOSIXProperties, strings.EqualFold(cca.Destination.Value, common.Dev_Null), cca.IncludeDirectoryStubs)
	if !cca.dryrunMode {
		glcm.Info(message)
	}
	if jobsAdmin.JobsAdmin != nil {
		jobsAdmin.JobsAdmin.LogToJobLog(message, common.LogInfo)
	}

	processor := func(object StoredObject) error {
		// Start by resolving the name and creating the container
		if object.ContainerName != "" {
			// set up the destination container name.
			cName := dstContainerName
			// if a destination container name is not specified OR copying service to container/folder, append the src container name.
			if cName == "" || (srcLevel == ELocationLevel.Service() && dstLevel > ELocationLevel.Service()) {
				cName, err = containerResolver.ResolveName(object.ContainerName)

				if err != nil {
					if _, ok := seenFailedContainers[object.ContainerName]; !ok {
						WarnStdoutAndScanningLog(fmt.Sprintf("failed to add transfers from container %s as it has an invalid name. Please manually transfer from this container to one with a valid name.", object.ContainerName))
						seenFailedContainers[object.ContainerName] = true
					}
					return nil
				}

				object.DstContainerName = cName
			}
		}

		// If above the service level, we already know the container name and don't need to supply it to makeEscapedRelativePath
		if srcLevel != ELocationLevel.Service() {
			object.ContainerName = ""

			// When copying directly TO a container or object from a container, don't drop under a sub directory
			if dstLevel >= ELocationLevel.Container() {
				object.DstContainerName = ""
			}
		}

		srcRelPath := cca.MakeEscapedRelativePath(true, isDestDir, cca.asSubdir, object)
		dstRelPath := cca.MakeEscapedRelativePath(false, isDestDir, cca.asSubdir, object)

		transfer, shouldSendToSte := object.ToNewCopyTransfer(cca.autoDecompress && cca.FromTo.IsDownload(), srcRelPath, dstRelPath, cca.s2sPreserveAccessTier, jobPartOrder.Fpo, cca.SymlinkHandling)
		if !cca.S2sPreserveBlobTags {
			transfer.BlobTags = cca.blobTags
		}

		if cca.dryrunMode && shouldSendToSte {
			glcm.Dryrun(func(format common.OutputFormat) string {
				src := common.GenerateFullPath(cca.Source.Value, srcRelPath)
				dst := common.GenerateFullPath(cca.Destination.Value, dstRelPath)

				switch format {
				case common.EOutputFormat.Json():
					tx := DryrunTransfer{
						EntityType:  transfer.EntityType,
						BlobType:    common.FromBlobType(transfer.BlobType),
						FromTo:      cca.FromTo,
						Source:      src,
						Destination: dst,
						SourceSize:  &transfer.SourceSize,
						HttpHeaders: blob.HTTPHeaders{
							BlobCacheControl:       &transfer.CacheControl,
							BlobContentDisposition: &transfer.ContentDisposition,
							BlobContentEncoding:    &transfer.ContentEncoding,
							BlobContentLanguage:    &transfer.ContentLanguage,
							BlobContentMD5:         transfer.ContentMD5,
							BlobContentType:        &transfer.ContentType,
						},
						Metadata:     transfer.Metadata,
						BlobTier:     &transfer.BlobTier,
						BlobVersion:  &transfer.BlobVersionID,
						BlobTags:     transfer.BlobTags,
						BlobSnapshot: &transfer.BlobSnapshotID,
					}

					buf, _ := json.Marshal(tx)
					return string(buf)
				default:
					return fmt.Sprintf("DRYRUN: copy %v to %v",
						src, dst)
				}
			})
			return nil
		}

		if shouldSendToSte {
			return addTransfer(&jobPartOrder, transfer, cca)
		}
		return nil
	}
	finalizer := func() error {
		return dispatchFinalPart(&jobPartOrder, cca)
	}

	return NewCopyEnumerator(traverser, filters, processor, finalizer), nil
}