in ste/mgr-JobPartMgr.go [233:404]
func (jpm *jobPartMgr) ScheduleTransfers(jobCtx context.Context) {
jobCtx = context.WithValue(jobCtx, ServiceAPIVersionOverride, DefaultServiceApiVersion)
jpm.atomicTransfersDone = 0 // Reset the # of transfers done back to 0
jpm.atomicTransfersFailed = 0 // Resets the # transfers failed back to 0 during resume operation
// partplan file is opened and mapped when job part is added
// jpm.planMMF = jpm.filename.Map() // Open the job part plan file & memory-map it in
plan := jpm.planMMF.Plan()
if plan.PartNum == 0 && plan.NumTransfers == 0 {
/* This will wind down the transfer and report summary */
plan.SetJobStatus(common.EJobStatus.Completed())
return
}
// get the list of include / exclude transfers
includeTransfer, excludeTransfer := jpm.jobMgr.IncludeExclude()
if len(includeTransfer) > 0 || len(excludeTransfer) > 0 {
panic("List of transfers is obsolete.")
}
// *** Open the job part: process any job part plan-setting used by all transfers ***
dstData := plan.DstBlobData
jpm.httpHeaders = common.ResourceHTTPHeaders{
ContentType: string(dstData.ContentType[:dstData.ContentTypeLength]),
ContentEncoding: string(dstData.ContentEncoding[:dstData.ContentEncodingLength]),
ContentDisposition: string(dstData.ContentDisposition[:dstData.ContentDispositionLength]),
ContentLanguage: string(dstData.ContentLanguage[:dstData.ContentLanguageLength]),
CacheControl: string(dstData.CacheControl[:dstData.CacheControlLength]),
}
jpm.putMd5 = dstData.PutMd5
jpm.blockBlobTier = dstData.BlockBlobTier
jpm.pageBlobTier = dstData.PageBlobTier
jpm.deleteDestinationFileIfNecessary = dstData.DeleteDestinationFileIfNecessary
// For this job part, split the metadata string apart and create an common.Metadata out of it
metadataString := string(dstData.Metadata[:dstData.MetadataLength])
jpm.metadata = common.Metadata{}
if len(metadataString) > 0 {
var err error
jpm.metadata, err = common.StringToMetadata(metadataString)
if err != nil {
panic("sanity check: metadata string should be valid at this point: " + metadataString)
}
}
blobTagsStr := string(dstData.BlobTags[:dstData.BlobTagsLength])
jpm.blobTags = common.BlobTags{}
if len(blobTagsStr) > 0 {
for _, keyAndValue := range strings.Split(blobTagsStr, "&") { // key/value pairs are separated by '&'
kv := strings.Split(keyAndValue, "=") // key/value are separated by '='
key, _ := url.QueryUnescape(kv[0])
value, _ := url.QueryUnescape(kv[1])
jpm.blobTags[key] = value
}
}
jpm.cpkOptions = common.CpkOptions{
CpkInfo: dstData.CpkInfo,
CpkScopeInfo: string(dstData.CpkScopeInfo[:dstData.CpkScopeInfoLength]),
IsSourceEncrypted: dstData.IsSourceEncrypted,
}
jpm.SetPropertiesFlags = dstData.SetPropertiesFlags
jpm.RehydratePriority = plan.RehydratePriority
jpm.preserveLastModifiedTime = plan.DstLocalData.PreserveLastModifiedTime
jpm.blobTypeOverride = plan.DstBlobData.BlobType
jpm.newJobXfer = computeJobXfer(plan.FromTo, plan.DstBlobData.BlobType)
jpm.priority = plan.Priority
jpm.clientInfo()
// *** Schedule this job part's transfers ***
for t := uint32(0); t < plan.NumTransfers; t++ {
jppt := plan.Transfer(t)
ts := jppt.TransferStatus()
if ts == common.ETransferStatus.Success() {
jpm.ReportTransferDone(ts) // Don't schedule an already-completed/failed transfer
continue
}
// If the transfer was failed, then while rescheduling the transfer marking it Started.
if ts == common.ETransferStatus.Failed() {
jppt.SetTransferStatus(common.ETransferStatus.Restarted(), true)
if failedCount := atomic.LoadUint32(&jpm.atomicTransfersFailed); failedCount > 0 {
atomic.AddUint32(&jpm.atomicTransfersFailed, ^uint32(0))
} // Adding uint32 max is effectively subtracting 1
}
if _, dst, isFolder := plan.TransferSrcDstStrings(t); isFolder {
// register the folder!
if jpptFolderTracker, ok := jpm.getFolderCreationTracker().(JPPTCompatibleFolderCreationTracker); ok {
if plan.FromTo.To().IsRemote() {
uri, err := url.Parse(dst)
common.PanicIfErr(err)
uri.RawPath = ""
uri.RawQuery = ""
dst = uri.String()
}
jpptFolderTracker.RegisterPropertiesTransfer(dst, t)
}
}
// Each transfer gets its own context (so any chunk can cancel the whole transfer) based off the job's context
transferCtx, transferCancel := context.WithCancel(jobCtx)
// Add the pipeline network stats to the context. This will be manually unset for all sourceInfoProvider contexts.
transferCtx = withPipelineNetworkStats(transferCtx, jpm.jobMgr.PipelineNetworkStats())
// Initialize a job part transfer manager
jptm := &jobPartTransferMgr{
jobPartMgr: jpm,
jobPartPlanTransfer: jppt,
transferIndex: t,
ctx: transferCtx,
cancel: transferCancel,
// TODO: insert the factory func interface in jptm.
// numChunks will be set by the transfer's prologue method
}
//build transferInfo after we've set transferIndex
jptm.transferInfo = jptm.Info()
jpm.Log(common.LogDebug, fmt.Sprintf("scheduling JobID=%v, Part#=%d, Transfer#=%d, priority=%v", plan.JobID, plan.PartNum, t, plan.Priority))
// ===== TEST KNOB
relSrc, relDst := plan.TransferSrcDstRelatives(t)
var err error
if plan.FromTo.From().IsRemote() {
relSrc, err = url.PathUnescape(relSrc)
}
relSrc = strings.TrimPrefix(relSrc, common.AZCOPY_PATH_SEPARATOR_STRING)
common.PanicIfErr(err) // neither of these panics should happen, they already would have had a clean error.
if plan.FromTo.To().IsRemote() {
relDst, err = url.PathUnescape(relDst)
}
relDst = strings.TrimPrefix(relDst, common.AZCOPY_PATH_SEPARATOR_STRING)
common.PanicIfErr(err)
_, srcOk := DebugSkipFiles[relSrc]
_, dstOk := DebugSkipFiles[relDst]
if srcOk || dstOk {
if jpm.ShouldLog(common.LogInfo) {
jpm.Log(common.LogInfo, fmt.Sprintf("Transfer %d cancelled: %s", jptm.transferIndex, relSrc))
}
// cancel the transfer
jptm.Cancel()
jptm.SetStatus(common.ETransferStatus.Cancelled())
} else {
if len(DebugSkipFiles) != 0 && jpm.ShouldLog(common.LogInfo) {
jpm.Log(common.LogInfo, fmt.Sprintf("Did not exclude: src: %s dst: %s", relSrc, relDst))
}
}
// ===== TEST KNOB
jpm.jobMgr.ScheduleTransfer(jpm.priority, jptm)
// This sets the atomic variable atomicAllTransfersScheduled to 1
// atomicAllTransfersScheduled variables is used in case of resume job
// Since iterating the JobParts and scheduling transfer is independent
// a variable is required which defines whether last part is resumed or not
if plan.IsFinalPart {
jpm.jobMgr.ConfirmAllTransfersScheduled()
}
}
if plan.IsFinalPart {
jpm.Log(common.LogInfo, "Final job part has been scheduled")
}
}