in ste/xfer-remoteToLocal-file.go [52:340]
func remoteToLocal_file(jptm IJobPartTransferMgr, pacer pacer, df downloaderFactory) {
info := jptm.Info()
// step 1: create downloader instance for this transfer
// We are using a separate instance per transfer, in case some implementations need to hold per-transfer state
dl, err := df(jptm)
if err != nil {
jptm.SetStatus(common.ETransferStatus.Failed())
jptm.ReportTransferDone()
return
}
// step 2: get the source, destination info for the transfer.
fileSize := int64(info.SourceSize)
downloadChunkSize := info.BlockSize
// step 3: Perform initial checks
// If the transfer was cancelled, then report transfer as done
// TODO Question: the above comment had this following text too: "and increasing the bytestransferred by the size of the source." what does it mean?
if jptm.WasCanceled() {
/* This is the earliest we detect that jptm has been cancelled before we schedule chunks */
jptm.SetStatus(common.ETransferStatus.Cancelled())
jptm.ReportTransferDone()
return
}
// if the force Write flags is set to false or prompt
// then check the file exists at the remote location
// if it does, react accordingly
if jptm.GetOverwriteOption() != common.EOverwriteOption.True() {
dstProps, err := common.OSStat(info.Destination)
if err == nil {
// if the error is nil, then file exists locally
shouldOverwrite := false
// if necessary, prompt to confirm user's intent
if jptm.GetOverwriteOption() == common.EOverwriteOption.Prompt() {
shouldOverwrite = jptm.GetOverwritePrompter().ShouldOverwrite(info.Destination, common.EEntityType.File())
} else if jptm.GetOverwriteOption() == common.EOverwriteOption.IfSourceNewer() {
// only overwrite if source lmt is newer (after) the destination
if jptm.LastModifiedTime().After(dstProps.ModTime()) {
shouldOverwrite = true
}
}
if !shouldOverwrite {
// logging as Warning so that it turns up even in compact logs, and because previously we use Error here
jptm.LogAtLevelForCurrentTransfer(common.LogWarning, "File already exists, so will be skipped")
jptm.SetStatus(common.ETransferStatus.SkippedEntityAlreadyExists())
jptm.ReportTransferDone()
return
}
}
}
if jptm.MD5ValidationOption() == common.EHashValidationOption.FailIfDifferentOrMissing() {
// We can make a check early on MD5 existence and fail the transfer if it's not present.
// This will save hours in the event a user has say, a several hundred gigabyte file.
if len(info.SrcHTTPHeaders.ContentMD5) == 0 {
jptm.LogDownloadError(info.Source, info.Destination, errExpectedMd5Missing.Error(), 0)
jptm.SetStatus(common.ETransferStatus.Failed())
jptm.ReportTransferDone()
return
}
}
// step 4a: mark destination as modified before we take our first action there (which is to create the destination file)
jptm.SetDestinationIsModified()
writeThrough := false
// TODO: consider cases where we might set it to true. It might give more predictable and understandable disk throughput.
// But can't be used in the cases shown in the if statement below (one of which is only pseudocode, at this stage)
// if fileSize <= 1*1024*1024 || jptm.JobHasLowFileCount() || <is a short-running job> {
// // but, for very small files, testing indicates that we can need it in at least some cases. (Presumably just can't get enough queue depth to physical disk without it.)
// // And also, for very low file counts, we also need it. Presumably for same reasons of queue depth (given our sequential write strategy as at March 2019)
// // And for very short-running jobs, it looks and feels faster for the user to just let the OS cache flush out after the job appears to have finished.
// writeThrough = false
// }
var dstFile io.WriteCloser
if ctdl, ok := dl.(creationTimeDownloader); info.Destination != os.DevNull && ok { // ctdl never needs to handle devnull
failFileCreation := func(err error) {
jptm.LogDownloadError(info.Source, info.Destination, "File Creation Error "+err.Error(), 0)
jptm.SetStatus(common.ETransferStatus.Failed())
// use standard epilogue for consistency, but force release of file count (without an actual file) if necessary
epilogueWithCleanupDownload(jptm, dl, nil, nil)
}
// block until we can safely use a file handle
err := jptm.WaitUntilLockDestination(jptm.Context())
if err != nil {
failFileCreation(err)
return
}
size := fileSize
ct := common.ECompressionType.None()
if jptm.ShouldDecompress() {
size = 0 // we don't know what the final size will be, so we can't pre-size it
ct, err = jptm.GetSourceCompressionType() // calls same decompression getter routine as the front-end does
if err != nil { // check this, and return error, before we create any disk file, since if we return err, then no cleanup of file will be required
failFileCreation(err)
return
}
// Why get the decompression type again here, when we already looked at it at enumeration time?
// Because we have better ability to report unsupported compression types here, with clear "transfer failed" handling,
// and we still need to set size to zero here, so relying on enumeration more wouldn't simply this code much, if at all.
}
// Normal scenario, create the destination file as expected
// Use pseudo chunk id to allow our usual state tracking mechanism to keep count of how many
// file creations are running at any given instant, for perf diagnostics
//
// We create the file to a temporary location with name .azcopy-<jobID>-<actualName> and then move it
// to correct name.
pseudoId := common.NewPseudoChunkIDForWholeFile(info.Source)
jptm.LogChunkStatus(pseudoId, common.EWaitReason.CreateLocalFile())
var needChunks bool
dstFile, needChunks, err = ctdl.CreateFile(jptm, info.getDownloadPath(), size, writeThrough, jptm.GetFolderCreationTracker())
jptm.LogChunkStatus(pseudoId, common.EWaitReason.ChunkDone()) // normal setting to done doesn't apply to these pseudo ids
if err != nil {
failFileCreation(err)
return
}
if !needChunks { // If no chunks need to be transferred (e.g. this is 0-bytes long, a symlink, etc.), treat it as a 0-byte transfer
dl.Prologue(jptm)
epilogueWithCleanupDownload(jptm, dl, nil, nil)
return
}
if jptm.ShouldDecompress() { // Wrap the file in the decompressor if necessary
jptm.LogAtLevelForCurrentTransfer(common.LogInfo, "will be decompressed from "+ct.String())
// wrap for automatic decompression
dstFile = common.NewDecompressingWriter(dstFile, ct)
// why don't we just let Go's network stack automatically decompress for us? Because
// 1. Then we can't check the MD5 hash (since logically, any stored hash should be the hash of the file that exists in Storage, i.e. the compressed one)
// 2. Then we can't pre-plan a certain number of fixed-size chunks (which is required by the way our architecture currently works).
}
} else {
// step 4b: special handling for empty files
if fileSize == 0 {
if strings.EqualFold(info.Destination, common.Dev_Null) {
// do nothing
} else {
err := jptm.WaitUntilLockDestination(jptm.Context())
if err == nil {
err = createEmptyFile(jptm, info.Destination)
}
if err != nil {
jptm.LogDownloadError(info.Source, info.Destination, "Empty File Creation error "+err.Error(), 0)
jptm.SetStatus(common.ETransferStatus.Failed())
}
}
// Run the prologue anyway, as some downloaders (files) require this.
// Note that this doesn't actually have adverse effects (at the moment).
// For files, it just sets a few properties.
// For blobs, it sets up a page blob pacer if it's a page blob.
// For blobFS, it's a noop.
dl.Prologue(jptm)
epilogueWithCleanupDownload(jptm, dl, nil, nil) // need standard epilogue, rather than a quick exit, so we can preserve modification dates
return
}
// step 4c: normal file creation when source has content
failFileCreation := func(err error) {
jptm.LogDownloadError(info.Source, info.Destination, "File Creation Error "+err.Error(), 0)
jptm.SetStatus(common.ETransferStatus.Failed())
// use standard epilogue for consistency, but force release of file count (without an actual file) if necessary
epilogueWithCleanupDownload(jptm, dl, nil, nil)
}
// block until we can safely use a file handle
err := jptm.WaitUntilLockDestination(jptm.Context())
if err != nil {
failFileCreation(err)
return
}
if strings.EqualFold(info.Destination, common.Dev_Null) {
// the user wants to discard the downloaded data
dstFile = devNullWriter{}
} else {
// Normal scenario, create the destination file as expected
// Use pseudo chunk id to allow our usual state tracking mechanism to keep count of how many
// file creations are running at any given instant, for perf diagnostics
//
// We create the file to a temporary location with name .azcopy-<jobID>-<actualName> and then move it
// to correct name.
pseudoId := common.NewPseudoChunkIDForWholeFile(info.Source)
jptm.LogChunkStatus(pseudoId, common.EWaitReason.CreateLocalFile())
dstFile, err = createDestinationFile(jptm, info.getDownloadPath(), fileSize, writeThrough)
jptm.LogChunkStatus(pseudoId, common.EWaitReason.ChunkDone()) // normal setting to done doesn't apply to these pseudo ids
if err != nil {
failFileCreation(err)
return
}
}
}
// TODO: Question: do we need to Stat the file, to check its size, after explicitly making it with the desired size?
// That was what the old xfer-blobToLocal code used to do
// I've commented it out to be more concise, but we'll put it back if someone knows why it needs to be here
/*
dstFileInfo, err := dstFile.Stat()
if err != nil || (dstFileInfo.Size() != blobSize) {
jptm.LogDownloadError(info.Source, info.Destination, "File Creation Error "+err.Error(), 0)
jptm.SetStatus(common.ETransferStatus.Failed())
// Since the transfer failed, the file created above should be deleted
// If there was an error while opening / creating the file, delete will fail.
// But delete is required when error occurred while truncating the file and
// in this case file should be deleted.
tryDeleteFile(info, jptm)
jptm.ReportTransferDone()
return
}*/
// step 5a: compute num chunks
numChunks := uint32(0)
if rem := fileSize % downloadChunkSize; rem == 0 {
numChunks = uint32(fileSize / downloadChunkSize)
} else {
numChunks = uint32(fileSize/downloadChunkSize + 1)
}
// step 5b: create destination writer
chunkLogger := jptm.ChunkStatusLogger()
sourceMd5Exists := len(info.SrcHTTPHeaders.ContentMD5) > 0
dstWriter := common.NewChunkedFileWriter(
jptm.Context(),
jptm.SlicePool(),
jptm.CacheLimiter(),
chunkLogger,
dstFile,
numChunks,
MaxRetryPerDownloadBody,
jptm.MD5ValidationOption(),
sourceMd5Exists)
// step 5c: run prologue in downloader (here it can, for example, create things that will require cleanup in the epilogue)
common.GetLifecycleMgr().E2EAwaitAllowOpenFiles()
dl.Prologue(jptm)
// step 5d: tell jptm what to expect, and how to clean up at the end
jptm.SetNumberOfChunks(numChunks)
jptm.SetActionAfterLastChunk(func() { epilogueWithCleanupDownload(jptm, dl, dstFile, dstWriter) })
// step 6: go through the blob range and schedule download chunk jobs
// TODO: currently, the epilogue will only run if the number of completed chunks = numChunks.
// ...which means that we can't exit this loop early, if there is a cancellation or failure. Instead we
// ...must schedule the expected number of chunks, i.e. schedule all of them even if the transfer is already failed,
// ...so that the last of them will trigger the epilogue.
// ...Question: is that OK?
// DECISION: 16 Jan, 2019: for now, we are leaving in place the above rule than number of of completed chunks must
// eventually reach numChunks, since we have no better short-term alternative.
chunkCount := uint32(0)
for startIndex := int64(0); startIndex < fileSize; startIndex += downloadChunkSize {
adjustedChunkSize := downloadChunkSize
// compute exact size of the chunk
if startIndex+downloadChunkSize > fileSize {
adjustedChunkSize = fileSize - startIndex
}
id := common.NewChunkID(info.Destination, startIndex, adjustedChunkSize) // TODO: stop using adjustedChunkSize, below, and use the size that's in the ID
// Wait until its OK to schedule it
// To prevent excessive RAM consumption, we have a limit on the amount of scheduled-but-not-yet-saved data
// TODO: as per comment above, currently, if there's an error here we must continue because we must schedule all chunks
// TODO: ... Can we refactor/improve that?
_ = dstWriter.WaitToScheduleChunk(jptm.Context(), id, adjustedChunkSize)
// create download func that is a appropriate to the remote data source
downloadFunc := dl.GenerateDownloadFunc(jptm, dstWriter, id, adjustedChunkSize, pacer)
// schedule the download chunk job
jptm.ScheduleChunks(downloadFunc)
chunkCount++
jptm.LogChunkStatus(id, common.EWaitReason.WorkerGR())
}
// sanity check to verify the number of chunks scheduled
if chunkCount != numChunks {
panic(fmt.Errorf("difference in the number of chunk calculated %v and actual chunks scheduled %v for src %s of size %v", numChunks, chunkCount, info.Source, fileSize))
}
}