in ste/mgr-JobPartTransferMgr.go [271:441]
func (jptm *jobPartTransferMgr) Info() *TransferInfo {
if jptm.transferInfo != nil {
return jptm.transferInfo
}
plan := jptm.jobPartMgr.Plan()
srcURI, dstURI, _ := plan.TransferSrcDstStrings(jptm.transferIndex)
dstBlobData := plan.DstBlobData
var err error
var srcContainer, srcPath string
if plan.FromTo.From().IsRemote() {
srcContainer, srcPath, err = common.SplitContainerNameFromPath(srcURI)
if err != nil {
panic(err)
}
}
var dstContainer, dstPath string
if plan.FromTo.To().IsRemote() {
dstContainer, dstPath, err = common.SplitContainerNameFromPath(dstURI)
if err != nil {
panic(err)
}
}
srcHTTPHeaders, srcMetadata, srcBlobType, srcBlobTier, s2sGetPropertiesInBackend, DestLengthValidation, s2sSourceChangeValidation, s2sInvalidMetadataHandleOption, entityType, versionID, snapshotID, blobTags :=
plan.TransferSrcPropertiesAndMetadata(jptm.transferIndex)
srcSAS, dstSAS := jptm.jobPartMgr.SAS()
// If the length of destination SAS is greater than 0
// it means the destination is remote url and destination SAS
// has been stripped from the destination before persisting it in
// part plan file.
// SAS needs to be appended before executing the transfer
if len(dstSAS) > 0 {
dUrl, e := url.Parse(dstURI)
if e != nil {
panic(e)
}
if len(dUrl.RawQuery) > 0 {
dUrl.RawQuery += "&" + dstSAS
} else {
dUrl.RawQuery = dstSAS
}
dstURI = dUrl.String()
}
// If the length of source SAS is greater than 0
// it means the source is a remote url and source SAS
// has been stripped from the source before persisting it in
// part plan file.
// SAS needs to be appended before executing the transfer
if len(srcSAS) > 0 {
sUrl, e := url.Parse(srcURI)
if e != nil {
panic(e)
}
if len(sUrl.RawQuery) > 0 {
sUrl.RawQuery += "&" + srcSAS
} else {
sUrl.RawQuery = srcSAS
}
srcURI = sUrl.String()
}
if versionID != "" {
sURL, e := url.Parse(srcURI)
if e != nil {
panic(e)
}
if len(sURL.RawQuery) > 0 {
sURL.RawQuery += "&versionId=" + versionID
} else {
sURL.RawQuery = "versionId=" + versionID
}
srcURI = sURL.String()
}
if snapshotID != "" {
sURL, e := url.Parse(srcURI)
if e != nil {
panic(e)
}
if len(sURL.RawQuery) > 0 {
sURL.RawQuery += "&snapshot=" + snapshotID
} else {
sURL.RawQuery = "snapshot=" + snapshotID
}
srcURI = sURL.String()
}
sourceSize := plan.Transfer(jptm.transferIndex).SourceSize
var blockSize = dstBlobData.BlockSize
// If the blockSize is 0, then User didn't provide any blockSize
// We need to set the blockSize in such way that number of blocks per blob
// does not exceeds 50000 (max number of block per blob)
if blockSize == 0 {
blockSize = common.DefaultBlockBlobBlockSize
for ; sourceSize >= common.MaxNumberOfBlocksPerBlob*blockSize; blockSize = 2 * blockSize {
if blockSize > common.BlockSizeThreshold {
/*
* For a RAM usage of 0.5G/core, we would have 4G memory on typical 8 core device, meaning at a blockSize of 256M,
* we can have 4 blocks in core, waiting for a disk or n/w operation. Any higher block size would *sort of*
* serialize n/w and disk operations, and is better avoided.
*/
if sourceSize%common.MaxNumberOfBlocksPerBlob == 0 {
blockSize = sourceSize / common.MaxNumberOfBlocksPerBlob
} else {
blockSize = sourceSize/common.MaxNumberOfBlocksPerBlob + 1
}
break
}
}
}
if blockSize > common.MaxBlockBlobBlockSize {
jptm.Log(common.LogWarning, fmt.Sprintf("block-size %d is greater than maximum allowed size %d, setting it to maximum allowed size", blockSize, int64(common.MaxBlockBlobBlockSize)))
}
blockSize = common.Iff(blockSize > common.MaxBlockBlobBlockSize, common.MaxBlockBlobBlockSize, blockSize)
// If the putBlobSize is 0, then the user didn't provide any putBlobSize, default to block size to default to no breaking changes (prior to this feature, we would use blockSize to determine the put blob size).
putBlobSize := dstBlobData.PutBlobSize
if putBlobSize == 0 {
putBlobSize = blockSize
}
if putBlobSize > common.MaxPutBlobSize {
jptm.Log(common.LogWarning, fmt.Sprintf("put-blob-size %d is greater than maximum allowed size %d, setting it to maximum allowed size", putBlobSize, int64(common.MaxPutBlobSize)))
}
putBlobSize = common.Iff(putBlobSize > common.MaxPutBlobSize, common.MaxPutBlobSize, putBlobSize)
var srcBlobTags common.BlobTags
if blobTags != nil {
srcBlobTags = common.BlobTags{}
for k, v := range blobTags {
key, _ := url.QueryUnescape(k)
value, _ := url.QueryUnescape(v)
srcBlobTags[key] = value
}
}
return &TransferInfo{
JobID: plan.JobID,
BlockSize: blockSize,
PutBlobSize: putBlobSize,
Source: srcURI,
SourceSize: sourceSize,
Destination: dstURI,
SrcContainer: srcContainer,
SrcFilePath: srcPath,
DstContainer: dstContainer,
DstFilePath: dstPath,
EntityType: entityType,
PreserveSMBPermissions: plan.PreservePermissions,
PreserveSMBInfo: plan.PreserveSMBInfo,
PreservePOSIXProperties: plan.PreservePOSIXProperties,
S2SGetPropertiesInBackend: s2sGetPropertiesInBackend,
S2SSourceChangeValidation: s2sSourceChangeValidation,
S2SInvalidMetadataHandleOption: s2sInvalidMetadataHandleOption,
BlobFSRecursiveDelete: plan.BlobFSRecursiveDelete,
DestLengthValidation: DestLengthValidation,
SrcProperties: SrcProperties{
SrcHTTPHeaders: srcHTTPHeaders,
SrcMetadata: srcMetadata,
SrcBlobTags: srcBlobTags,
},
SrcBlobType: srcBlobType,
S2SSrcBlobTier: srcBlobTier,
RehydratePriority: plan.RehydratePriority.ToRehydratePriorityType(),
VersionID: versionID,
SnapshotID: snapshotID,
}
}