origin/blobclient/client.go (209 lines of code) (raw):

// Copyright (c) 2016-2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package blobclient import ( "crypto/tls" "encoding/json" "errors" "fmt" "io" "io/ioutil" "math" "net/http" "net/url" "strconv" "strings" "time" "github.com/uber/kraken/core" "github.com/uber/kraken/utils/httputil" "github.com/uber/kraken/utils/memsize" ) // Client provides a wrapper around all Server HTTP endpoints. type Client interface { Addr() string CheckReadiness() error Locations(d core.Digest) ([]string, error) DeleteBlob(d core.Digest) error TransferBlob(d core.Digest, blob io.Reader) error Stat(namespace string, d core.Digest) (*core.BlobInfo, error) StatLocal(namespace string, d core.Digest) (*core.BlobInfo, error) GetMetaInfo(namespace string, d core.Digest) (*core.MetaInfo, error) OverwriteMetaInfo(d core.Digest, pieceLength int64) error UploadBlob(namespace string, d core.Digest, blob io.Reader) error DuplicateUploadBlob(namespace string, d core.Digest, blob io.Reader, delay time.Duration) error DownloadBlob(namespace string, d core.Digest, dst io.Writer) error ReplicateToRemote(namespace string, d core.Digest, remoteDNS string) error GetPeerContext() (core.PeerContext, error) ForceCleanup(ttl time.Duration) error } // HTTPClient defines the Client implementation. type HTTPClient struct { addr string chunkSize uint64 tls *tls.Config } // Option allows setting optional HTTPClient parameters. type Option func(*HTTPClient) // WithChunkSize configures an HTTPClient with a custom chunk size for uploads. func WithChunkSize(s uint64) Option { return func(c *HTTPClient) { c.chunkSize = s } } // WithTLS configures an HTTPClient with tls configuration. func WithTLS(tls *tls.Config) Option { return func(c *HTTPClient) { c.tls = tls } } // New returns a new HTTPClient scoped to addr. func New(addr string, opts ...Option) *HTTPClient { c := &HTTPClient{ addr: addr, chunkSize: 32 * memsize.MB, } for _, opt := range opts { opt(c) } return c } // Addr returns the address of the server the client is provisioned for. func (c *HTTPClient) Addr() string { return c.addr } func (c *HTTPClient) CheckReadiness() error { _, err := httputil.Get( fmt.Sprintf("http://%s/readiness", c.addr), httputil.SendTimeout(5*time.Second), httputil.SendTLS(c.tls)) if err != nil { return fmt.Errorf("origin not ready: %v", err) } return nil } // Locations returns the origin server addresses which d is sharded on. func (c *HTTPClient) Locations(d core.Digest) ([]string, error) { r, err := httputil.Get( fmt.Sprintf("http://%s/blobs/%s/locations", c.addr, d), httputil.SendTimeout(5*time.Second), httputil.SendTLS(c.tls)) if err != nil { return nil, err } locs := strings.Split(r.Header.Get("Origin-Locations"), ",") if len(locs) == 0 { return nil, errors.New("no locations found") } return locs, nil } // Stat returns blob info. It returns error if the origin does not have a blob // for d. func (c *HTTPClient) Stat(namespace string, d core.Digest) (*core.BlobInfo, error) { return c.stat(namespace, d, false) } // StatLocal returns blob info. It returns error if the origin does not have a blob // for d locally. func (c *HTTPClient) StatLocal(namespace string, d core.Digest) (*core.BlobInfo, error) { return c.stat(namespace, d, true) } func (c *HTTPClient) stat(namespace string, d core.Digest, local bool) (*core.BlobInfo, error) { u := fmt.Sprintf( "http://%s/internal/namespace/%s/blobs/%s", c.addr, url.PathEscape(namespace), d) if local { u += "?local=true" } r, err := httputil.Head( u, httputil.SendTimeout(15*time.Second), httputil.SendTLS(c.tls)) if err != nil { if httputil.IsNotFound(err) { return nil, ErrBlobNotFound } return nil, err } var size int64 hdr := r.Header.Get("Content-Length") if hdr != "" { size, err = strconv.ParseInt(hdr, 10, 64) if err != nil { return nil, err } } return core.NewBlobInfo(size), nil } // DeleteBlob deletes the blob corresponding to d. func (c *HTTPClient) DeleteBlob(d core.Digest) error { _, err := httputil.Delete( fmt.Sprintf("http://%s/internal/blobs/%s", c.addr, d), httputil.SendAcceptedCodes(http.StatusAccepted), httputil.SendTLS(c.tls)) return err } // TransferBlob uploads a blob to a single origin server. Unlike its cousin UploadBlob, // TransferBlob is an internal API which does not replicate the blob. func (c *HTTPClient) TransferBlob(d core.Digest, blob io.Reader) error { tc := newTransferClient(c.addr, c.tls) return runChunkedUpload(tc, d, blob, int64(c.chunkSize)) } // UploadBlob uploads and replicates blob to the origin cluster, asynchronously // backing the blob up to the remote storage configured for namespace. func (c *HTTPClient) UploadBlob(namespace string, d core.Digest, blob io.Reader) error { uc := newUploadClient(c.addr, namespace, _publicUpload, 0, c.tls) return runChunkedUpload(uc, d, blob, int64(c.chunkSize)) } // DuplicateUploadBlob duplicates an blob upload request, which will attempt to // write-back at the given delay. func (c *HTTPClient) DuplicateUploadBlob( namespace string, d core.Digest, blob io.Reader, delay time.Duration) error { uc := newUploadClient(c.addr, namespace, _duplicateUpload, delay, c.tls) return runChunkedUpload(uc, d, blob, int64(c.chunkSize)) } // DownloadBlob downloads blob for d. If the blob of d is not available yet // (i.e. still downloading), returns 202 httputil.StatusError, indicating that // the request shoudl be retried later. If not blob exists for d, returns a 404 // httputil.StatusError. func (c *HTTPClient) DownloadBlob(namespace string, d core.Digest, dst io.Writer) error { r, err := httputil.Get( fmt.Sprintf("http://%s/namespace/%s/blobs/%s", c.addr, url.PathEscape(namespace), d), httputil.SendTLS(c.tls)) if err != nil { return err } defer r.Body.Close() if _, err := io.Copy(dst, r.Body); err != nil { return fmt.Errorf("copy body: %s", err) } return nil } // ReplicateToRemote replicates the blob of d to a remote origin cluster. If the // blob of d is not available yet, returns 202 httputil.StatusError, indicating // that the request should be retried later. func (c *HTTPClient) ReplicateToRemote(namespace string, d core.Digest, remoteDNS string) error { _, err := httputil.Post( fmt.Sprintf("http://%s/namespace/%s/blobs/%s/remote/%s", c.addr, url.PathEscape(namespace), d, remoteDNS), httputil.SendTLS(c.tls)) return err } // GetMetaInfo returns metainfo for d. If the blob of d is not available yet // (i.e. still downloading), returns a 202 httputil.StatusError, indicating that // the request should be retried later. If no blob exists for d, returns a 404 // httputil.StatusError. func (c *HTTPClient) GetMetaInfo(namespace string, d core.Digest) (*core.MetaInfo, error) { r, err := httputil.Get( fmt.Sprintf("http://%s/internal/namespace/%s/blobs/%s/metainfo", c.addr, url.PathEscape(namespace), d), httputil.SendTimeout(15*time.Second), httputil.SendTLS(c.tls)) if err != nil { return nil, err } defer r.Body.Close() raw, err := ioutil.ReadAll(r.Body) if err != nil { return nil, fmt.Errorf("read body: %s", err) } mi, err := core.DeserializeMetaInfo(raw) if err != nil { return nil, fmt.Errorf("deserialize metainfo: %s", err) } return mi, nil } // OverwriteMetaInfo overwrites existing metainfo for d with new metainfo // configured with pieceLength. Primarily intended for benchmarking purposes. func (c *HTTPClient) OverwriteMetaInfo(d core.Digest, pieceLength int64) error { _, err := httputil.Post( fmt.Sprintf("http://%s/internal/blobs/%s/metainfo?piece_length=%d", c.addr, d, pieceLength), httputil.SendTLS(c.tls)) return err } // GetPeerContext gets the PeerContext of the p2p client running alongside the Server. func (c *HTTPClient) GetPeerContext() (core.PeerContext, error) { var pctx core.PeerContext r, err := httputil.Get( fmt.Sprintf("http://%s/internal/peercontext", c.addr), httputil.SendTimeout(5*time.Second), httputil.SendTLS(c.tls)) if err != nil { return pctx, err } defer r.Body.Close() if err := json.NewDecoder(r.Body).Decode(&pctx); err != nil { return pctx, err } return pctx, nil } // ForceCleanup forces cache cleanup to run. func (c *HTTPClient) ForceCleanup(ttl time.Duration) error { v := url.Values{} v.Add("ttl_hr", strconv.Itoa(int(math.Ceil(float64(ttl)/float64(time.Hour))))) _, err := httputil.Post( fmt.Sprintf("http://%s/forcecleanup?%s", c.addr, v.Encode()), httputil.SendTimeout(2*time.Minute), httputil.SendTLS(c.tls)) return err }