ste/sender-blockBlob.go (349 lines of code) (raw):
// Copyright © 2017 Microsoft <wastore@microsoft.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package ste
import (
"context"
"encoding/base64"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file"
"github.com/Azure/azure-storage-azcopy/v10/common"
)
var lowMemoryLimitAdvice sync.Once
type blockBlobSenderBase struct {
jptm IJobPartTransferMgr
sip ISourceInfoProvider
destBlockBlobClient *blockblob.Client
chunkSize int64
numChunks uint32
pacer pacer
blockIDs []string
destBlobTier *blob.AccessTier
// Headers and other info that we will apply to the destination object.
// 1. For S2S, these come from the source service.
// 2. When sending local data, they are computed based on the properties of the local file
headersToApply blob.HTTPHeaders
metadataToApply common.Metadata
blobTagsToApply common.BlobTags
atomicChunksWritten int32
atomicPutListIndicator int32
muBlockIDs *sync.Mutex
blockNamePrefix string
completedBlockList map[int]string
}
func getVerifiedChunkParams(transferInfo *TransferInfo, memLimit int64, strictMemLimit int64) (chunkSize int64, numChunks uint32, err error) {
chunkSize = transferInfo.BlockSize
putBlobSize := transferInfo.PutBlobSize
srcSize := transferInfo.SourceSize
numChunks = getNumChunks(srcSize, chunkSize, putBlobSize)
maxSize := int64(common.MaxBlockBlobBlockSize)
if srcSize < putBlobSize {
chunkSize = putBlobSize
maxSize = common.MaxPutBlobSize
}
toGiB := func(bytes int64) float64 {
return float64(bytes) / float64(1024*1024*1024)
}
if common.MinParallelChunkCountThreshold >= memLimit/chunkSize {
glcm := common.GetLifecycleMgr()
msg := fmt.Sprintf("Using a blocksize of %.2fGiB for file %s. AzCopy is limited to use %.2fGiB of memory."+
"Consider providing at least %.2fGiB to AzCopy, using environment variable %s.",
toGiB(chunkSize), transferInfo.Source, toGiB(memLimit),
toGiB(common.MinParallelChunkCountThreshold*chunkSize),
common.EEnvironmentVariable.BufferGB().Name)
lowMemoryLimitAdvice.Do(func() { glcm.Info(msg) })
}
if chunkSize >= memLimit {
err = fmt.Errorf("Cannot use a block size of %.2fGiB. AzCopy is limited to use only %.2fGiB of memory",
toGiB(chunkSize), toGiB(memLimit))
return
}
if chunkSize >= strictMemLimit {
err = fmt.Errorf("Cannot use a block size of %.2fGiB. AzCopy is limited to use only %.2fGiB of memory, and only %.2fGiB of these are available for chunks.",
toGiB(chunkSize), toGiB(memLimit), toGiB(strictMemLimit))
return
}
// sanity check
if chunkSize > maxSize { // Use relevant PutBlob or Block size max
// mercy, please
err = fmt.Errorf("block size of %.2fGiB for file %s of size %.2fGiB exceeds maximum allowed %.2fMiB block size for a BlockBlob",
toGiB(chunkSize), transferInfo.Source, toGiB(transferInfo.SourceSize), float64(maxSize/(1024*1024)))
return
}
if putBlobSize >= memLimit {
err = fmt.Errorf("Cannot use a put blob size of %.2fGiB. AzCopy is limited to use only %.2fGiB of memory",
toGiB(putBlobSize), toGiB(memLimit))
return
}
if putBlobSize >= strictMemLimit {
err = fmt.Errorf("Cannot use a put blob size of %.2fGiB. AzCopy is limited to use only %.2fGiB of memory, and only %.2fGiB of these are available for chunks.",
toGiB(putBlobSize), toGiB(memLimit), toGiB(strictMemLimit))
return
}
// sanity check
if putBlobSize > common.MaxPutBlobSize {
// mercy, please
err = fmt.Errorf("put blob size of %.2fGiB for file %s of size %.2fGiB exceeds maximum allowed put blob size for a BlockBlob",
toGiB(putBlobSize), transferInfo.Source, toGiB(transferInfo.SourceSize))
return
}
if numChunks > common.MaxNumberOfBlocksPerBlob {
err = fmt.Errorf("Block size %d for source of size %d is not correct. Number of blocks will exceed the limit", chunkSize, srcSize)
return
}
return
}
// Current size of block names in AzCopy is 48B. To be consistent with this,
// we have to generate a 36B string and then base64-encode this to conform
// to the same size. We generate prefix here.
// Block Names of blobs are of format noted below.
// <5B empty placeholder><16B GUID of AzCopy re-interpreted as string><5B PartNum><5B Index in the jobPart><5B blockNum>
func getBlockNamePrefix(jobID common.JobID, partNum uint32, transferIndex uint32) string {
jobIdStr := string((*[16]byte)(unsafe.Pointer(&jobID))[:])
placeHolderPrefix := "00000"
return fmt.Sprintf("%s%s%05d%05d", placeHolderPrefix, jobIdStr, partNum, transferIndex)
}
func newBlockBlobSenderBase(jptm IJobPartTransferMgr, pacer pacer, srcInfoProvider ISourceInfoProvider, inferredAccessTierType *blob.AccessTier) (*blockBlobSenderBase, error) {
// compute chunk count
chunkSize, numChunks, err := getVerifiedChunkParams(jptm.Info(), jptm.CacheLimiter().Limit(), jptm.CacheLimiter().StrictLimit())
if err != nil {
return nil, err
}
c, err := jptm.DstServiceClient().BlobServiceClient()
if err != nil {
return nil, err
}
destBlockBlobClient := c.NewContainerClient(jptm.Info().DstContainer).NewBlockBlobClient(jptm.Info().DstFilePath)
props, err := srcInfoProvider.Properties()
if err != nil {
return nil, err
}
// If user set blob tier explicitly, override any value that our caller
// may have guessed.
destBlobTier := inferredAccessTierType
blockBlobTierOverride, _ := jptm.BlobTiers()
if blockBlobTierOverride != common.EBlockBlobTier.None() {
t := blockBlobTierOverride.ToAccessTierType()
destBlobTier = &t
}
if (props.SrcMetadata["hdi_isfolder"] != nil && *props.SrcMetadata["hdi_isfolder"] == "true") ||
(props.SrcMetadata["Hdi_isfolder"] != nil && *props.SrcMetadata["Hdi_isfolder"] == "true") {
destBlobTier = nil
}
partNum, transferIndex := jptm.TransferIndex()
return &blockBlobSenderBase{
jptm: jptm,
sip: srcInfoProvider,
destBlockBlobClient: destBlockBlobClient,
chunkSize: chunkSize,
numChunks: numChunks,
pacer: pacer,
blockIDs: make([]string, numChunks),
headersToApply: props.SrcHTTPHeaders.ToBlobHTTPHeaders(),
metadataToApply: props.SrcMetadata,
blobTagsToApply: props.SrcBlobTags,
destBlobTier: destBlobTier,
muBlockIDs: &sync.Mutex{},
blockNamePrefix: getBlockNamePrefix(jptm.Info().JobID, partNum, transferIndex),
}, nil
}
func (s *blockBlobSenderBase) SendableEntityType() common.EntityType {
return common.EEntityType.File()
}
func (s *blockBlobSenderBase) ChunkSize() int64 {
return s.chunkSize
}
func (s *blockBlobSenderBase) NumChunks() uint32 {
return s.numChunks
}
func (s *blockBlobSenderBase) RemoteFileExists() (bool, time.Time, error) {
properties, err := s.destBlockBlobClient.GetProperties(s.jptm.Context(), &blob.GetPropertiesOptions{CPKInfo: s.jptm.CpkInfo()})
return remoteObjectExists(blobPropertiesResponseAdapter{properties}, err)
}
func (s *blockBlobSenderBase) Prologue(ps common.PrologueState) (destinationModified bool) {
if s.jptm.RestartedTransfer() {
s.buildCommittedBlockMap()
}
if s.jptm.ShouldInferContentType() {
s.headersToApply.BlobContentType = ps.GetInferredContentType(s.jptm)
}
if s.jptm.DeleteDestinationFileIfNecessary() {
s.DeleteDstBlob()
}
return false
}
func (s *blockBlobSenderBase) Epilogue() {
jptm := s.jptm
s.muBlockIDs.Lock()
blockIDs := s.blockIDs
s.blockIDs = nil // so we know for sure that only this routine has access after we release the lock (nothing else should need it now, since we're in the epilogue. Nil-ing here is just being defensive)
s.muBlockIDs.Unlock()
shouldPutBlockList := getPutListNeed(&s.atomicPutListIndicator)
if shouldPutBlockList == putListNeedUnknown && !jptm.WasCanceled() {
panic(errors.New("'put list' need flag was never set"))
}
// TODO: finalize and wrap in functions whether 0 is included or excluded in status comparisons
// commit block list if necessary
if jptm.IsLive() && shouldPutBlockList == putListNeeded {
jptm.Log(common.LogDebug, fmt.Sprintf("Conclude Transfer with BlockList %s", blockIDs))
// commit the blocks.
if !ValidateTier(jptm, s.destBlobTier, s.destBlockBlobClient.BlobClient(), s.jptm.Context(), false) {
s.destBlobTier = nil
}
blobTags := s.blobTagsToApply
setTags := separateSetTagsRequired(blobTags)
if setTags || len(blobTags) == 0 {
blobTags = nil
}
// TODO: Remove this snippet once service starts supporting CPK with blob tier
destBlobTier := s.destBlobTier
if s.jptm.IsSourceEncrypted() {
destBlobTier = nil
}
_, err := s.destBlockBlobClient.CommitBlockList(jptm.Context(), blockIDs,
&blockblob.CommitBlockListOptions{
HTTPHeaders: &s.headersToApply,
Metadata: s.metadataToApply,
Tier: destBlobTier,
Tags: blobTags,
CPKInfo: s.jptm.CpkInfo(),
CPKScopeInfo: s.jptm.CpkScopeInfo(),
})
if err != nil {
jptm.FailActiveSend(common.Iff(blobTags != nil, "Committing block list (with tags)", "Committing block list"), err)
return
}
if setTags {
if _, err := s.destBlockBlobClient.SetTags(jptm.Context(), s.blobTagsToApply, nil); err != nil {
jptm.FailActiveSend("Setting tags", err)
}
}
}
// Upload ADLS Gen 2 ACLs
fromTo := jptm.FromTo()
if fromTo.From().SupportsHnsACLs() && fromTo.To().SupportsHnsACLs() && jptm.Info().PreserveSMBPermissions.IsTruthy() {
// We know for a fact our source is a "blob".
acl, err := s.sip.(*blobSourceInfoProvider).AccessControl()
if err != nil {
jptm.FailActiveSend("Grabbing source ACLs", err)
return
}
dsc, err := jptm.DstServiceClient().DatalakeServiceClient()
if err != nil {
jptm.FailActiveSend("Getting source client", err)
return
}
dstDatalakeClient := dsc.NewFileSystemClient(jptm.Info().DstContainer).NewFileClient(jptm.Info().DstFilePath)
_, err = dstDatalakeClient.SetAccessControl(jptm.Context(), &file.SetAccessControlOptions{ACL: acl})
if err != nil {
jptm.FailActiveSend("Putting ACLs", err)
return
}
}
}
func (s *blockBlobSenderBase) Cleanup() {
jptm := s.jptm
// Cleanup
if jptm.IsDeadInflight() && atomic.LoadInt32(&s.atomicChunksWritten) != 0 {
// there is a possibility that some uncommitted blocks will be there
// Delete the uncommitted blobs
deletionContext, cancelFn := context.WithTimeout(context.WithValue(context.Background(), ServiceAPIVersionOverride, DefaultServiceApiVersion), 30*time.Second)
defer cancelFn()
if jptm.WasCanceled() {
// If we cancelled, and the only blocks that exist are uncommitted, then clean them up.
// This prevents customer paying for their storage for a week until they get garbage collected, and it
// also prevents any issues with "too many uncommitted blocks" if user tries to upload the blob again in future.
// But if there are committed blocks, leave them there (since they still safely represent the state before our job even started)
blockList, err := s.destBlockBlobClient.GetBlockList(deletionContext, blockblob.BlockListTypeAll, nil)
hasUncommittedOnly := err == nil && len(blockList.CommittedBlocks) == 0 && len(blockList.UncommittedBlocks) > 0
if hasUncommittedOnly {
jptm.LogAtLevelForCurrentTransfer(common.LogDebug, "Deleting uncommitted destination blob due to cancellation")
// Delete can delete uncommitted blobs.
_, _ = s.destBlockBlobClient.Delete(deletionContext, nil)
}
} else {
// TODO: review (one last time) should we really do this? Or should we just give better error messages on "too many uncommitted blocks" errors
jptm.LogAtLevelForCurrentTransfer(common.LogDebug, "Deleting destination blob due to failure")
_, _ = s.destBlockBlobClient.Delete(deletionContext, nil)
}
}
}
// Currently we've common Metadata Copier across all senders for block blob.
func (s *blockBlobSenderBase) GenerateCopyMetadata(id common.ChunkID) chunkFunc {
return createChunkFunc(true, s.jptm, id, func() {
if unixSIP, ok := s.sip.(IUNIXPropertyBearingSourceInfoProvider); ok {
// Clone the metadata before we write to it, we shouldn't be writing to the same metadata as every other blob.
s.metadataToApply = s.metadataToApply.Clone()
statAdapter, err := unixSIP.GetUNIXProperties()
if err != nil {
s.jptm.FailActiveSend("GetUNIXProperties", err)
}
common.AddStatToBlobMetadata(statAdapter, s.metadataToApply)
}
_, err := s.destBlockBlobClient.SetMetadata(s.jptm.Context(), s.metadataToApply,
&blob.SetMetadataOptions{
CPKInfo: s.jptm.CpkInfo(),
CPKScopeInfo: s.jptm.CpkScopeInfo(),
})
if err != nil {
s.jptm.FailActiveSend("Setting Metadata", err)
return
}
})
}
func (s *blockBlobSenderBase) setBlockID(index int32, value string) {
s.muBlockIDs.Lock()
defer s.muBlockIDs.Unlock()
if len(s.blockIDs[index]) > 0 {
panic(errors.New("block id set twice for one block"))
}
s.blockIDs[index] = value
}
func (s *blockBlobSenderBase) generateEncodedBlockID(index int32) string {
return common.GenerateBlockBlobBlockID(s.blockNamePrefix, index)
}
func (s *blockBlobSenderBase) buildCommittedBlockMap() {
invalidAzCopyBlockNameMsg := "buildCommittedBlockMap: Found blocks which are not committed by AzCopy. Restarting whole file"
changedChunkSize := "buildCommittedBlockMap: Chunksize mismatch on uncommitted blocks"
list := make(map[int]string)
if common.GetEnvironmentVariable(common.EEnvironmentVariable.DisableBlobTransferResume()) == "true" {
return
}
blockList, err := s.destBlockBlobClient.GetBlockList(s.jptm.Context(), blockblob.BlockListTypeUncommitted, nil)
if err != nil {
s.jptm.LogAtLevelForCurrentTransfer(common.LogError, "Failed to get blocklist. Restarting whole file.")
return
}
if len(blockList.UncommittedBlocks) == 0 {
s.jptm.LogAtLevelForCurrentTransfer(common.LogDebug, "No uncommitted chunks found.")
return
}
// We return empty list if
// 1. We find chunks by a different actor
// 2. Chunk size differs
for _, block := range blockList.UncommittedBlocks {
name := common.IffNotNil(block.Name, "")
size := common.IffNotNil(block.Size, 0)
if len(name) != common.AZCOPY_BLOCKNAME_LENGTH {
s.jptm.LogAtLevelForCurrentTransfer(common.LogDebug, invalidAzCopyBlockNameMsg)
return
}
tmp, err := base64.StdEncoding.DecodeString(name)
decodedBlockName := string(tmp)
if err != nil || !strings.HasPrefix(decodedBlockName, s.blockNamePrefix) {
s.jptm.LogAtLevelForCurrentTransfer(common.LogDebug, invalidAzCopyBlockNameMsg)
return
}
index, err := strconv.Atoi(decodedBlockName[len(s.blockNamePrefix):])
if err != nil || index < 0 || index > int(s.numChunks) {
s.jptm.LogAtLevelForCurrentTransfer(common.LogDebug, invalidAzCopyBlockNameMsg)
return
}
// Last chunk may have different blockSize
if size != s.ChunkSize() && index != int(s.numChunks) {
s.jptm.LogAtLevelForCurrentTransfer(common.LogDebug, changedChunkSize)
return
}
list[index] = decodedBlockName
}
// We are here only if all the uncommitted blocks are uploaded by this job with same blockSize
s.completedBlockList = list
}
func (s *blockBlobSenderBase) ChunkAlreadyTransferred(index int32) bool {
if s.completedBlockList != nil {
return false
}
_, ok := s.completedBlockList[int(index)]
return ok
}
// GetDestinationLength gets the destination length.
func (s *blockBlobSenderBase) GetDestinationLength() (int64, error) {
prop, err := s.destBlockBlobClient.GetProperties(s.jptm.Context(), &blob.GetPropertiesOptions{CPKInfo: s.jptm.CpkInfo()})
if err != nil {
return -1, err
}
if prop.ContentLength == nil {
return -1, fmt.Errorf("destination content length not returned")
}
return *prop.ContentLength, nil
}
func (s *blockBlobSenderBase) DeleteDstBlob() {
// Delete destination blob with uncommitted blocks, called in Prologue
resp, err := s.destBlockBlobClient.GetBlockList(s.jptm.Context(), blockblob.BlockListTypeUncommitted, nil)
if err != nil {
s.jptm.LogError(s.destBlockBlobClient.URL(), "GetBlockList with Uncommitted BlockListType failed ", err)
}
if len(resp.UncommittedBlocks) > 0 {
_, err := s.destBlockBlobClient.Delete(s.jptm.Context(), nil)
if err != nil {
s.jptm.LogError(s.destBlockBlobClient.URL(), "Deleting destination blob with uncommitted blocks failed ", err)
}
}
}