tools/integration_tests/util/client/storage_client.go (316 lines of code) (raw):
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"context"
"errors"
"fmt"
"io"
"log"
"os"
"reflect"
"runtime"
"sync"
"testing"
"time"
"cloud.google.com/go/storage"
"cloud.google.com/go/storage/experimental"
"github.com/googleapis/gax-go/v2"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
"github.com/googlecloudplatform/gcsfuse/v2/tools/integration_tests/util/operations"
"github.com/googlecloudplatform/gcsfuse/v2/tools/integration_tests/util/setup"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
storagev1 "google.golang.org/api/storage/v1"
)
func CreateStorageClient(ctx context.Context) (client *storage.Client, err error) {
// Create new storage client.
if setup.TestOnTPCEndPoint() {
var ts oauth2.TokenSource
// Set up the TPC endpoint and provide a token source for authentication.
ts, err = getTokenSrc("/tmp/sa.key.json")
if err != nil {
return nil, fmt.Errorf("unable to fetch token-source for TPC: %w", err)
}
client, err = storage.NewClient(ctx, option.WithEndpoint("storage.apis-tpczero.goog:443"), option.WithTokenSource(ts))
} else {
if setup.IsZonalBucketRun() {
client, err = storage.NewGRPCClient(ctx, experimental.WithGRPCBidiReads())
} else {
client, err = storage.NewClient(ctx)
}
}
if err != nil {
return nil, fmt.Errorf("storage.NewClient: %w", err)
}
// RetryAlways causes all operations to be retried when the service returns
// transient error, regardless of idempotency considerations. Since the
// concurrent execution of our CI/CD tests (VMs, threads) doesn't share any
// cloud-storage resources, hence it's safe to disregard idempotency.
client.SetRetry(
storage.WithBackoff(gax.Backoff{
Max: 30 * time.Second,
Multiplier: 2,
}),
storage.WithPolicy(storage.RetryAlways),
storage.WithErrorFunc(storageutil.ShouldRetry),
storage.WithMaxAttempts(5))
return client, nil
}
func getTokenSrc(path string) (tokenSrc oauth2.TokenSource, err error) {
contents, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("ReadFile(%q): %w", path, err)
}
// Create a config struct based on its contents.
ts, err := google.JWTAccessTokenSourceWithScope(contents, storagev1.DevstorageFullControlScope)
if err != nil {
return nil, fmt.Errorf("JWTConfigFromJSON: %w", err)
}
return ts, err
}
// ReadObjectFromGCS downloads the object from GCS and returns the data.
func ReadObjectFromGCS(ctx context.Context, client *storage.Client, object string) (string, error) {
bucket, object := setup.GetBucketAndObjectBasedOnTypeOfMount(object)
// Create storage reader to read from GCS.
rc, err := client.Bucket(bucket).Object(object).NewReader(ctx)
if err != nil {
return "", fmt.Errorf("Object(%q).NewReader: %w", object, err)
}
defer rc.Close()
content, err := io.ReadAll(rc)
if err != nil {
return "", fmt.Errorf("io.ReadAll failed: %v", err)
}
return string(content), nil
}
// ReadChunkFromGCS downloads the object chunk from GCS and returns the data.
func ReadChunkFromGCS(ctx context.Context, client *storage.Client, object string,
offset, size int64) (string, error) {
bucket, object := setup.GetBucketAndObjectBasedOnTypeOfMount(object)
// Create storage reader to read from GCS.
rc, err := client.Bucket(bucket).Object(object).NewRangeReader(ctx, offset, size)
if err != nil {
return "", fmt.Errorf("Object(%q).NewReader: %w", object, err)
}
defer rc.Close()
content, err := io.ReadAll(rc)
if err != nil {
return "", fmt.Errorf("io.ReadAll failed: %v", err)
}
return string(content), nil
}
// NewWriter is a wrapper over storage.NewWriter which
// extends support to zonal buckets.
func NewWriter(ctx context.Context, o *storage.ObjectHandle, client *storage.Client) (wc *storage.Writer, err error) {
wc = o.NewWriter(ctx)
wc.FinalizeOnClose = true
// Changes specific to zonal bucket
var attrs *storage.BucketAttrs
attrs, err = client.Bucket(o.BucketName()).Attrs(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get attributes for bucket %q: %w", o.BucketName(), err)
}
if attrs.StorageClass == "RAPID" {
if setup.IsZonalBucketRun() {
// Zonal bucket writers require append-flag to be set.
wc.Append = true
} else {
return nil, fmt.Errorf("found zonal bucket %q in non-zonal e2e test run (--zonal=false)", o.BucketName())
}
}
return
}
func WriteToObject(ctx context.Context, client *storage.Client, object, content string, precondition storage.Conditions) error {
bucket, object := setup.GetBucketAndObjectBasedOnTypeOfMount(object)
o := client.Bucket(bucket).Object(object)
if !reflect.DeepEqual(precondition, storage.Conditions{}) {
o = o.If(precondition)
}
// Upload an object with storage.Writer.
wc, err := NewWriter(ctx, o, client)
if err != nil {
return fmt.Errorf("Failed to open writer for object %q: %w", o.ObjectName(), err)
}
if _, err := io.WriteString(wc, content); err != nil {
return fmt.Errorf("io.WriteSTring: %w", err)
}
if err := wc.Close(); err != nil {
return fmt.Errorf("Writer.Close: %w", err)
}
return nil
}
// CreateObjectOnGCS creates an object with given name and content on GCS.
func CreateObjectOnGCS(ctx context.Context, client *storage.Client, object, content string) error {
return WriteToObject(ctx, client, object, content, storage.Conditions{DoesNotExist: true})
}
// CreateStorageClientWithCancel creates a new storage client with a cancelable context and returns a function that can be used to cancel the client's operations
func CreateStorageClientWithCancel(ctx *context.Context, storageClient **storage.Client) func() error {
var err error
var cancel context.CancelFunc
*ctx, cancel = context.WithCancel(*ctx)
*storageClient, err = CreateStorageClient(*ctx)
if err != nil {
log.Fatalf("client.CreateStorageClient: %v", err)
}
// Return func to close storage client and release resources.
return func() error {
err := (*storageClient).Close()
if err != nil {
return fmt.Errorf("failed to close storage client: %v", err)
}
defer cancel()
return nil
}
}
// DownloadObjectFromGCS downloads an object to a local file.
func DownloadObjectFromGCS(gcsFile string, destFileName string, t *testing.T) error {
bucket, gcsFile := setup.GetBucketAndObjectBasedOnTypeOfMount(gcsFile)
ctx := context.Background()
var storageClient *storage.Client
closeStorageClient := CreateStorageClientWithCancel(&ctx, &storageClient)
defer func() {
err := closeStorageClient()
if err != nil {
t.Errorf("closeStorageClient failed: %v", err)
}
}()
f := operations.CreateFile(destFileName, setup.FilePermission_0600, t)
defer operations.CloseFileShouldNotThrowError(t, f)
rc, err := storageClient.Bucket(bucket).Object(gcsFile).NewReader(ctx)
if err != nil {
return fmt.Errorf("Object(%q).NewReader: %w", gcsFile, err)
}
defer rc.Close()
if _, err := io.Copy(f, rc); err != nil {
return fmt.Errorf("io.Copy: %w", err)
}
return nil
}
func DeleteObjectOnGCS(ctx context.Context, client *storage.Client, objectName string) error {
bucket, _ := setup.GetBucketAndObjectBasedOnTypeOfMount("")
// Get handle to the object
object := client.Bucket(bucket).Object(objectName)
// Delete the object
err := object.Delete(ctx)
if err != nil {
return err
}
return nil
}
// DeleteAllObjectsWithPrefix deletes all objects with the specified prefix in a GCS bucket.
// It concurrently iterates through objects with the given prefix and deletes them using multiple goroutines,
// leveraging the number of CPU cores for optimal performance.
func DeleteAllObjectsWithPrefix(ctx context.Context, client *storage.Client, prefix string) error {
bucket, _ := setup.GetBucketAndObjectBasedOnTypeOfMount("")
// Get an object iterator
query := &storage.Query{Prefix: prefix}
objectItr := client.Bucket(bucket).Objects(ctx, query)
// Create a buffered channel to receive errors from goroutines
errChan := make(chan error, 100)
// Determine the number of concurrent goroutines using CPU cores
numCores := runtime.NumCPU()
sem := make(chan struct{}, numCores) // Semaphore to limit concurrency
var wg sync.WaitGroup
// Iterate through objects with the specified prefix
for {
attrs, err := objectItr.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("error iterating through objects: %w", err)
}
wg.Add(1)
sem <- struct{}{} // Acquire a semaphore slot
go func(attrs *storage.ObjectAttrs) {
defer func() {
<-sem // Release the semaphore slot
wg.Done()
}()
if err := DeleteObjectOnGCS(ctx, client, attrs.Name); err != nil {
errChan <- fmt.Errorf("error deleting object %s: %w", attrs.Name, err)
}
}(attrs)
}
wg.Wait()
close(errChan)
var errs []error
for err := range errChan {
errs = append(errs, err)
}
return errors.Join(errs...)
}
func StatObject(ctx context.Context, client *storage.Client, object string) (*storage.ObjectAttrs, error) {
bucket, object := setup.GetBucketAndObjectBasedOnTypeOfMount(object)
attrs, err := client.Bucket(bucket).Object(object).Attrs(ctx)
if err != nil {
return nil, err
}
return attrs, nil
}
// UploadGcsObject uploads a local file to a specified GCS bucket and object.
// Handles gzip compression if requested.
func UploadGcsObject(ctx context.Context, client *storage.Client, localPath, bucketName, objectName string, uploadGzipEncoded bool) error {
// Create a writer to upload the object.
obj := client.Bucket(bucketName).Object(objectName)
w, err := NewWriter(ctx, obj, client)
if err != nil {
return fmt.Errorf("Failed to open writer for GCS object gs://%s/%s: %w", bucketName, objectName, err)
}
defer func() {
if err := w.Close(); err != nil {
log.Printf("Failed to close GCS object gs://%s/%s: %v", bucketName, objectName, err)
}
}()
filePathToUpload := localPath
// Set content encoding if gzip compression is needed.
if uploadGzipEncoded {
data, err := os.ReadFile(localPath)
if err != nil {
return err
}
content := string(data)
if filePathToUpload, err = operations.CreateLocalTempFile(content, true); err != nil {
return fmt.Errorf("failed to create local gzip file from %s for upload to bucket: %w", localPath, err)
}
defer func() {
if removeErr := os.Remove(filePathToUpload); removeErr != nil {
log.Printf("Error removing temporary gzip file %s: %v", filePathToUpload, removeErr)
}
}()
}
// Open the local file for reading.
f, err := operations.OpenFileAsReadonly(filePathToUpload)
if err != nil {
return fmt.Errorf("failed to open local file %s: %w", filePathToUpload, err)
}
defer operations.CloseFile(f)
// Copy the file contents to the object writer.
if _, err := io.Copy(w, f); err != nil {
return fmt.Errorf("failed to copy file %s to gs://%s/%s: %w", localPath, bucketName, objectName, err)
}
return nil
}
// Get the object size of the GCS object.
func GetGcsObjectSize(ctx context.Context, client *storage.Client, object string) (int64, error) {
attrs, err := StatObject(ctx, client, object)
if err != nil {
return -1, err
}
return attrs.Size, nil
}
// Clears cache-control attributes on given GCS object.
// Fails if the object doesn't exist or permission to modify object's metadata is not
// available.
func ClearCacheControlOnGcsObject(ctx context.Context, client *storage.Client, object string) error {
attrs, err := StatObject(ctx, client, object)
if err != nil {
return err
}
attrs.CacheControl = ""
return nil
}
func CopyFileInBucket(ctx context.Context, storageClient *storage.Client, srcfilePath, destFilePath, bucket string) {
err := UploadGcsObject(ctx, storageClient, srcfilePath, bucket, destFilePath, false)
if err != nil {
log.Fatalf("Error while copying file %q to GCS object \"gs://%s/%s\" : %v", srcfilePath, bucket, destFilePath, err)
}
}
func DeleteBucket(ctx context.Context, client *storage.Client, bucketName string) error {
bucket := client.Bucket(bucketName)
// Iterate through objects and delete them
query := &storage.Query{}
it := bucket.Objects(ctx, query)
for {
objAttrs, err := it.Next()
if err == iterator.Done {
break // No more objects
}
if err != nil {
log.Fatalf("Error iterating through objects: %v", err)
}
obj := bucket.Object(objAttrs.Name)
err = obj.Delete(ctx)
if err != nil {
log.Fatalf("Failed to delete object %s: %v", objAttrs.Name, err)
}
}
if err := bucket.Delete(ctx); err != nil {
log.Printf("Bucket(%q).Delete: %v", bucketName, err)
return err
}
return nil
}
func AppendableWriter(ctx context.Context, client *storage.Client, object string, precondition storage.Conditions) (*storage.Writer, error) {
bucket, object := setup.GetBucketAndObjectBasedOnTypeOfMount(object)
o := client.Bucket(bucket).Object(object)
if !reflect.DeepEqual(precondition, storage.Conditions{}) {
o = o.If(precondition)
}
// Upload an object with storage.Writer.
wc, err := NewWriter(ctx, o, client)
if err != nil {
return nil, fmt.Errorf("failed to open writer for object %q: %w", o.ObjectName(), err)
}
return wc, nil
}