ste/sourceInfoProvider-File.go (271 lines of code) (raw):

// Copyright © 2017 Microsoft <wastore@microsoft.com> // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package ste import ( "context" "crypto/md5" "fmt" "io" "sync" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/directory" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/share" "github.com/Azure/azure-storage-azcopy/v10/common" ) type shareFilePropertyProvider interface { FileCreationTime() time.Time FileLastWriteTime() time.Time FileAttributes() (*file.NTFSFileAttributes, error) FilePermissionKey() string Metadata() map[string]*string LastModified() time.Time CacheControl() string ContentDisposition() string ContentEncoding() string ContentLanguage() string ContentType() string ContentMD5() []byte } type fileGetPropertiesAdapter struct { GetProperties file.GetPropertiesResponse } func (f fileGetPropertiesAdapter) CacheControl() string { return common.IffNotNil(f.GetProperties.CacheControl, "") } func (f fileGetPropertiesAdapter) ContentDisposition() string { return common.IffNotNil(f.GetProperties.ContentDisposition, "") } func (f fileGetPropertiesAdapter) ContentEncoding() string { return common.IffNotNil(f.GetProperties.ContentEncoding, "") } func (f fileGetPropertiesAdapter) ContentLanguage() string { return common.IffNotNil(f.GetProperties.ContentLanguage, "") } func (f fileGetPropertiesAdapter) ContentType() string { return common.IffNotNil(f.GetProperties.ContentType, "") } func (f fileGetPropertiesAdapter) ContentMD5() []byte { return f.GetProperties.ContentMD5 } func (f fileGetPropertiesAdapter) FileCreationTime() time.Time { return common.IffNotNil(f.GetProperties.FileCreationTime, time.Time{}) } func (f fileGetPropertiesAdapter) FileLastWriteTime() time.Time { return common.IffNotNil(f.GetProperties.FileLastWriteTime, time.Time{}) } func (f fileGetPropertiesAdapter) FileAttributes() (*file.NTFSFileAttributes, error) { return file.ParseNTFSFileAttributes(f.GetProperties.FileAttributes) } func (f fileGetPropertiesAdapter) FilePermissionKey() string { return common.IffNotNil(f.GetProperties.FilePermissionKey, "") } func (f fileGetPropertiesAdapter) Metadata() map[string]*string { return f.GetProperties.Metadata } func (f fileGetPropertiesAdapter) LastModified() time.Time { return common.IffNotNil(f.GetProperties.LastModified, time.Time{}) } type directoryGetPropertiesAdapter struct { GetProperties directory.GetPropertiesResponse } func (d directoryGetPropertiesAdapter) CacheControl() string { return "" } func (d directoryGetPropertiesAdapter) ContentDisposition() string { return "" } func (d directoryGetPropertiesAdapter) ContentEncoding() string { return "" } func (d directoryGetPropertiesAdapter) ContentLanguage() string { return "" } func (d directoryGetPropertiesAdapter) ContentType() string { return "" } func (d directoryGetPropertiesAdapter) ContentMD5() []byte { return make([]byte, 0) } func (d directoryGetPropertiesAdapter) FileCreationTime() time.Time { return common.IffNotNil(d.GetProperties.FileCreationTime, time.Time{}) } func (d directoryGetPropertiesAdapter) FileLastWriteTime() time.Time { return common.IffNotNil(d.GetProperties.FileLastWriteTime, time.Time{}) } func (d directoryGetPropertiesAdapter) FileAttributes() (*file.NTFSFileAttributes, error) { return file.ParseNTFSFileAttributes(d.GetProperties.FileAttributes) } func (d directoryGetPropertiesAdapter) FilePermissionKey() string { return common.IffNotNil(d.GetProperties.FilePermissionKey, "") } func (d directoryGetPropertiesAdapter) Metadata() map[string]*string { return d.GetProperties.Metadata } func (d directoryGetPropertiesAdapter) LastModified() time.Time { return common.IffNotNil(d.GetProperties.LastModified, time.Time{}) } // Source info provider for Azure blob type fileSourceInfoProvider struct { ctx context.Context cachedPermissionKey string cacheOnce *sync.Once cachedProperties shareFilePropertyProvider // use interface because may be file or directory properties sourceURL string srcShareClient *share.Client defaultRemoteSourceInfoProvider } func newFileSourceInfoProvider(jptm IJobPartTransferMgr) (ISourceInfoProvider, error) { base, err := newDefaultRemoteSourceInfoProvider(jptm) if err != nil { return nil, err } s, err := jptm.SrcServiceClient().FileServiceClient() if err != nil { return nil, err } sourceShare := s.NewShareClient(jptm.Info().SrcContainer) if jptm.Info().SnapshotID != "" { sourceShare, err = sourceShare.WithSnapshot(jptm.Info().SnapshotID) if err != nil { return nil, err } } source := sourceShare.NewRootDirectoryClient().NewFileClient(jptm.Info().SrcFilePath) // due to the REST parity feature added in 2019-02-02, the File APIs are no longer backward compatible // so we must use the latest SDK version to stay safe //TODO: Should we do that? ctx := jptm.Context() ctx = withPipelineNetworkStats(ctx, nil) return &fileSourceInfoProvider{ defaultRemoteSourceInfoProvider: *base, ctx: ctx, cacheOnce: &sync.Once{}, srcShareClient: s.NewShareClient(jptm.Info().SrcContainer), sourceURL: source.URL()}, nil } func (p *fileSourceInfoProvider) PreSignedSourceURL() (string, error) { return p.sourceURL, nil } func (p *fileSourceInfoProvider) RawSource() string { return p.sourceURL } func (p *fileSourceInfoProvider) getFreshProperties() (shareFilePropertyProvider, error) { fsc, err := p.jptm.SrcServiceClient().FileServiceClient() if err != nil { return nil, err } share := fsc.NewShareClient(p.transferInfo.SrcContainer) switch p.EntityType() { case common.EEntityType.File(): fileClient := share.NewRootDirectoryClient().NewFileClient(p.transferInfo.SrcFilePath) props, err := fileClient.GetProperties(p.ctx, nil) return &fileGetPropertiesAdapter{props}, err case common.EEntityType.Folder(): directoryClient := share.NewDirectoryClient(p.transferInfo.SrcFilePath) props, err := directoryClient.GetProperties(p.ctx, nil) return &directoryGetPropertiesAdapter{props}, err default: panic("unexpected case") } } // cached because we use it for both GetSMBProperties and GetSDDL, and in some cases (e.g. small files, // or enough transactions that transaction costs matter) saving IOPS matters func (p *fileSourceInfoProvider) getCachedProperties() (shareFilePropertyProvider, error) { var err error p.cacheOnce.Do(func() { p.cachedProperties, err = p.getFreshProperties() }) return p.cachedProperties, err } func (p *fileSourceInfoProvider) GetSMBProperties() (TypedSMBPropertyHolder, error) { return p.getCachedProperties() } func (p *fileSourceInfoProvider) GetSDDL() (string, error) { // Get the key for SIPM props, err := p.getCachedProperties() if err != nil { return "", err } key := props.FilePermissionKey() if key == "" { return "", nil } // Call into SIPM and grab our SDDL string. sipm := p.jptm.SecurityInfoPersistenceManager() sddlString, err := sipm.GetSDDLFromID(key, p.srcShareClient) return sddlString, err } func (p *fileSourceInfoProvider) Properties() (*SrcProperties, error) { srcProperties, err := p.defaultRemoteSourceInfoProvider.Properties() if err != nil { return nil, err } // Get properties in backend. if p.transferInfo.S2SGetPropertiesInBackend { properties, err := p.getCachedProperties() if err != nil { return nil, err } // TODO: is it OK that this does not get set if s2sGetPropertiesInBackend is false? Probably yes, because it's only a cached value, and getPropertiesInBackend is always false of AzFiles anyway at present (early 2020) p.cachedPermissionKey = properties.FilePermissionKey() // We cache this as getting the SDDL is a separate operation. switch p.EntityType() { case common.EEntityType.File(): srcProperties = &SrcProperties{ SrcHTTPHeaders: common.ResourceHTTPHeaders{ ContentType: properties.ContentType(), ContentEncoding: properties.ContentEncoding(), ContentDisposition: properties.ContentDisposition(), ContentLanguage: properties.ContentLanguage(), CacheControl: properties.CacheControl(), ContentMD5: properties.ContentMD5(), }, SrcMetadata: properties.Metadata(), } case common.EEntityType.Folder(): srcProperties = &SrcProperties{ SrcHTTPHeaders: common.ResourceHTTPHeaders{}, // no contentType etc for folders SrcMetadata: properties.Metadata(), } default: panic("unsupported entity type") } } return srcProperties, nil } func (p *fileSourceInfoProvider) GetFreshFileLastModifiedTime() (time.Time, error) { if p.EntityType() != common.EEntityType.File() { panic("unsupported. Cannot get modification time on non-file object") // nothing should ever call this for a non-file } properties, err := p.getFreshProperties() if err != nil { return time.Time{}, err } // We ignore smblastwrite because otherwise the tx will fail s2s return properties.LastModified(), nil } func (p *fileSourceInfoProvider) GetMD5(offset, count int64) ([]byte, error) { switch p.EntityType() { case common.EEntityType.File(): var rangeGetContentMD5 *bool if count <= common.MaxRangeGetSize { rangeGetContentMD5 = to.Ptr(true) } fsc, err := p.jptm.SrcServiceClient().FileServiceClient() if err != nil { return nil, err } shareClient := fsc.NewShareClient(p.transferInfo.SrcContainer) fileClient := shareClient.NewRootDirectoryClient().NewFileClient(p.transferInfo.SrcFilePath) response, err := fileClient.DownloadStream(p.ctx, &file.DownloadStreamOptions{ Range: file.HTTPRange{Offset: offset, Count: count}, RangeGetContentMD5: rangeGetContentMD5, }) if err != nil { return nil, err } if len(response.ContentMD5) > 0 { return response.ContentMD5, nil } else { // compute md5 body := response.NewRetryReader(p.ctx, &file.RetryReaderOptions{MaxRetries: MaxRetryPerDownloadBody}) defer body.Close() h := md5.New() if _, err = io.Copy(h, body); err != nil { return nil, err } return h.Sum(nil), nil } case common.EEntityType.Folder(): return nil, fmt.Errorf("cannot get body or md5 of a folder") default: panic("unexpected case") } }