ste/sourceInfoProvider-S3.go (155 lines of code) (raw):

// Copyright © 2017 Microsoft <wastore@microsoft.com> // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package ste import ( "crypto/md5" "fmt" "io" "net/url" "os" "time" "github.com/Azure/azure-storage-azcopy/v10/common" minio "github.com/minio/minio-go" ) // Source info provider for S3 type s3SourceInfoProvider struct { jptm IJobPartTransferMgr transferInfo *TransferInfo rawSourceURL *url.URL s3Client *minio.Client s3URLPart common.S3URLParts credType common.CredentialType } // By default presign expires after 7 days, which is considered enough for large amounts of files transfer. // This value could be further tuned, or exposed to user for customization, according to user feedback. const defaultPresignExpires = time.Hour * 7 * 24 var s3ClientFactory = common.NewS3ClientFactory() func newS3SourceInfoProvider(jptm IJobPartTransferMgr) (ISourceInfoProvider, error) { var err error p := s3SourceInfoProvider{jptm: jptm, transferInfo: jptm.Info()} p.rawSourceURL, err = url.Parse(p.transferInfo.Source) if err != nil { return nil, err } p.s3URLPart, err = common.NewS3URLParts(*p.rawSourceURL) if err != nil { return nil, err } if os.Getenv("AWS_ACCESS_KEY_ID") == "" && os.Getenv("AWS_SECRET_ACCESS_KEY") == "" { p.credType = common.ECredentialType.S3PublicBucket() } else { p.credType = common.ECredentialType.S3AccessKey() } ctx := jptm.Context() ctx = withPipelineNetworkStats(ctx, nil) p.s3Client, err = s3ClientFactory.GetS3Client(ctx, common.CredentialInfo{ CredentialType: p.credType, S3CredentialInfo: common.S3CredentialInfo{ Endpoint: p.s3URLPart.Endpoint, Region: p.s3URLPart.Region, }, }, common.CredentialOpOptions{ LogInfo: func(str string) { p.jptm.Log(common.LogInfo, str) }, LogError: func(str string) { p.jptm.Log(common.LogError, str) }, Panic: func(err error) { panic(err) }, }, jptm) if err != nil { return nil, err } return &p, nil } func (p *s3SourceInfoProvider) PreSignedSourceURL() (string, error) { if p.credType == common.ECredentialType.S3PublicBucket() { return p.rawSourceURL.String(), nil } source, err := p.s3Client.PresignedGetObject(p.s3URLPart.BucketName, p.s3URLPart.ObjectKey, defaultPresignExpires, url.Values{}) if err != nil { return "", err } return source.String(), nil } func (p *s3SourceInfoProvider) Properties() (*SrcProperties, error) { srcProperties := SrcProperties{ SrcHTTPHeaders: p.transferInfo.SrcHTTPHeaders, SrcMetadata: p.transferInfo.SrcMetadata, SrcBlobTags: p.transferInfo.SrcBlobTags, } // Get properties in backend. if p.transferInfo.S2SGetPropertiesInBackend { objectInfo, err := p.s3Client.StatObject(p.s3URLPart.BucketName, p.s3URLPart.ObjectKey, minio.StatObjectOptions{}) if err != nil { return nil, err } oie := common.ObjectInfoExtension{ObjectInfo: objectInfo} srcProperties = SrcProperties{ SrcHTTPHeaders: common.ResourceHTTPHeaders{ ContentType: objectInfo.ContentType, ContentEncoding: oie.ContentEncoding(), ContentDisposition: oie.ContentDisposition(), ContentLanguage: oie.ContentLanguage(), CacheControl: oie.CacheControl(), ContentMD5: oie.ContentMD5(), }, SrcMetadata: oie.NewCommonMetadata(), } } // Handle invalid metadata. // Note: Only handle metadata's key, as metadata's value must conform to US-ASCII for both S3 and Azure. resolvedMetadata, err := p.handleInvalidMetadataKeys(srcProperties.SrcMetadata) if err != nil { return nil, err } srcProperties.SrcMetadata = resolvedMetadata return &srcProperties, nil } // handleInvalidMetadataKeys handles invalid metadata for S3 source. func (p *s3SourceInfoProvider) handleInvalidMetadataKeys(m common.Metadata) (common.Metadata, error) { if m == nil { return m, nil } switch p.transferInfo.S2SInvalidMetadataHandleOption { case common.EInvalidMetadataHandleOption.ExcludeIfInvalid(): retainedMetadata, excludedMetadata, invalidKeyExists := m.ExcludeInvalidKey() if invalidKeyExists && p.jptm.ShouldLog(common.LogWarning) { p.jptm.Log(common.LogWarning, fmt.Sprintf("METADATAWARNING: For source %q, invalid metadata with keys %s are excluded", p.transferInfo.Source, excludedMetadata.ConcatenatedKeys())) } return retainedMetadata, nil case common.EInvalidMetadataHandleOption.FailIfInvalid(): _, invalidMetdata, invalidKeyExists := m.ExcludeInvalidKey() if invalidKeyExists { return nil, fmt.Errorf("metadata with keys %s in source is invalid, and application parameters specify that error should be reported when invalid keys are found", invalidMetdata.ConcatenatedKeys()) } return m, nil case common.EInvalidMetadataHandleOption.RenameIfInvalid(): return m.ResolveInvalidKey() } return m, nil } func (p *s3SourceInfoProvider) SourceSize() int64 { return p.transferInfo.SourceSize } func (p *s3SourceInfoProvider) RawSource() string { return p.transferInfo.Source } func (p *s3SourceInfoProvider) IsLocal() bool { return false } func (p *s3SourceInfoProvider) GetFreshFileLastModifiedTime() (time.Time, error) { objectInfo, err := p.s3Client.StatObject(p.s3URLPart.BucketName, p.s3URLPart.ObjectKey, minio.StatObjectOptions{}) if err != nil { return time.Time{}, err } return objectInfo.LastModified, nil } func (p *s3SourceInfoProvider) EntityType() common.EntityType { return common.EEntityType.File() // no real folders exist in S3 } func (p *s3SourceInfoProvider) GetMD5(offset, count int64) ([]byte, error) { options := minio.GetObjectOptions{} r := formatHTTPRange(offset, count) if r != nil { options.Set("Range", *r) } // s3 does not support getting range md5 body, err := p.s3Client.GetObject(p.s3URLPart.BucketName, p.s3URLPart.ObjectKey, options) if err != nil { return nil, err } // compute md5 defer body.Close() //nolint:staticcheck h := md5.New() if _, err = io.Copy(h, body); err != nil { return nil, err } return h.Sum(nil), nil }