in component/azstorage/block_blob.go [1495:1548]
func (bb *BlockBlob) StageAndCommit(name string, bol *common.BlockOffsetList) error {
// lock on the blob name so that no stage and commit race condition occur causing failure
blobMtx := bb.blockLocks.GetLock(name)
blobMtx.Lock()
defer blobMtx.Unlock()
blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, name))
var blockIDList []string
var data []byte
staged := false
for _, blk := range bol.BlockList {
blockIDList = append(blockIDList, blk.Id)
if blk.Truncated() {
data = make([]byte, blk.EndIndex-blk.StartIndex)
blk.Flags.Clear(common.TruncatedBlock)
} else {
data = blk.Data
}
if blk.Dirty() {
_, err := blobClient.StageBlock(context.Background(),
blk.Id,
streaming.NopCloser(bytes.NewReader(data)),
&blockblob.StageBlockOptions{
CPKInfo: bb.blobCPKOpt,
})
if err != nil {
log.Err("BlockBlob::StageAndCommit : Failed to stage to blob %s with ID %s at block %v [%s]", name, blk.Id, blk.StartIndex, err.Error())
return err
}
staged = true
blk.Flags.Clear(common.DirtyBlock)
} else if blk.Removed() {
staged = true
}
}
if staged {
_, err := blobClient.CommitBlockList(context.Background(),
blockIDList,
&blockblob.CommitBlockListOptions{
HTTPHeaders: &blob.HTTPHeaders{
BlobContentType: to.Ptr(getContentType(name)),
},
Tier: bb.Config.defaultTier,
CPKInfo: bb.blobCPKOpt,
// AccessConditions: &blob.AccessConditions{ModifiedAccessConditions: &blob.ModifiedAccessConditions{IfMatch: bol.Etag}},
})
if err != nil {
log.Err("BlockBlob::StageAndCommit : Failed to commit block list to blob %s [%s]", name, err.Error())
return err
}
// update the etag
// bol.Etag = resp.ETag()
}
return nil
}