ste/sender-appendBlob.go (185 lines of code) (raw):
// Copyright © 2017 Microsoft <wastore@microsoft.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package ste
import (
"bytes"
"context"
"crypto/md5"
"fmt"
"io"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"golang.org/x/sync/semaphore"
"github.com/Azure/azure-storage-azcopy/v10/common"
)
type appendBlobSenderBase struct {
jptm IJobPartTransferMgr
destAppendBlobClient *appendblob.Client
chunkSize int64
numChunks uint32
pacer pacer
// Headers and other info that we will apply to the destination
// object. For S2S, these come from the source service.
// When sending local data, they are computed based on
// the properties of the local file
headersToApply blob.HTTPHeaders
metadataToApply common.Metadata
blobTagsToApply common.BlobTags
sip ISourceInfoProvider
soleChunkFuncSemaphore *semaphore.Weighted
}
type appendBlockFunc = func()
func newAppendBlobSenderBase(jptm IJobPartTransferMgr, destination string, pacer pacer, srcInfoProvider ISourceInfoProvider) (*appendBlobSenderBase, error) {
transferInfo := jptm.Info()
// compute chunk count
chunkSize := transferInfo.BlockSize
// If the given chunk Size for the Job is greater than maximum append blob block size i.e common.MaxAppendBlobBlockSize,
// then set chunkSize as common.MaxAppendBlobBlockSize.
chunkSize = common.Iff(
chunkSize > common.MaxAppendBlobBlockSize,
common.MaxAppendBlobBlockSize,
chunkSize)
srcSize := transferInfo.SourceSize
numChunks := getNumChunks(srcSize, chunkSize, chunkSize)
bsc, err := jptm.DstServiceClient().BlobServiceClient()
if err != nil {
return nil, err
}
destAppendBlobClient := bsc.NewContainerClient(transferInfo.DstContainer).NewAppendBlobClient(transferInfo.DstFilePath)
props, err := srcInfoProvider.Properties()
if err != nil {
return nil, err
}
return &appendBlobSenderBase{
jptm: jptm,
destAppendBlobClient: destAppendBlobClient,
chunkSize: chunkSize,
numChunks: numChunks,
pacer: pacer,
headersToApply: props.SrcHTTPHeaders.ToBlobHTTPHeaders(),
metadataToApply: props.SrcMetadata,
blobTagsToApply: props.SrcBlobTags,
sip: srcInfoProvider,
soleChunkFuncSemaphore: semaphore.NewWeighted(1)}, nil
}
func (s *appendBlobSenderBase) SendableEntityType() common.EntityType {
return common.EEntityType.File()
}
func (s *appendBlobSenderBase) ChunkSize() int64 {
return s.chunkSize
}
func (s *appendBlobSenderBase) NumChunks() uint32 {
return s.numChunks
}
func (s *appendBlobSenderBase) RemoteFileExists() (bool, time.Time, error) {
properties, err := s.destAppendBlobClient.GetProperties(s.jptm.Context(), &blob.GetPropertiesOptions{CPKInfo: s.jptm.CpkInfo()})
return remoteObjectExists(blobPropertiesResponseAdapter{properties}, err)
}
// Returns a chunk-func for sending append blob to remote
func (s *appendBlobSenderBase) generateAppendBlockToRemoteFunc(id common.ChunkID, appendBlock appendBlockFunc) chunkFunc {
// Copy must be totally sequential for append blobs
// The way we enforce that is simple: we won't even CREATE
// a chunk func, until all previously-scheduled chunk funcs have completed
// Here we block until there are no other chunkfuncs in existence for this blob
err := s.soleChunkFuncSemaphore.Acquire(s.jptm.Context(), 1)
if err != nil {
// Must have been cancelled
// We must still return a chunk func, so return a no-op one
return createSendToRemoteChunkFunc(s.jptm, id, func() {})
}
return createSendToRemoteChunkFunc(s.jptm, id, func() {
// Here, INSIDE the chunkfunc, we release the semaphore when we have finished running
defer s.soleChunkFuncSemaphore.Release(1)
jptm := s.jptm
if jptm.Info().SourceSize == 0 {
// nothing to do, since this is a dummy chunk in a zero-size file, and the prologue will have done all the real work
return
}
appendBlock()
})
}
func (s *appendBlobSenderBase) Prologue(ps common.PrologueState) (destinationModified bool) {
if s.jptm.ShouldInferContentType() {
// sometimes, specifically when reading local files, we have more info
// about the file type at this time than what we had before
s.headersToApply.BlobContentType = ps.GetInferredContentType(s.jptm)
}
blobTags := s.blobTagsToApply
setTags := separateSetTagsRequired(blobTags)
if setTags || len(blobTags) == 0 {
blobTags = nil
}
_, err := s.destAppendBlobClient.Create(s.jptm.Context(), &appendblob.CreateOptions{
HTTPHeaders: &s.headersToApply,
Metadata: s.metadataToApply,
Tags: blobTags,
CPKInfo: s.jptm.CpkInfo(),
CPKScopeInfo: s.jptm.CpkScopeInfo(),
})
if err != nil {
s.jptm.FailActiveSend(common.Iff(len(blobTags) > 0, "Creating blob (with tags)", "Creating blob"), err)
return
}
destinationModified = true
if setTags {
_, err = s.destAppendBlobClient.SetTags(s.jptm.Context(), s.blobTagsToApply, nil)
if err != nil {
s.jptm.FailActiveSend("Set blob tags", err)
}
}
return
}
func (s *appendBlobSenderBase) Epilogue() {
// Empty function because you don't have to commit on an append blob
}
func (s *appendBlobSenderBase) Cleanup() {
jptm := s.jptm
// Cleanup
if jptm.IsDeadInflight() {
// There is a possibility that some uncommitted blocks will be there
// Delete the uncommitted blobs
// TODO: particularly, given that this is an APPEND blob, do we really need to delete it? But if we don't delete it,
// it will still be in an ambiguous situation with regard to how much has been added to it. Probably best to delete
// to be consistent with other
deletionContext, cancelFunc := context.WithTimeout(context.WithValue(context.Background(), ServiceAPIVersionOverride, DefaultServiceApiVersion), 30*time.Second)
defer cancelFunc()
_, err := s.destAppendBlobClient.Delete(deletionContext, nil)
if err != nil {
jptm.LogError(s.destAppendBlobClient.URL(), "Delete (incomplete) Append Blob ", err)
}
}
}
// GetDestinationLength gets the destination length.
func (s *appendBlobSenderBase) GetDestinationLength() (int64, error) {
prop, err := s.destAppendBlobClient.GetProperties(s.jptm.Context(), &blob.GetPropertiesOptions{CPKInfo: s.jptm.CpkInfo()})
if err != nil {
return -1, err
}
if prop.ContentLength == nil {
return -1, fmt.Errorf("destination content length not returned")
}
return *prop.ContentLength, nil
}
func (s *appendBlobSenderBase) GetMD5(offset, count int64) ([]byte, error) {
var rangeGetContentMD5 *bool
if count <= common.MaxRangeGetSize {
rangeGetContentMD5 = to.Ptr(true)
}
response, err := s.destAppendBlobClient.DownloadStream(s.jptm.Context(),
&blob.DownloadStreamOptions{
Range: blob.HTTPRange{Offset: offset, Count: count},
RangeGetContentMD5: rangeGetContentMD5,
CPKInfo: s.jptm.CpkInfo(),
CPKScopeInfo: s.jptm.CpkScopeInfo(),
})
if err != nil {
return nil, err
}
if len(response.ContentMD5) > 0 {
return response.ContentMD5, nil
} else {
// compute md5
body := response.NewRetryReader(s.jptm.Context(), &blob.RetryReaderOptions{MaxRetries: MaxRetryPerDownloadBody})
defer body.Close()
h := md5.New()
if _, err = io.Copy(h, body); err != nil {
return nil, err
}
return h.Sum(nil), nil
}
}
func (s *appendBlobSenderBase) transformAppendConditionMismatchError(timeoutFromCtx bool, offset, count int64, err error) (string, error) {
if err != nil && bloberror.HasCode(err, bloberror.AppendPositionConditionNotMet) && timeoutFromCtx {
if _, ok := s.sip.(benchmarkSourceInfoProvider); ok {
// If the source is a benchmark, then we don't need to check MD5 since the data is constantly changing. This is OK.
return "", nil
}
// Download Range of last append
destMD5, destErr := s.GetMD5(offset, count)
if destErr != nil {
return ", get destination md5 after timeout", destErr
}
sourceMD5, sourceErr := s.sip.GetMD5(offset, count)
if sourceErr != nil {
return ", get source md5 after timeout", sourceErr
}
if destMD5 != nil && sourceMD5 != nil && len(destMD5) > 0 && len(sourceMD5) > 0 {
// Compare MD5
if bytes.Equal(destMD5, sourceMD5) {
return "", nil
}
}
}
return "", err
}