lib/backend/manager.go (128 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package backend
import (
"errors"
"fmt"
"regexp"
"github.com/uber/kraken/lib/backend/backenderrors"
"github.com/uber/kraken/utils/bandwidth"
"github.com/uber/kraken/utils/log"
"github.com/uber-go/tally"
)
// Manager errors.
var (
ErrNamespaceNotFound = errors.New("no matches for namespace")
)
type backend struct {
regexp *regexp.Regexp
client Client
mustReady bool
}
func newBackend(namespace string, c Client, mustReady bool) (*backend, error) {
re, err := regexp.Compile(namespace)
if err != nil {
return nil, fmt.Errorf("regexp: %s", err)
}
return &backend{
regexp: re,
client: c,
mustReady: mustReady,
}, nil
}
// Manager manages backend clients for namespace regular expressions.
type Manager struct {
backends []*backend
}
// ManagerConfig is config for backend manager.
type ManagerConfig struct {
Log log.Config `yaml:"log"`
}
// NewManager creates a new backend Manager.
func NewManager(managerConfig ManagerConfig, configs []Config, auth AuthConfig, stats tally.Scope) (*Manager, error) {
logger, err := log.New(managerConfig.Log, nil)
if err != nil {
return nil, fmt.Errorf("log: %s", err)
}
slogger := logger.Sugar()
var backends []*backend
for _, config := range configs {
config = config.applyDefaults()
var c Client
if len(config.Backend) != 1 {
return nil, fmt.Errorf("no backend or more than one backend configured")
}
var backendName string
var backendConfig interface{}
for backendName, backendConfig = range config.Backend { // Pull the only key/value out of map
}
factory, err := getFactory(backendName)
if err != nil {
return nil, fmt.Errorf("get backend client factory: %s", err)
}
c, err = factory.Create(backendConfig, auth, stats, slogger)
if err != nil {
return nil, fmt.Errorf("create backend client: %s", err)
}
if config.Bandwidth.Enable {
l, err := bandwidth.NewLimiter(config.Bandwidth)
if err != nil {
return nil, fmt.Errorf("bandwidth: %s", err)
}
c = throttle(c, l)
}
b, err := newBackend(config.Namespace, c, config.MustReady)
if err != nil {
return nil, fmt.Errorf("new backend for namespace %s: %s", config.Namespace, err)
}
backends = append(backends, b)
}
return &Manager{backends}, nil
}
// AdjustBandwidth adjusts bandwidth limits across all throttled clients to the
// originally configured bandwidth divided by denominator.
func (m *Manager) AdjustBandwidth(denominator int) error {
for _, b := range m.backends {
tc, ok := b.client.(*ThrottledClient)
if !ok {
continue
}
if err := tc.adjustBandwidth(denominator); err != nil {
return err
}
log.With(
"namespace", b.regexp.String(),
"ingress", tc.IngressLimit(),
"egress", tc.EgressLimit(),
"denominator", denominator).Info("Adjusted backend bandwidth")
}
return nil
}
// Register dynamically registers a namespace with a provided client. Register
// should be primarily used for testing purposes -- normally, namespaces should
// be statically configured and provided upon construction of the Manager.
func (m *Manager) Register(namespace string, c Client, mustReady bool) error {
for _, b := range m.backends {
if b.regexp.String() == namespace {
return fmt.Errorf("namespace %s already exists", namespace)
}
}
b, err := newBackend(namespace, c, mustReady)
if err != nil {
return fmt.Errorf("new backend: %s", err)
}
m.backends = append(m.backends, b)
return nil
}
// GetClient matches namespace to the configured Client. Returns ErrNamespaceNotFound
// if no clients match namespace.
func (m *Manager) GetClient(namespace string) (Client, error) {
if namespace == NoopNamespace {
return NoopClient{}, nil
}
for _, b := range m.backends {
if b.regexp.MatchString(namespace) {
return b.client, nil
}
}
return nil, ErrNamespaceNotFound
}
// CheckReadiness returns whether the backends are ready (available).
// A backend must be explicitly configured as required for readiness to be checked.
func (m *Manager) CheckReadiness() error {
for _, b := range m.backends {
if !b.mustReady {
continue
}
_, err := b.client.Stat(ReadinessCheckNamespace, ReadinessCheckName)
if err != nil && err != backenderrors.ErrBlobNotFound {
return fmt.Errorf("backend for namespace '%s' not ready: %s", b.regexp.String(), err)
}
}
return nil
}