origin/blobclient/cluster_client.go (234 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package blobclient
import (
"errors"
"fmt"
"io"
"math/rand"
"net/http"
"sync"
"time"
"github.com/cenkalti/backoff"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/backend"
"github.com/uber/kraken/lib/hostlist"
"github.com/uber/kraken/utils/errutil"
"github.com/uber/kraken/utils/httputil"
"github.com/uber/kraken/utils/log"
)
// Locations queries cluster for the locations of d.
func Locations(p Provider, cluster hostlist.List, d core.Digest) (locs []string, err error) {
addrs := cluster.Resolve().Sample(3)
if len(addrs) == 0 {
return nil, errors.New("cluster is empty")
}
for addr := range addrs {
locs, err = p.Provide(addr).Locations(d)
if err != nil {
continue
}
break
}
return locs, err
}
// ClientResolver resolves digests into Clients of origins.
type ClientResolver interface {
// Resolve must return an ordered, stable, non-empty list of Clients for origins owning d.
Resolve(d core.Digest) ([]Client, error)
}
type clientResolver struct {
provider Provider
cluster hostlist.List
}
// NewClientResolver returns a new client resolver.
func NewClientResolver(p Provider, cluster hostlist.List) ClientResolver {
return &clientResolver{p, cluster}
}
func (r *clientResolver) Resolve(d core.Digest) ([]Client, error) {
locs, err := Locations(r.provider, r.cluster, d)
if err != nil {
return nil, err
}
var clients []Client
for _, loc := range locs {
clients = append(clients, r.provider.Provide(loc))
}
return clients, nil
}
// ClusterClient defines a top-level origin cluster client which handles blob
// location resolution and retries.
type ClusterClient interface {
CheckReadiness() error
UploadBlob(namespace string, d core.Digest, blob io.Reader) error
DownloadBlob(namespace string, d core.Digest, dst io.Writer) error
GetMetaInfo(namespace string, d core.Digest) (*core.MetaInfo, error)
Stat(namespace string, d core.Digest) (*core.BlobInfo, error)
OverwriteMetaInfo(d core.Digest, pieceLength int64) error
Owners(d core.Digest) ([]core.PeerContext, error)
ReplicateToRemote(namespace string, d core.Digest, remoteDNS string) error
}
type clusterClient struct {
resolver ClientResolver
}
// NewClusterClient returns a new ClusterClient.
func NewClusterClient(r ClientResolver) ClusterClient {
return &clusterClient{r}
}
// defaultPollBackOff returns the default backoff used on Poll operations.
func (c *clusterClient) defaultPollBackOff() backoff.BackOff {
return &backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: 0.05,
Multiplier: 1.3,
MaxInterval: 5 * time.Second,
MaxElapsedTime: 15 * time.Minute,
Clock: backoff.SystemClock,
}
}
func (c *clusterClient) CheckReadiness() error {
clients, err := c.resolver.Resolve(backend.ReadinessCheckDigest)
if err != nil {
return fmt.Errorf("resolve clients: %s", err)
}
randIdx := rand.Intn(len(clients))
return clients[randIdx].CheckReadiness()
}
// UploadBlob uploads blob to origin cluster. See Client.UploadBlob for more details.
func (c *clusterClient) UploadBlob(namespace string, d core.Digest, blob io.Reader) (err error) {
clients, err := c.resolver.Resolve(d)
if err != nil {
return fmt.Errorf("resolve clients: %s", err)
}
// We prefer the origin with highest hashing score so the first origin will handle
// replication to origins with lower score. This is because we want to reduce upload
// conflicts between local replicas.
for _, client := range clients {
err = client.UploadBlob(namespace, d, blob)
// Allow retry on another origin if the current upstream is temporarily
// unavailable or under high load.
if httputil.IsNetworkError(err) || httputil.IsRetryable(err) {
continue
}
break
}
return err
}
// GetMetaInfo returns the metainfo for d. Does not handle polling.
func (c *clusterClient) GetMetaInfo(namespace string, d core.Digest) (mi *core.MetaInfo, err error) {
clients, err := c.resolver.Resolve(d)
if err != nil {
return nil, fmt.Errorf("resolve clients: %s", err)
}
for _, client := range clients {
mi, err = client.GetMetaInfo(namespace, d)
// Do not try the next replica on 202 errors.
if err != nil && !httputil.IsAccepted(err) {
continue
}
break
}
return mi, err
}
// Stat checks availability of a blob in the cluster.
func (c *clusterClient) Stat(namespace string, d core.Digest) (bi *core.BlobInfo, err error) {
clients, err := c.resolver.Resolve(d)
if err != nil {
return nil, fmt.Errorf("resolve clients: %s", err)
}
shuffle(clients)
for _, client := range clients {
bi, err = client.Stat(namespace, d)
if err != nil {
continue
}
break
}
return bi, err
}
// OverwriteMetaInfo overwrites existing metainfo for d with new metainfo configured
// with pieceLength on every origin server. Returns error if any origin was unable
// to overwrite metainfo. Primarly intended for benchmarking purposes.
func (c *clusterClient) OverwriteMetaInfo(d core.Digest, pieceLength int64) error {
clients, err := c.resolver.Resolve(d)
if err != nil {
return fmt.Errorf("resolve clients: %s", err)
}
var errs []error
for _, client := range clients {
if err := client.OverwriteMetaInfo(d, pieceLength); err != nil {
errs = append(errs, fmt.Errorf("origin %s: %s", client.Addr(), err))
}
}
return errutil.Join(errs)
}
// DownloadBlob pulls a blob from the origin cluster.
func (c *clusterClient) DownloadBlob(namespace string, d core.Digest, dst io.Writer) error {
err := Poll(c.resolver, c.defaultPollBackOff(), d, func(client Client) error {
return client.DownloadBlob(namespace, d, dst)
})
if httputil.IsNotFound(err) {
err = ErrBlobNotFound
}
return err
}
// Owners returns the origin peers which own d.
func (c *clusterClient) Owners(d core.Digest) ([]core.PeerContext, error) {
clients, err := c.resolver.Resolve(d)
if err != nil {
return nil, fmt.Errorf("resolve clients: %s", err)
}
var mu sync.Mutex
var peers []core.PeerContext
var errs []error
var wg sync.WaitGroup
for _, client := range clients {
wg.Add(1)
go func(client Client) {
defer wg.Done()
pctx, err := client.GetPeerContext()
mu.Lock()
if err != nil {
errs = append(errs, err)
} else {
peers = append(peers, pctx)
}
mu.Unlock()
}(client)
}
wg.Wait()
err = errutil.Join(errs)
if len(peers) == 0 {
if err != nil {
return nil, err
}
return nil, errors.New("no origin peers found")
}
if err != nil {
log.With("blob", d.Hex()).Errorf("Error getting all origin peers: %s", err)
}
return peers, nil
}
// ReplicateToRemote replicates d to a remote origin cluster.
func (c *clusterClient) ReplicateToRemote(namespace string, d core.Digest, remoteDNS string) error {
// Re-use download backoff since replicate may download blobs.
return Poll(c.resolver, c.defaultPollBackOff(), d, func(client Client) error {
return client.ReplicateToRemote(namespace, d, remoteDNS)
})
}
func shuffle(cs []Client) {
for i := range cs {
j := rand.Intn(i + 1)
cs[i], cs[j] = cs[j], cs[i]
}
}
// Poll wraps requests for endpoints which require polling, due to a blob
// being asynchronously fetched from remote storage in the origin cluster.
func Poll(
r ClientResolver, b backoff.BackOff, d core.Digest, makeRequest func(Client) error) error {
// By looping over clients in order, we will always prefer the same origin
// for making requests to loosely guarantee that only one origin needs to
// fetch the file from remote backend.
clients, err := r.Resolve(d)
if err != nil {
return fmt.Errorf("resolve clients: %s", err)
}
var errs []error
ORIGINS:
for _, client := range clients {
b.Reset()
POLL:
for {
if err := makeRequest(client); err != nil {
if serr, ok := err.(httputil.StatusError); ok {
if serr.Status == http.StatusAccepted {
d := b.NextBackOff()
if d == backoff.Stop {
break POLL // Backoff timed out.
}
time.Sleep(d)
continue POLL
}
if serr.Status < 500 {
return err
}
}
errs = append(errs, fmt.Errorf("origin %s: %s", client.Addr(), err))
continue ORIGINS
}
return nil // Success!
}
errs = append(errs,
fmt.Errorf("origin %s: backoff timed out on 202 responses", client.Addr()))
}
return fmt.Errorf("all origins unavailable: %s", errutil.Join(errs))
}