pkg/fs/remote/aws/s3.go (139 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Package aws provides an AWS S3 file system implementation. package aws import ( "context" "fmt" "io" "path" "strings" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/apache/skywalking-banyandb/pkg/fs/remote" ) // todo: Maybe we can bring in minio, oss type s3FS struct { client *s3.Client bucket string basePath string checksumAlgorithm types.ChecksumAlgorithm storageClass types.StorageClass } // NewFS creates a new instance of the file system for accessing S3 storage. func NewFS(path string, userConfig *remote.FsConfig) (remote.FS, error) { bucket, basePath := extractBucketAndBase(path) if bucket == "" { return nil, fmt.Errorf("bucket name not provided") } if userConfig == nil { return nil, fmt.Errorf("userConfig is nil") } opts := buildAWSCfgOptions(userConfig) awsCfg, err := config.LoadDefaultConfig(context.TODO(), opts...) if err != nil { return nil, fmt.Errorf("load AWS config: %w", err) } client := s3.NewFromConfig(awsCfg) fs := &s3FS{ client: client, bucket: bucket, basePath: basePath, } if userConfig.S3ChecksumAlgorithm != "" { fs.checksumAlgorithm = types.ChecksumAlgorithm(userConfig.S3ChecksumAlgorithm) } if userConfig.S3StorageClass != "" { fs.storageClass = types.StorageClass(userConfig.S3StorageClass) } return fs, nil } func buildAWSCfgOptions(userConfig *remote.FsConfig) []func(*config.LoadOptions) error { opts := []func(*config.LoadOptions) error{ config.WithClientLogMode(aws.LogRetries), } if userConfig.S3ProfileName != "" { opts = append(opts, config.WithSharedConfigProfile(userConfig.S3ProfileName)) } if userConfig.S3ConfigFilePath != "" { opts = append(opts, config.WithSharedConfigFiles([]string{userConfig.S3ConfigFilePath})) } if userConfig.S3CredentialFilePath != "" { opts = append(opts, config.WithSharedCredentialsFiles([]string{userConfig.S3CredentialFilePath})) } return opts } func extractBucketAndBase(path string) (bucket, basePath string) { trimmedPath := strings.Trim(path, "/") if trimmedPath == "" { return "", "" } parts := strings.SplitN(trimmedPath, "/", 2) bucket = parts[0] if len(parts) > 1 { basePath = parts[1] } return } func (s *s3FS) getFullPath(p string) string { if s.basePath == "" { return p } return path.Join(s.basePath, p) } func (s *s3FS) Upload(ctx context.Context, path string, data io.Reader) error { key := s.getFullPath(path) _, err := s.client.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(s.bucket), Key: aws.String(key), Body: data, ChecksumAlgorithm: s.checksumAlgorithm, StorageClass: s.storageClass, }) if err != nil { return err } return nil } func (s *s3FS) Download(ctx context.Context, path string) (io.ReadCloser, error) { key := s.getFullPath(path) resp, err := s.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(s.bucket), Key: aws.String(key), ChecksumMode: types.ChecksumModeEnabled, }) if err != nil { return nil, err } return resp.Body, nil } func (s *s3FS) List(ctx context.Context, prefix string) ([]string, error) { fullPrefix := s.getFullPath(prefix) var files []string paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{ Bucket: aws.String(s.bucket), Prefix: aws.String(fullPrefix), }) for paginator.HasMorePages() { page, err := paginator.NextPage(ctx) if err != nil { return nil, err } for _, obj := range page.Contents { key := *obj.Key if s.basePath != "" { key = strings.TrimPrefix(key, s.basePath+"/") } files = append(files, key) } } return files, nil } func (s *s3FS) Delete(ctx context.Context, path string) error { key := s.getFullPath(path) _, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: aws.String(s.bucket), Key: aws.String(key), }) return err } func (s *s3FS) Close() error { // No resources to close for S3 client return nil }