jobsAdmin/init.go (502 lines of code) (raw):
// Copyright © 2017 Microsoft <wastore@microsoft.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package jobsAdmin
import (
"context"
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"os"
"sync"
"time"
"github.com/Azure/azure-storage-azcopy/v10/common"
"github.com/Azure/azure-storage-azcopy/v10/ste"
)
var steCtx = context.Background()
var mu sync.Mutex // Prevent inconsistent state between check and update of TotalBytesTransferred variable
const EMPTY_SAS_STRING = ""
type azCopyConfig struct {
MIMETypeMapping map[string]string
}
// round api rounds up the float number after the decimal point.
func round(num float64) int {
return int(num + math.Copysign(0.5, num))
}
// ToFixed api returns the float number precised up to given decimal places.
func ToFixed(num float64, precision int) float64 {
output := math.Pow(10, float64(precision))
return float64(round(num*output)) / output
}
// MainSTE initializes the Storage Transfer Engine
func MainSTE(concurrency ste.ConcurrencySettings, targetRateInMegaBitsPerSec float64, azcopyJobPlanFolder, azcopyLogPathFolder string, providePerfAdvice bool) error {
// Initialize the JobsAdmin, resurrect Job plan files
initJobsAdmin(steCtx, concurrency, targetRateInMegaBitsPerSec, azcopyJobPlanFolder, azcopyLogPathFolder, providePerfAdvice)
// TODO: We may want to list listen first and terminate if there is already an instance listening
// if we've a custom mime map
if path := common.GetEnvironmentVariable(common.EEnvironmentVariable.MimeMapping()); path != "" {
data, err := os.ReadFile(path)
if err != nil {
return err
}
var config azCopyConfig
err = json.Unmarshal(data, &config)
if err != nil {
return err
}
ste.EnvironmentMimeMap = config.MIMETypeMapping
}
deserialize := func(request *http.Request, v interface{}) {
// TODO: Check the HTTP verb here?
// reading the entire request body and closing the request body
body, err := io.ReadAll(request.Body)
request.Body.Close()
if err != nil {
JobsAdmin.Panic(fmt.Errorf("error deserializing HTTP request"))
}
_ = json.Unmarshal(body, v)
}
serialize := func(v interface{}, response http.ResponseWriter) {
payload, err := json.Marshal(response)
if err != nil {
JobsAdmin.Panic(fmt.Errorf("error serializing HTTP response"))
}
// sending successful response back to front end
response.WriteHeader(http.StatusAccepted)
_, _ = response.Write(payload)
}
http.HandleFunc(common.ERpcCmd.CopyJobPartOrder().Pattern(),
func(writer http.ResponseWriter, request *http.Request) {
var payload common.CopyJobPartOrderRequest
deserialize(request, &payload)
serialize(ExecuteNewCopyJobPartOrder(payload), writer)
})
http.HandleFunc(common.ERpcCmd.ListJobs().Pattern(),
func(writer http.ResponseWriter, request *http.Request) {
// var payload common.ListRequest
// deserialize(request, &payload)
serialize(ListJobs(common.EJobStatus.All()), writer)
})
http.HandleFunc(common.ERpcCmd.ListJobSummary().Pattern(),
func(writer http.ResponseWriter, request *http.Request) {
var payload common.JobID
deserialize(request, &payload)
serialize(GetJobSummary(payload), writer)
})
http.HandleFunc(common.ERpcCmd.ListJobTransfers().Pattern(),
func(writer http.ResponseWriter, request *http.Request) {
var payload common.ListJobTransfersRequest
deserialize(request, &payload)
serialize(ListJobTransfers(payload), writer) // TODO: make struct
})
/*
http.HandleFunc(common.ERpcCmd.CancelJob().Pattern(),
func(writer http.ResponseWriter, request *http.Request) {
var payload common.JobID
deserialize(request, &payload)
serialize(CancelPauseJobOrder(payload, common.EJobStatus.Cancelling()), writer)
})
http.HandleFunc(common.ERpcCmd.PauseJob().Pattern(),
func(writer http.ResponseWriter, request *http.Request) {
var payload common.JobID
deserialize(request, &payload)
serialize(CancelPauseJobOrder(payload, common.EJobStatus.Paused()), writer)
})
*/
http.HandleFunc(common.ERpcCmd.ResumeJob().Pattern(),
func(writer http.ResponseWriter, request *http.Request) {
var payload common.ResumeJobRequest
deserialize(request, &payload)
serialize(ResumeJobOrder(payload), writer)
})
http.HandleFunc(common.ERpcCmd.GetJobDetails().Pattern(),
func(writer http.ResponseWriter, request *http.Request) {
var payload common.GetJobDetailsRequest
deserialize(request, &payload)
serialize(GetJobDetails(payload), writer)
})
// Listen for front-end requests
// if err := http.ListenAndServe("localhost:1337", nil); err != nil {
// fmt.Print("Server already initialized")
// return err
// }
return nil // TODO: don't return (like normal main)
}
// /////////////////////////////////////////////////////////////////////////////
// ExecuteNewCopyJobPartOrder api executes a new job part order
func ExecuteNewCopyJobPartOrder(order common.CopyJobPartOrderRequest) common.CopyJobPartOrderResponse {
// Get the file name for this Job Part's Plan
jppfn := JobsAdmin.NewJobPartPlanFileName(order.JobID, order.PartNum)
jppfn.Create(order) // Convert the order to a plan file
jm := JobsAdmin.JobMgrEnsureExists(order.JobID, order.LogLevel, order.CommandString) // Get a this job part's job manager (create it if it doesn't exist)
if len(order.Transfers.List) == 0 && order.IsFinalPart {
/*
* We set the status of this jobPart to Completed()
* immediately after it is scheduled, and wind down
* the transfer
*/
jm.Log(common.LogWarning, "No transfers were scheduled.")
}
// Get credential info from RPC request order, and set in InMemoryTransitJobState.
jm.SetInMemoryTransitJobState(
ste.InMemoryTransitJobState{
CredentialInfo: order.CredentialInfo,
S2SSourceCredentialType: order.S2SSourceCredentialType,
})
// Supply no plan MMF because we don't have one, and AddJobPart will create one on its own.
// Add this part to the Job and schedule its transfers
args := &ste.AddJobPartArgs{
PartNum: order.PartNum,
PlanFile: jppfn,
ExistingPlanMMF: nil,
SrcClient: order.SrcServiceClient,
DstClient: order.DstServiceClient,
SrcIsOAuth: order.S2SSourceCredentialType.IsAzureOAuth(),
ScheduleTransfers: true,
}
jm.AddJobPart(args)
// Update jobPart Status with the status Manager
jm.SendJobPartCreatedMsg(ste.JobPartCreatedMsg{TotalTransfers: uint32(len(order.Transfers.List)),
IsFinalPart: order.IsFinalPart,
TotalBytesEnumerated: order.Transfers.TotalSizeInBytes,
FileTransfers: order.Transfers.FileTransferCount,
SymlinkTransfers: order.Transfers.SymlinkTransferCount,
FolderTransfer: order.Transfers.FolderTransferCount})
return common.CopyJobPartOrderResponse{JobStarted: true}
}
// cancelpauseJobOrder api cancel/pause a job with given JobId
/* A Job cannot be cancelled/paused in following cases
* If the Job has not been ordered completely it cannot be cancelled or paused
* If all the transfers in the Job are either failed or completed, then Job cannot be cancelled or paused
* If a job is already paused, it cannot be paused again
*/
func CancelPauseJobOrder(jobID common.JobID, desiredJobStatus common.JobStatus) common.CancelPauseResumeResponse {
jm, found := JobsAdmin.JobMgr(jobID) // Find Job being paused/canceled
if !found {
// If the Job is not found, search for Job Plan files in the existing plan file
// and resurrect the job
if !JobsAdmin.ResurrectJob(jobID, EMPTY_SAS_STRING, EMPTY_SAS_STRING, nil, nil, false) {
return common.CancelPauseResumeResponse{
CancelledPauseResumed: false,
ErrorMsg: fmt.Sprintf("no active job with JobId %s exists", jobID.String()),
}
}
jm, _ = JobsAdmin.JobMgr(jobID)
}
return jm.CancelPauseJobOrder(desiredJobStatus)
}
func ResumeJobOrder(req common.ResumeJobRequest) common.CancelPauseResumeResponse {
// Strip '?' if present as first character of the source sas / destination sas
if len(req.SourceSAS) > 0 && req.SourceSAS[0] == '?' {
req.SourceSAS = req.SourceSAS[1:]
}
if len(req.DestinationSAS) > 0 && req.DestinationSAS[0] == '?' {
req.DestinationSAS = req.DestinationSAS[1:]
}
// Always search the plan files in Azcopy folder,
// and resurrect the Job with provided credentials, to ensure SAS and etc get updated.
if !JobsAdmin.ResurrectJob(req.JobID, req.SourceSAS, req.DestinationSAS, req.SrcServiceClient, req.DstServiceClient, false) {
return common.CancelPauseResumeResponse{
CancelledPauseResumed: false,
ErrorMsg: fmt.Sprintf("no job with JobId %v exists", req.JobID),
}
}
// If the job manager was not found, then Job was resurrected
// Get the Job manager again for given JobId
jm, _ := JobsAdmin.JobMgr(req.JobID)
// Check whether Job has been completely ordered or not
completeJobOrdered := func(jm ste.IJobMgr) bool {
// completeJobOrdered determines whether final part for job with JobId has been ordered or not.
completeJobOrdered := false
for p := ste.PartNumber(0); true; p++ {
jpm, found := jm.JobPartMgr(p)
if !found {
break
}
completeJobOrdered = completeJobOrdered || jpm.Plan().IsFinalPart
}
return completeJobOrdered
}
// If the job has not been ordered completely, then job cannot be resumed
if !completeJobOrdered(jm) {
return common.CancelPauseResumeResponse{
CancelledPauseResumed: false,
ErrorMsg: fmt.Sprintf("cannot resume job with JobId %s . It hasn't been ordered completely", req.JobID),
}
}
var jr common.CancelPauseResumeResponse
jpm, found := jm.JobPartMgr(0)
if !found {
return common.CancelPauseResumeResponse{
CancelledPauseResumed: false,
ErrorMsg: fmt.Sprintf("JobID=%v, Part#=0 not found", req.JobID),
}
}
// If the credential type is is Anonymous, to resume the Job destinationSAS / sourceSAS needs to be provided
// Depending on the FromType, sourceSAS or destinationSAS is checked.
if req.CredentialInfo.CredentialType == common.ECredentialType.Anonymous() {
var errorMsg = ""
switch jpm.Plan().FromTo {
case common.EFromTo.LocalBlob(),
common.EFromTo.LocalFile(),
common.EFromTo.S3Blob(),
common.EFromTo.GCPBlob():
if len(req.DestinationSAS) == 0 {
errorMsg = "The destination-sas switch must be provided to resume the job"
}
case common.EFromTo.BlobLocal(),
common.EFromTo.FileLocal(),
common.EFromTo.BlobTrash(),
common.EFromTo.FileTrash():
if len(req.SourceSAS) == 0 {
plan := jpm.Plan()
if plan.FromTo.From() == common.ELocation.Blob() {
src := string(plan.SourceRoot[:plan.SourceRootLength])
if common.IsSourcePublicBlob(src, steCtx) {
break
}
}
errorMsg = "The source-sas switch must be provided to resume the job"
}
case common.EFromTo.BlobBlob(),
common.EFromTo.FileBlob():
if len(req.SourceSAS) == 0 ||
len(req.DestinationSAS) == 0 {
plan := jpm.Plan()
if plan.FromTo.From() == common.ELocation.Blob() && len(req.DestinationSAS) != 0 {
src := string(plan.SourceRoot[:plan.SourceRootLength])
if common.IsSourcePublicBlob(src, steCtx) {
break
}
}
errorMsg = "Both the source-sas and destination-sas switches must be provided to resume the job"
}
}
if len(errorMsg) != 0 {
return common.CancelPauseResumeResponse{
CancelledPauseResumed: false,
ErrorMsg: fmt.Sprintf("cannot resume job with JobId %s. %s", req.JobID, errorMsg),
}
}
}
// After creating the Job mgr, set the include / exclude list of transfer.
jm.SetIncludeExclude(req.IncludeTransfer, req.ExcludeTransfer)
jpp0 := jpm.Plan()
switch jpp0.JobStatus() {
// Cannot resume a Job which is in Cancelling state
// Cancelling is an intermediary state. The reason we accept and process it here, rather than returning an error,
// is in case a process was terminated while its job was in cancelling state.
case common.EJobStatus.Cancelling():
jpp0.SetJobStatus(common.EJobStatus.Cancelled())
fallthrough
// Resume all the failed / In Progress Transfers.
case common.EJobStatus.InProgress(),
common.EJobStatus.Completed(),
common.EJobStatus.CompletedWithErrors(),
common.EJobStatus.CompletedWithSkipped(),
common.EJobStatus.CompletedWithErrorsAndSkipped(),
common.EJobStatus.Cancelled(),
common.EJobStatus.Paused():
// go func() {
// Navigate through transfers and schedule them independently
// This is done to avoid FE to get blocked until all the transfers have been scheduled
// Get credential info from RPC request, and set in InMemoryTransitJobState.
jm.SetInMemoryTransitJobState(
ste.InMemoryTransitJobState{
CredentialInfo: req.CredentialInfo,
})
// Prevents previous number of failed transfers seeping into a new run
jm.ResetFailedTransfersCount()
jpp0.SetJobStatus(common.EJobStatus.InProgress())
// Jank, force the jstm to recognize that it's also in progress
summaryResp := jm.ListJobSummary()
summaryResp.JobStatus = common.EJobStatus.InProgress()
jm.ResurrectSummary(summaryResp)
if jm.ShouldLog(common.LogInfo) {
jm.Log(common.LogInfo, fmt.Sprintf("JobID=%v resumed", req.JobID))
}
// Iterate through all transfer of the Job Parts and reset the transfer status
jm.IterateJobParts(true, func(partNum common.PartNumber, jpm ste.IJobPartMgr) {
jpp := jpm.Plan()
// Iterate through this job part's transfers
for t := uint32(0); t < jpp.NumTransfers; t++ {
// transferHeader represents the memory map transfer header of transfer at index position for given job and part number
jppt := jpp.Transfer(t)
// If the transfer status is less than -1, it means the transfer failed because of some reason.
// Transfer Status needs to reset.
if jppt.TransferStatus() <= common.ETransferStatus.Failed() {
jppt.SetTransferStatus(common.ETransferStatus.Restarted(), true)
jppt.SetErrorCode(0, true)
}
}
})
jm.ResumeTransfers(steCtx) // Reschedule all job part's transfers
// }()
jr = common.CancelPauseResumeResponse{
CancelledPauseResumed: true,
ErrorMsg: "",
}
}
return jr
}
// GetJobSummary api returns the job progress summary of an active job
/*
* Return following Properties in Job Progress Summary
* CompleteJobOrdered - determines whether final part of job has been ordered or not
* TotalTransfers - total number of transfers available for the given job
* TotalNumberOfTransfersCompleted - total number of transfers in the job completed
* NumberOfTransfersCompletedAfterCheckpoint - number of transfers completed after the last checkpoint
* NumberOfTransferFailedAfterCheckpoint - number of transfers failed after last checkpoint timestamp
* PercentageProgress - job progress reported in terms of percentage
* FailedTransfers - list of transfer after last checkpoint timestamp that failed.
*/
func GetJobSummary(jobID common.JobID) common.ListJobSummaryResponse {
// getJobPartMapFromJobPartInfoMap gives the map of partNo to JobPartPlanInfo Pointer for a given JobId
jm, found := JobsAdmin.JobMgr(jobID)
if !found {
// Job with JobId does not exists
// Search the plan files in Azcopy folder
// and resurrect the Job
if !JobsAdmin.ResurrectJob(jobID, EMPTY_SAS_STRING, EMPTY_SAS_STRING, nil, nil, false) {
return common.ListJobSummaryResponse{
ErrorMsg: fmt.Sprintf("no job with JobId %v exists", jobID),
}
}
// If the job manager was not found, then Job was resurrected
// Get the Job manager again for given JobId
jm, _ = JobsAdmin.JobMgr(jobID)
}
js := jm.ListJobSummary()
js.Timestamp = time.Now().UTC()
js.JobID = jm.JobID()
js.ErrorMsg = ""
part0, ok := jm.JobPartMgr(0)
if !ok {
return js
}
part0PlanStatus := part0.Plan().JobStatus()
// Add on byte count from files in flight, to get a more accurate running total
// Check is added to prevent double counting
if js.TotalBytesTransferred+jm.SuccessfulBytesInActiveFiles() <= js.TotalBytesExpected {
js.TotalBytesTransferred += jm.SuccessfulBytesInActiveFiles()
}
if js.TotalBytesExpected == 0 {
// if no bytes expected, and we should avoid dividing by 0 (which results in NaN)
js.PercentComplete = 100
} else {
js.PercentComplete = 100 * (float32(js.TotalBytesTransferred) / float32(js.TotalBytesExpected))
}
if js.PercentComplete > 100 {
js.PercentComplete = 100
}
// This is added to let FE to continue fetching the Job Progress Summary
// in case of resume. In case of resume, the Job is already completely
// ordered so the progress summary should be fetched until all job parts
// are iterated and have been scheduled
js.CompleteJobOrdered = js.CompleteJobOrdered || jm.AllTransfersScheduled()
js.BytesOverWire = uint64(JobsAdmin.BytesOverWire())
// Get the number of active go routines performing the transfer or executing the chunk Func
// TODO: added for debugging purpose. remove later (is covered by GetPerfInfo now anyway)
js.ActiveConnections = jm.ActiveConnections()
js.PerfStrings, js.PerfConstraint = jm.GetPerfInfo()
pipeStats := jm.PipelineNetworkStats()
if pipeStats != nil {
js.AverageIOPS = pipeStats.OperationsPerSecond()
js.AverageE2EMilliseconds = pipeStats.AverageE2EMilliseconds()
js.NetworkErrorPercentage = pipeStats.NetworkErrorPercentage()
js.ServerBusyPercentage = pipeStats.TotalServerBusyPercentage()
}
// If the status is cancelled, then no need to check for completerJobOrdered
// since user must have provided the consent to cancel an incompleteJob if that
// is the case.
dir := jm.TransferDirection()
p := jm.PipelineNetworkStats()
if part0PlanStatus == common.EJobStatus.Cancelled() {
js.JobStatus = part0PlanStatus
js.PerformanceAdvice = JobsAdmin.TryGetPerformanceAdvice(js.TotalBytesExpected, js.TotalTransfers-js.TransfersSkipped, part0.Plan().FromTo, dir, p)
} else {
// Job is completed if Job order is complete AND ALL transfers are completed/failed
// FIX: active or inactive state, then job order is said to be completed if final part of job has been ordered.
if (js.CompleteJobOrdered) && (part0PlanStatus.IsJobDone()) {
js.JobStatus = part0PlanStatus
}
if js.JobStatus.IsJobDone() {
js.PerformanceAdvice = JobsAdmin.TryGetPerformanceAdvice(js.TotalBytesExpected, js.TotalTransfers-js.TransfersSkipped, part0.Plan().FromTo, dir, p)
}
}
return js
}
func resurrectJobSummary(jm ste.IJobMgr) common.ListJobSummaryResponse {
js := common.ListJobSummaryResponse{
Timestamp: time.Now().UTC(),
JobID: jm.JobID(),
ErrorMsg: "",
JobStatus: common.EJobStatus.InProgress(), // Default
CompleteJobOrdered: false, // default to false; returns true if ALL job parts have been ordered
FailedTransfers: []common.TransferDetail{},
}
// To avoid race condition: get overall status BEFORE we get counts of completed files)
// (if we get it afterwards, we can get a cases where the counts haven't reached 100% done, but by the time we
// get the status, the job IS finished - and so we report completion with a lower total file count than what the job really had).
// Better to check overall status first, and see it as uncompleted on this call (and completed on the next call).
part0, ok := jm.JobPartMgr(0)
if !ok {
panic(fmt.Errorf("error getting the 0th part of Job %s", jm.JobID()))
}
part0PlanStatus := part0.Plan().JobStatus()
// Now iterate and count things up, rebuild job summary by examining the current state of all transfers
jm.IterateJobParts(true, func(partNum common.PartNumber, jpm ste.IJobPartMgr) {
jpp := jpm.Plan()
js.CompleteJobOrdered = js.CompleteJobOrdered || jpp.IsFinalPart
js.TotalTransfers += jpp.NumTransfers
// Iterate through this job part's transfers
for t := uint32(0); t < jpp.NumTransfers; t++ {
// transferHeader represents the memory map transfer header of transfer at index position for given job and part number
jppt := jpp.Transfer(t)
js.TotalBytesEnumerated += uint64(jppt.SourceSize)
switch jppt.EntityType {
case common.EEntityType.File():
js.FileTransfers++
case common.EEntityType.Folder():
js.FolderPropertyTransfers++
case common.EEntityType.Symlink():
js.SymlinkTransfers++
}
// check for all completed transfer to calculate the progress percentage at the end
switch jppt.TransferStatus() {
case common.ETransferStatus.NotStarted(),
common.ETransferStatus.FolderCreated(),
common.ETransferStatus.Started(),
common.ETransferStatus.Restarted(),
common.ETransferStatus.Cancelled():
js.TotalBytesExpected += uint64(jppt.SourceSize)
case common.ETransferStatus.Success():
js.TransfersCompleted++
js.TotalBytesTransferred += uint64(jppt.SourceSize)
js.TotalBytesExpected += uint64(jppt.SourceSize)
case common.ETransferStatus.Failed(),
common.ETransferStatus.TierAvailabilityCheckFailure(),
common.ETransferStatus.BlobTierFailure():
js.TransfersFailed++
// getting the source and destination for failed transfer at position - index
src, dst, isFolder := jpp.TransferSrcDstStrings(t)
// appending to list of failed transfer
js.FailedTransfers = append(js.FailedTransfers,
common.TransferDetail{
Src: src,
Dst: dst,
IsFolderProperties: isFolder,
TransferStatus: common.ETransferStatus.Failed(),
ErrorCode: jppt.ErrorCode()}) // TODO: Optimize
case common.ETransferStatus.SkippedEntityAlreadyExists(),
common.ETransferStatus.SkippedBlobHasSnapshots():
js.TransfersSkipped++
// getting the source and destination for skipped transfer at position - index
src, dst, isFolder := jpp.TransferSrcDstStrings(t)
js.SkippedTransfers = append(js.SkippedTransfers,
common.TransferDetail{
Src: src,
Dst: dst,
IsFolderProperties: isFolder,
TransferStatus: jppt.TransferStatus(),
})
}
}
})
mu.Lock()
// Add on byte count from files in flight, to get a more accurate running total
// Check is added to prevent double counting
if js.TotalBytesTransferred+jm.SuccessfulBytesInActiveFiles() <= js.TotalBytesExpected {
js.TotalBytesTransferred += jm.SuccessfulBytesInActiveFiles()
}
mu.Unlock()
if js.TotalBytesExpected == 0 {
// if no bytes expected, and we should avoid dividing by 0 (which results in NaN)
js.PercentComplete = 100
} else {
js.PercentComplete = 100 * float32(js.TotalBytesTransferred) / float32(js.TotalBytesExpected)
}
if js.PercentComplete > 100 {
js.PercentComplete = 100
}
// This is added to let FE to continue fetching the Job Progress Summary
// in case of resume. In case of resume, the Job is already completely
// ordered so the progress summary should be fetched until all job parts
// are iterated and have been scheduled
js.CompleteJobOrdered = js.CompleteJobOrdered || jm.AllTransfersScheduled()
js.BytesOverWire = uint64(JobsAdmin.BytesOverWire())
// Get the number of active go routines performing the transfer or executing the chunk Func
// TODO: added for debugging purpose. remove later (is covered by GetPerfInfo now anyway)
js.ActiveConnections = jm.ActiveConnections()
js.PerfStrings, js.PerfConstraint = jm.GetPerfInfo()
pipeStats := jm.PipelineNetworkStats()
if pipeStats != nil {
js.AverageIOPS = pipeStats.OperationsPerSecond()
js.AverageE2EMilliseconds = pipeStats.AverageE2EMilliseconds()
js.NetworkErrorPercentage = pipeStats.NetworkErrorPercentage()
js.ServerBusyPercentage = pipeStats.TotalServerBusyPercentage()
}
// If the status is cancelled, then no need to check for completerJobOrdered
// since user must have provided the consent to cancel an incompleteJob if that
// is the case.
dir := jm.TransferDirection()
p := jm.PipelineNetworkStats()
if part0PlanStatus == common.EJobStatus.Cancelled() {
js.JobStatus = part0PlanStatus
js.PerformanceAdvice = JobsAdmin.TryGetPerformanceAdvice(js.TotalBytesExpected, js.TotalTransfers-js.TransfersSkipped, part0.Plan().FromTo, dir, p)
return js
}
// Job is completed if Job order is complete AND ALL transfers are completed/failed
// FIX: active or inactive state, then job order is said to be completed if final part of job has been ordered.
if (js.CompleteJobOrdered) && (part0PlanStatus.IsJobDone()) {
js.JobStatus = part0PlanStatus
}
if js.JobStatus.IsJobDone() {
js.PerformanceAdvice = JobsAdmin.TryGetPerformanceAdvice(js.TotalBytesExpected, js.TotalTransfers-js.TransfersSkipped, part0.Plan().FromTo, dir, p)
}
return js
}
// ListJobTransfers api returns the list of transfer with specific status for given jobId in http response
func ListJobTransfers(r common.ListJobTransfersRequest) common.ListJobTransfersResponse {
// getJobPartInfoReferenceFromMap gives the JobPartPlanInfo Pointer for given JobId and partNumber
jm, found := JobsAdmin.JobMgr(r.JobID)
if !found {
// Job with JobId does not exists
// Search the plan files in Azcopy folder
// and resurrect the Job
if !JobsAdmin.ResurrectJob(r.JobID, EMPTY_SAS_STRING, EMPTY_SAS_STRING, nil, nil, false) {
return common.ListJobTransfersResponse{
ErrorMsg: fmt.Sprintf("no job with JobId %v exists", r.JobID),
}
}
// If the job manager was not found, then Job was resurrected
// Get the Job manager again for given JobId
jm, _ = JobsAdmin.JobMgr(r.JobID)
}
ljt := common.ListJobTransfersResponse{
JobID: r.JobID,
Details: []common.TransferDetail{},
}
for partNum := ste.PartNumber(0); true; partNum++ {
jpm, found := jm.JobPartMgr(partNum)
if !found {
break
}
// jPartPlan represents the memory map JobPartPlanHeader for given jobid and part number
jpp := jpm.Plan()
// numTransfer := jPartPlan.NumTransfers
// transferStatusList represents the list containing number of transfer for given jobID and part number
for t := uint32(0); t < jpp.NumTransfers; t++ {
// getting transfer header of transfer at index index for given jobId and part number
transferEntry := jpp.Transfer(t)
// If the expected status is not to list all transfer and
// if the transfer status is not equal to the given status
// skip the transfer.
// If the given status is failed and the current transfer status is <= -1,
// it means transfer failed and could have failed because of some other reason.
// In this case we don't skip the transfer.
// For Example: In case with-status is Failed, transfers with status "BlobAlreadyExistsFailure"
// will also be included.
if r.OfStatus != common.ETransferStatus.All() &&
((transferEntry.TransferStatus() != r.OfStatus) &&
!(r.OfStatus == common.ETransferStatus.Failed() && transferEntry.TransferStatus() <= common.ETransferStatus.Failed())) {
continue
}
// getting source and destination of a transfer at index index for given jobId and part number.
src, dst, isFolder := jpp.TransferSrcDstStrings(t)
ljt.Details = append(ljt.Details,
common.TransferDetail{Src: src, Dst: dst, IsFolderProperties: isFolder, TransferStatus: transferEntry.TransferStatus(), ErrorCode: transferEntry.ErrorCode()})
}
}
return ljt
}
func GetJobLCMWrapper(jobID common.JobID) common.LifecycleMgr {
jobmgr, found := JobsAdmin.JobMgr(jobID)
lcm := common.GetLifecycleMgr()
if !found {
return lcm
}
return ste.JobLogLCMWrapper{
JobManager: jobmgr,
LifecycleMgr: lcm,
}
}
// ListJobs returns the jobId of all the jobs existing in the current instance of azcopy
func ListJobs(givenStatus common.JobStatus) common.ListJobsResponse {
return JobsAdmin.ListJobs(givenStatus)
}
// GetJobDetails api returns the job FromTo info.
func GetJobDetails(r common.GetJobDetailsRequest) common.GetJobDetailsResponse {
jm, found := JobsAdmin.JobMgr(r.JobID)
if !found {
// Job with JobId does not exists.
// Search the plan files in Azcopy folder and resurrect the Job.
if !JobsAdmin.ResurrectJob(r.JobID, EMPTY_SAS_STRING, EMPTY_SAS_STRING, nil, nil, false) {
return common.GetJobDetailsResponse{
ErrorMsg: fmt.Sprintf("Job with JobID %v does not exist or is invalid", r.JobID),
}
}
jm, _ = JobsAdmin.JobMgr(r.JobID)
}
// Get zeroth part of the job part plan.
jp0, ok := jm.JobPartMgr(0)
if !ok {
return common.GetJobDetailsResponse{
ErrorMsg: fmt.Sprintf("error getting the job's FromTo with JobID %v", r.JobID),
}
}
// Use first transfer's source/destination as represent.
source, destination, _ := jp0.Plan().TransferSrcDstStrings(0)
if source == "" && destination == "" {
return common.GetJobDetailsResponse{
ErrorMsg: fmt.Sprintf("error getting the source/destination with JobID %v", r.JobID),
}
}
return common.GetJobDetailsResponse{
ErrorMsg: "",
FromTo: jp0.Plan().FromTo,
Source: source,
Destination: destination,
TrailingDot: jp0.Plan().DstFileData.TrailingDot,
}
}