in e2etest/scenario_helpers.go [421:639]
func (scenarioHelper) generateBlobsFromList(c asserter, options *generateBlobFromListOptions) {
for _, b := range options.fs {
switch b.creationProperties.entityType {
case common.EEntityType.Folder(): // it's fine to create folders even when we're not explicitly testing them, UNLESS we're testing CPK-- AzCopy can't properly pick that up!
if options.cpkInfo != nil || b.name == "" {
continue // can't write root, and can't handle dirs with CPK
}
if b.creationProperties.nameValueMetadata == nil {
b.creationProperties.nameValueMetadata = map[string]*string{}
}
b.body = make([]byte, 0)
b.creationProperties.nameValueMetadata[common.POSIXFolderMeta] = to.Ptr("true")
mode := uint64(os.FileMode(common.DEFAULT_FILE_PERM) | os.ModeDir)
b.creationProperties.nameValueMetadata[common.POSIXModeMeta] = to.Ptr(strconv.FormatUint(mode, 10))
b.creationProperties.posixProperties.AddToMetadata(b.creationProperties.nameValueMetadata)
case common.EEntityType.Symlink():
if b.creationProperties.nameValueMetadata == nil {
b.creationProperties.nameValueMetadata = map[string]*string{}
}
b.body = []byte(*b.creationProperties.symlinkTarget)
b.creationProperties.nameValueMetadata[common.POSIXSymlinkMeta] = to.Ptr("true")
mode := uint64(os.FileMode(common.DEFAULT_FILE_PERM) | os.ModeSymlink)
b.creationProperties.nameValueMetadata[common.POSIXModeMeta] = to.Ptr(strconv.FormatUint(mode, 10))
b.creationProperties.posixProperties.AddToMetadata(b.creationProperties.nameValueMetadata)
default:
if b.creationProperties.nameValueMetadata == nil {
b.creationProperties.nameValueMetadata = map[string]*string{}
}
b.creationProperties.posixProperties.AddToMetadata(b.creationProperties.nameValueMetadata)
if b.creationProperties.posixProperties != nil && b.creationProperties.posixProperties.mode != nil {
mode := *b.creationProperties.posixProperties.mode
// todo: support for device rep files may be difficult in a testing environment.
if mode&common.S_IFSOCK == common.S_IFSOCK || mode&common.S_IFIFO == common.S_IFIFO {
b.body = make([]byte, 0)
}
}
}
blobHadBody := b.body != nil
versionsRequested := common.IffNotNil[uint](b.creationProperties.blobVersions, 1)
versionsCreated := uint(0)
for versionsCreated < versionsRequested {
versionsCreated++
ad := blobResourceAdapter{b}
var reader io.ReadSeekCloser
var size int
var sourceData []byte
if b.body != nil && blobHadBody {
reader = streaming.NopCloser(bytes.NewReader(b.body))
sourceData = b.body
size = len(b.body)
} else {
reader, sourceData = getRandomDataAndReader(b.creationProperties.sizeBytes(c, options.defaultSize))
b.body = sourceData // set body
size = len(b.body)
}
if options.compressToGZ {
var buff bytes.Buffer
gz := gzip.NewWriter(&buff)
if _, err := gz.Write([]byte(sourceData)); err != nil {
c.AssertNoErr(err)
}
if err := gz.Close(); err != nil {
c.AssertNoErr(err)
}
if ad.obj.creationProperties.contentHeaders == nil {
ad.obj.creationProperties.contentHeaders = &contentHeaders{}
}
contentEncoding := "gzip"
ad.obj.creationProperties.contentHeaders.contentEncoding = &contentEncoding
b.body = buff.Bytes()
b.name += ".gz"
}
// Setting content MD5
if ad.obj.creationProperties.contentHeaders == nil {
b.creationProperties.contentHeaders = &contentHeaders{}
}
// only set MD5 when we're on the last version
if ad.obj.creationProperties.contentHeaders.contentMD5 == nil && versionsCreated == versionsRequested {
contentMD5 := md5.Sum(sourceData)
ad.obj.creationProperties.contentHeaders.contentMD5 = contentMD5[:]
}
tags := ad.obj.creationProperties.blobTags
metadata := ad.obj.creationProperties.nameValueMetadata
if options.accountType == EAccountType.HierarchicalNamespaceEnabled() {
tags = nil
}
headers := ad.toHeaders()
var err error
switch b.creationProperties.blobType {
case common.EBlobType.BlockBlob(), common.EBlobType.Detect():
bb := options.containerClient.NewBlockBlobClient(b.name)
if size > 0 {
// to prevent the service from erroring out with an improper MD5, we opt to commit a block, then the list.
blockID := base64.StdEncoding.EncodeToString([]byte(uuid.NewString()))
_, err = bb.StageBlock(ctx, blockID, reader,
&blockblob.StageBlockOptions{
CPKInfo: options.cpkInfo,
CPKScopeInfo: options.cpkScopeInfo,
})
c.AssertNoErr(err)
// Commit block list will generate a new version.
_, err = bb.CommitBlockList(ctx,
[]string{blockID},
&blockblob.CommitBlockListOptions{
HTTPHeaders: headers,
Metadata: metadata,
Tier: options.accessTier,
Tags: tags,
CPKInfo: options.cpkInfo,
CPKScopeInfo: options.cpkScopeInfo,
})
c.AssertNoErr(err)
} else { // todo: invalid MD5 on empty blob is impossible like this, but it's doubtful we'll need to support it.
// handle empty blobs
_, err := bb.Upload(ctx, reader,
&blockblob.UploadOptions{
HTTPHeaders: headers,
Metadata: metadata,
Tier: options.accessTier,
Tags: tags,
CPKInfo: options.cpkInfo,
CPKScopeInfo: options.cpkScopeInfo,
})
c.AssertNoErr(err)
}
case common.EBlobType.PageBlob():
// A create call will generate a new version
pb := options.containerClient.NewPageBlobClient(b.name)
_, err := pb.Create(ctx, int64(size),
&pageblob.CreateOptions{
SequenceNumber: to.Ptr(int64(0)),
HTTPHeaders: headers,
Metadata: metadata,
Tags: tags,
CPKInfo: options.cpkInfo,
CPKScopeInfo: options.cpkScopeInfo,
})
c.AssertNoErr(err)
_, err = pb.UploadPages(ctx, reader, blob.HTTPRange{Offset: 0, Count: int64(size)},
&pageblob.UploadPagesOptions{
CPKInfo: options.cpkInfo,
CPKScopeInfo: options.cpkScopeInfo,
})
c.AssertNoErr(err)
case common.EBlobType.AppendBlob():
// A create call will generate a new version
ab := options.containerClient.NewAppendBlobClient(b.name)
_, err := ab.Create(ctx,
&appendblob.CreateOptions{
HTTPHeaders: headers,
Metadata: metadata,
Tags: tags,
CPKInfo: options.cpkInfo,
CPKScopeInfo: options.cpkScopeInfo,
})
c.AssertNoErr(err)
_, err = ab.AppendBlock(ctx, reader,
&appendblob.AppendBlockOptions{
CPKInfo: options.cpkInfo,
CPKScopeInfo: options.cpkScopeInfo,
})
c.AssertNoErr(err)
}
}
if b.creationProperties.adlsPermissionsACL != nil {
bfsURLParts, err := azdatalake.ParseURL(options.rawSASURL.String())
c.AssertNoErr(err)
bfsURLParts.Host = strings.Replace(bfsURLParts.Host, ".blob", ".dfs", 1)
fsc, err := filesystem.NewClientWithNoCredential(bfsURLParts.String(), nil)
c.AssertNoErr(err)
if b.isFolder() {
dc := fsc.NewDirectoryClient(b.name)
_, err = dc.SetAccessControl(ctx,
&datalakedirectory.SetAccessControlOptions{ACL: b.creationProperties.adlsPermissionsACL})
} else {
d, f := path.Split(b.name)
dc := fsc.NewDirectoryClient(d)
fc, err := dc.NewFileClient(f)
c.AssertNoErr(err)
_, err = fc.SetAccessControl(ctx,
&datalakefile.SetAccessControlOptions{ACL: b.creationProperties.adlsPermissionsACL})
}
c.AssertNoErr(err)
}
}
// sleep a bit so that the blobs' lmts are guaranteed to be in the past
// TODO: can we make it so that this sleeping only happens when we really need it to?
time.Sleep(time.Millisecond * 1050)
}