cli_tools/common/utils/storage/storage_client.go (207 lines of code) (raw):
// Copyright 2019 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"regexp"
"strings"
"cloud.google.com/go/storage"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
daisy "github.com/GoogleCloudPlatform/compute-daisy"
"github.com/GoogleCloudPlatform/compute-image-tools/cli_tools/common/domain"
"github.com/GoogleCloudPlatform/compute-image-tools/cli_tools/common/utils/logging"
)
var (
bucketNameRegex = `[a-z0-9][-_.a-z0-9]*`
gsPathRegex = regexp.MustCompile(fmt.Sprintf(`^gs://(%s)(\/.*)?$`, bucketNameRegex))
slashCounterRegex = regexp.MustCompile("/")
)
// Client implements domain.StorageClientInterface. It implements main Storage functions
// used by image import features.
type Client struct {
StorageClient *storage.Client
Logger logging.Logger
Ctx context.Context
Oic domain.ObjectIteratorCreatorInterface
Soc domain.StorageObjectCreatorInterface
}
// NewStorageClient creates a Client
func NewStorageClient(ctx context.Context,
logger logging.Logger, option ...option.ClientOption) (*Client, error) {
client, err := storage.NewClient(ctx, option...)
if err != nil {
return nil, daisy.Errf("error creating storage client: %v", err)
}
sc := &Client{StorageClient: client, Ctx: ctx,
Oic: &ObjectIteratorCreator{ctx: ctx, sc: client}, Logger: logger}
sc.Soc = &storageObjectCreator{ctx: ctx, sc: client}
return sc, nil
}
// CreateBucket creates a GCS bucket
func (sc *Client) CreateBucket(
bucketName string, project string, attrs *storage.BucketAttrs) error {
if err := sc.StorageClient.Bucket(bucketName).Create(sc.Ctx, project, attrs); err != nil {
return daisy.Errf("Error creating bucket `%v` in project `%v`: %v", bucketName, project, err)
}
return nil
}
// Buckets returns a bucket iterator for all buckets within a project
func (sc *Client) Buckets(projectID string) *storage.BucketIterator {
return sc.StorageClient.Buckets(sc.Ctx, projectID)
}
// GetBucket returns a BucketHandle, which provides operations on the named bucket.
func (sc *Client) GetBucket(bucket string) *storage.BucketHandle {
return sc.StorageClient.Bucket(bucket)
}
// GetBucketAttrs returns bucket attributes for given bucket
func (sc *Client) GetBucketAttrs(bucket string) (*storage.BucketAttrs, error) {
bucketAttrs, err := sc.StorageClient.Bucket(bucket).Attrs(sc.Ctx)
if err != nil {
return nil, daisy.Errf("Error getting bucket attributes for bucket `%v`: %v", bucket, err)
}
return bucketAttrs, nil
}
// GetObject returns storage object for the given bucket and path
func (sc *Client) GetObject(bucket string, objectPath string) domain.StorageObject {
return sc.Soc.GetObject(bucket, objectPath)
}
// GetObjects returns object iterator for given bucket and path
func (sc *Client) GetObjects(bucket string, objectPath string) domain.ObjectIteratorInterface {
return sc.Oic.CreateObjectIterator(bucket, objectPath)
}
// GetObjectAttrs returns storage object attributes
func (sc *Client) GetObjectAttrs(bucket string, objectPath string) (*storage.ObjectAttrs, error) {
objectAttrs, err := sc.StorageClient.Bucket(bucket).Object(objectPath).Attrs(sc.Ctx)
if err != nil {
return nil, daisy.Errf("Error getting object attributes for object `%v\\%v`: %v", bucket, objectPath, err)
}
return objectAttrs, nil
}
// DeleteGcsPath deletes a GCS path, including files
func (sc *Client) DeleteGcsPath(gcsPath string) error {
bucketName, objectPath, err := SplitGCSPath(gcsPath)
if err != nil {
return err
}
log.Printf("Deleting content of: %v", gcsPath)
it := sc.GetObjects(bucketName, objectPath)
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return daisy.Errf("Error deleting Cloud Storage path `%v`: %v", gcsPath, err)
}
sc.Logger.User(fmt.Sprintf("Deleting gs://%v/%v", bucketName, attrs.Name))
if err := sc.GetObject(bucketName, attrs.Name).Delete(); err != nil {
return daisy.Errf("Error deleting Cloud Storage object `%v` in bucket `%v`: %v", attrs.Name, bucketName, err)
}
}
return nil
}
// DeleteObject deletes the object with the path `gcsPath`.
func (sc *Client) DeleteObject(gcsPath string) error {
bucketName, objectPath, err := SplitGCSPath(gcsPath)
if err != nil {
return daisy.Errf("Error deleting `%v`: `%v`", gcsPath, err)
}
if err := sc.GetObject(bucketName, objectPath).Delete(); err != nil {
return daisy.Errf("Error deleting `%v`: `%v`", gcsPath, err)
}
return nil
}
// FindGcsFile finds a file in a GCS directory path for given file extension. File extension can
// be a file name as well. The lookup is done recursively.
func (sc *Client) FindGcsFile(gcsDirectoryPath string, fileExtension string) (*storage.ObjectHandle, error) {
return sc.FindGcsFileDepthLimited(gcsDirectoryPath, fileExtension, -1)
}
// FindGcsFileDepthLimited finds a file in a GCS directory path for given file
// extension up to lookupDepth deep. If lookup should be only for files directly in
// gcsDirectoryPath, lookupDepth should be set as 0. For recursive lookup with
// no limitations on depth, lookupDepth should be -1
// File extension can be a file name as well.
func (sc *Client) FindGcsFileDepthLimited(gcsDirectoryPath string, fileExtension string, lookupDepth int) (*storage.ObjectHandle, error) {
bucketName, lookupPath, err := SplitGCSPath(gcsDirectoryPath)
if err != nil {
return nil, err
}
it := sc.GetObjects(bucketName, lookupPath)
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, daisy.Errf("Error finding file with extension `%v` in Cloud Storage directory `%v`: %v", fileExtension, gcsDirectoryPath, err)
}
if !isDepthValid(lookupDepth, lookupPath, attrs.Name) {
continue
}
if !strings.HasSuffix(attrs.Name, fileExtension) {
continue
}
sc.Logger.User(fmt.Sprintf("Found gs://%v/%v", bucketName, attrs.Name))
return sc.GetBucket(bucketName).Object(attrs.Name), nil
}
return nil, daisy.Errf(
"path %v doesn't contain a file with %v extension", gcsDirectoryPath, fileExtension)
}
func isDepthValid(lookupDepth int, lookupPath, objectPath string) bool {
if lookupDepth <= -1 {
return true
}
if strings.HasSuffix(lookupPath, "/") {
lookupPath = lookupPath[:len(lookupPath)-1]
}
lookupPathDepth := 0
if len(lookupPath) > 0 {
// lookup path is a "folder", have to count all elements as one level, thus the +1
lookupPathDepth = 1 + getSlashCount(lookupPath)
}
// objectPath refers to an object so its path depth is one less than if it was a folder, thus no +1
objectDepth := getSlashCount(objectPath)
return objectDepth-lookupPathDepth <= lookupDepth
}
func getSlashCount(path string) int {
return len(slashCounterRegex.FindAllStringIndex(path, -1))
}
// GetGcsFileContent returns content of a GCS object as byte array
func (sc *Client) GetGcsFileContent(gcsObject *storage.ObjectHandle) ([]byte, error) {
reader, err := gcsObject.NewReader(sc.Ctx)
if err != nil {
return nil, daisy.Errf("Error getting Cloud Storage file content: %v", err)
}
return ioutil.ReadAll(reader)
}
// WriteToGCS writes content from a reader to destination bucket and path
func (sc *Client) WriteToGCS(
destinationBucketName string, destinationObjectPath string, reader io.Reader) error {
destinationBucket := sc.GetBucket(destinationBucketName)
fileWriter := destinationBucket.Object(destinationObjectPath).NewWriter(sc.Ctx)
if _, err := io.Copy(fileWriter, reader); err != nil {
return daisy.Errf("Error writing to Cloud Storage file path `%v` in bucket `%v`: %v", destinationObjectPath, destinationBucketName, err)
}
return fileWriter.Close()
}
// Close closes the Client.
//
// Close need not be called at program exit.
func (sc *Client) Close() error {
if err := sc.StorageClient.Close(); err != nil {
return daisy.Errf("Error closing storage client: %v", err)
}
return nil
}
// SplitGCSPath splits GCS path into bucket and object path portions
func SplitGCSPath(p string) (string, string, error) {
matches := gsPathRegex.FindStringSubmatch(p)
if matches != nil {
return matches[1], strings.TrimLeft(matches[2], "/"), nil
}
return "", "", daisy.Errf("%q is not a valid Cloud Storage path", p)
}
// ConcatGCSPath concatenates multiple elements of GCS path into a GCS path.
func ConcatGCSPath(pathElements ...string) string {
path := ""
for i, pathElement := range pathElements {
path += pathElement
if i != len(pathElements)-1 && !strings.HasSuffix(pathElement, "/") {
path += "/"
}
}
return path
}
// GetGCSObjectPathElements returns bucket name, object path within the bucket
// for a valid object path. Error is returned otherwise.
func GetGCSObjectPathElements(p string) (string, string, error) {
bucket, object, err := SplitGCSPath(p)
if err != nil || bucket == "" || object == "" {
return "", "", daisy.Errf("%q is not a valid Cloud Storage object path", p)
}
return bucket, object, err
}
// GetBucketNameFromGCSPath splits GCS path to get bucket name
func GetBucketNameFromGCSPath(p string) (string, error) {
bucket, _, err := SplitGCSPath(p)
return bucket, err
}
// HTTPClient implements domain.HTTPClientInterface which abstracts HTTP functionality used by
// image import features.
type HTTPClient struct {
httpClient *http.Client
}
// Get executes HTTP GET request for given URL
func (hc *HTTPClient) Get(url string) (resp *http.Response, err error) {
return hc.httpClient.Get(url)
}