cmd/jobsResume.go (338 lines of code) (raw):
// Copyright © 2017 Microsoft <wastore@microsoft.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package cmd
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-storage-azcopy/v10/jobsAdmin"
"github.com/Azure/azure-storage-azcopy/v10/common"
"github.com/Azure/azure-storage-azcopy/v10/ste"
"github.com/spf13/cobra"
)
// TODO the behavior of the resume command should be double-checked
// TODO figure out how to merge resume job with copy
// TODO the progress reporting code is almost the same as the copy command, the copy-paste should be avoided
type resumeJobController struct {
// generated
jobID common.JobID
// variables used to calculate progress
// intervalStartTime holds the last time value when the progress summary was fetched
// the value of this variable is used to calculate the throughput
// it gets updated every time the progress summary is fetched
intervalStartTime time.Time
intervalBytesTransferred uint64
// used to calculate job summary
jobStartTime time.Time
}
// wraps call to lifecycle manager to wait for the job to complete
// if blocking is specified to true, then this method will never return
// if blocking is specified to false, then another goroutine spawns and wait out the job
func (cca *resumeJobController) waitUntilJobCompletion(blocking bool) {
// print initial message to indicate that the job is starting
// Output the log location if log-level is set to other then NONE
var logPathFolder string
if azcopyLogPathFolder != "" {
logPathFolder = fmt.Sprintf("%s%s%s.log", azcopyLogPathFolder, common.OS_PATH_SEPARATOR, cca.jobID)
}
glcm.Init(common.GetStandardInitOutputBuilder(cca.jobID.String(), logPathFolder, false, ""))
// initialize the times necessary to track progress
cca.jobStartTime = time.Now()
cca.intervalStartTime = time.Now()
cca.intervalBytesTransferred = 0
// hand over control to the lifecycle manager if blocking
if blocking {
glcm.InitiateProgressReporting(cca)
glcm.SurrenderControl()
} else {
// non-blocking, return after spawning a go routine to watch the job
glcm.InitiateProgressReporting(cca)
}
}
func (cca *resumeJobController) Cancel(lcm common.LifecycleMgr) {
err := cookedCancelCmdArgs{jobID: cca.jobID}.process()
if err != nil {
lcm.Error("error occurred while cancelling the job " + cca.jobID.String() + ". Failed with error " + err.Error())
}
}
// TODO: can we combine this with the copy one (and the sync one?)
func (cca *resumeJobController) ReportProgressOrExit(lcm common.LifecycleMgr) (totalKnownCount uint32) {
// fetch a job status
var summary common.ListJobSummaryResponse
Rpc(common.ERpcCmd.ListJobSummary(), &cca.jobID, &summary)
glcmSwapOnce.Do(func() {
Rpc(common.ERpcCmd.GetJobLCMWrapper(), &cca.jobID, &glcm)
})
jobDone := summary.JobStatus.IsJobDone()
totalKnownCount = summary.TotalTransfers
// if json is not desired, and job is done, then we generate a special end message to conclude the job
duration := time.Since(cca.jobStartTime) // report the total run time of the job
var computeThroughput = func() float64 {
// compute the average throughput for the last time interval
bytesInMb := float64(float64(summary.BytesOverWire-cca.intervalBytesTransferred) / float64(base10Mega))
timeElapsed := time.Since(cca.intervalStartTime).Seconds()
// reset the interval timer and byte count
cca.intervalStartTime = time.Now()
cca.intervalBytesTransferred = summary.BytesOverWire
return common.Iff(timeElapsed != 0, bytesInMb/timeElapsed, 0) * 8
}
glcm.Progress(func(format common.OutputFormat) string {
if format == common.EOutputFormat.Json() {
jsonOutput, err := json.Marshal(summary)
common.PanicIfErr(err)
return string(jsonOutput)
} else {
// if json is not needed, then we generate a message that goes nicely on the same line
// display a scanning keyword if the job is not completely ordered
var scanningString = " (scanning...)"
if summary.CompleteJobOrdered {
scanningString = ""
}
throughput := computeThroughput()
throughputString := fmt.Sprintf("2-sec Throughput (Mb/s): %v", jobsAdmin.ToFixed(throughput, 4))
if throughput == 0 {
// As there would be case when no bits sent from local, e.g. service side copy, when throughput = 0, hide it.
throughputString = ""
}
// indicate whether constrained by disk or not
perfString, diskString := getPerfDisplayText(summary.PerfStrings, summary.PerfConstraint, duration, false)
return fmt.Sprintf("%.1f %%, %v Done, %v Failed, %v Pending, %v Skipped, %v Total%s, %s%s%s",
summary.PercentComplete,
summary.TransfersCompleted,
summary.TransfersFailed,
summary.TotalTransfers-(summary.TransfersCompleted+summary.TransfersFailed+summary.TransfersSkipped),
summary.TransfersSkipped, summary.TotalTransfers, scanningString, perfString, throughputString, diskString)
}
})
if jobDone {
exitCode := common.EExitCode.Success()
if summary.TransfersFailed > 0 {
exitCode = common.EExitCode.Error()
}
lcm.Exit(func(format common.OutputFormat) string {
if format == common.EOutputFormat.Json() {
jsonOutput, err := json.Marshal(summary)
common.PanicIfErr(err)
return string(jsonOutput)
} else {
return fmt.Sprintf(
`
Job %s summary
Elapsed Time (Minutes): %v
Number of File Transfers: %v
Number of Folder Property Transfers: %v
Number of Symlink Transfers: %v
Total Number of Transfers: %v
Number of File Transfers Completed: %v
Number of Folder Transfers Completed: %v
Number of File Transfers Failed: %v
Number of Folder Transfers Failed: %v
Number of File Transfers Skipped: %v
Number of Folder Transfers Skipped: %v
Total Number of Bytes Transferred: %v
Final Job Status: %v
`,
summary.JobID.String(),
jobsAdmin.ToFixed(duration.Minutes(), 4),
summary.FileTransfers,
summary.FolderPropertyTransfers,
summary.SymlinkTransfers,
summary.TotalTransfers,
summary.TransfersCompleted-summary.FoldersCompleted,
summary.FoldersCompleted,
summary.TransfersFailed-summary.FoldersFailed,
summary.FoldersFailed,
summary.TransfersSkipped-summary.FoldersSkipped,
summary.FoldersSkipped,
summary.TotalBytesTransferred,
summary.JobStatus)
}
}, exitCode)
}
return
}
func init() {
resumeCmdArgs := resumeCmdArgs{}
// resumeCmd represents the resume command
resumeCmd := &cobra.Command{
Use: "resume [jobID]",
SuggestFor: []string{"resme", "esume", "resue"},
Short: resumeJobsCmdShortDescription,
Long: resumeJobsCmdLongDescription,
Args: func(cmd *cobra.Command, args []string) error {
// the resume command requires necessarily to have an argument
// resume jobId -- resumes all the parts of an existing job for given jobId
// If no argument is passed then it is not valid
if len(args) != 1 {
return errors.New("this command requires jobId to be passed as argument")
}
resumeCmdArgs.jobID = args[0]
glcm.EnableInputWatcher()
if cancelFromStdin {
glcm.EnableCancelFromStdIn()
}
return nil
},
Run: func(cmd *cobra.Command, args []string) {
err := resumeCmdArgs.process()
if err != nil {
glcm.Error(fmt.Sprintf("failed to perform resume command due to error: %s", err.Error()))
}
glcm.Exit(nil, common.EExitCode.Success())
},
}
jobsCmd.AddCommand(resumeCmd)
resumeCmd.PersistentFlags().StringVar(&resumeCmdArgs.includeTransfer, "include", "", "Filter: Include only these failed transfer(s) when resuming the job. "+
"Files should be separated by ';'.")
resumeCmd.PersistentFlags().StringVar(&resumeCmdArgs.excludeTransfer, "exclude", "", "Filter: Exclude these failed transfer(s) when resuming the job. "+
"Files should be separated by ';'.")
// oauth options
resumeCmd.PersistentFlags().StringVar(&resumeCmdArgs.SourceSAS, "source-sas", "", "Source SAS token of the source for a given Job ID.")
resumeCmd.PersistentFlags().StringVar(&resumeCmdArgs.DestinationSAS, "destination-sas", "", "Destination SAS token of the destination for a given Job ID.")
}
type resumeCmdArgs struct {
jobID string
includeTransfer string
excludeTransfer string
SourceSAS string
DestinationSAS string
}
func (rca resumeCmdArgs) getSourceAndDestinationServiceClients(
ctx context.Context,
fromTo common.FromTo,
source common.ResourceString,
destination common.ResourceString,
) (*common.ServiceClient, *common.ServiceClient, error) {
if len(rca.SourceSAS) > 0 && rca.SourceSAS[0] != '?' {
rca.SourceSAS = "?" + rca.SourceSAS
}
if len(rca.DestinationSAS) > 0 && rca.DestinationSAS[0] != '?' {
rca.DestinationSAS = "?" + rca.DestinationSAS
}
source.SAS = rca.SourceSAS
destination.SAS = rca.DestinationSAS
srcCredType, _, err := getCredentialTypeForLocation(ctx,
fromTo.From(),
source,
true,
common.CpkOptions{})
if err != nil {
return nil, nil, err
}
dstCredType, _, err := getCredentialTypeForLocation(ctx,
fromTo.To(),
destination,
false,
common.CpkOptions{})
if err != nil {
return nil, nil, err
}
var tc azcore.TokenCredential
if srcCredType.IsAzureOAuth() || dstCredType.IsAzureOAuth() {
uotm := GetUserOAuthTokenManagerInstance()
// Get token from env var or cache.
tokenInfo, err := uotm.GetTokenInfo(ctx)
if err != nil {
return nil, nil, err
}
tc, err = tokenInfo.GetTokenCredential()
if err != nil {
return nil, nil, err
}
}
var reauthTok *common.ScopedAuthenticator
if at, ok := tc.(common.AuthenticateToken); ok { // We don't need two different tokens here since it gets passed in just the same either way.
// This will cause a reauth with StorageScope, which is fine, that's the original Authenticate call as it stands.
reauthTok = (*common.ScopedAuthenticator)(common.NewScopedCredential(at, common.ECredentialType.OAuthToken()))
}
jobID, err := common.ParseJobID(rca.jobID)
if err != nil {
// Error for invalid JobId format
return nil, nil, fmt.Errorf("error parsing the jobId %s. Failed with error %w", rca.jobID, err)
}
// But we don't want to supply a reauth token if we're not using OAuth. That could cause problems if say, a SAS is invalid.
options := createClientOptions(common.AzcopyCurrentJobLogger, nil, common.Iff(srcCredType.IsAzureOAuth(), reauthTok, nil))
var getJobDetailsResponse common.GetJobDetailsResponse
// Get job details from the STE
Rpc(common.ERpcCmd.GetJobDetails(),
&common.GetJobDetailsRequest{JobID: jobID},
&getJobDetailsResponse)
if getJobDetailsResponse.ErrorMsg != "" {
glcm.Error(getJobDetailsResponse.ErrorMsg)
}
var fileSrcClientOptions any
if fromTo.From() == common.ELocation.File() {
fileSrcClientOptions = &common.FileClientOptions{
AllowTrailingDot: getJobDetailsResponse.TrailingDot.IsEnabled(), //Access the trailingDot option of the job
}
}
srcServiceClient, err := common.GetServiceClientForLocation(fromTo.From(), source, srcCredType, tc, &options, fileSrcClientOptions)
if err != nil {
return nil, nil, err
}
var srcCred *common.ScopedToken
if fromTo.IsS2S() && srcCredType.IsAzureOAuth() {
srcCred = common.NewScopedCredential(tc, srcCredType)
}
options = createClientOptions(common.AzcopyCurrentJobLogger, srcCred, common.Iff(dstCredType.IsAzureOAuth(), reauthTok, nil))
var fileClientOptions any
if fromTo.To() == common.ELocation.File() {
fileClientOptions = &common.FileClientOptions{
AllowSourceTrailingDot: getJobDetailsResponse.TrailingDot.IsEnabled() && fromTo.From() == common.ELocation.File(),
AllowTrailingDot: getJobDetailsResponse.TrailingDot.IsEnabled(),
}
}
dstServiceClient, err := common.GetServiceClientForLocation(fromTo.To(), destination, dstCredType, tc, &options, fileClientOptions)
if err != nil {
return nil, nil, err
}
return srcServiceClient, dstServiceClient, nil
}
// processes the resume command,
// dispatches the resume Job order to the storage engine.
func (rca resumeCmdArgs) process() error {
// parsing the given JobId to validate its format correctness
jobID, err := common.ParseJobID(rca.jobID)
if err != nil {
// If parsing gives an error, hence it is not a valid JobId format
return fmt.Errorf("error parsing the jobId %s. Failed with error %w", rca.jobID, err)
}
// if no logging, set this empty so that we don't display the log location
if azcopyLogVerbosity == common.LogNone {
azcopyLogPathFolder = ""
}
includeTransfer := make(map[string]int)
excludeTransfer := make(map[string]int)
// If the transfer has been provided with the include, parse the transfer list.
if len(rca.includeTransfer) > 0 {
// Split the Include Transfer using ';'
transfers := strings.Split(rca.includeTransfer, ";")
for index := range transfers {
if len(transfers[index]) == 0 {
// If the transfer provided is empty
// skip the transfer
// This is to handle the misplaced ';'
continue
}
includeTransfer[transfers[index]] = index
}
}
// If the transfer has been provided with the exclude, parse the transfer list.
if len(rca.excludeTransfer) > 0 {
// Split the Exclude Transfer using ';'
transfers := strings.Split(rca.excludeTransfer, ";")
for index := range transfers {
if len(transfers[index]) == 0 {
// If the transfer provided is empty
// skip the transfer
// This is to handle the misplaced ';'
continue
}
excludeTransfer[transfers[index]] = index
}
}
// Get fromTo info, so we can decide what's the proper credential type to use.
var getJobFromToResponse common.GetJobDetailsResponse
Rpc(common.ERpcCmd.GetJobDetails(),
&common.GetJobDetailsRequest{JobID: jobID},
&getJobFromToResponse)
if getJobFromToResponse.ErrorMsg != "" {
glcm.Error(getJobFromToResponse.ErrorMsg)
}
if getJobFromToResponse.FromTo.From() == common.ELocation.Benchmark() ||
getJobFromToResponse.FromTo.To() == common.ELocation.Benchmark() {
// Doesn't make sense to resume a benchmark job.
// It's not tested, and wouldn't report progress correctly and wouldn't clean up after itself properly
return errors.New("resuming benchmark jobs is not supported")
}
ctx := context.WithValue(context.TODO(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
// Initialize credential info.
credentialInfo := common.CredentialInfo{}
// TODO: Replace context with root context
srcResourceString, err := SplitResourceString(getJobFromToResponse.Source, getJobFromToResponse.FromTo.From())
_ = err // todo
srcResourceString.SAS = rca.SourceSAS
dstResourceString, err := SplitResourceString(getJobFromToResponse.Destination, getJobFromToResponse.FromTo.To())
_ = err // todo
dstResourceString.SAS = rca.DestinationSAS
// we should stop using credentiaLInfo and use the clients instead. But before we fix
// that there will be repeated calls to get Credential type for correctness.
if credentialInfo.CredentialType, err = getCredentialType(ctx, rawFromToInfo{
fromTo: getJobFromToResponse.FromTo,
source: srcResourceString,
destination: dstResourceString,
}, common.CpkOptions{}); err != nil {
return err
}
srcServiceClient, dstServiceClient, err := rca.getSourceAndDestinationServiceClients(
ctx, getJobFromToResponse.FromTo,
srcResourceString,
dstResourceString,
)
if err != nil {
return errors.New("could not create service clients " + err.Error())
}
// Send resume job request.
var resumeJobResponse common.CancelPauseResumeResponse
Rpc(common.ERpcCmd.ResumeJob(),
&common.ResumeJobRequest{
JobID: jobID,
SourceSAS: rca.SourceSAS,
DestinationSAS: rca.DestinationSAS,
SrcServiceClient: srcServiceClient,
DstServiceClient: dstServiceClient,
CredentialInfo: credentialInfo,
IncludeTransfer: includeTransfer,
ExcludeTransfer: excludeTransfer,
},
&resumeJobResponse)
if !resumeJobResponse.CancelledPauseResumed {
glcm.Error(resumeJobResponse.ErrorMsg)
}
controller := resumeJobController{jobID: jobID}
controller.waitUntilJobCompletion(true)
return nil
}