cmd/copyEnumeratorHelper.go (68 lines of code) (raw):
package cmd
import (
"fmt"
"math/rand"
"github.com/Azure/azure-storage-azcopy/v10/common"
"github.com/Azure/azure-storage-azcopy/v10/jobsAdmin"
)
var EnumerationParallelism = 1
var EnumerationParallelStatFiles = false
// addTransfer accepts a new transfer, if the threshold is reached, dispatch a job part order.
func addTransfer(e *common.CopyJobPartOrderRequest, transfer common.CopyTransfer, cca *CookedCopyCmdArgs) error {
// Source and destination paths are and should be relative paths.
// dispatch the transfers once the number reaches NumOfFilesPerDispatchJobPart
// we do this so that in the case of large transfer, the transfer engine can get started
// while the frontend is still gathering more transfers
if len(e.Transfers.List) == NumOfFilesPerDispatchJobPart {
shuffleTransfers(e.Transfers.List)
resp := common.CopyJobPartOrderResponse{}
Rpc(common.ERpcCmd.CopyJobPartOrder(), (*common.CopyJobPartOrderRequest)(e), &resp)
if !resp.JobStarted {
return fmt.Errorf("copy job part order with JobId %s and part number %d failed because %s", e.JobID, e.PartNum, resp.ErrorMsg)
}
// if the current part order sent to engine is 0, then start fetching the Job Progress summary.
if e.PartNum == 0 {
cca.waitUntilJobCompletion(false)
}
e.Transfers = common.Transfers{}
e.PartNum++
}
// only append the transfer after we've checked and dispatched a part
// so that there is at least one transfer for the final part
{
// Should this block be a function?
e.Transfers.List = append(e.Transfers.List, transfer)
e.Transfers.TotalSizeInBytes += uint64(transfer.SourceSize)
switch transfer.EntityType {
case common.EEntityType.File():
e.Transfers.FileTransferCount++
case common.EEntityType.Folder():
e.Transfers.FolderTransferCount++
case common.EEntityType.Symlink():
e.Transfers.SymlinkTransferCount++
}
}
return nil
}
// this function shuffles the transfers before they are dispatched
// this is done to avoid hitting the same partition continuously in an append only pattern
// TODO this should probably be removed after the high throughput block blob feature is implemented on the service side
func shuffleTransfers(transfers []common.CopyTransfer) {
rand.Shuffle(len(transfers), func(i, j int) { transfers[i], transfers[j] = transfers[j], transfers[i] })
}
// we need to send a last part with isFinalPart set to true, along with whatever transfers that still haven't been sent
// dispatchFinalPart sends a last part with isFinalPart set to true, along with whatever transfers that still haven't been sent.
func dispatchFinalPart(e *common.CopyJobPartOrderRequest, cca *CookedCopyCmdArgs) error {
shuffleTransfers(e.Transfers.List)
e.IsFinalPart = true
var resp common.CopyJobPartOrderResponse
Rpc(common.ERpcCmd.CopyJobPartOrder(), (*common.CopyJobPartOrderRequest)(e), &resp)
if !resp.JobStarted {
// Output the log location if log-level is set to other then NONE
var logPathFolder string
if azcopyLogPathFolder != "" {
logPathFolder = fmt.Sprintf("%s%s%s.log", azcopyLogPathFolder, common.OS_PATH_SEPARATOR, cca.jobID)
}
glcm.Init(common.GetStandardInitOutputBuilder(cca.jobID.String(), logPathFolder, cca.isCleanupJob, cca.cleanupJobMessage))
if cca.dryrunMode {
return nil
}
if resp.ErrorMsg == common.ECopyJobPartOrderErrorType.NoTransfersScheduledErr() {
return NothingScheduledError
}
return fmt.Errorf("copy job part order with JobId %s and part number %d failed because %s", e.JobID, e.PartNum, resp.ErrorMsg)
}
if jobsAdmin.JobsAdmin != nil {
jobsAdmin.JobsAdmin.LogToJobLog(FinalPartCreatedMessage, common.LogInfo)
}
// set the flag on cca, to indicate the enumeration is done
cca.isEnumerationComplete = true
// if the current part order sent to engine is 0, then start fetching the Job Progress summary.
if e.PartNum == 0 {
cca.waitUntilJobCompletion(false)
}
return nil
}