in cmd/copyEnumeratorInit.go [48:329]
func (cca *CookedCopyCmdArgs) initEnumerator(jobPartOrder common.CopyJobPartOrderRequest, srcCredInfo common.CredentialInfo, ctx context.Context) (*CopyEnumerator, error) {
var traverser ResourceTraverser
var err error
jobPartOrder.FileAttributes = common.FileTransferAttributes{
TrailingDot: cca.trailingDot,
}
jobPartOrder.CpkOptions = cca.CpkOptions
jobPartOrder.PreserveSMBPermissions = cca.preservePermissions
jobPartOrder.PreserveSMBInfo = cca.preserveSMBInfo
// We set preservePOSIXProperties if the customer has explicitly asked for this in transfer or if it is just a Posix-property only transfer
jobPartOrder.PreservePOSIXProperties = cca.preservePOSIXProperties || (cca.ForceWrite == common.EOverwriteOption.PosixProperties())
// Infer on download so that we get LMT and MD5 on files download
// On S2S transfers the following rules apply:
// If preserve properties is enabled, but get properties in backend is disabled, turn it on
// If source change validation is enabled on files to remote, turn it on (consider a separate flag entirely?)
getRemoteProperties := cca.ForceWrite == common.EOverwriteOption.IfSourceNewer() ||
(cca.FromTo.From() == common.ELocation.File() && !cca.FromTo.To().IsRemote()) || // If it's a download, we still need LMT and MD5 from files.
(cca.FromTo.From() == common.ELocation.File() && cca.FromTo.To().IsRemote() && (cca.s2sSourceChangeValidation || cca.IncludeAfter != nil || cca.IncludeBefore != nil)) || // If S2S from File to *, and sourceChangeValidation is enabled, we get properties so that we have LMTs. Likewise, if we are using includeAfter or includeBefore, which require LMTs.
(cca.FromTo.From().IsRemote() && cca.FromTo.To().IsRemote() && cca.s2sPreserveProperties && !cca.s2sGetPropertiesInBackend) // If S2S and preserve properties AND get properties in backend is on, turn this off, as properties will be obtained in the backend.
jobPartOrder.S2SGetPropertiesInBackend = cca.s2sPreserveProperties && !getRemoteProperties && cca.s2sGetPropertiesInBackend // Infer GetProperties if GetPropertiesInBackend is enabled.
jobPartOrder.S2SSourceChangeValidation = cca.s2sSourceChangeValidation
jobPartOrder.DestLengthValidation = cca.CheckLength
jobPartOrder.S2SInvalidMetadataHandleOption = cca.s2sInvalidMetadataHandleOption
jobPartOrder.S2SPreserveBlobTags = cca.S2sPreserveBlobTags
dest := cca.FromTo.To()
traverser, err = InitResourceTraverser(cca.Source, cca.FromTo.From(), &ctx, &srcCredInfo, cca.SymlinkHandling, cca.ListOfFilesChannel, cca.Recursive, getRemoteProperties, cca.IncludeDirectoryStubs, cca.permanentDeleteOption, func(common.EntityType) {}, cca.ListOfVersionIDs, cca.S2sPreserveBlobTags, common.ESyncHashType.None(), cca.preservePermissions, azcopyLogVerbosity, cca.CpkOptions, nil, cca.StripTopDir, cca.trailingDot, &dest, cca.excludeContainer, false)
if err != nil {
return nil, err
}
err = cca.validateSourceDir(traverser)
if err != nil {
return nil, err
}
// Check if the destination is a directory to correctly decide where our files land
isDestDir := cca.isDestDirectory(cca.Destination, &ctx)
if cca.ListOfVersionIDs != nil && (!(cca.FromTo == common.EFromTo.BlobLocal() || cca.FromTo == common.EFromTo.BlobTrash()) || cca.IsSourceDir || !isDestDir) {
log.Fatalf("Either source is not a blob or destination is not a local folder")
}
srcLevel, err := DetermineLocationLevel(cca.Source.Value, cca.FromTo.From(), true)
if err != nil {
return nil, err
}
dstLevel, err := DetermineLocationLevel(cca.Destination.Value, cca.FromTo.To(), false)
if err != nil {
return nil, err
}
// Disallow list-of-files and include-path on service-level traversal due to a major bug
// TODO: Fix the bug.
// Two primary issues exist with the list-of-files implementation:
// 1) Account name doesn't get trimmed from the path
// 2) List-of-files is not considered an account traverser; therefore containers don't get made.
// Resolve these two issues and service-level list-of-files/include-path will work
if cca.ListOfFilesChannel != nil && srcLevel == ELocationLevel.Service() {
return nil, errors.New("cannot combine list-of-files or include-path with account traversal")
}
if (srcLevel == ELocationLevel.Object() || cca.FromTo.From().IsLocal()) && dstLevel == ELocationLevel.Service() {
return nil, errors.New("cannot transfer individual files/folders to the root of a service. Add a container or directory to the destination URL")
}
if srcLevel == ELocationLevel.Container() && dstLevel == ELocationLevel.Service() && !cca.asSubdir {
return nil, errors.New("cannot use --as-subdir=false with a service level destination")
}
// When copying a container directly to a container, strip the top directory, unless we're attempting to persist permissions.
if srcLevel == ELocationLevel.Container() && dstLevel == ELocationLevel.Container() && cca.FromTo.From().IsRemote() && cca.FromTo.To().IsRemote() {
if cca.preservePermissions.IsTruthy() {
// if we're preserving permissions, we need to keep the top directory, but with container->container, we don't need to add the container name to the path.
// asSubdir is a better option than stripTopDir as stripTopDir disincludes the root.
cca.asSubdir = false
} else {
cca.StripTopDir = true
}
}
// Create a Remote resource resolver
// Giving it nothing to work with as new names will be added as we traverse.
var containerResolver BucketToContainerNameResolver
containerResolver = NewS3BucketNameToAzureResourcesResolver(nil)
if cca.FromTo == common.EFromTo.GCPBlob() {
containerResolver = NewGCPBucketNameToAzureResourcesResolver(nil)
}
existingContainers := make(map[string]bool)
var logDstContainerCreateFailureOnce sync.Once
seenFailedContainers := make(map[string]bool) // Create map of already failed container conversions so we don't log a million items just for one container.
dstContainerName := ""
// Extract the existing destination container name
if cca.FromTo.To().IsRemote() {
dstContainerName, err = GetContainerName(cca.Destination.Value, cca.FromTo.To())
if err != nil {
return nil, err
}
// only create the destination container in S2S scenarios
if cca.FromTo.From().IsRemote() && dstContainerName != "" { // if the destination has a explicit container name
// Attempt to create the container. If we fail, fail silently.
err = cca.createDstContainer(dstContainerName, cca.Destination, ctx, existingContainers, common.ELogLevel.None())
// check against seenFailedContainers so we don't spam the job log with initialization failed errors
if _, ok := seenFailedContainers[dstContainerName]; err != nil && jobsAdmin.JobsAdmin != nil && !ok {
logDstContainerCreateFailureOnce.Do(func() {
glcm.Warn("Failed to create one or more destination container(s). Your transfers may still succeed if the container already exists.")
})
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("Failed to create destination container %s. The transfer will continue if the container exists", dstContainerName), common.LogWarning)
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("Error %s", err), common.LogDebug)
seenFailedContainers[dstContainerName] = true
}
} else if cca.FromTo.From().IsRemote() { // if the destination has implicit container names
if acctTraverser, ok := traverser.(AccountTraverser); ok && dstLevel == ELocationLevel.Service() {
containers, err := acctTraverser.listContainers()
if err != nil {
return nil, fmt.Errorf("failed to list containers: %w", err)
}
// Resolve all container names up front.
// If we were to resolve on-the-fly, then name order would affect the results inconsistently.
if cca.FromTo == common.EFromTo.S3Blob() {
containerResolver = NewS3BucketNameToAzureResourcesResolver(containers)
} else if cca.FromTo == common.EFromTo.GCPBlob() {
containerResolver = NewGCPBucketNameToAzureResourcesResolver(containers)
}
for _, v := range containers {
bucketName, err := containerResolver.ResolveName(v)
if err != nil {
// Silently ignore the failure; it'll get logged later.
continue
}
err = cca.createDstContainer(bucketName, cca.Destination, ctx, existingContainers, common.ELogLevel.None())
// if JobsAdmin is nil, we're probably in testing mode.
// As a result, container creation failures are expected as we don't give the SAS tokens adequate permissions.
// check against seenFailedContainers so we don't spam the job log with initialization failed errors
if _, ok := seenFailedContainers[bucketName]; err != nil && jobsAdmin.JobsAdmin != nil && !ok {
logDstContainerCreateFailureOnce.Do(func() {
glcm.Warn("Failed to create one or more destination container(s). Your transfers may still succeed if the container already exists.")
})
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("failed to initialize destination container %s; the transfer will continue (but be wary it may fail).", bucketName), common.LogWarning)
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("Error %s", err), common.LogDebug)
seenFailedContainers[bucketName] = true
}
}
} else {
cName, err := GetContainerName(cca.Source.Value, cca.FromTo.From())
if err != nil || cName == "" {
// this will probably never be reached
return nil, fmt.Errorf("failed to get container name from source (is it formatted correctly?)")
}
resName, err := containerResolver.ResolveName(cName)
if err == nil {
err = cca.createDstContainer(resName, cca.Destination, ctx, existingContainers, common.ELogLevel.None())
if _, ok := seenFailedContainers[dstContainerName]; err != nil && jobsAdmin.JobsAdmin != nil && !ok {
logDstContainerCreateFailureOnce.Do(func() {
glcm.Warn("Failed to create one or more destination container(s). Your transfers may still succeed if the container already exists.")
})
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("failed to initialize destination container %s; the transfer will continue (but be wary it may fail).", resName), common.LogWarning)
jobsAdmin.JobsAdmin.LogToJobLog(fmt.Sprintf("Error %s", err), common.LogDebug)
seenFailedContainers[dstContainerName] = true
}
}
}
}
}
filters := cca.InitModularFilters()
// decide our folder transfer strategy
var message string
jobPartOrder.Fpo, message = NewFolderPropertyOption(cca.FromTo, cca.Recursive, cca.StripTopDir, filters, cca.preserveSMBInfo, cca.preservePermissions.IsTruthy(), cca.preservePOSIXProperties, strings.EqualFold(cca.Destination.Value, common.Dev_Null), cca.IncludeDirectoryStubs)
if !cca.dryrunMode {
glcm.Info(message)
}
if jobsAdmin.JobsAdmin != nil {
jobsAdmin.JobsAdmin.LogToJobLog(message, common.LogInfo)
}
processor := func(object StoredObject) error {
// Start by resolving the name and creating the container
if object.ContainerName != "" {
// set up the destination container name.
cName := dstContainerName
// if a destination container name is not specified OR copying service to container/folder, append the src container name.
if cName == "" || (srcLevel == ELocationLevel.Service() && dstLevel > ELocationLevel.Service()) {
cName, err = containerResolver.ResolveName(object.ContainerName)
if err != nil {
if _, ok := seenFailedContainers[object.ContainerName]; !ok {
WarnStdoutAndScanningLog(fmt.Sprintf("failed to add transfers from container %s as it has an invalid name. Please manually transfer from this container to one with a valid name.", object.ContainerName))
seenFailedContainers[object.ContainerName] = true
}
return nil
}
object.DstContainerName = cName
}
}
// If above the service level, we already know the container name and don't need to supply it to makeEscapedRelativePath
if srcLevel != ELocationLevel.Service() {
object.ContainerName = ""
// When copying directly TO a container or object from a container, don't drop under a sub directory
if dstLevel >= ELocationLevel.Container() {
object.DstContainerName = ""
}
}
srcRelPath := cca.MakeEscapedRelativePath(true, isDestDir, cca.asSubdir, object)
dstRelPath := cca.MakeEscapedRelativePath(false, isDestDir, cca.asSubdir, object)
transfer, shouldSendToSte := object.ToNewCopyTransfer(cca.autoDecompress && cca.FromTo.IsDownload(), srcRelPath, dstRelPath, cca.s2sPreserveAccessTier, jobPartOrder.Fpo, cca.SymlinkHandling)
if !cca.S2sPreserveBlobTags {
transfer.BlobTags = cca.blobTags
}
if cca.dryrunMode && shouldSendToSte {
glcm.Dryrun(func(format common.OutputFormat) string {
src := common.GenerateFullPath(cca.Source.Value, srcRelPath)
dst := common.GenerateFullPath(cca.Destination.Value, dstRelPath)
switch format {
case common.EOutputFormat.Json():
tx := DryrunTransfer{
EntityType: transfer.EntityType,
BlobType: common.FromBlobType(transfer.BlobType),
FromTo: cca.FromTo,
Source: src,
Destination: dst,
SourceSize: &transfer.SourceSize,
HttpHeaders: blob.HTTPHeaders{
BlobCacheControl: &transfer.CacheControl,
BlobContentDisposition: &transfer.ContentDisposition,
BlobContentEncoding: &transfer.ContentEncoding,
BlobContentLanguage: &transfer.ContentLanguage,
BlobContentMD5: transfer.ContentMD5,
BlobContentType: &transfer.ContentType,
},
Metadata: transfer.Metadata,
BlobTier: &transfer.BlobTier,
BlobVersion: &transfer.BlobVersionID,
BlobTags: transfer.BlobTags,
BlobSnapshot: &transfer.BlobSnapshotID,
}
buf, _ := json.Marshal(tx)
return string(buf)
default:
return fmt.Sprintf("DRYRUN: copy %v to %v",
src, dst)
}
})
return nil
}
if shouldSendToSte {
return addTransfer(&jobPartOrder, transfer, cca)
}
return nil
}
finalizer := func() error {
return dispatchFinalPart(&jobPartOrder, cca)
}
return NewCopyEnumerator(traverser, filters, processor, finalizer), nil
}