ste/sender-pageBlob.go (195 lines of code) (raw):

// Copyright © 2017 Microsoft <wastore@microsoft.com> // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package ste import ( "context" "errors" "fmt" "net/url" "regexp" "strings" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/pageblob" "github.com/Azure/azure-storage-azcopy/v10/common" ) type pageBlobSenderBase struct { jptm IJobPartTransferMgr destPageBlobClient *pageblob.Client srcSize int64 chunkSize int64 numChunks uint32 pacer pacer // Headers and other info that we will apply to the destination // object. For S2S, these come from the source service. // When sending local data, they are computed based on // the properties of the local file headersToApply blob.HTTPHeaders metadataToApply common.Metadata blobTagsToApply common.BlobTags destBlobTier *blob.AccessTier // filePacer is necessary because page blobs have per-blob throughput limits. The limits depend on // what type of page blob it is (e.g. premium) and can be significantly lower than the blob account limit. // Using a automatic pacer here lets us find the right rate for this particular page blob, at which // we won't be trying to move the faster than the Service wants us to. filePacer autopacer // destPageRangeOptimizer is necessary for managed disk imports, // as it helps us identify where we actually need to write all zeroes to. // Previously, if a page prefetched all zeroes, we'd ignore it. // In a edge-case scenario (where two different VHDs had been uploaded to the same md impexp URL), // there was a potential for us to not zero out 512b segments that we'd prefetched all zeroes for. // This only posed danger when there was already data in one of these segments. destPageRangeOptimizer *pageRangeOptimizer } const ( managedDiskImportExportAccountPrefix = "md-" // Start high(ish), because it auto-tunes downwards faster than it auto-tunes upwards pageBlobInitialBytesPerSecond = (4 * 1000 * 1000 * 1000) / 8 ) var ( md5NotSupportedInManagedDiskError = errors.New("the Content-MD5 hash is not supported for managed disk uploads") ) func newPageBlobSenderBase(jptm IJobPartTransferMgr, destination string, pacer pacer, srcInfoProvider ISourceInfoProvider, inferredAccessTierType *blob.AccessTier) (*pageBlobSenderBase, error) { transferInfo := jptm.Info() // compute chunk count chunkSize := transferInfo.BlockSize // If the given chunk Size for the Job is invalid for page blob or greater than maximum page size, // then set chunkSize as maximum pageSize. chunkSize = common.Iff( chunkSize > common.DefaultPageBlobChunkSize || (chunkSize%pageblob.PageBytes != 0), common.DefaultPageBlobChunkSize, chunkSize) srcSize := transferInfo.SourceSize numChunks := getNumChunks(srcSize, chunkSize, chunkSize) bsc, err := jptm.DstServiceClient().BlobServiceClient() if err != nil { return nil, err } destPageBlobClient := bsc.NewContainerClient(jptm.Info().DstContainer).NewPageBlobClient(jptm.Info().DstFilePath) // This is only necessary if our destination is a managed disk impexp account. // Read the in struct explanation if necessary. var destRangeOptimizer *pageRangeOptimizer if isInManagedDiskImportExportAccount(destination) { destRangeOptimizer = newPageRangeOptimizer(destPageBlobClient, jptm.Context()) } props, err := srcInfoProvider.Properties() if err != nil { return nil, err } // If user set blob tier explicitly, override any value that our caller // may have guessed. destBlobTier := inferredAccessTierType _, pageBlobTierOverride := jptm.BlobTiers() if pageBlobTierOverride != common.EPageBlobTier.None() { t := pageBlobTierOverride.ToAccessTierType() destBlobTier = &t } s := &pageBlobSenderBase{ jptm: jptm, destPageBlobClient: destPageBlobClient, srcSize: srcSize, chunkSize: chunkSize, numChunks: numChunks, pacer: pacer, headersToApply: props.SrcHTTPHeaders.ToBlobHTTPHeaders(), metadataToApply: props.SrcMetadata, blobTagsToApply: props.SrcBlobTags, destBlobTier: destBlobTier, filePacer: NewNullAutoPacer(), // defer creation of real one to Prologue destPageRangeOptimizer: destRangeOptimizer, } if s.isInManagedDiskImportExportAccount() && jptm.ShouldPutMd5() { return nil, md5NotSupportedInManagedDiskError } return s, nil } // these accounts have special restrictions of which APIs operations they support func isInManagedDiskImportExportAccount(rawURL string) bool { u, err := url.Parse(rawURL) if err != nil { return false } return strings.HasPrefix(u.Host, managedDiskImportExportAccountPrefix) } func (s *pageBlobSenderBase) isInManagedDiskImportExportAccount() bool { return isInManagedDiskImportExportAccount(s.destPageBlobClient.URL()) } func (s *pageBlobSenderBase) SendableEntityType() common.EntityType { return common.EEntityType.File() } func (s *pageBlobSenderBase) ChunkSize() int64 { return s.chunkSize } func (s *pageBlobSenderBase) NumChunks() uint32 { return s.numChunks } func (s *pageBlobSenderBase) RemoteFileExists() (bool, time.Time, error) { properties, err := s.destPageBlobClient.GetProperties(s.jptm.Context(), &blob.GetPropertiesOptions{CPKInfo: s.jptm.CpkInfo()}) return remoteObjectExists(blobPropertiesResponseAdapter{properties}, err) } var premiumPageBlobTierRegex = regexp.MustCompile(`P\d+`) func (s *pageBlobSenderBase) Prologue(ps common.PrologueState) (destinationModified bool) { // Create file pacer now. Safe to create now, because we know that if Prologue is called the Epilogue will be to // so we know that the pacer will be closed. // TODO: consider re-factor xfer-anyToRemote so that epilogue is always called if uploader is constructed, and move this to constructor s.filePacer = newPageBlobAutoPacer(pageBlobInitialBytesPerSecond, s.ChunkSize(), false, s.jptm.(common.ILogger)) if s.isInManagedDiskImportExportAccount() { // Target will already exist (and CANNOT be created through the REST API, because // managed-disk import-export accounts have restricted API surface) // Check its length, since it already has a size, and the upload will fail at the end if you what // upload to it is bigger than its existing size. (And, for big files, it may be hours until you discover that // difference if we don't check here). // // We use an equality check (rather than ensuring sourceSize <= dest), because customer should have declared the correct exact size when // making the disk in Azure. (And if we don't check equality here, by default we do check it after upload for all blobs, as of version 10.3) // // Note re types and sizes: // Currently (2019) only VHDs are supported for Azure managed disk upload. VHDXs (which have a different footer size, are not). // Azure requires VHD size to be a multiple of 1MB plus 512 bytes for the VHD footer. And the VHD must be fixed size. // E.g. these are the values reported by PowerShell's Get-VHD for a valid 1 GB VHD: // VhdFormat : VHD // VhdType : Fixed // FileSize : 1073742336 (equals our s.srcSize, i.e. the size of the disk file) // Size : 1073741824 p, err := s.destPageBlobClient.GetProperties(s.jptm.Context(), &blob.GetPropertiesOptions{CPKInfo: s.jptm.CpkInfo()}) if err != nil { s.jptm.FailActiveSend("Checking size of managed disk blob", err) return } if p.ContentLength == nil { sizeErr := fmt.Errorf("destination content length not returned") s.jptm.FailActiveSend("Checking size of managed disk blob", sizeErr) } if s.srcSize != *p.ContentLength { sizeErr := fmt.Errorf("source file is not same size as the destination page blob. Source size is %d bytes but destination size is %d bytes. Re-create the destination with exactly the right size. E.g. see parameter UploadSizeInBytes in PowerShell's New-AzDiskConfig. Ensure the source is a fixed-size VHD", s.srcSize, *p.ContentLength) s.jptm.FailActiveSend("Checking size of managed disk blob", sizeErr) return } // Next, grab the page ranges on the destination. s.destPageRangeOptimizer.fetchPages() s.jptm.Log(common.LogInfo, "Blob is managed disk import/export blob, so no Create call is required") // the blob always already exists return } if s.jptm.ShouldInferContentType() { // sometimes, specifically when reading local files, we have more info // about the file type at this time than what we had before s.headersToApply.BlobContentType = ps.GetInferredContentType(s.jptm) } var destBlobTier *pageblob.PremiumPageBlobAccessTier if s.destBlobTier != nil { destBlobTier = to.Ptr(pageblob.PremiumPageBlobAccessTier(*s.destBlobTier)) } if !ValidateTier(s.jptm, s.destBlobTier, s.destPageBlobClient, s.jptm.Context(), false) { destBlobTier = nil } // TODO: Remove this snippet once service starts supporting CPK with blob tier if s.jptm.IsSourceEncrypted() { destBlobTier = nil } blobTags := s.blobTagsToApply setTags := separateSetTagsRequired(blobTags) if setTags || len(blobTags) == 0 { blobTags = nil } _, err := s.destPageBlobClient.Create(s.jptm.Context(), s.srcSize, &pageblob.CreateOptions{ SequenceNumber: to.Ptr(int64(0)), HTTPHeaders: &s.headersToApply, Metadata: s.metadataToApply, Tier: destBlobTier, Tags: blobTags, CPKInfo: s.jptm.CpkInfo(), CPKScopeInfo: s.jptm.CpkScopeInfo(), }) if err != nil { s.jptm.FailActiveSend(common.Iff(len(blobTags) > 0, "Creating blob (with tags)", "Creating blob"), err) return } destinationModified = true if setTags { if _, err := s.destPageBlobClient.SetTags(s.jptm.Context(), s.blobTagsToApply, nil); err != nil { s.jptm.FailActiveSend("Set blob tags", err) } } return } func (s *pageBlobSenderBase) Epilogue() { _ = s.filePacer.Close() // release resources } func (s *pageBlobSenderBase) Cleanup() { jptm := s.jptm // Cleanup if jptm.IsDeadInflight() { if s.isInManagedDiskImportExportAccount() { // no deletion is possible. User just has to upload it again. } else { deletionContext, cancelFunc := context.WithTimeout(context.WithValue(context.Background(), ServiceAPIVersionOverride, DefaultServiceApiVersion), 30*time.Second) defer cancelFunc() _, err := s.destPageBlobClient.Delete(deletionContext, nil) if err != nil { jptm.LogError(s.destPageBlobClient.URL(), "Delete (incomplete) Page Blob ", err) } } } } // GetDestinationLength gets the destination length. func (s *pageBlobSenderBase) GetDestinationLength() (int64, error) { prop, err := s.destPageBlobClient.GetProperties(s.jptm.Context(), &blob.GetPropertiesOptions{CPKInfo: s.jptm.CpkInfo()}) if err != nil { return -1, err } if prop.ContentLength == nil { return -1, fmt.Errorf("destination content length not returned") } return *prop.ContentLength, nil }