func remoteToLocal_file()

in ste/xfer-remoteToLocal-file.go [52:340]


func remoteToLocal_file(jptm IJobPartTransferMgr, pacer pacer, df downloaderFactory) {

	info := jptm.Info()

	// step 1: create downloader instance for this transfer
	// We are using a separate instance per transfer, in case some implementations need to hold per-transfer state
	dl, err := df(jptm)
	if err != nil {
		jptm.SetStatus(common.ETransferStatus.Failed())
		jptm.ReportTransferDone()
		return
	}

	// step 2: get the source, destination info for the transfer.
	fileSize := int64(info.SourceSize)
	downloadChunkSize := info.BlockSize

	// step 3: Perform initial checks
	// If the transfer was cancelled, then report transfer as done
	// TODO Question: the above comment had this following text too: "and increasing the bytestransferred by the size of the source." what does it mean?
	if jptm.WasCanceled() {
		/* This is the earliest we detect that jptm has been cancelled before we schedule chunks */
		jptm.SetStatus(common.ETransferStatus.Cancelled())
		jptm.ReportTransferDone()
		return
	}
	// if the force Write flags is set to false or prompt
	// then check the file exists at the remote location
	// if it does, react accordingly
	if jptm.GetOverwriteOption() != common.EOverwriteOption.True() {
		dstProps, err := common.OSStat(info.Destination)
		if err == nil {
			// if the error is nil, then file exists locally
			shouldOverwrite := false

			// if necessary, prompt to confirm user's intent
			if jptm.GetOverwriteOption() == common.EOverwriteOption.Prompt() {
				shouldOverwrite = jptm.GetOverwritePrompter().ShouldOverwrite(info.Destination, common.EEntityType.File())
			} else if jptm.GetOverwriteOption() == common.EOverwriteOption.IfSourceNewer() {
				// only overwrite if source lmt is newer (after) the destination
				if jptm.LastModifiedTime().After(dstProps.ModTime()) {
					shouldOverwrite = true
				}
			}

			if !shouldOverwrite {
				// logging as Warning so that it turns up even in compact logs, and because previously we use Error here
				jptm.LogAtLevelForCurrentTransfer(common.LogWarning, "File already exists, so will be skipped")
				jptm.SetStatus(common.ETransferStatus.SkippedEntityAlreadyExists())
				jptm.ReportTransferDone()
				return
			}
		}
	}

	if jptm.MD5ValidationOption() == common.EHashValidationOption.FailIfDifferentOrMissing() {
		// We can make a check early on MD5 existence and fail the transfer if it's not present.
		// This will save hours in the event a user has say, a several hundred gigabyte file.
		if len(info.SrcHTTPHeaders.ContentMD5) == 0 {
			jptm.LogDownloadError(info.Source, info.Destination, errExpectedMd5Missing.Error(), 0)
			jptm.SetStatus(common.ETransferStatus.Failed())
			jptm.ReportTransferDone()
			return
		}
	}

	// step 4a: mark destination as modified before we take our first action there (which is to create the destination file)
	jptm.SetDestinationIsModified()

	writeThrough := false
	// TODO: consider cases where we might set it to true. It might give more predictable and understandable disk throughput.
	//    But can't be used in the cases shown in the if statement below (one of which is only pseudocode, at this stage)
	//      if fileSize <= 1*1024*1024 || jptm.JobHasLowFileCount() || <is a short-running job> {
	//        // but, for very small files, testing indicates that we can need it in at least some cases. (Presumably just can't get enough queue depth to physical disk without it.)
	//        // And also, for very low file counts, we also need it. Presumably for same reasons of queue depth (given our sequential write strategy as at March 2019)
	//        // And for very short-running jobs, it looks and feels faster for the user to just let the OS cache flush out after the job appears to have finished.
	//        writeThrough = false
	//    }

	var dstFile io.WriteCloser
	if ctdl, ok := dl.(creationTimeDownloader); info.Destination != os.DevNull && ok { // ctdl never needs to handle devnull
		failFileCreation := func(err error) {
			jptm.LogDownloadError(info.Source, info.Destination, "File Creation Error "+err.Error(), 0)
			jptm.SetStatus(common.ETransferStatus.Failed())
			// use standard epilogue for consistency, but force release of file count (without an actual file) if necessary
			epilogueWithCleanupDownload(jptm, dl, nil, nil)
		}
		// block until we can safely use a file handle
		err := jptm.WaitUntilLockDestination(jptm.Context())
		if err != nil {
			failFileCreation(err)
			return
		}

		size := fileSize
		ct := common.ECompressionType.None()
		if jptm.ShouldDecompress() {
			size = 0                                  // we don't know what the final size will be, so we can't pre-size it
			ct, err = jptm.GetSourceCompressionType() // calls same decompression getter routine as the front-end does
			if err != nil {                           // check this, and return error, before we create any disk file, since if we return err, then no cleanup of file will be required
				failFileCreation(err)
				return
			}
			// Why get the decompression type again here, when we already looked at it at enumeration time?
			// Because we have better ability to report unsupported compression types here, with clear "transfer failed" handling,
			// and we still need to set size to zero here, so relying on enumeration more wouldn't simply this code much, if at all.
		}

		// Normal scenario, create the destination file as expected
		// Use pseudo chunk id to allow our usual state tracking mechanism to keep count of how many
		// file creations are running at any given instant, for perf diagnostics
		//
		// We create the file to a temporary location with name .azcopy-<jobID>-<actualName> and then move it
		// to correct name.
		pseudoId := common.NewPseudoChunkIDForWholeFile(info.Source)
		jptm.LogChunkStatus(pseudoId, common.EWaitReason.CreateLocalFile())
		var needChunks bool
		dstFile, needChunks, err = ctdl.CreateFile(jptm, info.getDownloadPath(), size, writeThrough, jptm.GetFolderCreationTracker())
		jptm.LogChunkStatus(pseudoId, common.EWaitReason.ChunkDone()) // normal setting to done doesn't apply to these pseudo ids
		if err != nil {
			failFileCreation(err)
			return
		}

		if !needChunks { // If no chunks need to be transferred (e.g. this is 0-bytes long, a symlink, etc.), treat it as a 0-byte transfer
			dl.Prologue(jptm)
			epilogueWithCleanupDownload(jptm, dl, nil, nil)
			return
		}

		if jptm.ShouldDecompress() { // Wrap the file in the decompressor if necessary
			jptm.LogAtLevelForCurrentTransfer(common.LogInfo, "will be decompressed from "+ct.String())

			// wrap for automatic decompression
			dstFile = common.NewDecompressingWriter(dstFile, ct)
			// why don't we just let Go's network stack automatically decompress for us? Because
			// 1. Then we can't check the MD5 hash (since logically, any stored hash should be the hash of the file that exists in Storage, i.e. the compressed one)
			// 2. Then we can't pre-plan a certain number of fixed-size chunks (which is required by the way our architecture currently works).
		}
	} else {
		// step 4b: special handling for empty files
		if fileSize == 0 {
			if strings.EqualFold(info.Destination, common.Dev_Null) {
				// do nothing
			} else {
				err := jptm.WaitUntilLockDestination(jptm.Context())
				if err == nil {
					err = createEmptyFile(jptm, info.Destination)
				}
				if err != nil {
					jptm.LogDownloadError(info.Source, info.Destination, "Empty File Creation error "+err.Error(), 0)
					jptm.SetStatus(common.ETransferStatus.Failed())
				}
			}
			// Run the prologue anyway, as some downloaders (files) require this.
			// Note that this doesn't actually have adverse effects (at the moment).
			// For files, it just sets a few properties.
			// For blobs, it sets up a page blob pacer if it's a page blob.
			// For blobFS, it's a noop.
			dl.Prologue(jptm)
			epilogueWithCleanupDownload(jptm, dl, nil, nil) // need standard epilogue, rather than a quick exit, so we can preserve modification dates
			return
		}

		// step 4c: normal file creation when source has content

		failFileCreation := func(err error) {
			jptm.LogDownloadError(info.Source, info.Destination, "File Creation Error "+err.Error(), 0)
			jptm.SetStatus(common.ETransferStatus.Failed())
			// use standard epilogue for consistency, but force release of file count (without an actual file) if necessary
			epilogueWithCleanupDownload(jptm, dl, nil, nil)
		}
		// block until we can safely use a file handle
		err := jptm.WaitUntilLockDestination(jptm.Context())
		if err != nil {
			failFileCreation(err)
			return
		}

		if strings.EqualFold(info.Destination, common.Dev_Null) {
			// the user wants to discard the downloaded data
			dstFile = devNullWriter{}
		} else {
			// Normal scenario, create the destination file as expected
			// Use pseudo chunk id to allow our usual state tracking mechanism to keep count of how many
			// file creations are running at any given instant, for perf diagnostics
			//
			// We create the file to a temporary location with name .azcopy-<jobID>-<actualName> and then move it
			// to correct name.
			pseudoId := common.NewPseudoChunkIDForWholeFile(info.Source)
			jptm.LogChunkStatus(pseudoId, common.EWaitReason.CreateLocalFile())
			dstFile, err = createDestinationFile(jptm, info.getDownloadPath(), fileSize, writeThrough)
			jptm.LogChunkStatus(pseudoId, common.EWaitReason.ChunkDone()) // normal setting to done doesn't apply to these pseudo ids
			if err != nil {
				failFileCreation(err)
				return
			}
		}
	}

	// TODO: Question: do we need to Stat the file, to check its size, after explicitly making it with the desired size?
	// That was what the old xfer-blobToLocal code used to do
	// I've commented it out to be more concise, but we'll put it back if someone knows why it needs to be here
	/*
		dstFileInfo, err := dstFile.Stat()
		if err != nil || (dstFileInfo.Size() != blobSize) {
			jptm.LogDownloadError(info.Source, info.Destination, "File Creation Error "+err.Error(), 0)
			jptm.SetStatus(common.ETransferStatus.Failed())
			// Since the transfer failed, the file created above should be deleted
			// If there was an error while opening / creating the file, delete will fail.
			// But delete is required when error occurred while truncating the file and
			// in this case file should be deleted.
			tryDeleteFile(info, jptm)
			jptm.ReportTransferDone()
			return
		}*/

	// step 5a: compute num chunks
	numChunks := uint32(0)
	if rem := fileSize % downloadChunkSize; rem == 0 {
		numChunks = uint32(fileSize / downloadChunkSize)
	} else {
		numChunks = uint32(fileSize/downloadChunkSize + 1)
	}

	// step 5b: create destination writer
	chunkLogger := jptm.ChunkStatusLogger()
	sourceMd5Exists := len(info.SrcHTTPHeaders.ContentMD5) > 0
	dstWriter := common.NewChunkedFileWriter(
		jptm.Context(),
		jptm.SlicePool(),
		jptm.CacheLimiter(),
		chunkLogger,
		dstFile,
		numChunks,
		MaxRetryPerDownloadBody,
		jptm.MD5ValidationOption(),
		sourceMd5Exists)

	// step 5c: run prologue in downloader (here it can, for example, create things that will require cleanup in the epilogue)
	common.GetLifecycleMgr().E2EAwaitAllowOpenFiles()
	dl.Prologue(jptm)

	// step 5d: tell jptm what to expect, and how to clean up at the end
	jptm.SetNumberOfChunks(numChunks)
	jptm.SetActionAfterLastChunk(func() { epilogueWithCleanupDownload(jptm, dl, dstFile, dstWriter) })

	// step 6: go through the blob range and schedule download chunk jobs
	// TODO: currently, the epilogue will only run if the number of completed chunks = numChunks.
	//     ...which means that we can't exit this loop early, if there is a cancellation or failure. Instead we
	//     ...must schedule the expected number of chunks, i.e. schedule all of them even if the transfer is already failed,
	//     ...so that the last of them will trigger the epilogue.
	//     ...Question: is that OK?
	// DECISION: 16 Jan, 2019: for now, we are leaving in place the above rule than number of of completed chunks must
	// eventually reach numChunks, since we have no better short-term alternative.

	chunkCount := uint32(0)
	for startIndex := int64(0); startIndex < fileSize; startIndex += downloadChunkSize {
		adjustedChunkSize := downloadChunkSize

		// compute exact size of the chunk
		if startIndex+downloadChunkSize > fileSize {
			adjustedChunkSize = fileSize - startIndex
		}

		id := common.NewChunkID(info.Destination, startIndex, adjustedChunkSize) // TODO: stop using adjustedChunkSize, below, and use the size that's in the ID

		// Wait until its OK to schedule it
		// To prevent excessive RAM consumption, we have a limit on the amount of scheduled-but-not-yet-saved data
		// TODO: as per comment above, currently, if there's an error here we must continue because we must schedule all chunks
		// TODO: ... Can we refactor/improve that?
		_ = dstWriter.WaitToScheduleChunk(jptm.Context(), id, adjustedChunkSize)

		// create download func that is a appropriate to the remote data source
		downloadFunc := dl.GenerateDownloadFunc(jptm, dstWriter, id, adjustedChunkSize, pacer)

		// schedule the download chunk job
		jptm.ScheduleChunks(downloadFunc)
		chunkCount++

		jptm.LogChunkStatus(id, common.EWaitReason.WorkerGR())
	}

	// sanity check to verify the number of chunks scheduled
	if chunkCount != numChunks {
		panic(fmt.Errorf("difference in the number of chunk calculated %v and actual chunks scheduled %v for src %s of size %v", numChunks, chunkCount, info.Source, fileSize))
	}

}