in jobsAdmin/init.go [230:397]
func ResumeJobOrder(req common.ResumeJobRequest) common.CancelPauseResumeResponse {
// Strip '?' if present as first character of the source sas / destination sas
if len(req.SourceSAS) > 0 && req.SourceSAS[0] == '?' {
req.SourceSAS = req.SourceSAS[1:]
}
if len(req.DestinationSAS) > 0 && req.DestinationSAS[0] == '?' {
req.DestinationSAS = req.DestinationSAS[1:]
}
// Always search the plan files in Azcopy folder,
// and resurrect the Job with provided credentials, to ensure SAS and etc get updated.
if !JobsAdmin.ResurrectJob(req.JobID, req.SourceSAS, req.DestinationSAS, req.SrcServiceClient, req.DstServiceClient, false) {
return common.CancelPauseResumeResponse{
CancelledPauseResumed: false,
ErrorMsg: fmt.Sprintf("no job with JobId %v exists", req.JobID),
}
}
// If the job manager was not found, then Job was resurrected
// Get the Job manager again for given JobId
jm, _ := JobsAdmin.JobMgr(req.JobID)
// Check whether Job has been completely ordered or not
completeJobOrdered := func(jm ste.IJobMgr) bool {
// completeJobOrdered determines whether final part for job with JobId has been ordered or not.
completeJobOrdered := false
for p := ste.PartNumber(0); true; p++ {
jpm, found := jm.JobPartMgr(p)
if !found {
break
}
completeJobOrdered = completeJobOrdered || jpm.Plan().IsFinalPart
}
return completeJobOrdered
}
// If the job has not been ordered completely, then job cannot be resumed
if !completeJobOrdered(jm) {
return common.CancelPauseResumeResponse{
CancelledPauseResumed: false,
ErrorMsg: fmt.Sprintf("cannot resume job with JobId %s . It hasn't been ordered completely", req.JobID),
}
}
var jr common.CancelPauseResumeResponse
jpm, found := jm.JobPartMgr(0)
if !found {
return common.CancelPauseResumeResponse{
CancelledPauseResumed: false,
ErrorMsg: fmt.Sprintf("JobID=%v, Part#=0 not found", req.JobID),
}
}
// If the credential type is is Anonymous, to resume the Job destinationSAS / sourceSAS needs to be provided
// Depending on the FromType, sourceSAS or destinationSAS is checked.
if req.CredentialInfo.CredentialType == common.ECredentialType.Anonymous() {
var errorMsg = ""
switch jpm.Plan().FromTo {
case common.EFromTo.LocalBlob(),
common.EFromTo.LocalFile(),
common.EFromTo.S3Blob(),
common.EFromTo.GCPBlob():
if len(req.DestinationSAS) == 0 {
errorMsg = "The destination-sas switch must be provided to resume the job"
}
case common.EFromTo.BlobLocal(),
common.EFromTo.FileLocal(),
common.EFromTo.BlobTrash(),
common.EFromTo.FileTrash():
if len(req.SourceSAS) == 0 {
plan := jpm.Plan()
if plan.FromTo.From() == common.ELocation.Blob() {
src := string(plan.SourceRoot[:plan.SourceRootLength])
if common.IsSourcePublicBlob(src, steCtx) {
break
}
}
errorMsg = "The source-sas switch must be provided to resume the job"
}
case common.EFromTo.BlobBlob(),
common.EFromTo.FileBlob():
if len(req.SourceSAS) == 0 ||
len(req.DestinationSAS) == 0 {
plan := jpm.Plan()
if plan.FromTo.From() == common.ELocation.Blob() && len(req.DestinationSAS) != 0 {
src := string(plan.SourceRoot[:plan.SourceRootLength])
if common.IsSourcePublicBlob(src, steCtx) {
break
}
}
errorMsg = "Both the source-sas and destination-sas switches must be provided to resume the job"
}
}
if len(errorMsg) != 0 {
return common.CancelPauseResumeResponse{
CancelledPauseResumed: false,
ErrorMsg: fmt.Sprintf("cannot resume job with JobId %s. %s", req.JobID, errorMsg),
}
}
}
// After creating the Job mgr, set the include / exclude list of transfer.
jm.SetIncludeExclude(req.IncludeTransfer, req.ExcludeTransfer)
jpp0 := jpm.Plan()
switch jpp0.JobStatus() {
// Cannot resume a Job which is in Cancelling state
// Cancelling is an intermediary state. The reason we accept and process it here, rather than returning an error,
// is in case a process was terminated while its job was in cancelling state.
case common.EJobStatus.Cancelling():
jpp0.SetJobStatus(common.EJobStatus.Cancelled())
fallthrough
// Resume all the failed / In Progress Transfers.
case common.EJobStatus.InProgress(),
common.EJobStatus.Completed(),
common.EJobStatus.CompletedWithErrors(),
common.EJobStatus.CompletedWithSkipped(),
common.EJobStatus.CompletedWithErrorsAndSkipped(),
common.EJobStatus.Cancelled(),
common.EJobStatus.Paused():
// go func() {
// Navigate through transfers and schedule them independently
// This is done to avoid FE to get blocked until all the transfers have been scheduled
// Get credential info from RPC request, and set in InMemoryTransitJobState.
jm.SetInMemoryTransitJobState(
ste.InMemoryTransitJobState{
CredentialInfo: req.CredentialInfo,
})
// Prevents previous number of failed transfers seeping into a new run
jm.ResetFailedTransfersCount()
jpp0.SetJobStatus(common.EJobStatus.InProgress())
// Jank, force the jstm to recognize that it's also in progress
summaryResp := jm.ListJobSummary()
summaryResp.JobStatus = common.EJobStatus.InProgress()
jm.ResurrectSummary(summaryResp)
if jm.ShouldLog(common.LogInfo) {
jm.Log(common.LogInfo, fmt.Sprintf("JobID=%v resumed", req.JobID))
}
// Iterate through all transfer of the Job Parts and reset the transfer status
jm.IterateJobParts(true, func(partNum common.PartNumber, jpm ste.IJobPartMgr) {
jpp := jpm.Plan()
// Iterate through this job part's transfers
for t := uint32(0); t < jpp.NumTransfers; t++ {
// transferHeader represents the memory map transfer header of transfer at index position for given job and part number
jppt := jpp.Transfer(t)
// If the transfer status is less than -1, it means the transfer failed because of some reason.
// Transfer Status needs to reset.
if jppt.TransferStatus() <= common.ETransferStatus.Failed() {
jppt.SetTransferStatus(common.ETransferStatus.Restarted(), true)
jppt.SetErrorCode(0, true)
}
}
})
jm.ResumeTransfers(steCtx) // Reschedule all job part's transfers
// }()
jr = common.CancelPauseResumeResponse{
CancelledPauseResumed: true,
ErrorMsg: "",
}
}
return jr
}