registry/storage/driver/gcs/common.go (243 lines of code) (raw):
package gcs
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand/v2"
"net/http"
"os"
"reflect"
"regexp"
"runtime"
"strconv"
"time"
// nolint: revive,gosec // imports-blocklist
"cloud.google.com/go/storage"
"github.com/benbjohnson/clock"
"github.com/docker/distribution/registry/internal"
dstorage "github.com/docker/distribution/registry/storage"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
"github.com/docker/distribution/registry/storage/driver/factory"
"github.com/docker/distribution/registry/storage/driver/internal/parse"
"github.com/docker/distribution/registry/storage/internal/metrics"
"github.com/docker/distribution/version"
"github.com/sirupsen/logrus"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"golang.org/x/oauth2/jwt"
"google.golang.org/api/googleapi"
"google.golang.org/api/option"
)
const (
driverName = "gcs"
uploadSessionContentType = "application/x-docker-upload-session"
minChunkSize int64 = 256 * 1024
defaultChunkSize = 20 * minChunkSize
defaultMaxConcurrency = 50
minConcurrency = 25
maxDeleteConcurrency = 150
maxWalkConcurrency = 100
maxTries = 5
)
// customGitlabGoogle... are the query params appended to gcs signed redirect url
const (
customGitlabGoogleNamespaceIdParam = "x-goog-custom-audit-gitlab-namespace-id"
customGitlabGoogleProjectIdParam = "x-goog-custom-audit-gitlab-project-id"
customGitlabGoogleAuthTypeParam = "x-goog-custom-audit-gitlab-auth-type"
customGitlabGoogleObjectSizeParam = "x-goog-custom-audit-gitlab-size-bytes"
)
const registryGCSDriverEnv = "REGISTRY_GCS_DRIVER"
var rangeHeader = regexp.MustCompile(`^bytes=([0-9])+-([0-9]+)$`)
// customParamKeys is the mapping between gitlab keys to gcs signed-redirect-url query parameter keys
var customParamKeys = map[string]string{
dstorage.NamespaceIdKey: customGitlabGoogleNamespaceIdParam,
dstorage.ProjectIdKey: customGitlabGoogleProjectIdParam,
dstorage.AuthTypeKey: customGitlabGoogleAuthTypeParam,
dstorage.SizeBytesKey: customGitlabGoogleObjectSizeParam,
}
// for testing purposes
var systemClock internal.Clock = clock.New()
func init() {
factory.Register(driverName, &gcsDriverFactory{})
}
// driverParameters is a struct that encapsulates all the driver parameters after all values have been set
type driverParameters struct {
bucket string
email string
privateKey []byte
client *http.Client
storageClient *storage.Client
rootDirectory string
chunkSize int64
// maxConcurrency limits the number of concurrent driver operations
// to GCS, which ultimately increases reliability of many simultaneous
// pushes by ensuring we aren't DoSing our own server with many
// connections.
maxConcurrency uint64
// parallelWalk enables or disables concurrently walking the filesystem.
parallelWalk bool
}
// gcsDriverFactory implements the factory.StorageDriverFactory interface
type gcsDriverFactory struct{}
// Create StorageDriver from parameters
func (*gcsDriverFactory) Create(parameters map[string]any) (storagedriver.StorageDriver, error) {
return FromParameters(parameters)
}
// Wrapper wraps `driver` with a throttler, ensuring that no more than N
// GCS actions can occur concurrently. The default limit is 75.
type Wrapper struct {
baseEmbed
}
// GCSBucketKey returns the GCS bucket key for the given storage driver path.
func (d *Wrapper) GCSBucketKey(path string) string {
// This is currently used exclusively by the Google Cloud CDN middleware.
// During an online migration we have to maintain two separate storage
// drivers, each with a different root directory. Because of that we have
// no other option than hand over the object full path construction to the
// underlying GCS driver, instead of manually concatenating the CDN
// endpoint with the object path.
baseDriver := d.StorageDriver.(*base.Regulator).StorageDriver
// Type assertion to handle both driver types
switch typedDriver := baseDriver.(type) {
case *driverNext:
return typedDriver.pathToKey(path)
case *driver:
return typedDriver.pathToKey(path)
default:
// Handle unexpected driver type or fail gracefully. Depending on
// requirements, we might want to log this or handle differently
return path
}
}
type baseEmbed struct {
base.Base
}
type request func() error
func retry(req request) error {
backoff := time.Second
var err error
for i := 0; i < maxTries; i++ {
err = req()
if err == nil {
return nil
}
// Context cancelation/expiry is fatal, do not try to retry:
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return err
}
gErr := new(googleapi.Error)
if !errors.As(err, &gErr) || (gErr.Code != http.StatusTooManyRequests && gErr.Code < http.StatusInternalServerError) {
return err
}
if gErr.Code == http.StatusTooManyRequests {
metrics.StorageRatelimit()
}
// nolint:gosec // this is just a random number for rety backoff
time.Sleep(backoff - time.Second + (time.Duration(rand.Int32N(1000)) * time.Millisecond))
if i <= 4 {
backoff *= 2
}
}
var gerr *googleapi.Error
if ok := errors.As(err, &gerr); ok && gerr.Code == http.StatusTooManyRequests {
return storagedriver.TooManyRequestsError{Cause: err}
}
return err
}
// FromParameters constructs a new Driver with a given parameters map
// Required parameters:
// - bucket
func FromParameters(parameters map[string]any) (storagedriver.StorageDriver, error) {
useNext := os.Getenv(registryGCSDriverEnv) == "next"
params, err := parseParameters(parameters, useNext)
if err != nil {
return nil, err
}
if useNext {
logrus.Warn("Using next-gen GCS driver")
return NewNext(params)
}
logrus.Warn("Using legacy GCS driver")
return New(params)
}
func parseParameters(parameters map[string]any, useNext bool) (*driverParameters, error) {
bucket, ok := parameters["bucket"]
if !ok || fmt.Sprint(bucket) == "" {
return nil, fmt.Errorf("no bucket parameter provided")
}
rootDirectory, ok := parameters["rootdirectory"]
if !ok {
rootDirectory = ""
}
chunkSize := defaultChunkSize
chunkSizeParam, ok := parameters["chunksize"]
if ok {
switch v := chunkSizeParam.(type) {
case string:
vv, err := strconv.ParseInt(v, 10, 64)
if err != nil {
return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam)
}
chunkSize = vv
case int, uint, int32, uint32, uint64, int64:
chunkSize = reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int()
default:
return nil, fmt.Errorf("invalid valud for chunksize: %#v", chunkSizeParam)
}
if chunkSize < minChunkSize {
return nil, fmt.Errorf("the chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize)
}
if chunkSize%minChunkSize != 0 {
return nil, fmt.Errorf("chunksize should be a multiple of %d", minChunkSize)
}
}
var ts oauth2.TokenSource
jwtConf := new(jwt.Config)
if keyfile, ok := parameters["keyfile"]; ok {
jsonKey, err := os.ReadFile(fmt.Sprint(keyfile))
if err != nil {
return nil, err
}
jwtConf, err = google.JWTConfigFromJSON(jsonKey, storage.ScopeFullControl)
if err != nil {
return nil, err
}
ts = jwtConf.TokenSource(context.Background())
} else if credentials, ok := parameters["credentials"]; ok {
credentialMap, ok := credentials.(map[any]any)
if !ok {
return nil, fmt.Errorf("the credentials were not specified in the correct format")
}
stringMap := make(map[string]any, 0)
for k, v := range credentialMap {
key, ok := k.(string)
if !ok {
return nil, fmt.Errorf("one of the credential keys was not a string: %s", fmt.Sprint(k))
}
stringMap[key] = v
}
data, err := json.Marshal(stringMap)
if err != nil {
return nil, fmt.Errorf("failed to marshal gcs credentials to json")
}
jwtConf, err = google.JWTConfigFromJSON(data, storage.ScopeFullControl)
if err != nil {
return nil, err
}
ts = jwtConf.TokenSource(context.Background())
} else {
var err error
ts, err = google.DefaultTokenSource(context.Background(), storage.ScopeFullControl)
if err != nil {
return nil, err
}
}
maxConcurrency, err := base.GetLimitFromParameter(parameters["maxconcurrency"], minConcurrency, defaultMaxConcurrency)
if err != nil {
return nil, fmt.Errorf("maxconcurrency config error: %s", err)
}
opts := []option.ClientOption{option.WithTokenSource(ts)}
if useNext {
// NOTE(prozlach): By default, reads are made using the Cloud Storage XML
// API. GCS SDK recommends using the JSON API instead, which is done
// here by setting WithJSONReads. This ensures consistency with other
// client operations, which all use JSON. JSON will become the default
// in a future release of GCS SDK. We only enable it for GCS next.
// https://cloud.google.com/go/docs/reference/cloud.google.com/go/storage/latest#cloud_google_com_go_storage_WithJSONReads
opts = append(opts, storage.WithJSONReads())
if userAgent, ok := parameters["useragent"]; ok {
if ua, ok := userAgent.(string); ok && ua != "" {
opts = append(opts, option.WithUserAgent(ua))
} else {
userAgent := fmt.Sprintf("container-registry %s (%s)", version.Version, runtime.Version())
opts = append(opts, option.WithUserAgent(userAgent))
}
}
}
storageClient, err := storage.NewClient(context.Background(), opts...)
if err != nil {
return nil, fmt.Errorf("storage client error: %s", err)
}
parallelWalkBool, err := parse.Bool(parameters, "parallelwalk", false)
if err != nil {
return nil, err
}
return &driverParameters{
bucket: fmt.Sprint(bucket),
rootDirectory: fmt.Sprint(rootDirectory),
email: jwtConf.Email,
privateKey: jwtConf.PrivateKey,
client: oauth2.NewClient(context.Background(), ts),
storageClient: storageClient,
chunkSize: chunkSize,
maxConcurrency: maxConcurrency,
parallelWalk: parallelWalkBool,
}, nil
}