e2etest/newe2e_resource_managers_blobfs.go (459 lines of code) (raw):

package e2etest import ( "bytes" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/datalakeerror" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/directory" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/filesystem" datalakeSAS "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/sas" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/service" "github.com/Azure/azure-storage-azcopy/v10/cmd" "github.com/Azure/azure-storage-azcopy/v10/common" "io" "path" "runtime" "strings" ) // check that everything aligns with interfaces func init() { void := func(_ ...any) {} // prevent go from erroring from unused vars var sm ServiceResourceManager = &BlobFSServiceResourceManager{} var cm ContainerResourceManager = &BlobFSFileSystemResourceManager{} var om ObjectResourceManager = &BlobFSPathResourceProvider{} var rrm RemoteResourceManager rrm = &BlobFSServiceResourceManager{} rrm = &BlobFSFileSystemResourceManager{} rrm = &BlobFSPathResourceProvider{} void(rrm, sm, cm, om) } func dfsStripSAS(uri string) string { parts, err := datalakeSAS.ParseURL(uri) common.PanicIfErr(err) parts.SAS = datalakeSAS.QueryParameters{} // remove SAS return parts.String() } type BlobFSServiceResourceManager struct { InternalAccount *AzureAccountResourceManager InternalClient *service.Client } func (b *BlobFSServiceResourceManager) DefaultAuthType() ExplicitCredentialTypes { // This is what we primarily want to support, despite also supporting AcctKey. return EExplicitCredentialType.SASToken() } func (b *BlobFSServiceResourceManager) WithSpecificAuthType(cred ExplicitCredentialTypes, a Asserter, opts ...CreateAzCopyTargetOptions) AzCopyTarget { return CreateAzCopyTarget(b, cred, a, opts...) } func (b *BlobFSServiceResourceManager) ValidAuthTypes() ExplicitCredentialTypes { return EExplicitCredentialType.With(EExplicitCredentialType.OAuth(), EExplicitCredentialType.SASToken(), EExplicitCredentialType.AcctKey()) } func (b *BlobFSServiceResourceManager) Canon() string { return buildCanonForAzureResourceManager(b) } func (b *BlobFSServiceResourceManager) Parent() ResourceManager { return nil } func (b *BlobFSServiceResourceManager) Account() AccountResourceManager { return b.InternalAccount } func (b *BlobFSServiceResourceManager) Location() common.Location { return common.ELocation.BlobFS() } func (b *BlobFSServiceResourceManager) Level() cmd.LocationLevel { return cmd.ELocationLevel.Service() } func (b *BlobFSServiceResourceManager) URI(opts ...GetURIOptions) string { base := dfsStripSAS(b.InternalClient.DFSURL()) base = b.InternalAccount.ApplySAS(base, b.Location(), opts...) base = addWildCard(base, opts...) return base } func (b *BlobFSServiceResourceManager) ResourceClient() any { return b.InternalClient } func (b *BlobFSServiceResourceManager) ListContainers(a Asserter) []string { a.HelperMarker().Helper() pager := b.InternalClient.NewListFileSystemsPager(nil) out := make([]string, 0) for pager.More() { page, err := pager.NextPage(ctx) a.NoError("Get filesystems page", err) for _, v := range page.FileSystemItems { if v == nil || v.Name == nil { continue } out = append(out, *v.Name) } } return out } func (b *BlobFSServiceResourceManager) GetContainer(containerName string) ContainerResourceManager { return &BlobFSFileSystemResourceManager{ internalAccount: b.InternalAccount, Service: b, containerName: containerName, internalClient: b.InternalClient.NewFileSystemClient(containerName), } } func (b *BlobFSServiceResourceManager) IsHierarchical() bool { return true } type BlobFSFileSystemResourceManager struct { internalAccount *AzureAccountResourceManager Service *BlobFSServiceResourceManager containerName string internalClient *filesystem.Client } func (b *BlobFSFileSystemResourceManager) DefaultAuthType() ExplicitCredentialTypes { return (&BlobFSServiceResourceManager{}).DefaultAuthType() } func (b *BlobFSFileSystemResourceManager) WithSpecificAuthType(cred ExplicitCredentialTypes, a Asserter, opts ...CreateAzCopyTargetOptions) AzCopyTarget { return CreateAzCopyTarget(b, cred, a, opts...) } func (b *BlobFSFileSystemResourceManager) ValidAuthTypes() ExplicitCredentialTypes { return (&BlobFSServiceResourceManager{}).ValidAuthTypes() } func (b *BlobFSFileSystemResourceManager) Canon() string { return buildCanonForAzureResourceManager(b) } func (b *BlobFSFileSystemResourceManager) Exists() bool { _, err := b.internalClient.GetProperties(ctx, nil) return err == nil || !datalakeerror.HasCode(err, datalakeerror.FileSystemNotFound, datalakeerror.FileSystemBeingDeleted, datalakeerror.ResourceNotFound) } func (b *BlobFSFileSystemResourceManager) Parent() ResourceManager { return b.Service } func (b *BlobFSFileSystemResourceManager) Account() AccountResourceManager { return b.internalAccount } func (b *BlobFSFileSystemResourceManager) ResourceClient() any { return b.internalClient } func (b *BlobFSFileSystemResourceManager) Location() common.Location { return b.Service.Location() } func (b *BlobFSFileSystemResourceManager) Level() cmd.LocationLevel { return cmd.ELocationLevel.Container() } func (b *BlobFSFileSystemResourceManager) URI(opts ...GetURIOptions) string { base := dfsStripSAS(b.internalClient.DFSURL()) base = b.internalAccount.ApplySAS(base, b.Location(), opts...) base = addWildCard(base, opts...) return base } func (b *BlobFSFileSystemResourceManager) ContainerName() string { return b.containerName } func (b *BlobFSFileSystemResourceManager) Create(a Asserter, props ContainerProperties) { a.HelperMarker().Helper() b.CreateWithOptions(a, &filesystem.CreateOptions{ Access: props.BlobContainerProperties.Access, Metadata: props.Metadata, CPKScopeInfo: props.BlobContainerProperties.CPKScopeInfo, }) } func (b *BlobFSFileSystemResourceManager) GetProperties(a Asserter) ContainerProperties { a.HelperMarker().Helper() // Same resource, same code. BlobFS SDK can't seem to return these props anyway. return b.Account().GetService(a, common.ELocation.Blob()).GetContainer(b.containerName).GetProperties(a) } func (b *BlobFSFileSystemResourceManager) CreateWithOptions(a Asserter, opts *filesystem.CreateOptions) { a.HelperMarker().Helper() _, err := b.internalClient.Create(ctx, opts) created := true if datalakeerror.HasCode(err, datalakeerror.FileSystemAlreadyExists) { created = false err = nil } a.NoError("Create filesystem", err) if created { TrackResourceCreation(a, b) } } func (b *BlobFSFileSystemResourceManager) Delete(a Asserter) { a.HelperMarker().Helper() b.DeleteWithOptions(a, nil) } func (b *BlobFSFileSystemResourceManager) DeleteWithOptions(a Asserter, opts *filesystem.DeleteOptions) { a.HelperMarker().Helper() _, err := b.internalClient.Delete(ctx, opts) a.NoError("Delete filesystem", err) } func (b *BlobFSFileSystemResourceManager) ListObjects(a Asserter, prefixOrDirectory string, recursive bool) map[string]ObjectProperties { a.HelperMarker().Helper() pager := b.internalClient.NewListPathsPager(recursive, &filesystem.ListPathsOptions{ Prefix: &prefixOrDirectory, }) out := make(map[string]ObjectProperties) for pager.More() { page, err := pager.NextPage(ctx) a.NoError("Get next page", err) for _, v := range page.Paths { out[*v.Name] = ObjectProperties{ EntityType: 0, HTTPHeaders: contentHeaders{}, Metadata: nil, BlobProperties: BlobProperties{}, FileProperties: FileProperties{}, } } } return out } func (b *BlobFSFileSystemResourceManager) GetObject(a Asserter, path string, eType common.EntityType) ObjectResourceManager { return &BlobFSPathResourceProvider{ internalAccount: b.internalAccount, Service: b.Service, Container: b, entityType: eType, objectPath: path, } } type BlobFSPathResourceProvider struct { internalAccount *AzureAccountResourceManager Service *BlobFSServiceResourceManager Container *BlobFSFileSystemResourceManager entityType common.EntityType objectPath string } func (b *BlobFSPathResourceProvider) DefaultAuthType() ExplicitCredentialTypes { return (&BlobFSServiceResourceManager{}).DefaultAuthType() } func (b *BlobFSPathResourceProvider) WithSpecificAuthType(cred ExplicitCredentialTypes, a Asserter, opts ...CreateAzCopyTargetOptions) AzCopyTarget { a.HelperMarker().Helper() return CreateAzCopyTarget(b, cred, a, opts...) } func (b *BlobFSPathResourceProvider) ValidAuthTypes() ExplicitCredentialTypes { return (&BlobFSServiceResourceManager{}).ValidAuthTypes() } func (b *BlobFSPathResourceProvider) Canon() string { return buildCanonForAzureResourceManager(b) } func (b *BlobFSPathResourceProvider) Parent() ResourceManager { return b.Container } func (b *BlobFSPathResourceProvider) Account() AccountResourceManager { return b.internalAccount } func (b *BlobFSPathResourceProvider) ResourceClient() any { switch b.entityType { case common.EEntityType.Folder(): return b.getDirClient() default: // lump files in with other types, because that's how they're implemented in azcopy return b.getFileClient() } } func (b *BlobFSPathResourceProvider) Location() common.Location { return b.Service.Location() } func (b *BlobFSPathResourceProvider) Level() cmd.LocationLevel { return cmd.ELocationLevel.Object() } func (b *BlobFSPathResourceProvider) URI(opts ...GetURIOptions) string { base := dfsStripSAS(b.getFileClient().DFSURL()) // obj type doesn't matter here, URL is the same under the hood base = b.internalAccount.ApplySAS(base, b.Location(), opts...) base = addWildCard(base, opts...) return base } func (b *BlobFSPathResourceProvider) EntityType() common.EntityType { return b.entityType } func (b *BlobFSPathResourceProvider) ContainerName() string { return b.Container.ContainerName() } func (b *BlobFSPathResourceProvider) ObjectName() string { return b.objectPath } func (b *BlobFSPathResourceProvider) CreateParents(a Asserter) { if !b.Container.Exists() { b.Container.Create(a, ContainerProperties{}) } dir, _ := path.Split(b.objectPath) if dir != "" { obj := b.Container.GetObject(a, strings.TrimSuffix(dir, "/"), common.EEntityType.Folder()).(*BlobFSPathResourceProvider) // Create recursively calls this function. if !obj.Exists() { obj.Create(a, nil, ObjectProperties{}) } } } func (b *BlobFSPathResourceProvider) Create(a Asserter, body ObjectContentContainer, properties ObjectProperties) { a.HelperMarker().Helper() b.CreateParents(a) switch b.entityType { case common.EEntityType.Folder(): _, err := b.getDirClient().Create(ctx, &directory.CreateOptions{ HTTPHeaders: properties.HTTPHeaders.ToBlobFS(), Permissions: properties.BlobFSProperties.Permissions, Owner: properties.BlobFSProperties.Owner, Group: properties.BlobFSProperties.Group, ACL: properties.BlobFSProperties.ACL, }) a.NoError("Create directory", err) case common.EEntityType.File(), common.EEntityType.Symlink(): // Symlinks just need an extra metadata tag _, err := b.getFileClient().Create(ctx, &file.CreateOptions{ HTTPHeaders: properties.HTTPHeaders.ToBlobFS(), }) a.NoError("Create file", err) err = b.getFileClient().UploadStream(ctx, body.Reader(), &file.UploadStreamOptions{ Concurrency: uint16(runtime.NumCPU()), HTTPHeaders: properties.HTTPHeaders.ToBlobFS(), }) a.NoError("Upload stream", err) if properties.BlobFSProperties.Owner != nil || properties.BlobFSProperties.Group != nil || properties.BlobFSProperties.Permissions != nil || properties.BlobFSProperties.ACL != nil { _, err = b.getFileClient().SetAccessControl(ctx, &file.SetAccessControlOptions{ // Set access control after we write to prevent locking ourselves out Permissions: properties.BlobFSProperties.Permissions, Owner: properties.BlobFSProperties.Owner, Group: properties.BlobFSProperties.Group, ACL: properties.BlobFSProperties.ACL, }) } a.NoError("Set access control", err) } meta := properties.Metadata if b.entityType == common.EEntityType.Symlink() { meta = make(common.Metadata) for k, v := range properties.Metadata { meta[k] = v } meta[common.POSIXSymlinkMeta] = pointerTo("true") } else if b.entityType == common.EEntityType.Folder() { meta = make(common.Metadata) for k, v := range properties.Metadata { meta[k] = v } meta[common.POSIXFolderMeta] = pointerTo("true") } b.SetMetadata(a, meta) blobClient := b.getBlobClient(a) if properties.BlobProperties.Tags != nil { _, err := blobClient.SetTags(ctx, properties.BlobProperties.Tags, nil) a.NoError("Set tags", err) } if properties.BlobProperties.BlockBlobAccessTier != nil { _, err := blobClient.SetTier(ctx, *properties.BlobProperties.BlockBlobAccessTier, nil) a.NoError("Set tier", err) } TrackResourceCreation(a, b) } func (b *BlobFSPathResourceProvider) Delete(a Asserter) { a.HelperMarker().Helper() var err error switch b.entityType { case common.EEntityType.File(): _, err = b.getFileClient().Delete(ctx, nil) case common.EEntityType.Folder(): _, err = b.getDirClient().Delete(ctx, nil) } if datalakeerror.HasCode(err, datalakeerror.PathNotFound, datalakeerror.ResourceNotFound, datalakeerror.FileSystemNotFound) { err = nil } a.NoError("delete path", err) } func (b *BlobFSPathResourceProvider) ListChildren(a Asserter, recursive bool) map[string]ObjectProperties { a.HelperMarker().Helper() return b.Container.ListObjects(a, b.objectPath, recursive) } func (b *BlobFSPathResourceProvider) GetProperties(a Asserter) ObjectProperties { a.HelperMarker().Helper() return b.GetPropertiesWithOptions(a, nil) } type BlobFSPathGetPropertiesOptions struct { AccessConditions *file.AccessConditions CPKInfo *file.CPKInfo UPN *bool } func (b *BlobFSPathResourceProvider) GetPropertiesWithOptions(a Asserter, options *BlobFSPathGetPropertiesOptions) ObjectProperties { a.HelperMarker().Helper() opts := DerefOrZero(options) var err error var resp file.GetPropertiesResponse // If we're talking about the root, there are no such properties on the blob endpoint. In this case, the only thing that would (or could) be present is access control. if !(b.objectPath == "" || b.objectPath == "/") { // As far as BlobFS (and it's SDK) are concerned, the REST API call is the same for files and directories. Using the same call doesn't hurt. resp, err = b.getFileClient().GetProperties(ctx, &file.GetPropertiesOptions{ AccessConditions: opts.AccessConditions, CPKInfo: opts.CPKInfo, }) a.NoError("Get properties", err) } permResp, err := b.getFileClient().GetAccessControl(ctx, &file.GetAccessControlOptions{ UPN: opts.UPN, AccessConditions: opts.AccessConditions, }) return ObjectProperties{ EntityType: 0, HTTPHeaders: contentHeaders{ cacheControl: resp.CacheControl, contentDisposition: resp.ContentDisposition, contentEncoding: resp.ContentEncoding, contentLanguage: resp.ContentLanguage, contentType: resp.ContentType, contentMD5: resp.ContentMD5, }, Metadata: resp.Metadata, BlobFSProperties: BlobFSProperties{ Permissions: resp.Permissions, Owner: resp.Owner, Group: resp.Group, ACL: permResp.ACL, }, } } func (b *BlobFSPathResourceProvider) SetHTTPHeaders(a Asserter, h contentHeaders) { a.HelperMarker().Helper() _, err := b.getFileClient().SetHTTPHeaders(ctx, DerefOrZero(h.ToBlobFS()), nil) a.NoError("Set HTTP headers", err) } func (b *BlobFSPathResourceProvider) SetMetadata(a Asserter, metadata common.Metadata) { a.HelperMarker().Helper() _, err := b.getFileClient().SetMetadata(ctx, metadata, nil) if datalakeerror.HasCode(err, datalakeerror.UnsupportedHeader) { // retry, removing hdi_isfolder delete(metadata, common.POSIXFolderMeta) _, err = b.getFileClient().SetMetadata(ctx, metadata, nil) } a.NoError("Set metadata", err) } func (b *BlobFSPathResourceProvider) SetObjectProperties(a Asserter, props ObjectProperties) { a.HelperMarker().Helper() b.SetHTTPHeaders(a, props.HTTPHeaders) b.SetMetadata(a, props.Metadata) _, err := b.getFileClient().SetAccessControl(ctx, &file.SetAccessControlOptions{ Owner: props.BlobFSProperties.Owner, Group: props.BlobFSProperties.Group, ACL: props.BlobFSProperties.ACL, Permissions: props.BlobFSProperties.Permissions, }) a.NoError("Set access control", err) blobClient := b.getBlobClient(a) if props.BlobProperties.Tags != nil { _, err := blobClient.SetTags(ctx, props.BlobProperties.Tags, nil) a.NoError("Set tags", err) } if props.BlobProperties.BlockBlobAccessTier != nil { _, err := blobClient.SetTier(ctx, *props.BlobProperties.BlockBlobAccessTier, nil) a.NoError("Set tier", err) } } func (b *BlobFSPathResourceProvider) getDirClient() *directory.Client { return b.Container.internalClient.NewDirectoryClient(b.objectPath) } func (b *BlobFSPathResourceProvider) getFileClient() *file.Client { return b.Container.internalClient.NewFileClient(b.objectPath) } func (b *BlobFSPathResourceProvider) getBlobClient(a Asserter) *blob.Client { a.HelperMarker().Helper() blobService := b.internalAccount.GetService(a, common.ELocation.Blob()).(*BlobServiceResourceManager) // Blob and BlobFS are synonymous, so simply getting the same path is fine. container := blobService.InternalClient.NewContainerClient(b.Container.containerName) return container.NewBlobClient(b.objectPath) // Generic blob client for now, we can specialize if we want in the future. } func (b *BlobFSPathResourceProvider) Download(a Asserter) io.ReadSeeker { a.HelperMarker().Helper() a.Assert("Object type must be file", Equal{}, common.EEntityType.File(), b.entityType) resp, err := b.getFileClient().DownloadStream(ctx, nil) a.NoError("Download stream", err) buf := &bytes.Buffer{} if resp.Body != nil { _, err = io.Copy(buf, resp.Body) a.NoError("Read body", err) } return bytes.NewReader(buf.Bytes()) } func (b *BlobFSPathResourceProvider) Exists() bool { _, err := b.getFileClient().GetProperties(ctx, nil) // under the hood it's just a path, no special restype flag. return err == nil || !datalakeerror.HasCode(err, datalakeerror.PathNotFound, datalakeerror.FileSystemNotFound, datalakeerror.FileSystemBeingDeleted, datalakeerror.ResourceNotFound) }