proxy/proxyserver/preheat.go (105 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxyserver
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
_ "net/http/pprof" // Registers /debug/pprof endpoints in http.DefaultServeMux.
"regexp"
"time"
"github.com/docker/distribution"
"github.com/uber/kraken/core"
"github.com/uber/kraken/origin/blobclient"
"github.com/uber/kraken/utils/dockerutil"
"github.com/uber/kraken/utils/handler"
"github.com/uber/kraken/utils/httputil"
"github.com/uber/kraken/utils/log"
)
var _manifestRegexp = regexp.MustCompile(`^application/vnd.docker.distribution.manifest.v\d\+(json|prettyjws)`)
// PreheatHandler defines the handler of preheat.
type PreheatHandler struct {
clusterClient blobclient.ClusterClient
}
// NewPreheatHandler creates a new preheat handler.
func NewPreheatHandler(client blobclient.ClusterClient) *PreheatHandler {
return &PreheatHandler{client}
}
// Handle notifies origins to cache the blob related to the image.
func (ph *PreheatHandler) Handle(w http.ResponseWriter, r *http.Request) error {
var notification Notification
if err := json.NewDecoder(r.Body).Decode(¬ification); err != nil {
return handler.Errorf("decode body: %s", err)
}
events := filterEvents(¬ification)
for _, event := range events {
repo := event.Target.Repository
digest := event.Target.Digest
log.With("repo", repo, "digest", digest).Infof("deal push image event")
err := ph.process(repo, digest)
if err != nil {
log.With("repo", repo, "digest", digest).Errorf("handle preheat: %s", err)
}
}
return nil
}
func (ph *PreheatHandler) process(repo, digest string) error {
manifest, err := ph.fetchManifest(repo, digest)
if err != nil {
return err
}
for _, desc := range manifest.References() {
d, err := core.ParseSHA256Digest(string(desc.Digest))
if err != nil {
log.With("repo", repo, "digest", string(desc.Digest)).Errorf("parse digest: %s", err)
continue
}
go func() {
log.With("repo", repo).Debugf("trigger origin cache: %+v", d)
_, err = ph.clusterClient.GetMetaInfo(repo, d)
if err != nil && !httputil.IsAccepted(err) {
log.With("repo", repo, "digest", digest).Errorf("notify origin cache: %s", err)
}
}()
}
return nil
}
func (ph *PreheatHandler) fetchManifest(repo, digest string) (distribution.Manifest, error) {
d, err := core.ParseSHA256Digest(digest)
if err != nil {
return nil, fmt.Errorf("Error parse digest: %s ", err)
}
buf := &bytes.Buffer{}
// there may be a gap between registry finish uploading manifest and send notification.
// see https://github.com/docker/distribution/issues/2625.
interval := 100 * time.Millisecond
for i := 0; i < 4; i++ {
if i != 0 {
time.Sleep(interval)
interval = interval * 2
}
if err := ph.clusterClient.DownloadBlob(repo, d, buf); err == nil {
break
} else if err == blobclient.ErrBlobNotFound {
continue
} else {
return nil, fmt.Errorf("download manifest: %s", err)
}
}
if buf.Len() == 0 {
return nil, fmt.Errorf("manifest not found")
}
manifest, _, err := dockerutil.ParseManifest(buf)
if err != nil {
return nil, fmt.Errorf("parse manifest: %s", err)
}
return manifest, nil
}
// filterEvents pick out the push manifest events.
func filterEvents(notification *Notification) []*Event {
events := []*Event{}
for _, event := range notification.Events {
isManifest := _manifestRegexp.MatchString(event.Target.MediaType)
if !isManifest {
continue
}
if event.Action == "push" {
events = append(events, &event)
continue
}
}
return events
}