in e2etest/newe2e_resource_managers_blob.go [470:630]
func (b *BlobObjectResourceManager) CreateWithOptions(a Asserter, body ObjectContentContainer, properties ObjectProperties, options *BlobObjectCreateOptions) {
a.HelperMarker().Helper()
opts := DerefOrZero(options)
blobProps := properties.BlobProperties
copyMeta := func() common.Metadata {
out := make(common.Metadata)
for k, v := range properties.Metadata {
out[k] = pointerTo(*v) // deep copy props
}
return out
}
switch b.entityType {
case common.EEntityType.Folder():
// Override body; must be empty
body = NewZeroObjectContentContainer(0)
// Set folder meta
properties.Metadata = copyMeta()
properties.Metadata[common.POSIXFolderMeta] = pointerTo("true")
// Override blob type
properties.BlobProperties.Type = pointerTo(blob.BlobTypeBlockBlob)
case common.EEntityType.Symlink():
// body should already be path
// Set symlink meta
properties.Metadata = copyMeta()
properties.Metadata[common.POSIXSymlinkMeta] = pointerTo("true")
// Override blob type
properties.BlobProperties.Type = pointerTo(blob.BlobTypeBlockBlob)
case common.EEntityType.File(): // no-op
}
switch DerefOrZero(blobProps.Type) {
case "", blob.BlobTypeBlockBlob:
blockSize := DerefOrDefault(opts.BlockSize, common.DefaultBlockBlobBlockSize)
bodySize := body.Size()
if bodySize < blockSize*common.MaxNumberOfBlocksPerBlob {
// resize until fits
for ; bodySize >= common.MaxNumberOfBlocksPerBlob*blockSize; blockSize = 2 * blockSize {
}
}
_, err := b.Container.InternalClient.NewBlockBlobClient(b.Path).UploadStream(ctx, body.Reader(), &blockblob.UploadStreamOptions{
BlockSize: blockSize,
Concurrency: runtime.NumCPU(),
TransactionalValidation: blob.TransferValidationTypeComputeCRC64(),
HTTPHeaders: properties.HTTPHeaders.ToBlob(),
Metadata: properties.Metadata,
AccessTier: blobProps.BlockBlobAccessTier,
Tags: blobProps.Tags,
CPKInfo: opts.CpkOptions.GetCPKInfo(),
CPKScopeInfo: opts.CpkOptions.GetCPKScopeInfo(),
})
a.NoError("Block blob upload", err)
case blob.BlobTypePageBlob:
// TODO : Investigate bug in multistep uploader for PageBlob. (WI 28334208)
client := b.Container.InternalClient.NewPageBlobClient(b.Path)
blockSize := DerefOrDefault(opts.BlockSize, common.DefaultPageBlobChunkSize)
size := body.Size()
_, err := client.Create(
ctx,
size,
&pageblob.CreateOptions{
Tags: blobProps.Tags,
Metadata: properties.Metadata,
Tier: blobProps.PageBlobAccessTier,
HTTPHeaders: properties.HTTPHeaders.ToBlob(),
CPKInfo: opts.CpkOptions.GetCPKInfo(),
CPKScopeInfo: opts.CpkOptions.GetCPKScopeInfo(),
})
a.NoError("Page blob create", err)
msu := &MultiStepUploader{BlockSize: blockSize}
blockCount := msu.GetBlockCount(size)
reader := body.Reader()
offset := int64(0)
blockIndex := int64(0)
for range blockCount {
buf := make([]byte, blockSize)
n, err := reader.Read(buf)
if err != nil && err != io.EOF {
a.Assert(fmt.Sprintf("failed to read content (offset %d (block %d/%d), total %d): %s", offset, blockIndex, blockCount, size, err.Error()), Equal{}, true)
}
buf = buf[:n] // reduce buffer size for block
_, err = client.UploadPages(
ctx,
streaming.NopCloser(bytes.NewReader(buf)),
blob.HTTPRange{Offset: offset, Count: int64(n)},
&pageblob.UploadPagesOptions{
TransactionalValidation: blob.TransferValidationTypeComputeCRC64(),
CPKInfo: opts.CpkOptions.GetCPKInfo(),
CPKScopeInfo: opts.CpkOptions.GetCPKScopeInfo(),
})
a.NoError("Page blob upload", err)
offset += int64(n)
blockIndex++
}
case blob.BlobTypeAppendBlob:
// TODO : Investigate bug in multistep uploader for AppendBlob. (WI 28334208)
blockSize := DerefOrDefault(opts.BlockSize, common.DefaultBlockBlobBlockSize)
size := body.Size()
if size < blockSize*common.MaxNumberOfBlocksPerBlob {
// resize until fits
for ; size >= common.MaxNumberOfBlocksPerBlob*blockSize; blockSize = 2 * blockSize {
}
}
client := b.Container.InternalClient.NewAppendBlobClient(b.Path)
_, err := client.Create(ctx, &appendblob.CreateOptions{
HTTPHeaders: properties.HTTPHeaders.ToBlob(),
CPKInfo: opts.CpkOptions.GetCPKInfo(),
CPKScopeInfo: opts.CpkOptions.GetCPKScopeInfo(),
Tags: blobProps.Tags,
Metadata: properties.Metadata,
})
a.NoError("Append blob create", err)
msu := &MultiStepUploader{BlockSize: blockSize}
blockCount := msu.GetBlockCount(size)
reader := body.Reader()
offset := int64(0)
blockIndex := int64(0)
for range blockCount {
buf := make([]byte, blockSize)
n, err := reader.Read(buf)
if err != nil && err != io.EOF {
a.Assert(fmt.Sprintf("failed to read content (offset %d (block %d/%d), total %d): %s", offset, blockIndex, blockCount, size, err.Error()), Equal{}, true)
}
buf = buf[:n] // reduce buffer size for block
_, err = client.AppendBlock(ctx, streaming.NopCloser(bytes.NewReader(buf)), &appendblob.AppendBlockOptions{
TransactionalValidation: blob.TransferValidationTypeComputeCRC64(),
AppendPositionAccessConditions: &appendblob.AppendPositionAccessConditions{
AppendPosition: pointerTo(offset),
MaxSize: pointerTo(offset + int64(n)),
},
CPKInfo: opts.CpkOptions.GetCPKInfo(),
CPKScopeInfo: opts.CpkOptions.GetCPKScopeInfo(),
})
a.NoError("Append blob upload", err)
offset += int64(n)
blockIndex++
}
}
TrackResourceCreation(a, b)
}