ste/sourceInfoProvider-GCP.go (159 lines of code) (raw):
package ste
import (
gcpUtils "cloud.google.com/go/storage"
"context"
"crypto/md5"
"fmt"
"github.com/Azure/azure-storage-azcopy/v10/common"
"golang.org/x/oauth2/google"
"io"
"os"
"net/url"
"time"
)
type gcpSourceInfoProvider struct {
jptm IJobPartTransferMgr
transferInfo *TransferInfo
rawSourceURL *url.URL
gcpClient *gcpUtils.Client
gcpURLParts common.GCPURLParts
ctx context.Context
}
var gcpClientFactory = common.NewGCPClientFactory()
var jsonKey []byte
func newGCPSourceInfoProvider(jptm IJobPartTransferMgr) (ISourceInfoProvider, error) {
var err error
p := gcpSourceInfoProvider{jptm: jptm, transferInfo: jptm.Info()}
p.rawSourceURL, err = url.Parse(p.transferInfo.Source)
if err != nil {
return nil, err
}
p.gcpURLParts, err = common.NewGCPURLParts(*p.rawSourceURL)
if err != nil {
return nil, err
}
ctx := jptm.Context()
ctx = withPipelineNetworkStats(ctx, nil)
p.ctx = ctx
p.gcpClient, err = gcpClientFactory.GetGCPClient(
p.ctx,
common.CredentialInfo{
CredentialType: common.ECredentialType.GoogleAppCredentials(),
GCPCredentialInfo: common.GCPCredentialInfo{},
},
common.CredentialOpOptions{
LogInfo: func(str string) { p.jptm.Log(common.LogInfo, str) },
LogError: func(str string) { p.jptm.Log(common.LogError, str) },
Panic: func(err error) { panic(err) },
})
if err != nil {
return nil, err
}
jsonKey, err = os.ReadFile(common.GetEnvironmentVariable(common.EEnvironmentVariable.GoogleAppCredentials()))
if err != nil {
return nil, fmt.Errorf("Cannot read JSON key file. Please verify you have correctly set GOOGLE_APPLICATION_CREDENTIALS environment variable")
}
return &p, nil
}
func (p *gcpSourceInfoProvider) PreSignedSourceURL() (string, error) {
conf, err := google.JWTConfigFromJSON(jsonKey)
if err != nil {
return "", fmt.Errorf("Could not get config from json key. Error: %v", err)
}
opts := &gcpUtils.SignedURLOptions{
Scheme: gcpUtils.SigningSchemeV4,
Method: "GET",
GoogleAccessID: conf.Email,
PrivateKey: conf.PrivateKey,
Expires: time.Now().Add(defaultPresignExpires),
}
u, err := gcpUtils.SignedURL(p.gcpURLParts.BucketName, p.gcpURLParts.ObjectKey, opts)
if err != nil {
return "", fmt.Errorf("Unable to Generate Signed URL for given GCP Object: %v", err)
}
return u, nil
}
func (p *gcpSourceInfoProvider) Properties() (*SrcProperties, error) {
srcProperties := SrcProperties{
SrcHTTPHeaders: p.transferInfo.SrcHTTPHeaders,
SrcMetadata: p.transferInfo.SrcMetadata,
}
if p.transferInfo.S2SGetPropertiesInBackend {
objectInfo, err := p.gcpClient.Bucket(p.gcpURLParts.BucketName).Object(p.gcpURLParts.ObjectKey).Attrs(p.ctx)
if err != nil {
return nil, err
}
oie := common.GCPObjectInfoExtension{ObjectInfo: *objectInfo}
srcProperties = SrcProperties{
SrcHTTPHeaders: common.ResourceHTTPHeaders{
ContentType: objectInfo.ContentType,
ContentEncoding: oie.ContentEncoding(),
ContentDisposition: oie.ContentDisposition(),
ContentLanguage: oie.ContentLanguage(),
CacheControl: oie.CacheControl(),
ContentMD5: oie.ContentMD5(),
},
SrcMetadata: oie.NewCommonMetadata(),
}
}
resolvedMetadata, err := p.handleInvalidMetadataKeys(srcProperties.SrcMetadata)
if err != nil {
return nil, err
}
srcProperties.SrcMetadata = resolvedMetadata
return &srcProperties, nil
}
func (p *gcpSourceInfoProvider) handleInvalidMetadataKeys(m common.Metadata) (common.Metadata, error) {
if m == nil {
return m, nil
}
switch p.transferInfo.S2SInvalidMetadataHandleOption {
case common.EInvalidMetadataHandleOption.ExcludeIfInvalid():
retainedMetadata, excludedMetadata, invalidKeyExists := m.ExcludeInvalidKey()
if invalidKeyExists && p.jptm.ShouldLog(common.LogWarning) {
p.jptm.Log(common.LogWarning,
fmt.Sprintf("METADATAWARNING: For source %q, invalid metadata with keys %s are excluded", p.transferInfo.Source, excludedMetadata.ConcatenatedKeys()))
}
return retainedMetadata, nil
case common.EInvalidMetadataHandleOption.FailIfInvalid():
_, invalidMetadata, invalidKeyExists := m.ExcludeInvalidKey()
if invalidKeyExists {
return nil, fmt.Errorf("metadata with keys %s in source is invalid, and application parameters specify that error should be reported when invalid keys are found", invalidMetadata.ConcatenatedKeys())
}
return m, nil
case common.EInvalidMetadataHandleOption.RenameIfInvalid():
return m.ResolveInvalidKey()
}
return m, nil
}
func (p *gcpSourceInfoProvider) SourceSize() int64 {
return p.transferInfo.SourceSize
}
func (p *gcpSourceInfoProvider) RawSource() string {
return p.transferInfo.Source
}
func (p *gcpSourceInfoProvider) IsLocal() bool {
return false
}
func (p *gcpSourceInfoProvider) GetFreshFileLastModifiedTime() (time.Time, error) {
objectInfo, err := p.gcpClient.Bucket(p.gcpURLParts.BucketName).Object(p.gcpURLParts.ObjectKey).Attrs(p.ctx)
if err != nil {
return time.Time{}, err
}
return objectInfo.Updated, nil
}
func (p *gcpSourceInfoProvider) EntityType() common.EntityType {
return common.EEntityType.File() // All folders are virtual in GCP and only files exist.
}
func (p *gcpSourceInfoProvider) GetMD5(offset, count int64) ([]byte, error) {
// gcp does not support getting range md5
body, err := p.gcpClient.Bucket(p.gcpURLParts.BucketName).Object(p.gcpURLParts.ObjectKey).NewRangeReader(p.ctx, offset, count)
if err != nil {
return nil, err
}
// compute md5
defer body.Close() //nolint:staticcheck
h := md5.New()
if _, err = io.Copy(h, body); err != nil {
return nil, err
}
return h.Sum(nil), nil
}