ste/downloader-blob.go (129 lines of code) (raw):
// Copyright © 2017 Microsoft <wastore@microsoft.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package ste
import (
"os"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/pageblob"
"github.com/Azure/azure-storage-azcopy/v10/common"
)
type blobDownloader struct {
// filePacer is necessary because page blobs have per-blob throughput limits. The limits depend on
// what type of page blob it is (e.g. premium) and can be significantly lower than the blob account limit.
// Using a automatic pacer here lets us find the right rate for this particular page blob, at which
// we won't be trying to move the faster than the Service wants us to.
filePacer autopacer
// used to avoid downloading zero ranges of page blobs
pageRangeOptimizer *pageRangeOptimizer
source *blob.Client
jptm IJobPartTransferMgr
txInfo *TransferInfo
}
func (bd *blobDownloader) CreateSymlink(jptm IJobPartTransferMgr) error {
sip, err := newBlobSourceInfoProvider(jptm)
if err != nil {
return err
}
symsip := sip.(ISymlinkBearingSourceInfoProvider) // blob always implements this
symlinkInfo, _ := symsip.ReadLink()
// create the link
err = os.Symlink(symlinkInfo, jptm.Info().Destination)
return err
}
func newBlobDownloader(jptm IJobPartTransferMgr) (downloader, error) {
s, err := jptm.SrcServiceClient().BlobServiceClient()
if err != nil {
return nil, err
}
blobClient := s.NewContainerClient(jptm.Info().SrcContainer).NewBlobClient(jptm.Info().SrcFilePath)
if jptm.Info().VersionID != "" {
blobClient, err = blobClient.WithVersionID(jptm.Info().VersionID)
if err != nil {
return nil, err
}
} else if jptm.Info().SnapshotID != "" {
blobClient, err = blobClient.WithSnapshot(jptm.Info().SnapshotID)
if err != nil {
return nil, err
}
}
return &blobDownloader{
filePacer: NewNullAutoPacer(), // defer creation of real one, if needed, to Prologue
source: blobClient,
}, nil
}
func (bd *blobDownloader) Prologue(jptm IJobPartTransferMgr) {
bd.txInfo = jptm.Info()
bd.jptm = jptm
if jptm.Info().SrcBlobType == blob.BlobTypePageBlob {
// page blobs need a file-specific pacer
// See comments in uploader-pageBlob for the reasons, since the same reasons apply are are explained there
bd.filePacer = newPageBlobAutoPacer(pageBlobInitialBytesPerSecond, jptm.Info().BlockSize, false, jptm.(common.ILogger))
// This is safe. We've already asserted that SrcServiceClient() is
// a blob service client.
s, _ := jptm.SrcServiceClient().BlobServiceClient()
c := s.NewContainerClient(jptm.Info().SrcContainer)
bd.pageRangeOptimizer = newPageRangeOptimizer(c.NewPageBlobClient(bd.txInfo.SrcFilePath), jptm.Context())
bd.pageRangeOptimizer.fetchPages()
}
}
func (bd *blobDownloader) Epilogue() {
if bd.jptm != nil {
if bd.jptm.IsLive() && bd.jptm.Info().PreservePOSIXProperties {
bsip, err := newBlobSourceInfoProvider(bd.jptm)
if err != nil {
bd.jptm.FailActiveDownload("get blob source info provider", err)
}
unixstat, _ := bsip.(IUNIXPropertyBearingSourceInfoProvider)
if ubd, ok := (interface{})(bd).(unixPropertyAwareDownloader); ok && unixstat.HasUNIXProperties() {
adapter, err := unixstat.GetUNIXProperties()
if err != nil {
bd.jptm.FailActiveDownload("get unix properties", err)
}
stage, err := ubd.ApplyUnixProperties(adapter)
if err != nil {
bd.jptm.FailActiveDownload("set unix properties: "+stage, err)
}
}
}
}
_ = bd.filePacer.Close()
}
// Returns a chunk-func for blob downloads
func (bd *blobDownloader) GenerateDownloadFunc(jptm IJobPartTransferMgr, destWriter common.ChunkedFileWriter, id common.ChunkID, length int64, pacer pacer) chunkFunc {
return createDownloadChunkFunc(jptm, id, func() {
// If the range does not contain any data, write out empty data to disk without performing download
pageRange := pageblob.PageRange{Start: to.Ptr(id.OffsetInFile()), End: to.Ptr(id.OffsetInFile() + length - 1)}
if bd.pageRangeOptimizer != nil && !bd.pageRangeOptimizer.doesRangeContainData(pageRange) {
// queue an empty chunk
err := destWriter.EnqueueChunk(jptm.Context(), id, length, dummyReader{}, false)
if err != nil {
jptm.FailActiveDownload("Enqueuing chunk", err)
}
return
}
// Control rate of data movement (since page blobs can effectively have per-blob throughput limits)
// Note that this level of control here is specific to the individual page blob, and is additional
// to the application-wide pacing that we (optionally) do below when reading the response body.
// Note also that the resulting throughput is somewhat ragged for downloads, and does not track the
// pacer's target rate as closely as it does for uploads. Presumably this is just because its
// hard to accurately control throughput from the receiving end. I.e. not a pacer bug, but just
// something inherent in the nature of REST downloads. So, as at March 2018, we are just living
// with it as known issue when downloading paced blobs.
jptm.LogChunkStatus(id, common.EWaitReason.FilePacer())
if err := bd.filePacer.RequestTrafficAllocation(jptm.Context(), length); err != nil {
jptm.FailActiveDownload("Pacing block", err)
}
// download blob from start Index till startIndex + adjustedChunkSize
// TODO (gapra) : This can be removed after Access Conditions fix is released.
// set access conditions, to protect against inconsistencies from changes-while-being-read
lmt := jptm.LastModifiedTime().In(time.FixedZone("GMT", 0))
accessConditions := &blob.AccessConditions{ModifiedAccessConditions: &blob.ModifiedAccessConditions{IfUnmodifiedSince: &lmt}}
if isInManagedDiskImportExportAccount(jptm.Info().Source) {
// no access conditions (and therefore no if-modified checks) are supported on managed disk import/export (md-impexp)
// They are also unsupported on old "md-" style export URLs on the new (2019) large size disks.
// And if fact you can't have an md- URL in existence if the blob is mounted as a disk, so it won't be getting changed anyway, so we just treat all md-disks the same
accessConditions = nil
}
// At this point we create an HTTP(S) request for the desired portion of the blob, and
// wait until we get the headers back... but we have not yet read its whole body.
// The Download method encapsulates any retries that may be necessary to get to the point of receiving response headers.
jptm.LogChunkStatus(id, common.EWaitReason.HeaderResponse())
enrichedContext := withRetryNotification(jptm.Context(), bd.filePacer)
get, err := bd.source.DownloadStream(enrichedContext, &blob.DownloadStreamOptions{
Range: blob.HTTPRange{Offset: id.OffsetInFile(), Count: length},
AccessConditions: accessConditions,
CPKInfo: jptm.CpkInfo(),
CPKScopeInfo: jptm.CpkScopeInfo(),
})
if err != nil {
jptm.FailActiveDownload("Downloading response body", err) // cancel entire transfer because this chunk has failed
return
}
// Enqueue the response body to be written out to disk
// The retryReader encapsulates any retries that may be necessary while downloading the body
jptm.LogChunkStatus(id, common.EWaitReason.Body())
retryReader := get.NewRetryReader(enrichedContext, &blob.RetryReaderOptions{
MaxRetries: int32(destWriter.MaxRetryPerDownloadBody()),
OnFailedRead: common.NewBlobReadLogFunc(jptm, jptm.Info().Source),
})
defer retryReader.Close()
err = destWriter.EnqueueChunk(jptm.Context(), id, length, newPacedResponseBody(jptm.Context(), retryReader, pacer), true)
if err != nil {
jptm.FailActiveDownload("Enqueuing chunk", err)
return
}
})
}
type dummyReader struct{}
func (dummyReader) Read(p []byte) (n int, err error) {
return len(p), nil
}