func ResumeJobOrder()

in jobsAdmin/init.go [230:397]


func ResumeJobOrder(req common.ResumeJobRequest) common.CancelPauseResumeResponse {
	// Strip '?' if present as first character of the source sas / destination sas
	if len(req.SourceSAS) > 0 && req.SourceSAS[0] == '?' {
		req.SourceSAS = req.SourceSAS[1:]
	}
	if len(req.DestinationSAS) > 0 && req.DestinationSAS[0] == '?' {
		req.DestinationSAS = req.DestinationSAS[1:]
	}
	// Always search the plan files in Azcopy folder,
	// and resurrect the Job with provided credentials, to ensure SAS and etc get updated.
	if !JobsAdmin.ResurrectJob(req.JobID, req.SourceSAS, req.DestinationSAS, req.SrcServiceClient, req.DstServiceClient, false) {
		return common.CancelPauseResumeResponse{
			CancelledPauseResumed: false,
			ErrorMsg:              fmt.Sprintf("no job with JobId %v exists", req.JobID),
		}
	}
	// If the job manager was not found, then Job was resurrected
	// Get the Job manager again for given JobId
	jm, _ := JobsAdmin.JobMgr(req.JobID)

	// Check whether Job has been completely ordered or not
	completeJobOrdered := func(jm ste.IJobMgr) bool {
		// completeJobOrdered determines whether final part for job with JobId has been ordered or not.
		completeJobOrdered := false
		for p := ste.PartNumber(0); true; p++ {
			jpm, found := jm.JobPartMgr(p)
			if !found {
				break
			}
			completeJobOrdered = completeJobOrdered || jpm.Plan().IsFinalPart
		}
		return completeJobOrdered
	}
	// If the job has not been ordered completely, then job cannot be resumed
	if !completeJobOrdered(jm) {
		return common.CancelPauseResumeResponse{
			CancelledPauseResumed: false,
			ErrorMsg:              fmt.Sprintf("cannot resume job with JobId %s . It hasn't been ordered completely", req.JobID),
		}
	}

	var jr common.CancelPauseResumeResponse
	jpm, found := jm.JobPartMgr(0)
	if !found {
		return common.CancelPauseResumeResponse{
			CancelledPauseResumed: false,
			ErrorMsg:              fmt.Sprintf("JobID=%v, Part#=0 not found", req.JobID),
		}
	}

	// If the credential type is is Anonymous, to resume the Job destinationSAS / sourceSAS needs to be provided
	// Depending on the FromType, sourceSAS or destinationSAS is checked.
	if req.CredentialInfo.CredentialType == common.ECredentialType.Anonymous() {
		var errorMsg = ""
		switch jpm.Plan().FromTo {
		case common.EFromTo.LocalBlob(),
			common.EFromTo.LocalFile(),
			common.EFromTo.S3Blob(),
			common.EFromTo.GCPBlob():
			if len(req.DestinationSAS) == 0 {
				errorMsg = "The destination-sas switch must be provided to resume the job"
			}
		case common.EFromTo.BlobLocal(),
			common.EFromTo.FileLocal(),
			common.EFromTo.BlobTrash(),
			common.EFromTo.FileTrash():
			if len(req.SourceSAS) == 0 {
				plan := jpm.Plan()
				if plan.FromTo.From() == common.ELocation.Blob() {
					src := string(plan.SourceRoot[:plan.SourceRootLength])
					if common.IsSourcePublicBlob(src, steCtx) {
						break
					}
				}

				errorMsg = "The source-sas switch must be provided to resume the job"
			}
		case common.EFromTo.BlobBlob(),
			common.EFromTo.FileBlob():
			if len(req.SourceSAS) == 0 ||
				len(req.DestinationSAS) == 0 {

				plan := jpm.Plan()
				if plan.FromTo.From() == common.ELocation.Blob() && len(req.DestinationSAS) != 0 {
					src := string(plan.SourceRoot[:plan.SourceRootLength])
					if common.IsSourcePublicBlob(src, steCtx) {
						break
					}
				}

				errorMsg = "Both the source-sas and destination-sas switches must be provided to resume the job"
			}
		}
		if len(errorMsg) != 0 {
			return common.CancelPauseResumeResponse{
				CancelledPauseResumed: false,
				ErrorMsg:              fmt.Sprintf("cannot resume job with JobId %s. %s", req.JobID, errorMsg),
			}
		}
	}

	// After creating the Job mgr, set the include / exclude list of transfer.
	jm.SetIncludeExclude(req.IncludeTransfer, req.ExcludeTransfer)
	jpp0 := jpm.Plan()
	switch jpp0.JobStatus() {
	// Cannot resume a Job which is in Cancelling state
	// Cancelling is an intermediary state. The reason we accept and process it here, rather than returning an error,
	// is in case a process was terminated while its job was in cancelling state.
	case common.EJobStatus.Cancelling():
		jpp0.SetJobStatus(common.EJobStatus.Cancelled())
		fallthrough

	// Resume all the failed / In Progress Transfers.
	case common.EJobStatus.InProgress(),
		common.EJobStatus.Completed(),
		common.EJobStatus.CompletedWithErrors(),
		common.EJobStatus.CompletedWithSkipped(),
		common.EJobStatus.CompletedWithErrorsAndSkipped(),
		common.EJobStatus.Cancelled(),
		common.EJobStatus.Paused():
		// go func() {
		// Navigate through transfers and schedule them independently
		// This is done to avoid FE to get blocked until all the transfers have been scheduled
		// Get credential info from RPC request, and set in InMemoryTransitJobState.
		jm.SetInMemoryTransitJobState(
			ste.InMemoryTransitJobState{
				CredentialInfo: req.CredentialInfo,
			})

		// Prevents previous number of failed transfers seeping into a new run
		jm.ResetFailedTransfersCount()

		jpp0.SetJobStatus(common.EJobStatus.InProgress())

		// Jank, force the jstm to recognize that it's also in progress
		summaryResp := jm.ListJobSummary()
		summaryResp.JobStatus = common.EJobStatus.InProgress()
		jm.ResurrectSummary(summaryResp)

		if jm.ShouldLog(common.LogInfo) {
			jm.Log(common.LogInfo, fmt.Sprintf("JobID=%v resumed", req.JobID))
		}

		// Iterate through all transfer of the Job Parts and reset the transfer status
		jm.IterateJobParts(true, func(partNum common.PartNumber, jpm ste.IJobPartMgr) {
			jpp := jpm.Plan()
			// Iterate through this job part's transfers
			for t := uint32(0); t < jpp.NumTransfers; t++ {
				// transferHeader represents the memory map transfer header of transfer at index position for given job and part number
				jppt := jpp.Transfer(t)
				// If the transfer status is less than -1, it means the transfer failed because of some reason.
				// Transfer Status needs to reset.
				if jppt.TransferStatus() <= common.ETransferStatus.Failed() {
					jppt.SetTransferStatus(common.ETransferStatus.Restarted(), true)
					jppt.SetErrorCode(0, true)
				}
			}
		})

		jm.ResumeTransfers(steCtx) // Reschedule all job part's transfers
		// }()
		jr = common.CancelPauseResumeResponse{
			CancelledPauseResumed: true,
			ErrorMsg:              "",
		}
	}
	return jr
}