oracle/pkg/util/utils.go (140 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"compress/gzip"
"context"
"fmt"
"io"
"os"
"strings"
"cloud.google.com/go/storage"
"google.golang.org/api/iterator"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
)
const (
GSPrefix = "gs://"
contentTypeGZ = "application/gzip"
)
// GCSUtil contains helper methods for reading/writing GCS objects.
type GCSUtil interface {
// Download returns an io.ReadCloser for GCS object at given gcsPath.
Download(ctx context.Context, gcsPath string) (io.ReadCloser, error)
// Delete deletes all objects under given gcsPath
Delete(ctx context.Context, gcsPath string) error
// UploadFile uploads contents of a file at filepath to gcsPath location in
// GCS and sets object's contentType.
// If gcsPath ends with .gz it also compresses the uploaded contents
// and sets object's content type to application/gzip.
UploadFile(ctx context.Context, gcsPath, filepath, contentType string) error
// SplitURI takes a GCS URI and splits it into bucket and object names. If the URI does not have
// the gs:// scheme, or the URI doesn't specify both a bucket and an object name, returns an error.
SplitURI(url string) (bucket, name string, err error)
}
type GCSUtilImpl struct{}
func (g *GCSUtilImpl) Download(ctx context.Context, gcsPath string) (io.ReadCloser, error) {
bucket, name, err := g.SplitURI(gcsPath)
if err != nil {
return nil, err
}
client, err := storage.NewClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to init GCS client: %v", err)
}
defer client.Close()
reader, err := client.Bucket(bucket).Object(name).NewReader(ctx)
if err != nil {
return nil, fmt.Errorf("failed to read URL %s: %v", gcsPath, err)
}
return reader, nil
}
func (g *GCSUtilImpl) UploadFile(ctx context.Context, gcsPath, filePath, contentType string) error {
return retry.OnError(retry.DefaultBackoff, func(err error) bool {
klog.ErrorS(err, "failed to upload a file")
// tried to cast err to *googleapi.Error with errors.As and wrap the error
// in uploadFile. returned err is not a *googleapi.Error.
return err != nil && strings.Contains(err.Error(), "compute: Received 500 ")
}, func() error {
return g.uploadFile(ctx, gcsPath, filePath, contentType)
})
}
func (g *GCSUtilImpl) uploadFile(ctx context.Context, gcsPath, filePath, contentType string) error {
bucket, name, err := g.SplitURI(gcsPath)
if err != nil {
return err
}
f, err := os.Open(filePath)
if err != nil {
return err
}
defer func() {
if err := f.Close(); err != nil {
klog.Warningf("failed to close %v: %v", f, err)
}
}()
client, err := storage.NewClient(ctx)
if err != nil {
return fmt.Errorf("failed to init GCS client: %v", err)
}
defer client.Close()
b := client.Bucket(bucket)
// check if bucket exists and it is accessible
if _, err := b.Attrs(ctx); err != nil {
return err
}
gcsWriter := b.Object(name).NewWriter(ctx)
gcsWriter.ContentType = contentType
defer gcsWriter.Close()
var writer io.WriteCloser = gcsWriter
if strings.HasSuffix(gcsPath, ".gz") {
gcsWriter.ContentType = contentTypeGZ
writer = gzip.NewWriter(gcsWriter)
defer writer.Close()
}
_, err = io.Copy(writer, f)
if err != nil {
return fmt.Errorf("failed to write file %s to %s: %v", filePath, gcsPath, err)
}
return nil
}
func (g *GCSUtilImpl) SplitURI(url string) (bucket, name string, err error) {
u := strings.TrimPrefix(url, GSPrefix)
if u == url {
return "", "", fmt.Errorf("URL %q is missing the %q prefix", url, GSPrefix)
}
if i := strings.Index(u, "/"); i >= 2 {
return u[:i], u[i+1:], nil
}
return "", "", fmt.Errorf("URL %q does not specify a bucket and a name", url)
}
func (g *GCSUtilImpl) Delete(ctx context.Context, gcsPath string) error {
bucket, prefix, err := g.SplitURI(gcsPath)
if err != nil {
return err
}
client, err := storage.NewClient(ctx)
if err != nil {
return fmt.Errorf("failed to init GCS client: %v", err)
}
defer client.Close()
it := client.Bucket(bucket).Objects(ctx, &storage.Query{
Prefix: prefix,
})
for {
objAttrs, err := it.Next()
if err != nil && err != iterator.Done {
return fmt.Errorf("Bucket(%q).Objects(): %v", bucket, err)
}
if err == iterator.Done {
break
}
if err := client.Bucket(bucket).Object(objAttrs.Name).Delete(ctx); err != nil {
return fmt.Errorf("failed to Delete object(%s): %v", objAttrs.Name, err)
}
}
return nil
}
// Contains check whether given "elem" presents in "array"
func Contains(array []string, elem string) bool {
for _, v := range array {
if v == elem {
return true
}
}
return false
}
// Filter Returns a slice that doesn't contain element
func Filter(slice []string, element string) []string {
//This implementation isn't the fastest, but it protects against slices containing a single element.
result := make([]string, 0, len(slice))
for _, s := range slice {
if s != element {
result = append(result, s)
}
}
return result
}