func()

in ste/mgr-JobPartMgr.go [233:404]


func (jpm *jobPartMgr) ScheduleTransfers(jobCtx context.Context) {
	jobCtx = context.WithValue(jobCtx, ServiceAPIVersionOverride, DefaultServiceApiVersion)
	jpm.atomicTransfersDone = 0   // Reset the # of transfers done back to 0
	jpm.atomicTransfersFailed = 0 // Resets the # transfers failed back to 0 during resume operation
	// partplan file is opened and mapped when job part is added
	// jpm.planMMF = jpm.filename.Map() // Open the job part plan file & memory-map it in
	plan := jpm.planMMF.Plan()
	if plan.PartNum == 0 && plan.NumTransfers == 0 {
		/* This will wind down the transfer and report summary */
		plan.SetJobStatus(common.EJobStatus.Completed())
		return
	}

	// get the list of include / exclude transfers
	includeTransfer, excludeTransfer := jpm.jobMgr.IncludeExclude()
	if len(includeTransfer) > 0 || len(excludeTransfer) > 0 {
		panic("List of transfers is obsolete.")
	}

	// *** Open the job part: process any job part plan-setting used by all transfers ***
	dstData := plan.DstBlobData

	jpm.httpHeaders = common.ResourceHTTPHeaders{
		ContentType:        string(dstData.ContentType[:dstData.ContentTypeLength]),
		ContentEncoding:    string(dstData.ContentEncoding[:dstData.ContentEncodingLength]),
		ContentDisposition: string(dstData.ContentDisposition[:dstData.ContentDispositionLength]),
		ContentLanguage:    string(dstData.ContentLanguage[:dstData.ContentLanguageLength]),
		CacheControl:       string(dstData.CacheControl[:dstData.CacheControlLength]),
	}

	jpm.putMd5 = dstData.PutMd5
	jpm.blockBlobTier = dstData.BlockBlobTier
	jpm.pageBlobTier = dstData.PageBlobTier
	jpm.deleteDestinationFileIfNecessary = dstData.DeleteDestinationFileIfNecessary

	// For this job part, split the metadata string apart and create an common.Metadata out of it
	metadataString := string(dstData.Metadata[:dstData.MetadataLength])
	jpm.metadata = common.Metadata{}
	if len(metadataString) > 0 {
		var err error
		jpm.metadata, err = common.StringToMetadata(metadataString)
		if err != nil {
			panic("sanity check: metadata string should be valid at this point: " + metadataString)
		}
	}
	blobTagsStr := string(dstData.BlobTags[:dstData.BlobTagsLength])
	jpm.blobTags = common.BlobTags{}
	if len(blobTagsStr) > 0 {
		for _, keyAndValue := range strings.Split(blobTagsStr, "&") { // key/value pairs are separated by '&'
			kv := strings.Split(keyAndValue, "=") // key/value are separated by '='
			key, _ := url.QueryUnescape(kv[0])
			value, _ := url.QueryUnescape(kv[1])
			jpm.blobTags[key] = value
		}
	}

	jpm.cpkOptions = common.CpkOptions{
		CpkInfo:           dstData.CpkInfo,
		CpkScopeInfo:      string(dstData.CpkScopeInfo[:dstData.CpkScopeInfoLength]),
		IsSourceEncrypted: dstData.IsSourceEncrypted,
	}

	jpm.SetPropertiesFlags = dstData.SetPropertiesFlags
	jpm.RehydratePriority = plan.RehydratePriority

	jpm.preserveLastModifiedTime = plan.DstLocalData.PreserveLastModifiedTime

	jpm.blobTypeOverride = plan.DstBlobData.BlobType
	jpm.newJobXfer = computeJobXfer(plan.FromTo, plan.DstBlobData.BlobType)

	jpm.priority = plan.Priority

	jpm.clientInfo()

	// *** Schedule this job part's transfers ***
	for t := uint32(0); t < plan.NumTransfers; t++ {
		jppt := plan.Transfer(t)
		ts := jppt.TransferStatus()
		if ts == common.ETransferStatus.Success() {
			jpm.ReportTransferDone(ts) // Don't schedule an already-completed/failed transfer
			continue
		}

		// If the transfer was failed, then while rescheduling the transfer marking it Started.
		if ts == common.ETransferStatus.Failed() {
			jppt.SetTransferStatus(common.ETransferStatus.Restarted(), true)
			if failedCount := atomic.LoadUint32(&jpm.atomicTransfersFailed); failedCount > 0 {
				atomic.AddUint32(&jpm.atomicTransfersFailed, ^uint32(0))
			} // Adding uint32 max is effectively subtracting 1
		}

		if _, dst, isFolder := plan.TransferSrcDstStrings(t); isFolder {
			// register the folder!
			if jpptFolderTracker, ok := jpm.getFolderCreationTracker().(JPPTCompatibleFolderCreationTracker); ok {
				if plan.FromTo.To().IsRemote() {
					uri, err := url.Parse(dst)
					common.PanicIfErr(err)
					uri.RawPath = ""
					uri.RawQuery = ""

					dst = uri.String()
				}

				jpptFolderTracker.RegisterPropertiesTransfer(dst, t)
			}
		}

		// Each transfer gets its own context (so any chunk can cancel the whole transfer) based off the job's context
		transferCtx, transferCancel := context.WithCancel(jobCtx)
		// Add the pipeline network stats to the context. This will be manually unset for all sourceInfoProvider contexts.
		transferCtx = withPipelineNetworkStats(transferCtx, jpm.jobMgr.PipelineNetworkStats())
		// Initialize a job part transfer manager
		jptm := &jobPartTransferMgr{
			jobPartMgr:          jpm,
			jobPartPlanTransfer: jppt,
			transferIndex:       t,
			ctx:                 transferCtx,
			cancel:              transferCancel,
			// TODO: insert the factory func interface in jptm.
			// numChunks will be set by the transfer's prologue method
		}

		//build transferInfo after we've set transferIndex
		jptm.transferInfo = jptm.Info()
		jpm.Log(common.LogDebug, fmt.Sprintf("scheduling JobID=%v, Part#=%d, Transfer#=%d, priority=%v", plan.JobID, plan.PartNum, t, plan.Priority))

		// ===== TEST KNOB
		relSrc, relDst := plan.TransferSrcDstRelatives(t)

		var err error
		if plan.FromTo.From().IsRemote() {
			relSrc, err = url.PathUnescape(relSrc)
		}
		relSrc = strings.TrimPrefix(relSrc, common.AZCOPY_PATH_SEPARATOR_STRING)
		common.PanicIfErr(err) // neither of these panics should happen, they already would have had a clean error.
		if plan.FromTo.To().IsRemote() {
			relDst, err = url.PathUnescape(relDst)
		}
		relDst = strings.TrimPrefix(relDst, common.AZCOPY_PATH_SEPARATOR_STRING)
		common.PanicIfErr(err)

		_, srcOk := DebugSkipFiles[relSrc]
		_, dstOk := DebugSkipFiles[relDst]
		if srcOk || dstOk {
			if jpm.ShouldLog(common.LogInfo) {
				jpm.Log(common.LogInfo, fmt.Sprintf("Transfer %d cancelled: %s", jptm.transferIndex, relSrc))
			}

			// cancel the transfer
			jptm.Cancel()
			jptm.SetStatus(common.ETransferStatus.Cancelled())
		} else {
			if len(DebugSkipFiles) != 0 && jpm.ShouldLog(common.LogInfo) {
				jpm.Log(common.LogInfo, fmt.Sprintf("Did not exclude: src: %s dst: %s", relSrc, relDst))
			}
		}
		// ===== TEST KNOB
		jpm.jobMgr.ScheduleTransfer(jpm.priority, jptm)

		// This sets the atomic variable atomicAllTransfersScheduled to 1
		// atomicAllTransfersScheduled variables is used in case of resume job
		// Since iterating the JobParts and scheduling transfer is independent
		// a variable is required which defines whether last part is resumed or not
		if plan.IsFinalPart {
			jpm.jobMgr.ConfirmAllTransfersScheduled()
		}
	}

	if plan.IsFinalPart {
		jpm.Log(common.LogInfo, "Final job part has been scheduled")
	}
}