common/util.go (308 lines of code) (raw):
package common
import (
"context"
"errors"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"net"
"net/url"
"strings"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
blobservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake"
datalake "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/service"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/directory"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/fileerror"
fileservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/service"
)
var AzcopyJobPlanFolder string
var AzcopyCurrentJobLogger ILoggerResetable
// isIPEndpointStyle checks if URL's host is IP, in this case the storage account endpoint will be composed as:
// http(s)://IP(:port)/storageaccount/container/...
// As url's Host property, host could be both host or host:port
func isIPEndpointStyle(host string) bool {
if host == "" {
return false
}
if h, _, err := net.SplitHostPort(host); err == nil {
host = h
}
// For IPv6, there could be case where SplitHostPort fails for cannot finding port.
// In this case, eliminate the '[' and ']' in the URL.
// For details about IPv6 URL, please refer to https://tools.ietf.org/html/rfc2732
if host[0] == '[' && host[len(host)-1] == ']' {
host = host[1 : len(host)-1]
}
return net.ParseIP(host) != nil
}
// SplitContainerNameFromPath returns blob/file/dir path excluding container.
// Ex. For input https://account1.blob.core.windows.net/container1/a/b/c/d
// a/b/c/d is returned.
func SplitContainerNameFromPath(u string) (container string, filepath string, err error) {
uri, err := url.Parse(u)
if err != nil {
return "", "", err
}
if uri.Path == "" {
return "", "", nil
}
path := uri.Path
if path[0] == '/' {
path = path[1:]
}
if isIPEndpointStyle(uri.Host) {
if accountEndIndex := strings.Index(path, "/"); accountEndIndex == -1 {
// Slash not found; path has account name & no container name or blob
return "", "", nil
} else {
path = path[accountEndIndex+1:] // path refers to portion after the account name now (container & blob names)
}
}
containerEndIndex := strings.Index(path, "/") // Find the next slash (if it exists)
if containerEndIndex == -1 { // Slash not found; path has container name & no blob name
return path, "", nil
}
return path[:containerEndIndex], path[containerEndIndex+1:], nil
}
func VerifyIsURLResolvable(url_string string) error {
/* This function is disabled. But we should still fix this after fixing the below stuff.
* We can take this up after migration to new SDK. The pipeline infra may not be same then.
* 1. At someplaces we use Blob SDK directly to create pipeline - ex getBlobCredentialType()
* We should create pipeline through helper functions create<Blob/File/blobfs>pipeline, where we
* handle errors appropriately.
* 2. We should either do a http.Get or net.Dial instead of lookIP. If we are behind a proxy, we may
* not resolve this IP. #2144
* 3. DNS errors may by temporary, we should try for a minute before we give up.
*/
return nil
/*
url, err := url.Parse(url_string)
if (err != nil) {
return err
}
_, err = net.LookupIP(url.Host)
return err
*/
}
type FileClientOptions struct {
AllowTrailingDot bool
AllowSourceTrailingDot bool
}
// GetServiceClientForLocation returns service client for the resourceURL. It strips the
// container and file related details before creating the client. locationSpecificOptions
// are required currently only for files.
func GetServiceClientForLocation(loc Location,
resource ResourceString,
credType CredentialType,
cred azcore.TokenCredential,
policyOptions *azcore.ClientOptions,
locationSpecificOptions any,
) (*ServiceClient, error) {
ret := &ServiceClient{}
resourceURL, err := resource.String()
if err != nil {
return nil, fmt.Errorf("failed to get resource string: %w", err)
}
switch loc {
case ELocation.BlobFS(), ELocation.Blob(): // Since we always may need to interact with DFS while working with Blob, we should just attach both.
datalakeURLParts, err := azdatalake.ParseURL(resourceURL)
if err != nil {
return nil, err
}
datalakeURLParts.FileSystemName = ""
datalakeURLParts.PathName = ""
resourceURL = datalakeURLParts.String()
var o *datalake.ClientOptions
var dsc *datalake.Client
if policyOptions != nil {
o = &datalake.ClientOptions{ClientOptions: *policyOptions}
}
if credType.IsAzureOAuth() {
dsc, err = datalake.NewClient(resourceURL, cred, o)
} else if credType.IsSharedKey() {
var sharedKeyCred *azdatalake.SharedKeyCredential
sharedKeyCred, err = GetDatalakeSharedKeyCredential()
if err != nil {
return nil, err
}
dsc, err = datalake.NewClientWithSharedKeyCredential(resourceURL, sharedKeyCred, o)
} else {
dsc, err = datalake.NewClientWithNoCredential(resourceURL, o)
}
if err != nil {
return nil, err
}
ret.dsc = dsc
blobURLParts, err := blob.ParseURL(resourceURL)
if err != nil {
return nil, err
}
blobURLParts.ContainerName = ""
blobURLParts.BlobName = ""
// In case we are creating a blob client for a datalake target, correct the endpoint
blobURLParts.Host = strings.Replace(blobURLParts.Host, ".dfs", ".blob", 1)
resourceURL = blobURLParts.String()
var bso *blobservice.ClientOptions
var bsc *blobservice.Client
if policyOptions != nil {
bso = &blobservice.ClientOptions{ClientOptions: *policyOptions}
}
if credType.IsAzureOAuth() {
bsc, err = blobservice.NewClient(resourceURL, cred, bso)
} else if credType.IsSharedKey() {
var sharedKeyCred *blob.SharedKeyCredential
sharedKeyCred, err = GetBlobSharedKeyCredential()
if err != nil {
return nil, err
}
bsc, err = blobservice.NewClientWithSharedKeyCredential(resourceURL, sharedKeyCred, bso)
} else {
bsc, err = blobservice.NewClientWithNoCredential(resourceURL, bso)
}
if err != nil {
return nil, err
}
ret.bsc = bsc
return ret, nil
case ELocation.File():
fileURLParts, err := file.ParseURL(resourceURL)
if err != nil {
return nil, err
}
fileURLParts.ShareName = ""
fileURLParts.DirectoryOrFilePath = ""
resourceURL = fileURLParts.String()
var o *fileservice.ClientOptions
var fsc *fileservice.Client
if policyOptions != nil {
o = &fileservice.ClientOptions{ClientOptions: *policyOptions}
}
if locationSpecificOptions != nil {
o.AllowTrailingDot = &locationSpecificOptions.(*FileClientOptions).AllowTrailingDot
o.AllowSourceTrailingDot = &locationSpecificOptions.(*FileClientOptions).AllowSourceTrailingDot
}
if cred != nil {
o.FileRequestIntent = to.Ptr(fileservice.ShareTokenIntentBackup)
fsc, err = fileservice.NewClient(resourceURL, cred, o)
} else {
fsc, err = fileservice.NewClientWithNoCredential(resourceURL, o)
}
if err != nil {
return nil, err
}
ret.fsc = fsc
return ret, nil
default:
return nil, nil
}
}
// NewScopedCredential takes in a credInfo object and returns ScopedCredential
// if credentialType is either MDOAuth or oAuth. For anything else,
// nil is returned
func NewScopedCredential[T azcore.TokenCredential](cred T, credType CredentialType) *ScopedCredential[T] {
var scope string
if !credType.IsAzureOAuth() {
return nil
} else if credType == ECredentialType.MDOAuthToken() {
scope = ManagedDiskScope
} else if credType == ECredentialType.OAuthToken() {
scope = StorageScope
}
return &ScopedCredential[T]{cred: cred, scopes: []string{scope}}
}
type ScopedCredential[T azcore.TokenCredential] struct {
cred T
scopes []string
}
func (s *ScopedCredential[T]) GetToken(ctx context.Context, _ policy.TokenRequestOptions) (azcore.AccessToken, error) {
return s.cred.GetToken(ctx, policy.TokenRequestOptions{Scopes: s.scopes, EnableCAE: true})
}
type ScopedToken = ScopedCredential[azcore.TokenCredential]
type ScopedAuthenticator ScopedCredential[AuthenticateToken]
func (s *ScopedAuthenticator) GetToken(ctx context.Context, _ policy.TokenRequestOptions) (azcore.AccessToken, error) {
return s.cred.GetToken(ctx, policy.TokenRequestOptions{Scopes: s.scopes, EnableCAE: true})
}
func (s *ScopedAuthenticator) Authenticate(ctx context.Context, _ *policy.TokenRequestOptions) (azidentity.AuthenticationRecord, error) {
return s.cred.Authenticate(ctx, &policy.TokenRequestOptions{Scopes: s.scopes, EnableCAE: true})
}
type ServiceClient struct {
fsc *fileservice.Client
bsc *blobservice.Client
dsc *datalake.Client
}
func (s *ServiceClient) BlobServiceClient() (*blobservice.Client, error) {
if s.bsc == nil {
return nil, ErrInvalidClient("Blob Service")
}
return s.bsc, nil
}
func (s *ServiceClient) FileServiceClient() (*fileservice.Client, error) {
if s.fsc == nil {
return nil, ErrInvalidClient("File Service")
}
return s.fsc, nil
}
func (s *ServiceClient) DatalakeServiceClient() (*datalake.Client, error) {
if s.dsc == nil {
return nil, ErrInvalidClient("Datalake Service")
}
return s.dsc, nil
}
// This is currently used only in testcases
func NewServiceClient(bsc *blobservice.Client,
fsc *fileservice.Client,
dsc *datalake.Client) *ServiceClient {
return &ServiceClient{
bsc: bsc,
fsc: fsc,
dsc: dsc,
}
}
// Metadata utility functions to work around GoLang's metadata capitalization
func TryAddMetadata(metadata Metadata, key, value string) {
if _, ok := metadata[key]; ok {
return // Don't overwrite the user's metadata
}
if key != "" {
capitalizedKey := strings.ToUpper(string(key[0])) + key[1:]
if _, ok := metadata[capitalizedKey]; ok {
return
}
}
v := value
metadata[key] = &v
}
func TryReadMetadata(metadata Metadata, key string) (*string, bool) {
if v, ok := metadata[key]; ok {
return v, true
}
if key != "" {
capitalizedKey := strings.ToUpper(string(key[0])) + key[1:]
if v, ok := metadata[capitalizedKey]; ok {
return v, true
}
}
return nil, false
}
type FileClientStub interface {
URL() string
}
// DoWithOverrideReadOnlyOnAzureFiles performs the given action,
// and forces it to happen even if the target is read only.
// NOTE that all SMB attributes (and other headers?) on the target will be lost,
// so only use this if you don't need them any more
// (e.g. you are about to delete the resource, or you are going to reset the attributes/headers)
func DoWithOverrideReadOnlyOnAzureFiles(ctx context.Context, action func() (interface{}, error), targetFileOrDir FileClientStub, enableForcing bool) error {
// try the action
_, err := action()
if fileerror.HasCode(err, fileerror.ParentNotFound, fileerror.ShareNotFound) {
return err
}
failedAsReadOnly := false
if fileerror.HasCode(err, fileerror.ReadOnlyAttribute) {
failedAsReadOnly = true
}
if !failedAsReadOnly {
return err
}
// did fail as readonly, but forcing is not enabled
if !enableForcing {
return errors.New("target is readonly. To force the action to proceed, add --force-if-read-only to the command line")
}
// did fail as readonly, and forcing is enabled
if f, ok := targetFileOrDir.(*file.Client); ok {
h := file.HTTPHeaders{}
_, err = f.SetHTTPHeaders(ctx, &file.SetHTTPHeadersOptions{
HTTPHeaders: &h,
SMBProperties: &file.SMBProperties{
// clear the attributes
Attributes: &file.NTFSFileAttributes{None: true},
},
})
} else if d, ok := targetFileOrDir.(*directory.Client); ok {
// this code path probably isn't used, since ReadOnly (in Windows file systems at least)
// only applies to the files in a folder, not to the folder itself. But we'll leave the code here, for now.
_, err = d.SetProperties(ctx, &directory.SetPropertiesOptions{
FileSMBProperties: &file.SMBProperties{
// clear the attributes
Attributes: &file.NTFSFileAttributes{None: true},
},
})
} else {
err = errors.New("cannot remove read-only attribute from unknown target type")
}
if err != nil {
return err
}
// retry the action
_, err = action()
return err
}
// @brief Checks if the container name provided is a system container or not
func IsSystemContainer(containerName string) bool {
// Decode the container name in case it's URL-encoded
decodedName, err := url.QueryUnescape(containerName)
if err != nil {
// If decoding fails, it's unlikely the name matches a system container
return false
}
// define the system variables for the system containers
systemContainers := []string{"$blobchangefeed", "$logs"}
for _, sys := range systemContainers {
if decodedName == sys {
return true
}
}
return false
}