internal/storage/storage_handle.go (249 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"fmt"
"net/http"
"os"
"strconv"
"strings"
"time"
"cloud.google.com/go/storage"
control "cloud.google.com/go/storage/control/apiv2"
"cloud.google.com/go/storage/control/apiv2/controlpb"
"cloud.google.com/go/storage/experimental"
"github.com/googleapis/gax-go/v2"
"github.com/googlecloudplatform/gcsfuse/v2/cfg"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
"golang.org/x/net/context"
option "google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
// Side effect to run grpc client with direct-path on gcp machine.
_ "google.golang.org/grpc/balancer/rls"
_ "google.golang.org/grpc/xds/googledirectpath"
)
const (
// Used to modify the hidden options in go-sdk for read stall retry.
// Ref: https://github.com/googleapis/google-cloud-go/blob/main/storage/option.go#L30
dynamicReadReqIncreaseRateEnv = "DYNAMIC_READ_REQ_INCREASE_RATE"
dynamicReadReqInitialTimeoutEnv = "DYNAMIC_READ_REQ_INITIAL_TIMEOUT"
zonalLocationType = "zone"
)
type StorageHandle interface {
// In case of non-empty billingProject, this project is set as user-project for
// all subsequent calls on the bucket. Calls with user-project will be billed
// to that project rather than to the bucket's owning project.
//
// A user-project is required for all operations on Requester Pays buckets.
BucketHandle(ctx context.Context, bucketName string, billingProject string) (bh *bucketHandle, err error)
}
type storageClient struct {
httpClient *storage.Client
grpcClient *storage.Client
grpcClientWithBidiConfig *storage.Client
clientConfig storageutil.StorageClientConfig
storageControlClient StorageControlClient
directPathDetector *gRPCDirectPathDetector
}
type gRPCDirectPathDetector struct {
clientOptions []option.ClientOption
}
// isDirectPathPossible checks if gRPC direct connectivity is available for a specific bucket
// from the environment where the client is running. A `nil` error represents Direct Connectivity was
// detected.
func (pd *gRPCDirectPathDetector) isDirectPathPossible(ctx context.Context, bucketName string) error {
return storage.CheckDirectConnectivitySupported(ctx, bucketName, pd.clientOptions...)
}
// Return clientOpts for both gRPC client and control client.
func createClientOptionForGRPCClient(clientConfig *storageutil.StorageClientConfig, enableBidiConfig bool) (clientOpts []option.ClientOption, err error) {
// Add Custom endpoint option.
if clientConfig.CustomEndpoint != "" {
clientOpts = append(clientOpts, option.WithEndpoint(storageutil.StripScheme(clientConfig.CustomEndpoint)))
// TODO(b/390799251): Check if this line can be merged with below anonymousAccess check.
if clientConfig.AnonymousAccess {
clientOpts = append(clientOpts, option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())))
}
}
if clientConfig.AnonymousAccess {
clientOpts = append(clientOpts, option.WithoutAuthentication())
} else {
tokenSrc, tokenCreationErr := storageutil.CreateTokenSource(clientConfig)
if tokenCreationErr != nil {
err = fmt.Errorf("while fetching tokenSource: %w", tokenCreationErr)
return
}
clientOpts = append(clientOpts, option.WithTokenSource(tokenSrc))
}
if enableBidiConfig {
clientOpts = append(clientOpts, experimental.WithGRPCBidiReads())
}
clientOpts = append(clientOpts, option.WithGRPCConnectionPool(clientConfig.GrpcConnPoolSize))
clientOpts = append(clientOpts, option.WithUserAgent(clientConfig.UserAgent))
// Turning off the go-sdk metrics exporter to prevent any problems.
// TODO (kislaykishore) - to revisit here for monitoring support.
clientOpts = append(clientOpts, storage.WithDisabledClientMetrics())
return
}
func setRetryConfig(sc *storage.Client, clientConfig *storageutil.StorageClientConfig) {
if sc == nil || clientConfig == nil {
logger.Fatal("setRetryConfig: Empty storage client or clientConfig")
return
}
// ShouldRetry function checks if an operation should be retried based on the
// response of operation (error.Code).
// RetryAlways causes all operations to be checked for retries using
// ShouldRetry function.
// Without RetryAlways, only those operations are checked for retries which
// are idempotent.
// https://github.com/googleapis/google-cloud-go/blob/main/storage/storage.go#L1953
retryOpts := []storage.RetryOption{storage.WithBackoff(gax.Backoff{
Max: clientConfig.MaxRetrySleep,
Multiplier: clientConfig.RetryMultiplier,
}),
storage.WithPolicy(storage.RetryAlways),
storage.WithErrorFunc(storageutil.ShouldRetry)}
sc.SetRetry(retryOpts...)
// The default MaxRetryAttempts value is 0 indicates no limit.
if clientConfig.MaxRetryAttempts != 0 {
sc.SetRetry(storage.WithMaxAttempts(clientConfig.MaxRetryAttempts))
}
}
// Followed https://pkg.go.dev/cloud.google.com/go/storage#hdr-Experimental_gRPC_API to create the gRPC client.
func createGRPCClientHandle(ctx context.Context, clientConfig *storageutil.StorageClientConfig, enableBidiConfig bool) (sc *storage.Client, err error) {
if err := os.Setenv("GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS", "true"); err != nil {
logger.Fatal("error setting direct path env var: %v", err)
}
var clientOpts []option.ClientOption
clientOpts, err = createClientOptionForGRPCClient(clientConfig, enableBidiConfig)
if err != nil {
return nil, fmt.Errorf("error in getting clientOpts for gRPC client: %w", err)
}
sc, err = storage.NewGRPCClient(ctx, clientOpts...)
if err != nil {
err = fmt.Errorf("NewGRPCClient: %w", err)
} else {
setRetryConfig(sc, clientConfig)
}
// Unset the environment variable, since it's used only while creation of grpc client.
if err := os.Unsetenv("GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS"); err != nil {
logger.Fatal("error while unsetting direct path env var: %v", err)
}
return
}
func createHTTPClientHandle(ctx context.Context, clientConfig *storageutil.StorageClientConfig) (sc *storage.Client, err error) {
var clientOpts []option.ClientOption
// Add WithHttpClient option.
var httpClient *http.Client
httpClient, err = storageutil.CreateHttpClient(clientConfig)
if err != nil {
err = fmt.Errorf("while creating http endpoint: %w", err)
return
}
clientOpts = append(clientOpts, option.WithHTTPClient(httpClient))
if clientConfig.AnonymousAccess {
clientOpts = append(clientOpts, option.WithoutAuthentication())
}
// Create client with JSON read flow, if EnableJasonRead flag is set.
if clientConfig.ExperimentalEnableJsonRead {
clientOpts = append(clientOpts, storage.WithJSONReads())
}
// Add Custom endpoint option.
if clientConfig.CustomEndpoint != "" {
clientOpts = append(clientOpts, option.WithEndpoint(clientConfig.CustomEndpoint))
}
if clientConfig.ReadStallRetryConfig.Enable {
// Hidden way to modify the increase rate for dynamic delay algorithm in go-sdk.
// Ref: https://github.com/googleapis/google-cloud-go/blob/main/storage/option.go#L47
// Temporarily we kept an option to change the increase-rate, will be removed
// once we get a good default.
err = os.Setenv(dynamicReadReqIncreaseRateEnv, strconv.FormatFloat(clientConfig.ReadStallRetryConfig.ReqIncreaseRate, 'f', -1, 64))
if err != nil {
logger.Warnf("Error while setting the env %s: %v", dynamicReadReqIncreaseRateEnv, err)
}
// Hidden way to modify the initial-timeout of the dynamic delay algorithm in go-sdk.
// Ref: https://github.com/googleapis/google-cloud-go/blob/main/storage/option.go#L62
// Temporarily we kept an option to change the initial-timeout, will be removed
// once we get a good default.
err = os.Setenv(dynamicReadReqInitialTimeoutEnv, clientConfig.ReadStallRetryConfig.InitialReqTimeout.String())
if err != nil {
logger.Warnf("Error while setting the env %s: %v", dynamicReadReqInitialTimeoutEnv, err)
}
clientOpts = append(clientOpts, experimental.WithReadStallTimeout(&experimental.ReadStallTimeoutConfig{
Min: clientConfig.ReadStallRetryConfig.MinReqTimeout,
TargetPercentile: clientConfig.ReadStallRetryConfig.ReqTargetPercentile,
}))
}
sc, err = storage.NewClient(ctx, clientOpts...)
if err != nil {
err = fmt.Errorf("go http storage client creation failed: %w", err)
return
}
setRetryConfig(sc, clientConfig)
return
}
func (sh *storageClient) lookupBucketType(bucketName string) (*gcs.BucketType, error) {
var nilControlClient *control.StorageControlClient = nil
if sh.storageControlClient == nilControlClient {
return &gcs.BucketType{}, nil // Assume defaults
}
startTime := time.Now()
logger.Infof("GetStorageLayout <- (%s)", bucketName)
storageLayout, err := sh.getStorageLayout(bucketName)
duration := time.Since(startTime)
if err != nil {
return nil, err
}
logger.Infof("GetStorageLayout -> (%s) %v msec", bucketName, duration.Milliseconds())
return &gcs.BucketType{
Hierarchical: storageLayout.GetHierarchicalNamespace().GetEnabled(),
Zonal: storageLayout.GetLocationType() == zonalLocationType,
}, nil
}
func (sh *storageClient) getStorageLayout(bucketName string) (*controlpb.StorageLayout, error) {
var callOptions []gax.CallOption
stoargeLayout, err := sh.storageControlClient.GetStorageLayout(context.Background(), &controlpb.GetStorageLayoutRequest{
Name: fmt.Sprintf("projects/_/buckets/%s/storageLayout", bucketName),
Prefix: "",
RequestId: "",
}, callOptions...)
return stoargeLayout, err
}
// NewStorageHandle creates control client and stores client config to allow dynamic
// creation of http or grpc client.
func NewStorageHandle(ctx context.Context, clientConfig storageutil.StorageClientConfig) (sh StorageHandle, err error) {
// The default protocol for the Go Storage control client's folders API is gRPC.
// gcsfuse will initially mirror this behavior due to the client's lack of HTTP support.
var controlClient *control.StorageControlClient
var clientOpts []option.ClientOption
// Control-client is needed for folder APIs and for getting storage-layout of the bucket.
// GetStorageLayout API is not supported for storage-testbench, which are identified by custom-endpoint containing localhost.
if clientConfig.EnableHNS && !strings.Contains(clientConfig.CustomEndpoint, "localhost") {
clientOpts, err = createClientOptionForGRPCClient(&clientConfig, false)
if err != nil {
return nil, fmt.Errorf("error in getting clientOpts for gRPC client: %w", err)
}
controlClient, err = storageutil.CreateGRPCControlClient(ctx, clientOpts, &clientConfig)
if err != nil {
return nil, fmt.Errorf("could not create StorageControl Client: %w", err)
}
} else {
logger.Infof("Skipping storage control client creation because custom-endpoint %q was passed, which is assumed to be a storage testbench server because of 'localhost' in it.", clientConfig.CustomEndpoint)
}
sh = &storageClient{
storageControlClient: controlClient,
clientConfig: clientConfig,
directPathDetector: &gRPCDirectPathDetector{clientOptions: clientOpts},
}
return
}
func (sh *storageClient) getClient(ctx context.Context, isbucketZonal bool) (*storage.Client, error) {
var err error
if isbucketZonal {
if sh.grpcClientWithBidiConfig == nil {
sh.grpcClientWithBidiConfig, err = createGRPCClientHandle(ctx, &sh.clientConfig, true)
}
return sh.grpcClientWithBidiConfig, err
}
if sh.clientConfig.ClientProtocol == cfg.GRPC {
if sh.grpcClient == nil {
sh.grpcClient, err = createGRPCClientHandle(ctx, &sh.clientConfig, false)
}
return sh.grpcClient, err
}
if sh.clientConfig.ClientProtocol == cfg.HTTP1 || sh.clientConfig.ClientProtocol == cfg.HTTP2 {
if sh.httpClient == nil {
sh.httpClient, err = createHTTPClientHandle(ctx, &sh.clientConfig)
}
return sh.httpClient, err
}
return nil, fmt.Errorf("invalid client-protocol requested: %s", sh.clientConfig.ClientProtocol)
}
func (sh *storageClient) BucketHandle(ctx context.Context, bucketName string, billingProject string) (bh *bucketHandle, err error) {
var client *storage.Client
bucketType, err := sh.lookupBucketType(bucketName)
if err != nil {
return nil, fmt.Errorf("storageLayout call failed: %s", err)
}
client, err = sh.getClient(ctx, bucketType.Zonal)
if err != nil {
return nil, err
}
if bucketType.Zonal || sh.clientConfig.ClientProtocol == cfg.GRPC {
if sh.directPathDetector != nil {
if err := sh.directPathDetector.isDirectPathPossible(ctx, bucketName); err != nil {
logger.Warnf("Direct path connectivity unavailable for %s, reason: %v", bucketName, err)
} else {
logger.Infof("Successfully connected over gRPC DirectPath for %s", bucketName)
}
}
}
storageBucketHandle := client.Bucket(bucketName)
if billingProject != "" {
storageBucketHandle = storageBucketHandle.UserProject(billingProject)
}
bh = &bucketHandle{
bucket: storageBucketHandle,
bucketName: bucketName,
controlClient: sh.storageControlClient,
bucketType: bucketType,
}
return
}