build-index/tagclient/client.go (347 lines of code) (raw):

// Copyright (c) 2016-2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package tagclient import ( "bytes" "crypto/tls" "encoding/json" "errors" "fmt" "io" "io/ioutil" "net/url" "strconv" "time" "github.com/uber/kraken/build-index/tagmodels" "github.com/uber/kraken/core" "github.com/uber/kraken/lib/healthcheck" "github.com/uber/kraken/utils/httputil" ) // Client errors. var ( ErrTagNotFound = errors.New("tag not found") ) // Client wraps tagserver endpoints. type Client interface { CheckReadiness() error Put(tag string, d core.Digest) error PutAndReplicate(tag string, d core.Digest) error Get(tag string) (core.Digest, error) Has(tag string) (bool, error) List(prefix string) ([]string, error) ListWithPagination(prefix string, filter ListFilter) (tagmodels.ListResponse, error) ListRepository(repo string) ([]string, error) ListRepositoryWithPagination(repo string, filter ListFilter) (tagmodels.ListResponse, error) Replicate(tag string) error Origin() (string, error) DuplicateReplicate( tag string, d core.Digest, dependencies core.DigestList, delay time.Duration) error DuplicatePut(tag string, d core.Digest, delay time.Duration) error } type singleClient struct { addr string tls *tls.Config } // ListFilter contains filter request for list with pagination operations. type ListFilter struct { Offset string Limit int } // NewSingleClient returns a Client scoped to a single tagserver instance. func NewSingleClient(addr string, config *tls.Config) Client { return &singleClient{addr, config} } func (c *singleClient) CheckReadiness() error { _, err := httputil.Get( fmt.Sprintf("http://%s/readiness", c.addr), httputil.SendTimeout(5*time.Second), httputil.SendTLS(c.tls)) return err } func (c *singleClient) Put(tag string, d core.Digest) error { _, err := httputil.Put( fmt.Sprintf("http://%s/tags/%s/digest/%s", c.addr, url.PathEscape(tag), d.String()), httputil.SendTimeout(30*time.Second), httputil.SendTLS(c.tls)) return err } func (c *singleClient) PutAndReplicate(tag string, d core.Digest) error { _, err := httputil.Put( fmt.Sprintf("http://%s/tags/%s/digest/%s?replicate=true", c.addr, url.PathEscape(tag), d.String()), httputil.SendTimeout(30*time.Second), httputil.SendTLS(c.tls)) return err } func (c *singleClient) Get(tag string) (core.Digest, error) { resp, err := httputil.Get( fmt.Sprintf("http://%s/tags/%s", c.addr, url.PathEscape(tag)), httputil.SendTimeout(10*time.Second), httputil.SendTLS(c.tls)) if err != nil { if httputil.IsNotFound(err) { return core.Digest{}, ErrTagNotFound } return core.Digest{}, err } defer resp.Body.Close() b, err := ioutil.ReadAll(resp.Body) if err != nil { return core.Digest{}, fmt.Errorf("read body: %s", err) } d, err := core.ParseSHA256Digest(string(b)) if err != nil { return core.Digest{}, fmt.Errorf("new digest: %s", err) } return d, nil } func (c *singleClient) Has(tag string) (bool, error) { _, err := httputil.Head( fmt.Sprintf("http://%s/tags/%s", c.addr, url.PathEscape(tag)), httputil.SendTimeout(10*time.Second), httputil.SendTLS(c.tls)) if err != nil { if httputil.IsNotFound(err) { return false, nil } return false, err } return true, nil } func (c *singleClient) doListPaginated(urlFormat string, pathSub string, filter ListFilter) (tagmodels.ListResponse, error) { // Build query. reqVal := url.Values{} if filter.Offset != "" { reqVal.Add(tagmodels.OffsetQ, filter.Offset) } if filter.Limit != 0 { reqVal.Add(tagmodels.LimitQ, strconv.Itoa(filter.Limit)) } // Fetch list response from server. serverUrl := url.URL{ Scheme: "http", Host: c.addr, Path: fmt.Sprintf(urlFormat, pathSub), RawQuery: reqVal.Encode(), } var resp tagmodels.ListResponse httpResp, err := httputil.Get( serverUrl.String(), httputil.SendTimeout(60*time.Second), httputil.SendTLS(c.tls)) if err != nil { return resp, err } defer httpResp.Body.Close() if err := json.NewDecoder(httpResp.Body).Decode(&resp); err != nil { return resp, fmt.Errorf("json decode: %s", err) } return resp, nil } func (c *singleClient) doList(pathSub string, fn func(pathSub string, filter ListFilter) (tagmodels.ListResponse, error)) ( []string, error) { var names []string offset := "" for ok := true; ok; ok = (offset != "") { filter := ListFilter{Offset: offset} resp, err := fn(pathSub, filter) if err != nil { return nil, err } offset, err = resp.GetOffset() if err != nil && err != io.EOF { return nil, err } names = append(names, resp.Result...) } return names, nil } func (c *singleClient) List(prefix string) ([]string, error) { return c.doList(prefix, func(prefix string, filter ListFilter) ( tagmodels.ListResponse, error) { return c.ListWithPagination(prefix, filter) }) } func (c *singleClient) ListWithPagination(prefix string, filter ListFilter) ( tagmodels.ListResponse, error) { return c.doListPaginated("list/%s", prefix, filter) } // XXX: Deprecated. Use List instead. func (c *singleClient) ListRepository(repo string) ([]string, error) { return c.doList(repo, func(repo string, filter ListFilter) ( tagmodels.ListResponse, error) { return c.ListRepositoryWithPagination(repo, filter) }) } func (c *singleClient) ListRepositoryWithPagination(repo string, filter ListFilter) (tagmodels.ListResponse, error) { return c.doListPaginated("repositories/%s/tags", url.PathEscape(repo), filter) } // ReplicateRequest defines a Replicate request body. type ReplicateRequest struct { Dependencies []core.Digest `json:"dependencies"` } func (c *singleClient) Replicate(tag string) error { _, err := httputil.Post( fmt.Sprintf("http://%s/remotes/tags/%s", c.addr, url.PathEscape(tag)), httputil.SendTimeout(15*time.Second), httputil.SendTLS(c.tls)) return err } // DuplicateReplicateRequest defines a DuplicateReplicate request body. type DuplicateReplicateRequest struct { Dependencies core.DigestList `json:"dependencies"` Delay time.Duration `json:"delay"` } func (c *singleClient) DuplicateReplicate( tag string, d core.Digest, dependencies core.DigestList, delay time.Duration) error { b, err := json.Marshal(DuplicateReplicateRequest{dependencies, delay}) if err != nil { return fmt.Errorf("json marshal: %s", err) } _, err = httputil.Post( fmt.Sprintf( "http://%s/internal/duplicate/remotes/tags/%s/digest/%s", c.addr, url.PathEscape(tag), d.String()), httputil.SendBody(bytes.NewReader(b)), httputil.SendTimeout(10*time.Second), httputil.SendRetry(), httputil.SendTLS(c.tls)) return err } // DuplicatePutRequest defines a DuplicatePut request body. type DuplicatePutRequest struct { Delay time.Duration `json:"delay"` } func (c *singleClient) DuplicatePut(tag string, d core.Digest, delay time.Duration) error { b, err := json.Marshal(DuplicatePutRequest{delay}) if err != nil { return fmt.Errorf("json marshal: %s", err) } _, err = httputil.Put( fmt.Sprintf( "http://%s/internal/duplicate/tags/%s/digest/%s", c.addr, url.PathEscape(tag), d.String()), httputil.SendBody(bytes.NewReader(b)), httputil.SendTimeout(10*time.Second), httputil.SendRetry(), httputil.SendTLS(c.tls)) return err } func (c *singleClient) Origin() (string, error) { resp, err := httputil.Get( fmt.Sprintf("http://%s/origin", c.addr), httputil.SendTimeout(5*time.Second), httputil.SendTLS(c.tls)) if err != nil { return "", err } defer resp.Body.Close() b, err := ioutil.ReadAll(resp.Body) if err != nil { return "", fmt.Errorf("read body: %s", err) } return string(b), nil } type clusterClient struct { hosts healthcheck.List tls *tls.Config } // NewClusterClient creates a Client which operates on tagserver instances as // a cluster. func NewClusterClient(hosts healthcheck.List, config *tls.Config) Client { return &clusterClient{hosts, config} } func (cc *clusterClient) do(request func(c Client) error) error { addrs := cc.hosts.Resolve().Sample(3) if len(addrs) == 0 { return errors.New("cluster client: no hosts could be resolved") } var err error for addr := range addrs { err = request(NewSingleClient(addr, cc.tls)) if httputil.IsNetworkError(err) { cc.hosts.Failed(addr) continue } break } return err } // doOnce tries the request on only one randomly chosen client without any retries if it fails. func (cc *clusterClient) doOnce(request func(c Client) error) error { addrs := cc.hosts.Resolve().Sample(1) if len(addrs) == 0 { return errors.New("cluster client: no hosts could be resolved") } // read the only sampled addr var addr string for addr = range addrs { } err := request(NewSingleClient(addr, cc.tls)) if httputil.IsNetworkError(err) { cc.hosts.Failed(addr) } return err } func (cc *clusterClient) CheckReadiness() error { return cc.doOnce(func(c Client) error { err := c.CheckReadiness() if err != nil { return fmt.Errorf("build index not ready: %v", err) } return nil }) } func (cc *clusterClient) Put(tag string, d core.Digest) error { return cc.do(func(c Client) error { return c.Put(tag, d) }) } func (cc *clusterClient) PutAndReplicate(tag string, d core.Digest) error { return cc.do(func(c Client) error { return c.PutAndReplicate(tag, d) }) } func (cc *clusterClient) Get(tag string) (d core.Digest, err error) { err = cc.do(func(c Client) error { d, err = c.Get(tag) return err }) return } func (cc *clusterClient) Has(tag string) (ok bool, err error) { err = cc.do(func(c Client) error { ok, err = c.Has(tag) return err }) return } func (cc *clusterClient) List(prefix string) (tags []string, err error) { err = cc.do(func(c Client) error { tags, err = c.List(prefix) return err }) return } func (cc *clusterClient) ListWithPagination(prefix string, filter ListFilter) ( resp tagmodels.ListResponse, err error) { err = cc.do(func(c Client) error { resp, err = c.ListWithPagination(prefix, filter) return err }) return } func (cc *clusterClient) ListRepository(repo string) (tags []string, err error) { err = cc.do(func(c Client) error { tags, err = c.ListRepository(repo) return err }) return } func (cc *clusterClient) ListRepositoryWithPagination(repo string, filter ListFilter) (resp tagmodels.ListResponse, err error) { err = cc.do(func(c Client) error { resp, err = c.ListRepositoryWithPagination(repo, filter) return err }) return } func (cc *clusterClient) Replicate(tag string) error { return cc.do(func(c Client) error { return c.Replicate(tag) }) } func (cc *clusterClient) Origin() (origin string, err error) { err = cc.do(func(c Client) error { origin, err = c.Origin() return err }) return } func (cc *clusterClient) DuplicateReplicate( tag string, d core.Digest, dependencies core.DigestList, delay time.Duration) error { return errors.New("duplicate replicate not supported on cluster client") } func (cc *clusterClient) DuplicatePut(tag string, d core.Digest, delay time.Duration) error { return errors.New("duplicate put not supported on cluster client") }