in ste/xfer-anyToRemote-file.go [212:371]
func anyToRemote_file(jptm IJobPartTransferMgr, info *TransferInfo, pacer pacer, senderFactory senderFactory, sipf sourceInfoProviderFactory) {
pseudoId := common.NewPseudoChunkIDForWholeFile(info.Source)
jptm.LogChunkStatus(pseudoId, common.EWaitReason.XferStart())
defer jptm.LogChunkStatus(pseudoId, common.EWaitReason.ChunkDone())
srcSize := info.SourceSize
// step 1. perform initial checks
if jptm.WasCanceled() {
/* This is the earliest we detect jptm has been cancelled before scheduling chunks */
jptm.SetStatus(common.ETransferStatus.Cancelled())
jptm.ReportTransferDone()
return
}
// step 2a. Create sender
srcInfoProvider, err := sipf(jptm)
if err != nil {
jptm.LogSendError(info.Source, info.Destination, err.Error(), 0)
jptm.SetStatus(common.ETransferStatus.Failed())
jptm.ReportTransferDone()
return
}
if srcInfoProvider.EntityType() != common.EEntityType.File() {
panic("configuration error. Source Info Provider does not have File entity type")
}
s, err := senderFactory(jptm, info.Destination, pacer, srcInfoProvider)
if err != nil {
jptm.LogSendError(info.Source, info.Destination, err.Error(), 0)
jptm.SetStatus(common.ETransferStatus.Failed())
jptm.ReportTransferDone()
return
}
// step 2b. Read chunk size and count from the sender (since it may have applied its own defaults and/or calculations to produce these values
numChunks := s.NumChunks()
if jptm.ShouldLog(common.LogInfo) {
jptm.LogTransferStart(info.Source, info.Destination, fmt.Sprintf("Specified chunk size %d", s.ChunkSize()))
}
if s.NumChunks() == 0 {
panic("must always schedule one chunk, even if file is empty") // this keeps our code structure simpler, by using a dummy chunk for empty files
}
// step 3: check overwrite option
// if the force Write flags is set to false or prompt
// then check the file exists at the remote location
// if it does, react accordingly
if jptm.GetOverwriteOption() != common.EOverwriteOption.True() {
exists, dstLmt, existenceErr := s.RemoteFileExists()
if existenceErr != nil {
jptm.LogSendError(info.Source, info.Destination, "Could not check destination file existence. "+existenceErr.Error(), 0)
jptm.SetStatus(common.ETransferStatus.Failed()) // is a real failure, not just a SkippedFileAlreadyExists, in this case
jptm.ReportTransferDone()
return
}
if exists {
shouldOverwrite := false
// if necessary, prompt to confirm user's intent
if jptm.GetOverwriteOption() == common.EOverwriteOption.Prompt() {
// remove the SAS before prompting the user
parsed, _ := url.Parse(info.Destination)
parsed.RawQuery = ""
shouldOverwrite = jptm.GetOverwritePrompter().ShouldOverwrite(parsed.String(), common.EEntityType.File())
} else if jptm.GetOverwriteOption() == common.EOverwriteOption.IfSourceNewer() {
// only overwrite if source lmt is newer (after) the destination
if jptm.LastModifiedTime().After(dstLmt) {
shouldOverwrite = true
}
}
if !shouldOverwrite {
// logging as Warning so that it turns up even in compact logs, and because previously we use Error here
jptm.LogAtLevelForCurrentTransfer(common.LogWarning, "File already exists, so will be skipped")
jptm.SetStatus(common.ETransferStatus.SkippedEntityAlreadyExists())
jptm.ReportTransferDone()
return
}
}
}
// step 4: Open the local Source File (if any)
common.GetLifecycleMgr().E2EAwaitAllowOpenFiles()
jptm.LogChunkStatus(pseudoId, common.EWaitReason.OpenLocalSource())
var sourceFileFactory func() (common.CloseableReaderAt, error)
srcFile := (common.CloseableReaderAt)(nil)
if srcInfoProvider.IsLocal() {
sourceFileFactory = srcInfoProvider.(ILocalSourceInfoProvider).OpenSourceFile // all local providers must implement this interface
srcFile, err = sourceFileFactory()
if err != nil {
suffix := ""
if strings.Contains(err.Error(), "Access is denied") && runtime.GOOS == "windows" {
suffix = " See --" + common.BackupModeFlagName + " flag if you need to read all files regardless of their permissions"
}
jptm.LogSendError(info.Source, info.Destination, "Couldn't open source. "+err.Error()+suffix, 0)
jptm.SetStatus(common.ETransferStatus.Failed())
jptm.ReportTransferDone()
return
}
defer srcFile.Close() // we read all the chunks in this routine, so can close the file at the end
}
// We always to LMT verification after the transfer. Also do it here, before transfer, when:
// 1) Source is local, and source's size is > 1 chunk. (why not always? Since getting LMT is not "free" at very small sizes)
// 2) Source is remote, i.e. S2S copy case. And source's size is larger than one chunk. So verification can possibly save transfer's cost.
jptm.LogChunkStatus(pseudoId, common.EWaitReason.ModifiedTimeRefresh())
if _, isS2SCopier := s.(s2sCopier); numChunks > 1 &&
(srcInfoProvider.IsLocal() || isS2SCopier && info.S2SSourceChangeValidation) {
lmt, err := srcInfoProvider.GetFreshFileLastModifiedTime()
if err != nil {
jptm.LogSendError(info.Source, info.Destination, "Couldn't get source's last modified time-"+err.Error(), 0)
jptm.SetStatus(common.ETransferStatus.Failed())
jptm.ReportTransferDone()
return
}
if !lmt.Equal(jptm.LastModifiedTime()) {
jptm.LogSendError(info.Source, info.Destination, "File modified since transfer scheduled", 0)
jptm.SetStatus(common.ETransferStatus.Failed())
jptm.ReportTransferDone()
return
}
}
// step 5a: lock the destination
// (is safe to do it relatively early here, before we run the prologue, because its just a internal lock, within the app)
// But must be after all of the early returns that are above here (since
// if we succeed here, we need to know the epilogue will definitely run to unlock later)
jptm.LogChunkStatus(pseudoId, common.EWaitReason.LockDestination())
err = jptm.WaitUntilLockDestination(jptm.Context())
if err != nil {
jptm.LogSendError(info.Source, info.Destination, err.Error(), 0)
jptm.SetStatus(common.ETransferStatus.Failed())
jptm.ReportTransferDone()
return
}
// *****
// Error-handling rules change here.
// ABOVE this point, we end the transfer using the code as shown above
// BELOW this point, this routine always schedules the expected number
// of chunks, even if it has seen a failure, and the
// workers (the chunkfunc implementations) must use
// jptm.FailActiveSend when there's an error)
// TODO: are we comfortable with this approach?
// DECISION: 16 Jan, 2019: for now, we are leaving in place the above rule than number of of completed chunks must
// eventually reach numChunks, since we have no better short-term alternative.
// ******
// step 5b: tell jptm what to expect, and how to clean up at the end
jptm.SetNumberOfChunks(numChunks)
jptm.SetActionAfterLastChunk(func() { epilogueWithCleanupSendToRemote(jptm, s, srcInfoProvider) })
// stop tracking pseudo id (since real chunk id's will be tracked from here on)
jptm.LogChunkStatus(pseudoId, common.EWaitReason.ChunkDone())
// Step 6: Go through the file and schedule chunk messages to send each chunk
scheduleSendChunks(jptm, info.Source, srcFile, srcSize, s, sourceFileFactory, srcInfoProvider)
}