ste/mgr-JobPartMgr.go (475 lines of code) (raw):
package ste
import (
"context"
"fmt"
"mime"
"net/http"
"net/url"
"path/filepath"
"runtime"
"strings"
"sync/atomic"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-storage-azcopy/v10/common"
)
var _ IJobPartMgr = &jobPartMgr{}
// debug knob
var DebugSkipFiles = make(map[string]bool)
type IJobPartMgr interface {
Plan() *JobPartPlanHeader
ScheduleTransfers(jobCtx context.Context)
StartJobXfer(jptm IJobPartTransferMgr)
ReportTransferDone(status common.TransferStatus) uint32
GetOverwriteOption() common.OverwriteOption
GetForceIfReadOnly() bool
AutoDecompress() bool
ScheduleChunks(chunkFunc chunkFunc)
RescheduleTransfer(jptm IJobPartTransferMgr)
BlobTypeOverride() common.BlobType
BlobTiers() (blockBlobTier common.BlockBlobTier, pageBlobTier common.PageBlobTier)
ShouldPutMd5() bool
DeleteDestinationFileIfNecessary() bool
SAS() (string, string)
// CancelJob()
Close()
// TODO: added for debugging purpose. remove later
OccupyAConnection()
// TODO: added for debugging purpose. remove later
ReleaseAConnection()
SlicePool() common.ByteSlicePooler
CacheLimiter() common.CacheLimiter
FileCountLimiter() common.CacheLimiter
ExclusiveDestinationMap() *common.ExclusiveStringMap
ChunkStatusLogger() common.ChunkStatusLogger
common.ILogger
// These functions return Container/fileshare clients.
// They must be type asserted before use. In cases where they dont
// make sense (say SrcServiceClient for upload) they are il
SrcServiceClient() *common.ServiceClient
DstServiceClient() *common.ServiceClient
SourceIsOAuth() bool
getOverwritePrompter() *overwritePrompter
getFolderCreationTracker() FolderCreationTracker
SecurityInfoPersistenceManager() *securityInfoPersistenceManager
FolderDeletionManager() common.FolderDeletionManager
CpkInfo() *blob.CPKInfo
CpkScopeInfo() *blob.CPKScopeInfo
IsSourceEncrypted() bool
/* Status Manager Updates */
SendXferDoneMsg(msg xferDoneMsg)
PropertiesToTransfer() common.SetPropertiesFlags
ResetFailedTransfersCount() // Resets number of failed transfers after a job is resumed
}
// NewAzcopyHTTPClient creates a new HTTP client.
// We must minimize use of this, and instead maximize reuse of the returned client object.
// Why? Because that makes our connection pooling more efficient, and prevents us exhausting the
// number of available network sockets on resource-constrained Linux systems. (E.g. when
// 'ulimit -Hn' is low).
func NewAzcopyHTTPClient(maxIdleConns int) *http.Client {
const concurrentDialsPerCpu = 10 // exact value doesn't matter too much, but too low will be too slow, and too high will reduce the beneficial effect on thread count
return &http.Client{
Transport: &http.Transport{
Proxy: common.GlobalProxyLookup,
MaxConnsPerHost: concurrentDialsPerCpu * runtime.NumCPU(),
MaxIdleConns: 0, // No limit
MaxIdleConnsPerHost: maxIdleConns,
IdleConnTimeout: 180 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: false,
DisableCompression: true, // must disable the auto-decompression of gzipped files, and just download the gzipped version. See https://github.com/Azure/azure-storage-azcopy/issues/374
MaxResponseHeaderBytes: 0,
// ResponseHeaderTimeout: time.Duration{},
// ExpectContinueTimeout: time.Duration{},
},
}
}
func NewClientOptions(retry policy.RetryOptions, telemetry policy.TelemetryOptions, transport policy.Transporter, log LogOptions, srcCred *common.ScopedToken, dstCred *common.ScopedAuthenticator) azcore.ClientOptions {
// Pipeline will look like
// [includeResponsePolicy, newAPIVersionPolicy (ignored), NewTelemetryPolicy, perCall, NewRetryPolicy, perRetry, NewLogPolicy, httpHeaderPolicy, bodyDownloadPolicy]
perCallPolicies := []policy.Policy{azruntime.NewRequestIDPolicy(), NewVersionPolicy(), newFileUploadRangeFromURLFixPolicy()}
// TODO : Default logging policy is not equivalent to old one. tracing HTTP request
perRetryPolicies := []policy.Policy{newRetryNotificationPolicy(), newLogPolicy(log), newStatsPolicy()}
if dstCred != nil {
perCallPolicies = append(perRetryPolicies, NewDestReauthPolicy(dstCred))
}
if srcCred != nil {
perRetryPolicies = append(perRetryPolicies, NewSourceAuthPolicy(srcCred))
}
retry.ShouldRetry = GetShouldRetry(&log)
return azcore.ClientOptions{
//APIVersion: ,
//Cloud: ,
//Logging: ,
Retry: retry,
Telemetry: telemetry,
//TracingProvider: ,
Transport: transport,
PerCallPolicies: perCallPolicies,
PerRetryPolicies: perRetryPolicies,
}
}
// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Holds the status of transfers in this jptm
type jobPartProgressInfo struct {
transfersCompleted int
transfersSkipped int
transfersFailed int
completionChan chan struct{}
}
// jobPartMgr represents the runtime information for a Job's Part
type jobPartMgr struct {
// These fields represent the part's existence
jobMgr IJobMgr // Refers to this part's Job (for logging, cancelling, etc.)
jobMgrInitState *jobMgrInitState
filename JobPartPlanFileName
// sourceSAS defines the sas of the source of the Job. If the source is local Location, then sas is empty.
// Since sas is not persisted in JobPartPlan file, it stripped from the source and stored in memory in JobPart Manager
sourceSAS string
// destinationSAS defines the sas of the destination of the Job. If the destination is local Location, then sas is empty.
// Since sas is not persisted in JobPartPlan file, it stripped from the destination and stored in memory in JobPart Manager
destinationSAS string
// These fields hold the container/fileshare client of this jobPart,
// whatever is appropriate for this scenario. Ex. For BlobFile, we
// will have BlobService client in srcServiceClient and Fileservice in
// dstServiceClient. For upload, srcService is nil, and likewise.
srcServiceClient *common.ServiceClient
dstServiceClient *common.ServiceClient
credInfo common.CredentialInfo
srcIsOAuth bool // true if source is authenticated via oauth
credOption *common.CredentialOpOptions
// When the part is schedule to run (inprogress), the below fields are used
planMMF *JobPartPlanMMF // This Job part plan's MMF
// Additional data shared by all of this Job Part's transfers; initialized when this jobPartMgr is created
httpHeaders common.ResourceHTTPHeaders
// Additional data shared by all of this Job Part's transfers; initialized when this jobPartMgr is created
blockBlobTier common.BlockBlobTier
// Additional data shared by all of this Job Part's transfers; initialized when this jobPartMgr is created
pageBlobTier common.PageBlobTier
// Additional data shared by all of this Job Part's transfers; initialized when this jobPartMgr is created
putMd5 bool
deleteDestinationFileIfNecessary bool
metadata common.Metadata
blobTags common.BlobTags
blobTypeOverride common.BlobType // User specified blob type
preserveLastModifiedTime bool
newJobXfer newJobXfer // Method used to start the transfer
priority common.JobPriority
pacer pacer // Pacer is used to cap throughput
slicePool common.ByteSlicePooler
cacheLimiter common.CacheLimiter
fileCountLimiter common.CacheLimiter
exclusiveDestinationMap *common.ExclusiveStringMap
// numberOfTransfersDone_doNotUse represents the number of transfer of JobPartOrder
// which are either completed or failed
// numberOfTransfersDone_doNotUse determines the final cancellation of JobPartOrder
atomicTransfersDone uint32
atomicTransfersCompleted uint32
atomicTransfersFailed uint32
atomicTransfersSkipped uint32
cpkOptions common.CpkOptions
closeOnCompletion chan struct{}
SetPropertiesFlags common.SetPropertiesFlags
RehydratePriority common.RehydratePriorityType
}
func (jpm *jobPartMgr) getOverwritePrompter() *overwritePrompter {
return jpm.jobMgr.getOverwritePrompter()
}
func (jpm *jobPartMgr) getFolderCreationTracker() FolderCreationTracker {
if jpm.jobMgrInitState == nil || jpm.jobMgrInitState.folderCreationTracker == nil {
panic("folderCreationTracker should have been initialized already")
}
return jpm.jobMgrInitState.folderCreationTracker
}
func (jpm *jobPartMgr) Plan() *JobPartPlanHeader {
return jpm.planMMF.Plan()
}
// ScheduleTransfers schedules this job part's transfers. It is called when a new job part is ordered & is also called to resume a paused Job
func (jpm *jobPartMgr) ScheduleTransfers(jobCtx context.Context) {
jobCtx = context.WithValue(jobCtx, ServiceAPIVersionOverride, DefaultServiceApiVersion)
jpm.atomicTransfersDone = 0 // Reset the # of transfers done back to 0
jpm.atomicTransfersFailed = 0 // Resets the # transfers failed back to 0 during resume operation
// partplan file is opened and mapped when job part is added
// jpm.planMMF = jpm.filename.Map() // Open the job part plan file & memory-map it in
plan := jpm.planMMF.Plan()
if plan.PartNum == 0 && plan.NumTransfers == 0 {
/* This will wind down the transfer and report summary */
plan.SetJobStatus(common.EJobStatus.Completed())
return
}
// get the list of include / exclude transfers
includeTransfer, excludeTransfer := jpm.jobMgr.IncludeExclude()
if len(includeTransfer) > 0 || len(excludeTransfer) > 0 {
panic("List of transfers is obsolete.")
}
// *** Open the job part: process any job part plan-setting used by all transfers ***
dstData := plan.DstBlobData
jpm.httpHeaders = common.ResourceHTTPHeaders{
ContentType: string(dstData.ContentType[:dstData.ContentTypeLength]),
ContentEncoding: string(dstData.ContentEncoding[:dstData.ContentEncodingLength]),
ContentDisposition: string(dstData.ContentDisposition[:dstData.ContentDispositionLength]),
ContentLanguage: string(dstData.ContentLanguage[:dstData.ContentLanguageLength]),
CacheControl: string(dstData.CacheControl[:dstData.CacheControlLength]),
}
jpm.putMd5 = dstData.PutMd5
jpm.blockBlobTier = dstData.BlockBlobTier
jpm.pageBlobTier = dstData.PageBlobTier
jpm.deleteDestinationFileIfNecessary = dstData.DeleteDestinationFileIfNecessary
// For this job part, split the metadata string apart and create an common.Metadata out of it
metadataString := string(dstData.Metadata[:dstData.MetadataLength])
jpm.metadata = common.Metadata{}
if len(metadataString) > 0 {
var err error
jpm.metadata, err = common.StringToMetadata(metadataString)
if err != nil {
panic("sanity check: metadata string should be valid at this point: " + metadataString)
}
}
blobTagsStr := string(dstData.BlobTags[:dstData.BlobTagsLength])
jpm.blobTags = common.BlobTags{}
if len(blobTagsStr) > 0 {
for _, keyAndValue := range strings.Split(blobTagsStr, "&") { // key/value pairs are separated by '&'
kv := strings.Split(keyAndValue, "=") // key/value are separated by '='
key, _ := url.QueryUnescape(kv[0])
value, _ := url.QueryUnescape(kv[1])
jpm.blobTags[key] = value
}
}
jpm.cpkOptions = common.CpkOptions{
CpkInfo: dstData.CpkInfo,
CpkScopeInfo: string(dstData.CpkScopeInfo[:dstData.CpkScopeInfoLength]),
IsSourceEncrypted: dstData.IsSourceEncrypted,
}
jpm.SetPropertiesFlags = dstData.SetPropertiesFlags
jpm.RehydratePriority = plan.RehydratePriority
jpm.preserveLastModifiedTime = plan.DstLocalData.PreserveLastModifiedTime
jpm.blobTypeOverride = plan.DstBlobData.BlobType
jpm.newJobXfer = computeJobXfer(plan.FromTo, plan.DstBlobData.BlobType)
jpm.priority = plan.Priority
jpm.clientInfo()
// *** Schedule this job part's transfers ***
for t := uint32(0); t < plan.NumTransfers; t++ {
jppt := plan.Transfer(t)
ts := jppt.TransferStatus()
if ts == common.ETransferStatus.Success() {
jpm.ReportTransferDone(ts) // Don't schedule an already-completed/failed transfer
continue
}
// If the transfer was failed, then while rescheduling the transfer marking it Started.
if ts == common.ETransferStatus.Failed() {
jppt.SetTransferStatus(common.ETransferStatus.Restarted(), true)
if failedCount := atomic.LoadUint32(&jpm.atomicTransfersFailed); failedCount > 0 {
atomic.AddUint32(&jpm.atomicTransfersFailed, ^uint32(0))
} // Adding uint32 max is effectively subtracting 1
}
if _, dst, isFolder := plan.TransferSrcDstStrings(t); isFolder {
// register the folder!
if jpptFolderTracker, ok := jpm.getFolderCreationTracker().(JPPTCompatibleFolderCreationTracker); ok {
if plan.FromTo.To().IsRemote() {
uri, err := url.Parse(dst)
common.PanicIfErr(err)
uri.RawPath = ""
uri.RawQuery = ""
dst = uri.String()
}
jpptFolderTracker.RegisterPropertiesTransfer(dst, t)
}
}
// Each transfer gets its own context (so any chunk can cancel the whole transfer) based off the job's context
transferCtx, transferCancel := context.WithCancel(jobCtx)
// Add the pipeline network stats to the context. This will be manually unset for all sourceInfoProvider contexts.
transferCtx = withPipelineNetworkStats(transferCtx, jpm.jobMgr.PipelineNetworkStats())
// Initialize a job part transfer manager
jptm := &jobPartTransferMgr{
jobPartMgr: jpm,
jobPartPlanTransfer: jppt,
transferIndex: t,
ctx: transferCtx,
cancel: transferCancel,
// TODO: insert the factory func interface in jptm.
// numChunks will be set by the transfer's prologue method
}
//build transferInfo after we've set transferIndex
jptm.transferInfo = jptm.Info()
jpm.Log(common.LogDebug, fmt.Sprintf("scheduling JobID=%v, Part#=%d, Transfer#=%d, priority=%v", plan.JobID, plan.PartNum, t, plan.Priority))
// ===== TEST KNOB
relSrc, relDst := plan.TransferSrcDstRelatives(t)
var err error
if plan.FromTo.From().IsRemote() {
relSrc, err = url.PathUnescape(relSrc)
}
relSrc = strings.TrimPrefix(relSrc, common.AZCOPY_PATH_SEPARATOR_STRING)
common.PanicIfErr(err) // neither of these panics should happen, they already would have had a clean error.
if plan.FromTo.To().IsRemote() {
relDst, err = url.PathUnescape(relDst)
}
relDst = strings.TrimPrefix(relDst, common.AZCOPY_PATH_SEPARATOR_STRING)
common.PanicIfErr(err)
_, srcOk := DebugSkipFiles[relSrc]
_, dstOk := DebugSkipFiles[relDst]
if srcOk || dstOk {
if jpm.ShouldLog(common.LogInfo) {
jpm.Log(common.LogInfo, fmt.Sprintf("Transfer %d cancelled: %s", jptm.transferIndex, relSrc))
}
// cancel the transfer
jptm.Cancel()
jptm.SetStatus(common.ETransferStatus.Cancelled())
} else {
if len(DebugSkipFiles) != 0 && jpm.ShouldLog(common.LogInfo) {
jpm.Log(common.LogInfo, fmt.Sprintf("Did not exclude: src: %s dst: %s", relSrc, relDst))
}
}
// ===== TEST KNOB
jpm.jobMgr.ScheduleTransfer(jpm.priority, jptm)
// This sets the atomic variable atomicAllTransfersScheduled to 1
// atomicAllTransfersScheduled variables is used in case of resume job
// Since iterating the JobParts and scheduling transfer is independent
// a variable is required which defines whether last part is resumed or not
if plan.IsFinalPart {
jpm.jobMgr.ConfirmAllTransfersScheduled()
}
}
if plan.IsFinalPart {
jpm.Log(common.LogInfo, "Final job part has been scheduled")
}
}
func (jpm *jobPartMgr) ScheduleChunks(chunkFunc chunkFunc) {
jpm.jobMgr.ScheduleChunk(jpm.priority, chunkFunc)
}
func (jpm *jobPartMgr) RescheduleTransfer(jptm IJobPartTransferMgr) {
jpm.jobMgr.ScheduleTransfer(jpm.priority, jptm)
}
func (jpm *jobPartMgr) clientInfo() {
jobState := jpm.jobMgr.getInMemoryTransitJobState()
// Destination credential
if jpm.credInfo.CredentialType == common.ECredentialType.Unknown() {
jpm.credInfo = jobState.CredentialInfo
}
jpm.credOption = &common.CredentialOpOptions{
LogInfo: func(str string) { jpm.Log(common.LogInfo, str) },
LogError: func(str string) { jpm.Log(common.LogError, str) },
Panic: jpm.Panic,
CallerID: fmt.Sprintf("JobID=%v, Part#=%d", jpm.Plan().JobID, jpm.Plan().PartNum),
Cancel: jpm.jobMgr.Cancel,
}
}
func (jpm *jobPartMgr) SlicePool() common.ByteSlicePooler {
return jpm.slicePool
}
func (jpm *jobPartMgr) CacheLimiter() common.CacheLimiter {
return jpm.cacheLimiter
}
func (jpm *jobPartMgr) FileCountLimiter() common.CacheLimiter {
return jpm.fileCountLimiter
}
func (jpm *jobPartMgr) ExclusiveDestinationMap() *common.ExclusiveStringMap {
return jpm.exclusiveDestinationMap
}
func (jpm *jobPartMgr) StartJobXfer(jptm IJobPartTransferMgr) {
jpm.newJobXfer(jptm, jpm.pacer)
}
func (jpm *jobPartMgr) GetOverwriteOption() common.OverwriteOption {
return jpm.Plan().ForceWrite
}
func (jpm *jobPartMgr) GetForceIfReadOnly() bool {
return jpm.Plan().ForceIfReadOnly
}
func (jpm *jobPartMgr) AutoDecompress() bool {
return jpm.Plan().AutoDecompress
}
func (jpm *jobPartMgr) resourceDstData(fullFilePath string, dataFileToXfer []byte) (headers common.ResourceHTTPHeaders,
metadata common.Metadata, blobTags common.BlobTags, cpkOptions common.CpkOptions) {
if jpm.planMMF.Plan().DstBlobData.NoGuessMimeType {
return jpm.httpHeaders, jpm.metadata, jpm.blobTags, jpm.cpkOptions
}
return common.ResourceHTTPHeaders{
ContentType: jpm.inferContentType(fullFilePath, dataFileToXfer),
ContentLanguage: jpm.httpHeaders.ContentLanguage,
ContentDisposition: jpm.httpHeaders.ContentDisposition,
ContentEncoding: jpm.httpHeaders.ContentEncoding,
CacheControl: jpm.httpHeaders.CacheControl,
}, jpm.metadata, jpm.blobTags, jpm.cpkOptions
}
var EnvironmentMimeMap map[string]string
// TODO do we want these charset=utf-8?
var builtinTypes = map[string]string{
".css": "text/css",
".gif": "image/gif",
".htm": "text/html",
".html": "text/html",
".jpeg": "image/jpeg",
".jpg": "image/jpeg",
".js": "application/javascript",
".mjs": "application/javascript",
".pdf": "application/pdf",
".png": "image/png",
".svg": "image/svg+xml",
".wasm": "application/wasm",
".webp": "image/webp",
".xml": "text/xml",
}
func (jpm *jobPartMgr) inferContentType(fullFilePath string, dataFileToXfer []byte) string {
fileExtension := filepath.Ext(fullFilePath)
if contentType, ok := EnvironmentMimeMap[strings.ToLower(fileExtension)]; ok {
return contentType
}
// short-circuit for common static website files
// mime.TypeByExtension takes the registry into account, which is most often undesirable in practice
if override, ok := builtinTypes[strings.ToLower(fileExtension)]; ok {
return override
}
/*
* Below functions return utf-8 as default charset for text files. Discard
* charset if it exists, safer to omit charset instead of defaulting to
* a wrong one.
*/
if guessedType := mime.TypeByExtension(fileExtension); guessedType != "" {
return strings.Split(guessedType, ";")[0]
}
return strings.Split(http.DetectContentType(dataFileToXfer), ";")[0]
}
func (jpm *jobPartMgr) BlobTypeOverride() common.BlobType {
return jpm.blobTypeOverride
}
func (jpm *jobPartMgr) BlobTiers() (blockBlobTier common.BlockBlobTier, pageBlobTier common.PageBlobTier) {
return jpm.blockBlobTier, jpm.pageBlobTier
}
func (jpm *jobPartMgr) CpkInfo() *blob.CPKInfo {
return common.GetCpkInfo(jpm.cpkOptions.CpkInfo)
}
func (jpm *jobPartMgr) CpkScopeInfo() *blob.CPKScopeInfo {
return common.GetCpkScopeInfo(jpm.cpkOptions.CpkScopeInfo)
}
func (jpm *jobPartMgr) IsSourceEncrypted() bool {
return jpm.cpkOptions.IsSourceEncrypted
}
func (jpm *jobPartMgr) PropertiesToTransfer() common.SetPropertiesFlags {
return jpm.SetPropertiesFlags
}
func (jpm *jobPartMgr) ShouldPutMd5() bool {
return jpm.putMd5
}
func (jpm *jobPartMgr) DeleteDestinationFileIfNecessary() bool {
return jpm.deleteDestinationFileIfNecessary
}
func (jpm *jobPartMgr) SAS() (string, string) {
return jpm.sourceSAS, jpm.destinationSAS
}
func (jpm *jobPartMgr) SecurityInfoPersistenceManager() *securityInfoPersistenceManager {
if jpm.jobMgrInitState == nil || jpm.jobMgrInitState.securityInfoPersistenceManager == nil {
panic("SIPM should have been initialized already")
}
return jpm.jobMgrInitState.securityInfoPersistenceManager
}
func (jpm *jobPartMgr) FolderDeletionManager() common.FolderDeletionManager {
if jpm.jobMgrInitState == nil || jpm.jobMgrInitState.folderDeletionManager == nil {
panic("folder deletion manager should have been initialized already")
}
return jpm.jobMgrInitState.folderDeletionManager
}
func (jpm *jobPartMgr) localDstData() *JobPartPlanDstLocal {
return &jpm.Plan().DstLocalData
}
func (jpm *jobPartMgr) deleteSnapshotsOption() common.DeleteSnapshotsOption {
return jpm.Plan().DeleteSnapshotsOption
}
func (jpm *jobPartMgr) permanentDeleteOption() common.PermanentDeleteOption {
return jpm.Plan().PermanentDeleteOption
}
func (jpm *jobPartMgr) updateJobPartProgress(status common.TransferStatus) {
switch status {
case common.ETransferStatus.Success():
atomic.AddUint32(&jpm.atomicTransfersCompleted, 1)
case common.ETransferStatus.Failed(), common.ETransferStatus.BlobTierFailure():
atomic.AddUint32(&jpm.atomicTransfersFailed, 1)
case common.ETransferStatus.SkippedEntityAlreadyExists(), common.ETransferStatus.SkippedBlobHasSnapshots():
atomic.AddUint32(&jpm.atomicTransfersSkipped, 1)
case common.ETransferStatus.Restarted(): // When a job is resumed, number of failed should reset to 0
atomic.StoreUint32(&jpm.atomicTransfersFailed, 0)
case common.ETransferStatus.Cancelled():
default:
jpm.Log(common.LogError, fmt.Sprintf("Unexpected status: %v", status.String()))
}
}
// Call Done when a transfer has completed its epilog; this method returns the number of transfers completed so far
func (jpm *jobPartMgr) ReportTransferDone(status common.TransferStatus) (transfersDone uint32) {
transfersDone = atomic.AddUint32(&jpm.atomicTransfersDone, 1)
jpm.updateJobPartProgress(status)
if transfersDone == jpm.planMMF.Plan().NumTransfers {
jppi := jobPartProgressInfo{
transfersCompleted: int(atomic.LoadUint32(&jpm.atomicTransfersCompleted)),
transfersSkipped: int(atomic.LoadUint32(&jpm.atomicTransfersSkipped)),
transfersFailed: int(atomic.LoadUint32(&jpm.atomicTransfersFailed)),
completionChan: jpm.closeOnCompletion,
}
jpm.Plan().SetJobPartStatus(common.EJobStatus.EnhanceJobStatusInfo(jppi.transfersSkipped > 0,
jppi.transfersFailed > 0, jppi.transfersCompleted > 0))
jpm.jobMgr.ReportJobPartDone(jppi)
jpm.Log(common.LogInfo, fmt.Sprintf("JobID=%v, Part#=%d, TransfersDone=%d of %d",
jpm.planMMF.Plan().JobID, jpm.planMMF.Plan().PartNum, transfersDone,
jpm.planMMF.Plan().NumTransfers))
}
return transfersDone
}
// func (jpm *jobPartMgr) Cancel() { jpm.jobMgr.Cancel() }
func (jpm *jobPartMgr) Close() {
jpm.planMMF.Unmap()
// Clear other fields to all for GC
jpm.httpHeaders = common.ResourceHTTPHeaders{}
jpm.metadata = common.Metadata{}
jpm.preserveLastModifiedTime = false
// TODO: Delete file?
/*if err := os.Remove(jpm.planFile.Name()); err != nil {
jpm.Panic(fmt.Errorf("error removing Job Part Plan file %s. Error=%v", jpm.planFile.Name(), err))
}*/
}
// TODO: added for debugging purpose. remove later
// Add 1 to the active number of goroutine performing the transfer or executing the chunkFunc
func (jpm *jobPartMgr) OccupyAConnection() {
jpm.jobMgr.OccupyAConnection()
}
// Sub 1 from the active number of goroutine performing the transfer or executing the chunkFunc
// TODO: added for debugging purpose. remove later
func (jpm *jobPartMgr) ReleaseAConnection() {
jpm.jobMgr.ReleaseAConnection()
}
func (jpm *jobPartMgr) ShouldLog(level common.LogLevel) bool { return jpm.jobMgr.ShouldLog(level) }
func (jpm *jobPartMgr) Log(level common.LogLevel, msg string) { jpm.jobMgr.Log(level, msg) }
func (jpm *jobPartMgr) Panic(err error) { jpm.jobMgr.Panic(err) }
func (jpm *jobPartMgr) ChunkStatusLogger() common.ChunkStatusLogger {
return jpm.jobMgr.ChunkStatusLogger()
}
func (jpm *jobPartMgr) SrcServiceClient() *common.ServiceClient {
return jpm.srcServiceClient
}
func (jpm *jobPartMgr) DstServiceClient() *common.ServiceClient {
return jpm.dstServiceClient
}
func (jpm *jobPartMgr) SourceIsOAuth() bool {
return jpm.srcIsOAuth
}
/* Status update messages should not fail */
func (jpm *jobPartMgr) SendXferDoneMsg(msg xferDoneMsg) {
jpm.jobMgr.SendXferDoneMsg(msg)
}
func (jpm *jobPartMgr) ResetFailedTransfersCount() {
atomic.StoreUint32(&jpm.atomicTransfersFailed, 0)
}
// TODO: Can we delete this method?
// numberOfTransfersDone returns the numberOfTransfersDone_doNotUse of JobPartPlanInfo
// instance in thread safe manner
// func (jpm *jobPartMgr) numberOfTransfersDone() uint32 { return atomic.LoadUint32(&jpm.numberOfTransfersDone_doNotUse)}
// setNumberOfTransfersDone sets the number of transfers done to a specific value
// in a thread safe manner
// func (jppi *jobPartPlanInfo) setNumberOfTransfersDone(val uint32) {
// atomic.StoreUint32(&jPartPlanInfo.numberOfTransfersDone_doNotUse, val)
// }