pkg/cloud_provider/storage/storage.go (210 lines of code) (raw):
/*
Copyright 2018 The Kubernetes Authors.
Copyright 2022 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"context"
"errors"
"fmt"
"strings"
"time"
"cloud.google.com/go/iam"
"cloud.google.com/go/storage"
"golang.org/x/oauth2"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)
type ServiceBucket struct {
Project string
Name string
Location string
SizeBytes int64
Labels map[string]string
EnableUniformBucketLevelAccess bool
EnableHierarchicalNamespace bool
}
type Service interface {
CreateBucket(ctx context.Context, b *ServiceBucket) (*ServiceBucket, error)
GetBucket(ctx context.Context, b *ServiceBucket) (*ServiceBucket, error)
DeleteBucket(ctx context.Context, b *ServiceBucket) error
SetIAMPolicy(ctx context.Context, obj *ServiceBucket, member, roleName string) error
RemoveIAMPolicy(ctx context.Context, obj *ServiceBucket, member, roleName string) error
CheckBucketExists(ctx context.Context, obj *ServiceBucket) (bool, error)
Close()
}
type ServiceManager interface {
SetupService(ctx context.Context, ts oauth2.TokenSource) (Service, error)
SetupServiceWithDefaultCredential(ctx context.Context) (Service, error)
}
type gcsService struct {
storageClient *storage.Client
}
type gcsServiceManager struct{}
func NewGCSServiceManager() (ServiceManager, error) {
return &gcsServiceManager{}, nil
}
func (manager *gcsServiceManager) SetupService(ctx context.Context, ts oauth2.TokenSource) (Service, error) {
if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 30*time.Second, true, func(context.Context) (bool, error) {
if _, err := ts.Token(); err != nil {
klog.Errorf("error fetching initial token: %v", err)
return false, nil
}
return true, nil
}); err != nil {
return nil, err
}
client := oauth2.NewClient(ctx, ts)
storageClient, err := storage.NewClient(ctx, option.WithHTTPClient(client))
if err != nil {
return nil, err
}
return &gcsService{storageClient: storageClient}, nil
}
func (manager *gcsServiceManager) SetupServiceWithDefaultCredential(ctx context.Context) (Service, error) {
storageClient, err := storage.NewClient(ctx)
if err != nil {
return nil, err
}
return &gcsService{storageClient: storageClient}, nil
}
func (service *gcsService) CreateBucket(ctx context.Context, obj *ServiceBucket) (*ServiceBucket, error) {
klog.V(4).Infof("Creating bucket %q: project %q, location %q", obj.Name, obj.Project, obj.Location)
// Create the bucket
bkt := service.storageClient.Bucket(obj.Name)
bktAttrs := &storage.BucketAttrs{
Location: obj.Location,
Labels: obj.Labels,
UniformBucketLevelAccess: storage.UniformBucketLevelAccess{Enabled: obj.EnableUniformBucketLevelAccess},
HierarchicalNamespace: &storage.HierarchicalNamespace{Enabled: obj.EnableHierarchicalNamespace},
}
if err := bkt.Create(ctx, obj.Project, bktAttrs); err != nil {
return nil, fmt.Errorf("CreateBucket operation failed for bucket %q: %w", obj.Name, err)
}
// Check that the bucket exists
bucket, err := service.GetBucket(ctx, obj)
if err != nil {
return nil, fmt.Errorf("failed to get bucket %q after creation: %w", obj.Name, err)
}
return bucket, nil
}
func (service *gcsService) DeleteBucket(ctx context.Context, obj *ServiceBucket) error {
// Check that the bucket exists
_, err := service.GetBucket(ctx, obj)
if err != nil {
if IsNotExistErr(err) {
return nil
}
return fmt.Errorf("failed to get bucket %q before deletion: %w", obj.Name, err)
}
// Delete all objects in the bucket first
bkt := service.storageClient.Bucket(obj.Name)
it := bkt.Objects(ctx, nil)
for {
attrs, err := it.Next()
if errors.Is(err, iterator.Done) {
break
}
if err != nil {
return fmt.Errorf("failed to iterate next object: %w", err)
}
if err := bkt.Object(attrs.Name).Delete(ctx); err != nil {
return fmt.Errorf("failed to delete object %q: %w", attrs.Name, err)
}
}
// Delete the bucket
err = bkt.Delete(ctx)
if err != nil {
return fmt.Errorf("failed to delete bucket %q: %w", obj.Name, err)
}
return nil
}
func (service *gcsService) GetBucket(ctx context.Context, obj *ServiceBucket) (*ServiceBucket, error) {
bkt := service.storageClient.Bucket(obj.Name)
attrs, err := bkt.Attrs(ctx)
if err != nil {
klog.Errorf("Failed to get bucket %q: %v", obj.Name, err)
// returns the original error because func IsNotExistErr relies on the error message
return nil, err
}
if attrs != nil {
return cloudBucketToServiceBucket(attrs)
}
return nil, fmt.Errorf("failed to get bucket %q: got empty attrs", obj.Name)
}
func (service *gcsService) CheckBucketExists(ctx context.Context, obj *ServiceBucket) (bool, error) {
bkt := service.storageClient.Bucket(obj.Name)
_, err := bkt.Objects(ctx, &storage.Query{Prefix: ""}).Next()
if err == nil || errors.Is(err, iterator.Done) {
return true, nil
}
return false, err
}
func (service *gcsService) SetIAMPolicy(ctx context.Context, obj *ServiceBucket, member, roleName string) error {
bkt := service.storageClient.Bucket(obj.Name)
policy, err := bkt.IAM().Policy(ctx)
if err != nil {
return fmt.Errorf("failed to get bucket %q IAM policy: %w", obj.Name, err)
}
policy.Add(member, iam.RoleName(roleName))
if err := bkt.IAM().SetPolicy(ctx, policy); err != nil {
return fmt.Errorf("failed to set bucket %q IAM policy: %w", obj.Name, err)
}
return nil
}
func (service *gcsService) RemoveIAMPolicy(ctx context.Context, obj *ServiceBucket, member, roleName string) error {
bkt := service.storageClient.Bucket(obj.Name)
policy, err := bkt.IAM().Policy(ctx)
if err != nil {
return fmt.Errorf("failed to get bucket %q IAM policy: %w", obj.Name, err)
}
policy.Remove(member, iam.RoleName(roleName))
if err := bkt.IAM().SetPolicy(ctx, policy); err != nil {
return fmt.Errorf("failed to set bucket %q IAM policy: %w", obj.Name, err)
}
return nil
}
func (service *gcsService) Close() {
service.storageClient.Close()
}
func cloudBucketToServiceBucket(attrs *storage.BucketAttrs) (*ServiceBucket, error) {
return &ServiceBucket{
Location: attrs.Location,
Name: attrs.Name,
Labels: attrs.Labels,
}, nil
}
func CompareBuckets(a, b *ServiceBucket) error {
mismatches := []string{}
if a.Name != b.Name {
mismatches = append(mismatches, "bucket name")
}
if a.Project != b.Project {
mismatches = append(mismatches, "bucket project")
}
if a.Location != b.Location {
mismatches = append(mismatches, "bucket location")
}
if a.SizeBytes != b.SizeBytes {
mismatches = append(mismatches, "bucket size")
}
if len(mismatches) > 0 {
return fmt.Errorf("bucket %q and bucket %q do not match: [%s]", a.Name, b.Name, strings.Join(mismatches, ", "))
}
return nil
}
func IsNotExistErr(err error) bool {
return errors.Is(err, storage.ErrBucketNotExist)
}
func isPermissionDeniedErr(err error) bool {
return strings.Contains(err.Error(), "googleapi: Error 403")
}
func isCanceledErr(err error) bool {
return strings.Contains(err.Error(), "context canceled") || strings.Contains(err.Error(), "context deadline exceeded")
}
// ParseErrCode parses error and returns a gRPC code.
func ParseErrCode(err error) codes.Code {
code := codes.Internal
if IsNotExistErr(err) {
code = codes.NotFound
}
if isPermissionDeniedErr(err) {
code = codes.PermissionDenied
}
if isCanceledErr(err) {
code = codes.Aborted
}
return code
}