lib/backend/shadowbackend/client.go (206 lines of code) (raw):

// Copyright (c) 2016-2020 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package shadowbackend import ( "errors" "fmt" "io" "github.com/uber-go/tally" "github.com/uber/kraken/core" "github.com/uber/kraken/lib/backend" "github.com/uber/kraken/lib/backend/backenderrors" "github.com/uber/kraken/lib/backend/hdfsbackend" "github.com/uber/kraken/lib/backend/s3backend" "github.com/uber/kraken/lib/backend/sqlbackend" "github.com/uber/kraken/lib/backend/testfs" "github.com/uber/kraken/utils/log" "go.uber.org/zap" "gopkg.in/yaml.v2" ) const _shadow = "shadow" func init() { backend.Register(_shadow, &factory{}) } type factory struct{} func (f *factory) Name() string { return _shadow } func (f *factory) Create( confRaw interface{}, masterAuthConfig backend.AuthConfig, stats tally.Scope, _ *zap.SugaredLogger) (backend.Client, error) { confBytes, err := yaml.Marshal(confRaw) if err != nil { return nil, fmt.Errorf("marshal shadow config: %v", err) } var config Config if err := yaml.Unmarshal(confBytes, &config); err != nil { return nil, fmt.Errorf("unmarshal shadow config: %v", err) } return NewClient(config, masterAuthConfig, stats) } // Client implements a backend.Client for shadow mode. See the README for full details on what shadow mode means. type Client struct { cfg Config active backend.Client shadow backend.Client stats tally.Scope } // Option allows setting optional Client parameters. type Option func(*Client) // NewClient creates a new shadow Client func NewClient(config Config, masterAuthConfig backend.AuthConfig, stats tally.Scope) (*Client, error) { aAuthConfig, sAuthConfig, err := extractAuthConfigs(config, masterAuthConfig) if err != nil { return nil, err } a, err := getBackendClient(config.ActiveClientConfig, aAuthConfig, stats) if err != nil { return nil, err } s, err := getBackendClient(config.ShadowClientConfig, sAuthConfig, stats) if err != nil { return nil, err } return &Client{ cfg: config, active: a, shadow: s, stats: stats, }, nil } func extractAuthConfigs(config Config, masterAuthConfig backend.AuthConfig) (interface{}, interface{}, error) { aName, sName, err := getBackendNames(config) if err != nil { return nil, nil, err } aAuth, ok := masterAuthConfig[aName] if !ok { return nil, nil, fmt.Errorf("active backend auth config missing") } sAuth, ok := masterAuthConfig[sName] if !ok { return nil, nil, fmt.Errorf("shadow backend auth config missing") } return aAuth, sAuth, nil } func getBackendNames(config Config) (string, string, error) { if len(config.ActiveClientConfig) != 1 { return "", "", fmt.Errorf("no active backend or more than one active backend configured") } if len(config.ShadowClientConfig) != 1 { return "", "", fmt.Errorf("no shadow backend or more than one shadow backend configured") } var aName string var sName string for aName = range config.ActiveClientConfig { // Map should have only 1 key/value } for sName = range config.ShadowClientConfig { // Map should have only 1 key/value } return aName, sName, nil } func getBackendClient(backendConfig map[string]interface{}, authConfRaw interface{}, stats tally.Scope) (backend.Client, error) { var name string var confRaw interface{} // TODO Re-implementing all the factory functions is bad form, but because backends.getFactory isn't public there // is no way to access them currently. Opened https://github.com/uber/kraken/issues/213 to address this. for name, confRaw = range backendConfig { switch name { case "sql": confBytes, err := yaml.Marshal(confRaw) if err != nil { return nil, fmt.Errorf("marshal sql config: %s", err) } var config sqlbackend.Config if err := yaml.Unmarshal(confBytes, &config); err != nil { return nil, fmt.Errorf("unmarshal sql config: %s", err) } authConfBytes, err := yaml.Marshal(authConfRaw) var userAuth sqlbackend.UserAuthConfig if err := yaml.Unmarshal(authConfBytes, &userAuth); err != nil { return nil, fmt.Errorf("unmarshal sql auth config: %s", err) } return sqlbackend.NewClient(config, userAuth, stats) case "hdfs": confBytes, err := yaml.Marshal(confRaw) if err != nil { return nil, fmt.Errorf("marshal hdfs config: %s", err) } var config hdfsbackend.Config if err := yaml.Unmarshal(confBytes, &config); err != nil { return nil, fmt.Errorf("unmarshal hdfs config: %s", err) } return hdfsbackend.NewClient(config, stats) case "s3": confBytes, err := yaml.Marshal(confRaw) if err != nil { return nil, fmt.Errorf("marshal s3 config: %s", err) } var config s3backend.Config if err := yaml.Unmarshal(confBytes, &config); err != nil { return nil, fmt.Errorf("unmarshal s3 config: %s", err) } authConfBytes, err := yaml.Marshal(authConfRaw) var userAuth s3backend.UserAuthConfig if err := yaml.Unmarshal(authConfBytes, &userAuth); err != nil { return nil, fmt.Errorf("unmarshal s3 auth config: %s", err) } return s3backend.NewClient(config, userAuth, stats) case "testfs": confBytes, err := yaml.Marshal(confRaw) if err != nil { return nil, fmt.Errorf("marshal testfs config: %s", err) } var config testfs.Config if err := yaml.Unmarshal(confBytes, &config); err != nil { return nil, fmt.Errorf("unmarshal testfs config: %s", err) } return testfs.NewClient(config, stats) default: return nil, fmt.Errorf("unsupported backend type '%s'", name) } } return nil, nil } func isNotFoundErr(err error) bool { return err != nil && err == backenderrors.ErrBlobNotFound } // Stat returns a non-nil core.BlobInfo struct if the data exists, an error otherwise. func (c *Client) Stat(namespace string, name string) (*core.BlobInfo, error) { // read from both, fail if error from either res, errA := c.active.Stat(namespace, name) _, errS := c.shadow.Stat(namespace, name) if isNotFoundErr(errA) && isNotFoundErr(errS) { return nil, backenderrors.ErrBlobNotFound } if errA != nil || errS != nil { if errA != nil && errS == nil { log.Errorf("[Stat] error getting %s for namespace '%s' from active backend: %v", name, namespace, errA) return nil, errA } if errS != nil && errA == nil { log.Errorf("[Stat] error getting %s for namespace '%s' from shadow backend: %v", name, namespace, errS) return nil, errS } return nil, fmt.Errorf("[Stat] error in both backends for %s in namespace '%s'. active: '%v', shadow: '%v'", name, namespace, errA, errS) } return res, nil } // Download gets the data from the backend and then writes it to the output writer. func (c *Client) Download(namespace string, name string, dst io.Writer) error { err := c.active.Download(namespace, name, dst) return err } // Upload upserts the data into the backend. func (c *Client) Upload(namespace string, name string, src io.Reader) error { rs, ok := src.(io.ReadSeeker) if !ok { return errors.New("refusing upload: src does not implement io.Seeker") } // write to both, fail if write fails for any err := c.active.Upload(namespace, name, rs) if err != nil { return err } // Need to rewind the ReadSeeker here before the second upload if _, err := rs.Seek(0, io.SeekStart); err != nil { return err } err = c.shadow.Upload(namespace, name, rs) if err != nil { return err } return nil } // List lists names with start with prefix. func (c *Client) List(prefix string, opts ...backend.ListOption) (*backend.ListResult, error) { res, err := c.active.List(prefix, opts...) if err != nil { return nil, err } return res, nil }