ste/sender-blockBlobFromURL.go (116 lines of code) (raw):
// Copyright © 2017 Microsoft <wastore@microsoft.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package ste
import (
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"sync/atomic"
"github.com/Azure/azure-storage-azcopy/v10/common"
)
type urlToBlockBlobCopier struct {
blockBlobSenderBase
srcURL string
}
func newURLToBlockBlobCopier(jptm IJobPartTransferMgr, pacer pacer, srcInfoProvider IRemoteSourceInfoProvider) (s2sCopier, error) {
// Get blob tier, by default set none.
var destBlobTier *blob.AccessTier
// If the source is block blob, preserve source's blob tier.
if blobSrcInfoProvider, ok := srcInfoProvider.(IBlobSourceInfoProvider); ok {
if blobSrcInfoProvider.BlobType() == blob.BlobTypeBlockBlob {
destBlobTier = blobSrcInfoProvider.BlobTier()
}
}
senderBase, err := newBlockBlobSenderBase(jptm, pacer, srcInfoProvider, destBlobTier)
if err != nil {
return nil, err
}
srcURL, err := srcInfoProvider.PreSignedSourceURL()
if err != nil {
return nil, err
}
return &urlToBlockBlobCopier{
blockBlobSenderBase: *senderBase,
srcURL: srcURL}, nil
}
// Returns a chunk-func for blob copies
func (c *urlToBlockBlobCopier) GenerateCopyFunc(id common.ChunkID, blockIndex int32, adjustedChunkSize int64, chunkIsWholeFile bool) chunkFunc {
/*
* There was a optimization here to use PutBlob for zero-byte blobs instead of PutBlobFromURL.
* It was removed because of these reasons:
* 1. Both apis are different in some aspects. For put blob service verifies the content md5.
* This is not required if check-md5 is false. Using same calls helps us be consistent.
* 2. If the source only has list (and no read) permissions, we will still put the blob here
* While it is arguable that content can be inferred from size, it is better to fail transfer
* for blobs of all sizes.
*/
// Small blobs from all sources will be copied over to destination using PutBlobFromUrl
if c.NumChunks() == 1 && adjustedChunkSize <= int64(common.MaxPutBlobSize) {
/*
* siminsavani: FYI: For GCP, if the blob is the entirety of the file, GCP still returns
* invalid error from service due to PutBlockFromUrl.
*/
setPutListNeed(&c.atomicPutListIndicator, putListNotNeeded)
return c.generateStartPutBlobFromURL(id, blockIndex, adjustedChunkSize)
}
setPutListNeed(&c.atomicPutListIndicator, putListNeeded)
return c.generatePutBlockFromURL(id, blockIndex, adjustedChunkSize)
}
// generatePutBlockFromURL generates a func to copy the block of src data from given startIndex till the given chunkSize.
func (c *urlToBlockBlobCopier) generatePutBlockFromURL(id common.ChunkID, blockIndex int32, adjustedChunkSize int64) chunkFunc {
return createSendToRemoteChunkFunc(c.jptm, id, func() {
// step 1: generate block ID
encodedBlockID := c.generateEncodedBlockID(blockIndex)
// step 2: save the block ID into the list of block IDs
c.setBlockID(blockIndex, encodedBlockID)
if c.ChunkAlreadyTransferred(blockIndex) {
c.jptm.LogAtLevelForCurrentTransfer(common.LogDebug, fmt.Sprintf("Skipping chunk %d as it was already transferred.", blockIndex))
atomic.AddInt32(&c.atomicChunksWritten, 1)
return
}
// step 3: put block to remote
c.jptm.LogChunkStatus(id, common.EWaitReason.S2SCopyOnWire())
if err := c.pacer.RequestTrafficAllocation(c.jptm.Context(), adjustedChunkSize); err != nil {
c.jptm.FailActiveUpload("Pacing block", err)
}
token, err := c.jptm.GetS2SSourceTokenCredential(c.jptm.Context())
if err != nil {
c.jptm.FailActiveS2SCopy("Getting source token credential", err)
return
}
_, err = c.destBlockBlobClient.StageBlockFromURL(c.jptm.Context(), encodedBlockID, c.srcURL,
&blockblob.StageBlockFromURLOptions{
Range: blob.HTTPRange{Offset: id.OffsetInFile(), Count: adjustedChunkSize},
CPKInfo: c.jptm.CpkInfo(),
CPKScopeInfo: c.jptm.CpkScopeInfo(),
CopySourceAuthorization: token,
})
if err != nil {
c.jptm.FailActiveSend("Staging block from URL", err)
return
}
atomic.AddInt32(&c.atomicChunksWritten, 1)
})
}
func (c *urlToBlockBlobCopier) generateStartPutBlobFromURL(id common.ChunkID, blockIndex int32, adjustedChunkSize int64) chunkFunc {
return createSendToRemoteChunkFunc(c.jptm, id, func() {
c.jptm.LogChunkStatus(id, common.EWaitReason.S2SCopyOnWire())
// Create blob and finish.
if !ValidateTier(c.jptm, c.destBlobTier, c.destBlockBlobClient, c.jptm.Context(), false) {
c.destBlobTier = nil
}
blobTags := c.blobTagsToApply
setTags := separateSetTagsRequired(blobTags)
if setTags || len(blobTags) == 0 {
blobTags = nil
}
// TODO: Remove this snippet once service starts supporting CPK with blob tier
destBlobTier := c.destBlobTier
if c.jptm.IsSourceEncrypted() {
destBlobTier = nil
}
if err := c.pacer.RequestTrafficAllocation(c.jptm.Context(), adjustedChunkSize); err != nil {
c.jptm.FailActiveUpload("Pacing block", err)
}
token, err := c.jptm.GetS2SSourceTokenCredential(c.jptm.Context())
if err != nil {
c.jptm.FailActiveS2SCopy("Getting source token credential", err)
return
}
_, err = c.destBlockBlobClient.UploadBlobFromURL(c.jptm.Context(), c.srcURL,
&blockblob.UploadBlobFromURLOptions{
HTTPHeaders: &c.headersToApply,
Metadata: c.metadataToApply,
Tier: destBlobTier,
Tags: blobTags,
CPKInfo: c.jptm.CpkInfo(),
CPKScopeInfo: c.jptm.CpkScopeInfo(),
CopySourceAuthorization: token,
})
if err != nil {
c.jptm.FailActiveSend(common.Iff(len(blobTags) > 0, "Committing block list (with tags)", "Committing block list"), err)
return
}
atomic.AddInt32(&c.atomicChunksWritten, 1)
if setTags {
if _, err := c.destBlockBlobClient.SetTags(c.jptm.Context(), c.blobTagsToApply, nil); err != nil {
c.jptm.FailActiveSend("Set blob tags", err)
}
}
})
}