func anyToRemote_file()

in ste/xfer-anyToRemote-file.go [212:371]


func anyToRemote_file(jptm IJobPartTransferMgr, info *TransferInfo, pacer pacer, senderFactory senderFactory, sipf sourceInfoProviderFactory) {

	pseudoId := common.NewPseudoChunkIDForWholeFile(info.Source)
	jptm.LogChunkStatus(pseudoId, common.EWaitReason.XferStart())
	defer jptm.LogChunkStatus(pseudoId, common.EWaitReason.ChunkDone())

	srcSize := info.SourceSize

	// step 1. perform initial checks
	if jptm.WasCanceled() {
		/* This is the earliest we detect jptm has been cancelled before scheduling chunks */
		jptm.SetStatus(common.ETransferStatus.Cancelled())
		jptm.ReportTransferDone()
		return
	}

	// step 2a. Create sender
	srcInfoProvider, err := sipf(jptm)
	if err != nil {
		jptm.LogSendError(info.Source, info.Destination, err.Error(), 0)
		jptm.SetStatus(common.ETransferStatus.Failed())
		jptm.ReportTransferDone()
		return
	}
	if srcInfoProvider.EntityType() != common.EEntityType.File() {
		panic("configuration error. Source Info Provider does not have File entity type")
	}

	s, err := senderFactory(jptm, info.Destination, pacer, srcInfoProvider)
	if err != nil {
		jptm.LogSendError(info.Source, info.Destination, err.Error(), 0)
		jptm.SetStatus(common.ETransferStatus.Failed())
		jptm.ReportTransferDone()
		return
	}

	// step 2b. Read chunk size and count from the sender (since it may have applied its own defaults and/or calculations to produce these values
	numChunks := s.NumChunks()
	if jptm.ShouldLog(common.LogInfo) {
		jptm.LogTransferStart(info.Source, info.Destination, fmt.Sprintf("Specified chunk size %d", s.ChunkSize()))
	}
	if s.NumChunks() == 0 {
		panic("must always schedule one chunk, even if file is empty") // this keeps our code structure simpler, by using a dummy chunk for empty files
	}

	// step 3: check overwrite option
	// if the force Write flags is set to false or prompt
	// then check the file exists at the remote location
	// if it does, react accordingly
	if jptm.GetOverwriteOption() != common.EOverwriteOption.True() {
		exists, dstLmt, existenceErr := s.RemoteFileExists()
		if existenceErr != nil {
			jptm.LogSendError(info.Source, info.Destination, "Could not check destination file existence. "+existenceErr.Error(), 0)
			jptm.SetStatus(common.ETransferStatus.Failed()) // is a real failure, not just a SkippedFileAlreadyExists, in this case
			jptm.ReportTransferDone()
			return
		}
		if exists {
			shouldOverwrite := false

			// if necessary, prompt to confirm user's intent
			if jptm.GetOverwriteOption() == common.EOverwriteOption.Prompt() {
				// remove the SAS before prompting the user
				parsed, _ := url.Parse(info.Destination)
				parsed.RawQuery = ""
				shouldOverwrite = jptm.GetOverwritePrompter().ShouldOverwrite(parsed.String(), common.EEntityType.File())
			} else if jptm.GetOverwriteOption() == common.EOverwriteOption.IfSourceNewer() {
				// only overwrite if source lmt is newer (after) the destination
				if jptm.LastModifiedTime().After(dstLmt) {
					shouldOverwrite = true
				}
			}

			if !shouldOverwrite {
				// logging as Warning so that it turns up even in compact logs, and because previously we use Error here
				jptm.LogAtLevelForCurrentTransfer(common.LogWarning, "File already exists, so will be skipped")
				jptm.SetStatus(common.ETransferStatus.SkippedEntityAlreadyExists())
				jptm.ReportTransferDone()
				return
			}
		}
	}

	// step 4: Open the local Source File (if any)
	common.GetLifecycleMgr().E2EAwaitAllowOpenFiles()
	jptm.LogChunkStatus(pseudoId, common.EWaitReason.OpenLocalSource())
	var sourceFileFactory func() (common.CloseableReaderAt, error)
	srcFile := (common.CloseableReaderAt)(nil)
	if srcInfoProvider.IsLocal() {
		sourceFileFactory = srcInfoProvider.(ILocalSourceInfoProvider).OpenSourceFile // all local providers must implement this interface
		srcFile, err = sourceFileFactory()
		if err != nil {
			suffix := ""
			if strings.Contains(err.Error(), "Access is denied") && runtime.GOOS == "windows" {
				suffix = " See --" + common.BackupModeFlagName + " flag if you need to read all files regardless of their permissions"
			}
			jptm.LogSendError(info.Source, info.Destination, "Couldn't open source. "+err.Error()+suffix, 0)
			jptm.SetStatus(common.ETransferStatus.Failed())
			jptm.ReportTransferDone()
			return
		}
		defer srcFile.Close() // we read all the chunks in this routine, so can close the file at the end
	}

	// We always to LMT verification after the transfer. Also do it here, before transfer, when:
	// 1) Source is local, and source's size is > 1 chunk.  (why not always?  Since getting LMT is not "free" at very small sizes)
	// 2) Source is remote, i.e. S2S copy case. And source's size is larger than one chunk. So verification can possibly save transfer's cost.
	jptm.LogChunkStatus(pseudoId, common.EWaitReason.ModifiedTimeRefresh())
	if _, isS2SCopier := s.(s2sCopier); numChunks > 1 &&
		(srcInfoProvider.IsLocal() || isS2SCopier && info.S2SSourceChangeValidation) {
		lmt, err := srcInfoProvider.GetFreshFileLastModifiedTime()
		if err != nil {
			jptm.LogSendError(info.Source, info.Destination, "Couldn't get source's last modified time-"+err.Error(), 0)
			jptm.SetStatus(common.ETransferStatus.Failed())
			jptm.ReportTransferDone()
			return
		}
		if !lmt.Equal(jptm.LastModifiedTime()) {
			jptm.LogSendError(info.Source, info.Destination, "File modified since transfer scheduled", 0)
			jptm.SetStatus(common.ETransferStatus.Failed())
			jptm.ReportTransferDone()
			return
		}
	}

	// step 5a: lock the destination
	// (is safe to do it relatively early here, before we run the prologue, because its just a internal lock, within the app)
	// But must be after all of the early returns that are above here (since
	// if we succeed here, we need to know the epilogue will definitely run to unlock later)
	jptm.LogChunkStatus(pseudoId, common.EWaitReason.LockDestination())
	err = jptm.WaitUntilLockDestination(jptm.Context())
	if err != nil {
		jptm.LogSendError(info.Source, info.Destination, err.Error(), 0)
		jptm.SetStatus(common.ETransferStatus.Failed())
		jptm.ReportTransferDone()
		return
	}

	// *****
	// Error-handling rules change here.
	// ABOVE this point, we end the transfer using the code as shown above
	// BELOW this point, this routine always schedules the expected number
	// of chunks, even if it has seen a failure, and the
	// workers (the chunkfunc implementations) must use
	// jptm.FailActiveSend when there's an error)
	// TODO: are we comfortable with this approach?
	//   DECISION: 16 Jan, 2019: for now, we are leaving in place the above rule than number of of completed chunks must
	//   eventually reach numChunks, since we have no better short-term alternative.
	// ******

	// step 5b: tell jptm what to expect, and how to clean up at the end
	jptm.SetNumberOfChunks(numChunks)
	jptm.SetActionAfterLastChunk(func() { epilogueWithCleanupSendToRemote(jptm, s, srcInfoProvider) })

	// stop tracking pseudo id (since real chunk id's will be tracked from here on)
	jptm.LogChunkStatus(pseudoId, common.EWaitReason.ChunkDone())

	// Step 6: Go through the file and schedule chunk messages to send each chunk
	scheduleSendChunks(jptm, info.Source, srcFile, srcSize, s, sourceFileFactory, srcInfoProvider)
}