func()

in ste/mgr-JobPartTransferMgr.go [271:441]


func (jptm *jobPartTransferMgr) Info() *TransferInfo {
	if jptm.transferInfo != nil {
		return jptm.transferInfo
	}

	plan := jptm.jobPartMgr.Plan()
	srcURI, dstURI, _ := plan.TransferSrcDstStrings(jptm.transferIndex)
	dstBlobData := plan.DstBlobData

	var err error
	var srcContainer, srcPath string
	if plan.FromTo.From().IsRemote() {
		srcContainer, srcPath, err = common.SplitContainerNameFromPath(srcURI)
		if err != nil {
			panic(err)
		}
	}

	var dstContainer, dstPath string
	if plan.FromTo.To().IsRemote() {
		dstContainer, dstPath, err = common.SplitContainerNameFromPath(dstURI)
		if err != nil {
			panic(err)
		}
	}

	srcHTTPHeaders, srcMetadata, srcBlobType, srcBlobTier, s2sGetPropertiesInBackend, DestLengthValidation, s2sSourceChangeValidation, s2sInvalidMetadataHandleOption, entityType, versionID, snapshotID, blobTags :=
		plan.TransferSrcPropertiesAndMetadata(jptm.transferIndex)
	srcSAS, dstSAS := jptm.jobPartMgr.SAS()
	// If the length of destination SAS is greater than 0
	// it means the destination is remote url and destination SAS
	// has been stripped from the destination before persisting it in
	// part plan file.
	// SAS needs to be appended before executing the transfer
	if len(dstSAS) > 0 {
		dUrl, e := url.Parse(dstURI)
		if e != nil {
			panic(e)
		}
		if len(dUrl.RawQuery) > 0 {
			dUrl.RawQuery += "&" + dstSAS
		} else {
			dUrl.RawQuery = dstSAS
		}
		dstURI = dUrl.String()
	}

	// If the length of source SAS is greater than 0
	// it means the source is a remote url and source SAS
	// has been stripped from the source before persisting it in
	// part plan file.
	// SAS needs to be appended before executing the transfer
	if len(srcSAS) > 0 {
		sUrl, e := url.Parse(srcURI)
		if e != nil {
			panic(e)
		}
		if len(sUrl.RawQuery) > 0 {
			sUrl.RawQuery += "&" + srcSAS
		} else {
			sUrl.RawQuery = srcSAS
		}
		srcURI = sUrl.String()
	}

	if versionID != "" {
		sURL, e := url.Parse(srcURI)
		if e != nil {
			panic(e)
		}
		if len(sURL.RawQuery) > 0 {
			sURL.RawQuery += "&versionId=" + versionID
		} else {
			sURL.RawQuery = "versionId=" + versionID
		}
		srcURI = sURL.String()
	}

	if snapshotID != "" {
		sURL, e := url.Parse(srcURI)
		if e != nil {
			panic(e)
		}
		if len(sURL.RawQuery) > 0 {
			sURL.RawQuery += "&snapshot=" + snapshotID
		} else {
			sURL.RawQuery = "snapshot=" + snapshotID
		}
		srcURI = sURL.String()
	}

	sourceSize := plan.Transfer(jptm.transferIndex).SourceSize
	var blockSize = dstBlobData.BlockSize
	// If the blockSize is 0, then User didn't provide any blockSize
	// We need to set the blockSize in such way that number of blocks per blob
	// does not exceeds 50000 (max number of block per blob)
	if blockSize == 0 {
		blockSize = common.DefaultBlockBlobBlockSize
		for ; sourceSize >= common.MaxNumberOfBlocksPerBlob*blockSize; blockSize = 2 * blockSize {
			if blockSize > common.BlockSizeThreshold {
				/*
				 * For a RAM usage of 0.5G/core, we would have 4G memory on typical 8 core device, meaning at a blockSize of 256M,
				 * we can have 4 blocks in core, waiting for a disk or n/w operation. Any higher block size would *sort of*
				 * serialize n/w and disk operations, and is better avoided.
				 */
				if sourceSize%common.MaxNumberOfBlocksPerBlob == 0 {
					blockSize = sourceSize / common.MaxNumberOfBlocksPerBlob
				} else {
					blockSize = sourceSize/common.MaxNumberOfBlocksPerBlob + 1
				}
				break
			}
		}
	}
	if blockSize > common.MaxBlockBlobBlockSize {
		jptm.Log(common.LogWarning, fmt.Sprintf("block-size %d is greater than maximum allowed size %d, setting it to maximum allowed size", blockSize, int64(common.MaxBlockBlobBlockSize)))
	}
	blockSize = common.Iff(blockSize > common.MaxBlockBlobBlockSize, common.MaxBlockBlobBlockSize, blockSize)

	// If the putBlobSize is 0, then the user didn't provide any putBlobSize, default to block size to default to no breaking changes (prior to this feature, we would use blockSize to determine the put blob size).
	putBlobSize := dstBlobData.PutBlobSize
	if putBlobSize == 0 {
		putBlobSize = blockSize
	}
	if putBlobSize > common.MaxPutBlobSize {
		jptm.Log(common.LogWarning, fmt.Sprintf("put-blob-size %d is greater than maximum allowed size %d, setting it to maximum allowed size", putBlobSize, int64(common.MaxPutBlobSize)))
	}
	putBlobSize = common.Iff(putBlobSize > common.MaxPutBlobSize, common.MaxPutBlobSize, putBlobSize)

	var srcBlobTags common.BlobTags
	if blobTags != nil {
		srcBlobTags = common.BlobTags{}
		for k, v := range blobTags {
			key, _ := url.QueryUnescape(k)
			value, _ := url.QueryUnescape(v)
			srcBlobTags[key] = value
		}
	}

	return &TransferInfo{
		JobID:                          plan.JobID,
		BlockSize:                      blockSize,
		PutBlobSize:                    putBlobSize,
		Source:                         srcURI,
		SourceSize:                     sourceSize,
		Destination:                    dstURI,
		SrcContainer:                   srcContainer,
		SrcFilePath:                    srcPath,
		DstContainer:                   dstContainer,
		DstFilePath:                    dstPath,
		EntityType:                     entityType,
		PreserveSMBPermissions:         plan.PreservePermissions,
		PreserveSMBInfo:                plan.PreserveSMBInfo,
		PreservePOSIXProperties:        plan.PreservePOSIXProperties,
		S2SGetPropertiesInBackend:      s2sGetPropertiesInBackend,
		S2SSourceChangeValidation:      s2sSourceChangeValidation,
		S2SInvalidMetadataHandleOption: s2sInvalidMetadataHandleOption,
		BlobFSRecursiveDelete:          plan.BlobFSRecursiveDelete,
		DestLengthValidation:           DestLengthValidation,
		SrcProperties: SrcProperties{
			SrcHTTPHeaders: srcHTTPHeaders,
			SrcMetadata:    srcMetadata,
			SrcBlobTags:    srcBlobTags,
		},
		SrcBlobType:       srcBlobType,
		S2SSrcBlobTier:    srcBlobTier,
		RehydratePriority: plan.RehydratePriority.ToRehydratePriorityType(),
		VersionID:         versionID,
		SnapshotID:        snapshotID,
	}
}