origin/blobclient/uploader.go (154 lines of code) (raw):

// Copyright (c) 2016-2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package blobclient import ( "bytes" "crypto/tls" "encoding/json" "errors" "fmt" "io" "net/url" "time" "github.com/uber/kraken/core" "github.com/uber/kraken/utils/httputil" ) // uploader provides methods for executing a chunked upload. type uploader interface { start(d core.Digest) (uid string, err error) patch(d core.Digest, uid string, start, stop int64, chunk io.Reader) error commit(d core.Digest, uid string) error } func runChunkedUpload(u uploader, d core.Digest, blob io.Reader, chunkSize int64) error { if err := runChunkedUploadHelper(u, d, blob, chunkSize); err != nil && !httputil.IsConflict(err) { return err } return nil } func runChunkedUploadHelper(u uploader, d core.Digest, blob io.Reader, chunkSize int64) error { uid, err := u.start(d) if err != nil { return err } var pos int64 buf := make([]byte, chunkSize) for { n, err := blob.Read(buf) if err != nil { if err == io.EOF { break } return fmt.Errorf("read blob: %s", err) } chunk := bytes.NewReader(buf[:n]) stop := pos + int64(n) if err := u.patch(d, uid, pos, stop, chunk); err != nil { return err } pos = stop } return u.commit(d, uid) } // transferClient executes chunked uploads for internal blob transfers. type transferClient struct { addr string tls *tls.Config } func newTransferClient(addr string, tls *tls.Config) *transferClient { return &transferClient{addr, tls} } func (c *transferClient) start(d core.Digest) (uid string, err error) { r, err := httputil.Post( fmt.Sprintf("http://%s/internal/blobs/%s/uploads", c.addr, d), httputil.SendTLS(c.tls)) if err != nil { return "", err } uid = r.Header.Get("Location") if uid == "" { return "", errors.New("request succeeded, but Location header not set") } return uid, nil } func (c *transferClient) patch( d core.Digest, uid string, start, stop int64, chunk io.Reader) error { _, err := httputil.Patch( fmt.Sprintf("http://%s/internal/blobs/%s/uploads/%s", c.addr, d, uid), httputil.SendBody(chunk), httputil.SendHeaders(map[string]string{ "Content-Range": fmt.Sprintf("%d-%d", start, stop), }), httputil.SendTLS(c.tls)) return err } func (c *transferClient) commit(d core.Digest, uid string) error { _, err := httputil.Put( fmt.Sprintf("http://%s/internal/blobs/%s/uploads/%s", c.addr, d, uid), httputil.SendTimeout(15*time.Minute), httputil.SendTLS(c.tls)) return err } type uploadType int const ( _publicUpload = iota + 1 _duplicateUpload ) // uploadClient executes chunked uploads for external cluster upload operations. type uploadClient struct { addr string namespace string uploadType uploadType delay time.Duration tls *tls.Config } func newUploadClient( addr string, namespace string, t uploadType, delay time.Duration, tls *tls.Config) *uploadClient { return &uploadClient{addr, namespace, t, delay, tls} } func (c *uploadClient) start(d core.Digest) (uid string, err error) { r, err := httputil.Post( fmt.Sprintf("http://%s/namespace/%s/blobs/%s/uploads", c.addr, url.PathEscape(c.namespace), d), httputil.SendTLS(c.tls)) if err != nil { return "", err } uid = r.Header.Get("Location") if uid == "" { return "", errors.New("request succeeded, but Location header not set") } return uid, nil } func (c *uploadClient) patch( d core.Digest, uid string, start, stop int64, chunk io.Reader) error { _, err := httputil.Patch( fmt.Sprintf("http://%s/namespace/%s/blobs/%s/uploads/%s", c.addr, url.PathEscape(c.namespace), d, uid), httputil.SendBody(chunk), httputil.SendHeaders(map[string]string{ "Content-Range": fmt.Sprintf("%d-%d", start, stop), }), httputil.SendTLS(c.tls)) return err } // DuplicateCommitUploadRequest defines HTTP request body. type DuplicateCommitUploadRequest struct { Delay time.Duration `yaml:"delay"` } func (c *uploadClient) commit(d core.Digest, uid string) error { var template string var body io.Reader switch c.uploadType { case _publicUpload: template = "http://%s/namespace/%s/blobs/%s/uploads/%s" case _duplicateUpload: template = "http://%s/internal/duplicate/namespace/%s/blobs/%s/uploads/%s" b, err := json.Marshal(DuplicateCommitUploadRequest{c.delay}) if err != nil { return fmt.Errorf("json: %s", err) } body = bytes.NewBuffer(b) default: return fmt.Errorf("unknown upload type: %d", c.uploadType) } _, err := httputil.Put( fmt.Sprintf(template, c.addr, url.PathEscape(c.namespace), d, uid), httputil.SendTimeout(15*time.Minute), httputil.SendBody(body), httputil.SendTLS(c.tls)) return err }