ste/sender-blockBlobFromLocal.go (147 lines of code) (raw):
// Copyright © 2017 Microsoft <wastore@microsoft.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package ste
import (
"bytes"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"sync/atomic"
"github.com/Azure/azure-storage-azcopy/v10/common"
)
type blockBlobUploader struct {
blockBlobSenderBase
md5Channel chan []byte
}
func newBlockBlobUploader(jptm IJobPartTransferMgr, pacer pacer, sip ISourceInfoProvider) (sender, error) {
senderBase, err := newBlockBlobSenderBase(jptm, pacer, sip, nil)
if err != nil {
return nil, err
}
return &blockBlobUploader{blockBlobSenderBase: *senderBase, md5Channel: newMd5Channel()}, nil
}
func (s *blockBlobUploader) Prologue(ps common.PrologueState) (destinationModified bool) {
if s.jptm.Info().PreservePOSIXProperties {
if unixSIP, ok := s.sip.(IUNIXPropertyBearingSourceInfoProvider); ok {
// Clone the metadata before we write to it, we shouldn't be writing to the same metadata as every other blob.
s.metadataToApply = s.metadataToApply.Clone()
statAdapter, err := unixSIP.GetUNIXProperties()
if err != nil {
s.jptm.FailActiveSend("GetUNIXProperties", err)
}
common.AddStatToBlobMetadata(statAdapter, s.metadataToApply)
}
}
return s.blockBlobSenderBase.Prologue(ps)
}
func (u *blockBlobUploader) Md5Channel() chan<- []byte {
return u.md5Channel
}
// Returns a chunk-func for blob uploads
func (u *blockBlobUploader) GenerateUploadFunc(id common.ChunkID, blockIndex int32, reader common.SingleChunkReader, chunkIsWholeFile bool) chunkFunc {
if chunkIsWholeFile {
if blockIndex > 0 {
panic("chunk cannot be whole file where there is more than one chunk")
}
setPutListNeed(&u.atomicPutListIndicator, putListNotNeeded)
return u.generatePutWholeBlob(id, reader)
} else {
setPutListNeed(&u.atomicPutListIndicator, putListNeeded)
return u.generatePutBlock(id, blockIndex, reader)
}
}
// generatePutBlock generates a func to upload the block of src data from given startIndex till the given chunkSize.
func (u *blockBlobUploader) generatePutBlock(id common.ChunkID, blockIndex int32, reader common.SingleChunkReader) chunkFunc {
return createSendToRemoteChunkFunc(u.jptm, id, func() {
// step 1: generate block ID
encodedBlockID := u.generateEncodedBlockID(blockIndex)
if u.ChunkAlreadyTransferred(blockIndex) {
u.jptm.LogAtLevelForCurrentTransfer(common.LogDebug,
fmt.Sprintf("Skipping chunk %d as it was already transferred.", blockIndex))
atomic.AddInt32(&u.atomicChunksWritten, 1)
return
}
// step 2: save the block ID into the list of block IDs
u.setBlockID(blockIndex, encodedBlockID)
// step 3: put block to remote
u.jptm.LogChunkStatus(id, common.EWaitReason.Body())
body := newPacedRequestBody(u.jptm.Context(), reader, u.pacer)
_, err := u.destBlockBlobClient.StageBlock(u.jptm.Context(), encodedBlockID, body,
&blockblob.StageBlockOptions{
CPKInfo: u.jptm.CpkInfo(),
CPKScopeInfo: u.jptm.CpkScopeInfo(),
})
if err != nil {
u.jptm.FailActiveUpload("Staging block", err)
return
}
atomic.AddInt32(&u.atomicChunksWritten, 1)
})
}
// generates PUT Blob (for a blob that fits in a single put request)
func (u *blockBlobUploader) generatePutWholeBlob(id common.ChunkID, reader common.SingleChunkReader) chunkFunc {
return createSendToRemoteChunkFunc(u.jptm, id, func() {
jptm := u.jptm
// Upload the blob
jptm.LogChunkStatus(id, common.EWaitReason.Body())
var err error
if !ValidateTier(jptm, u.destBlobTier, u.destBlockBlobClient, u.jptm.Context(), false) {
u.destBlobTier = nil
}
blobTags := u.blobTagsToApply
setTags := separateSetTagsRequired(blobTags)
if setTags || len(blobTags) == 0 {
blobTags = nil
}
// TODO: Remove this snippet once service starts supporting CPK with blob tier
destBlobTier := u.destBlobTier
if u.jptm.IsSourceEncrypted() {
destBlobTier = nil
}
if jptm.Info().SourceSize == 0 {
_, err = u.destBlockBlobClient.Upload(jptm.Context(), streaming.NopCloser(bytes.NewReader(nil)),
&blockblob.UploadOptions{
HTTPHeaders: &u.headersToApply,
Metadata: u.metadataToApply,
Tier: destBlobTier,
Tags: blobTags,
CPKInfo: jptm.CpkInfo(),
CPKScopeInfo: jptm.CpkScopeInfo(),
})
} else {
// File with content
// Get the MD5 that was computed as we read the file
md5Hash, ok := <-u.md5Channel
if !ok {
jptm.FailActiveUpload("Getting hash", errNoHash)
return
}
if len(md5Hash) != 0 {
u.headersToApply.BlobContentMD5 = md5Hash
}
// Upload the file
body := newPacedRequestBody(jptm.Context(), reader, u.pacer)
_, err = u.destBlockBlobClient.Upload(jptm.Context(), body,
&blockblob.UploadOptions{
HTTPHeaders: &u.headersToApply,
Metadata: u.metadataToApply,
Tier: destBlobTier,
Tags: blobTags,
CPKInfo: jptm.CpkInfo(),
CPKScopeInfo: jptm.CpkScopeInfo(),
})
}
// if the put blob is a failure, update the transfer status to failed
if err != nil {
jptm.FailActiveSend(common.Iff(len(blobTags) > 0, "Committing block list (with tags)", "Committing block list"), err)
return
}
atomic.AddInt32(&u.atomicChunksWritten, 1)
if setTags {
if _, err := u.destBlockBlobClient.SetTags(jptm.Context(), u.blobTagsToApply, nil); err != nil {
jptm.FailActiveSend("Set blob tags", err)
}
}
})
}
func (u *blockBlobUploader) Epilogue() {
jptm := u.jptm
shouldPutBlockList := getPutListNeed(&u.atomicPutListIndicator)
if jptm.IsLive() && shouldPutBlockList == putListNeeded {
md5Hash, ok := <-u.md5Channel
if ok {
if len(md5Hash) != 0 {
u.headersToApply.BlobContentMD5 = md5Hash
}
} else {
jptm.FailActiveSend("Getting hash", errNoHash)
return
}
}
u.blockBlobSenderBase.Epilogue()
}