func()

in e2etest/scenario_helpers.go [421:639]


func (scenarioHelper) generateBlobsFromList(c asserter, options *generateBlobFromListOptions) {
	for _, b := range options.fs {
		switch b.creationProperties.entityType {
		case common.EEntityType.Folder(): // it's fine to create folders even when we're not explicitly testing them, UNLESS we're testing CPK-- AzCopy can't properly pick that up!
			if options.cpkInfo != nil || b.name == "" {
				continue // can't write root, and can't handle dirs with CPK
			}

			if b.creationProperties.nameValueMetadata == nil {
				b.creationProperties.nameValueMetadata = map[string]*string{}
			}

			b.body = make([]byte, 0)
			b.creationProperties.nameValueMetadata[common.POSIXFolderMeta] = to.Ptr("true")
			mode := uint64(os.FileMode(common.DEFAULT_FILE_PERM) | os.ModeDir)
			b.creationProperties.nameValueMetadata[common.POSIXModeMeta] = to.Ptr(strconv.FormatUint(mode, 10))
			b.creationProperties.posixProperties.AddToMetadata(b.creationProperties.nameValueMetadata)
		case common.EEntityType.Symlink():
			if b.creationProperties.nameValueMetadata == nil {
				b.creationProperties.nameValueMetadata = map[string]*string{}
			}

			b.body = []byte(*b.creationProperties.symlinkTarget)
			b.creationProperties.nameValueMetadata[common.POSIXSymlinkMeta] = to.Ptr("true")
			mode := uint64(os.FileMode(common.DEFAULT_FILE_PERM) | os.ModeSymlink)
			b.creationProperties.nameValueMetadata[common.POSIXModeMeta] = to.Ptr(strconv.FormatUint(mode, 10))
			b.creationProperties.posixProperties.AddToMetadata(b.creationProperties.nameValueMetadata)
		default:
			if b.creationProperties.nameValueMetadata == nil {
				b.creationProperties.nameValueMetadata = map[string]*string{}
			}

			b.creationProperties.posixProperties.AddToMetadata(b.creationProperties.nameValueMetadata)

			if b.creationProperties.posixProperties != nil && b.creationProperties.posixProperties.mode != nil {
				mode := *b.creationProperties.posixProperties.mode

				// todo: support for device rep files may be difficult in a testing environment.
				if mode&common.S_IFSOCK == common.S_IFSOCK || mode&common.S_IFIFO == common.S_IFIFO {
					b.body = make([]byte, 0)
				}
			}
		}

		blobHadBody := b.body != nil
		versionsRequested := common.IffNotNil[uint](b.creationProperties.blobVersions, 1)
		versionsCreated := uint(0)

		for versionsCreated < versionsRequested {
			versionsCreated++

			ad := blobResourceAdapter{b}
			var reader io.ReadSeekCloser
			var size int
			var sourceData []byte
			if b.body != nil && blobHadBody {
				reader = streaming.NopCloser(bytes.NewReader(b.body))
				sourceData = b.body
				size = len(b.body)
			} else {
				reader, sourceData = getRandomDataAndReader(b.creationProperties.sizeBytes(c, options.defaultSize))
				b.body = sourceData // set body
				size = len(b.body)
			}

			if options.compressToGZ {
				var buff bytes.Buffer
				gz := gzip.NewWriter(&buff)
				if _, err := gz.Write([]byte(sourceData)); err != nil {
					c.AssertNoErr(err)
				}
				if err := gz.Close(); err != nil {
					c.AssertNoErr(err)
				}
				if ad.obj.creationProperties.contentHeaders == nil {
					ad.obj.creationProperties.contentHeaders = &contentHeaders{}
				}
				contentEncoding := "gzip"
				ad.obj.creationProperties.contentHeaders.contentEncoding = &contentEncoding
				b.body = buff.Bytes()
				b.name += ".gz"
			}

			// Setting content MD5
			if ad.obj.creationProperties.contentHeaders == nil {
				b.creationProperties.contentHeaders = &contentHeaders{}
			}
			// only set MD5 when we're on the last version
			if ad.obj.creationProperties.contentHeaders.contentMD5 == nil && versionsCreated == versionsRequested {
				contentMD5 := md5.Sum(sourceData)
				ad.obj.creationProperties.contentHeaders.contentMD5 = contentMD5[:]
			}

			tags := ad.obj.creationProperties.blobTags
			metadata := ad.obj.creationProperties.nameValueMetadata

			if options.accountType == EAccountType.HierarchicalNamespaceEnabled() {
				tags = nil
			}

			headers := ad.toHeaders()

			var err error

			switch b.creationProperties.blobType {
			case common.EBlobType.BlockBlob(), common.EBlobType.Detect():
				bb := options.containerClient.NewBlockBlobClient(b.name)

				if size > 0 {
					// to prevent the service from erroring out with an improper MD5, we opt to commit a block, then the list.
					blockID := base64.StdEncoding.EncodeToString([]byte(uuid.NewString()))
					_, err = bb.StageBlock(ctx, blockID, reader,
						&blockblob.StageBlockOptions{
							CPKInfo:      options.cpkInfo,
							CPKScopeInfo: options.cpkScopeInfo,
						})

					c.AssertNoErr(err)

					// Commit block list will generate a new version.
					_, err = bb.CommitBlockList(ctx,
						[]string{blockID},
						&blockblob.CommitBlockListOptions{
							HTTPHeaders:  headers,
							Metadata:     metadata,
							Tier:         options.accessTier,
							Tags:         tags,
							CPKInfo:      options.cpkInfo,
							CPKScopeInfo: options.cpkScopeInfo,
						})

					c.AssertNoErr(err)
				} else { // todo: invalid MD5 on empty blob is impossible like this, but it's doubtful we'll need to support it.
					// handle empty blobs
					_, err := bb.Upload(ctx, reader,
						&blockblob.UploadOptions{
							HTTPHeaders:  headers,
							Metadata:     metadata,
							Tier:         options.accessTier,
							Tags:         tags,
							CPKInfo:      options.cpkInfo,
							CPKScopeInfo: options.cpkScopeInfo,
						})

					c.AssertNoErr(err)
				}
			case common.EBlobType.PageBlob():
				// A create call will generate a new version
				pb := options.containerClient.NewPageBlobClient(b.name)
				_, err := pb.Create(ctx, int64(size),
					&pageblob.CreateOptions{
						SequenceNumber: to.Ptr(int64(0)),
						HTTPHeaders:    headers,
						Metadata:       metadata,
						Tags:           tags,
						CPKInfo:        options.cpkInfo,
						CPKScopeInfo:   options.cpkScopeInfo,
					})
				c.AssertNoErr(err)

				_, err = pb.UploadPages(ctx, reader, blob.HTTPRange{Offset: 0, Count: int64(size)},
					&pageblob.UploadPagesOptions{
						CPKInfo:      options.cpkInfo,
						CPKScopeInfo: options.cpkScopeInfo,
					})
				c.AssertNoErr(err)
			case common.EBlobType.AppendBlob():
				// A create call will generate a new version
				ab := options.containerClient.NewAppendBlobClient(b.name)
				_, err := ab.Create(ctx,
					&appendblob.CreateOptions{
						HTTPHeaders:  headers,
						Metadata:     metadata,
						Tags:         tags,
						CPKInfo:      options.cpkInfo,
						CPKScopeInfo: options.cpkScopeInfo,
					})
				c.AssertNoErr(err)

				_, err = ab.AppendBlock(ctx, reader,
					&appendblob.AppendBlockOptions{
						CPKInfo:      options.cpkInfo,
						CPKScopeInfo: options.cpkScopeInfo,
					})
				c.AssertNoErr(err)
			}
		}

		if b.creationProperties.adlsPermissionsACL != nil {
			bfsURLParts, err := azdatalake.ParseURL(options.rawSASURL.String())
			c.AssertNoErr(err)
			bfsURLParts.Host = strings.Replace(bfsURLParts.Host, ".blob", ".dfs", 1)

			fsc, err := filesystem.NewClientWithNoCredential(bfsURLParts.String(), nil)
			c.AssertNoErr(err)

			if b.isFolder() {
				dc := fsc.NewDirectoryClient(b.name)

				_, err = dc.SetAccessControl(ctx,
					&datalakedirectory.SetAccessControlOptions{ACL: b.creationProperties.adlsPermissionsACL})
			} else {
				d, f := path.Split(b.name)
				dc := fsc.NewDirectoryClient(d)
				fc, err := dc.NewFileClient(f)
				c.AssertNoErr(err)

				_, err = fc.SetAccessControl(ctx,
					&datalakefile.SetAccessControlOptions{ACL: b.creationProperties.adlsPermissionsACL})
			}

			c.AssertNoErr(err)
		}
	}

	// sleep a bit so that the blobs' lmts are guaranteed to be in the past
	// TODO: can we make it so that this sleeping only happens when we really need it to?
	time.Sleep(time.Millisecond * 1050)
}