agent/agentserver/server.go (234 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agentserver
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
_ "net/http/pprof" // Registers /debug/pprof endpoints in http.DefaultServeMux.
"os"
"strings"
"sync"
"time"
"github.com/uber/kraken/build-index/tagclient"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/containerruntime"
"github.com/uber/kraken/lib/middleware"
"github.com/uber/kraken/lib/store"
"github.com/uber/kraken/lib/torrent/scheduler"
"github.com/uber/kraken/tracker/announceclient"
"github.com/uber/kraken/utils/handler"
"github.com/uber/kraken/utils/httputil"
"github.com/go-chi/chi"
"github.com/uber-go/tally"
)
// Config defines Server configuration.
type Config struct {
// How long a successful readiness check is valid for. If 0, disable caching successful readiness.
readinessCacheTTL time.Duration `yaml:"readiness_cache_ttl"`
}
// Server defines the agent HTTP server.
type Server struct {
config Config
stats tally.Scope
cads *store.CADownloadStore
sched scheduler.ReloadableScheduler
tags tagclient.Client
ac announceclient.Client
containerRuntime containerruntime.Factory
lastReady time.Time
}
// New creates a new Server.
func New(
config Config,
stats tally.Scope,
cads *store.CADownloadStore,
sched scheduler.ReloadableScheduler,
tags tagclient.Client,
ac announceclient.Client,
containerRuntime containerruntime.Factory) *Server {
stats = stats.Tagged(map[string]string{
"module": "agentserver",
})
return &Server{
config: config,
stats: stats,
cads: cads,
sched: sched,
tags: tags,
ac: ac,
containerRuntime: containerRuntime,
}
}
// Handler returns the HTTP handler.
func (s *Server) Handler() http.Handler {
r := chi.NewRouter()
r.Use(middleware.StatusCounter(s.stats))
r.Use(middleware.LatencyTimer(s.stats))
r.Get("/health", handler.Wrap(s.healthHandler))
r.Get("/readiness", handler.Wrap(s.readinessCheckHandler))
r.Get("/tags/{tag}", handler.Wrap(s.getTagHandler))
r.Get("/namespace/{namespace}/blobs/{digest}", handler.Wrap(s.downloadBlobHandler))
r.Delete("/blobs/{digest}", handler.Wrap(s.deleteBlobHandler))
// Preheat/preload endpoints.
r.Get("/preload/tags/{tag}", handler.Wrap(s.preloadTagHandler))
// Dangerous endpoint for running experiments.
r.Patch("/x/config/scheduler", handler.Wrap(s.patchSchedulerConfigHandler))
r.Get("/x/blacklist", handler.Wrap(s.getBlacklistHandler))
// Serves /debug/pprof endpoints.
r.Mount("/", http.DefaultServeMux)
return r
}
// getTagHandler proxies get tag requests to the build-index.
func (s *Server) getTagHandler(w http.ResponseWriter, r *http.Request) error {
tag, err := httputil.ParseParam(r, "tag")
if err != nil {
return err
}
d, err := s.tags.Get(tag)
if err != nil {
if err == tagclient.ErrTagNotFound {
return handler.ErrorStatus(http.StatusNotFound)
}
return handler.Errorf("get tag: %s", err)
}
io.WriteString(w, d.String())
return nil
}
// downloadBlobHandler downloads a blob through p2p.
func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) error {
namespace, err := httputil.ParseParam(r, "namespace")
if err != nil {
return err
}
d, err := parseDigest(r)
if err != nil {
return err
}
f, err := s.cads.Cache().GetFileReader(d.Hex())
if err != nil {
if os.IsNotExist(err) || s.cads.InDownloadError(err) {
if err := s.sched.Download(namespace, d); err != nil {
if err == scheduler.ErrTorrentNotFound {
return handler.ErrorStatus(http.StatusNotFound)
}
return handler.Errorf("download torrent: %s", err)
}
f, err = s.cads.Cache().GetFileReader(d.Hex())
if err != nil {
return handler.Errorf("store: %s", err)
}
} else {
return handler.Errorf("store: %s", err)
}
}
if _, err := io.Copy(w, f); err != nil {
return fmt.Errorf("copy file: %s", err)
}
return nil
}
func (s *Server) deleteBlobHandler(w http.ResponseWriter, r *http.Request) error {
d, err := parseDigest(r)
if err != nil {
return err
}
if err := s.sched.RemoveTorrent(d); err != nil {
return handler.Errorf("remove torrent: %s", err)
}
return nil
}
// preloadTagHandler triggers docker daemon to download specified docker image.
func (s *Server) preloadTagHandler(w http.ResponseWriter, r *http.Request) error {
tag, err := httputil.ParseParam(r, "tag")
if err != nil {
return err
}
parts := strings.Split(tag, ":")
if len(parts) != 2 {
return handler.Errorf("failed to parse docker image tag")
}
repo, tag := parts[0], parts[1]
rt := httputil.GetQueryArg(r, "runtime", "docker")
ns := httputil.GetQueryArg(r, "namespace", "")
switch rt {
case "docker":
if err := s.containerRuntime.DockerClient().
PullImage(context.Background(), repo, tag); err != nil {
return handler.Errorf("docker pull: %s", err)
}
case "containerd":
if err := s.containerRuntime.ContainerdClient().
PullImage(context.Background(), ns, repo, tag); err != nil {
return handler.Errorf("containerd pull: %s", err)
}
default:
return handler.Errorf("unsupported container runtime")
}
return nil
}
func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) error {
if err := s.sched.Probe(); err != nil {
return handler.Errorf("probe torrent client: %s", err)
}
io.WriteString(w, "OK")
return nil
}
func (s *Server) readinessCheckHandler(w http.ResponseWriter, r *http.Request) error {
if s.config.readinessCacheTTL != 0 {
rCacheValid := s.lastReady.Add(s.config.readinessCacheTTL).After(time.Now())
if rCacheValid {
io.WriteString(w, "OK")
return nil
}
}
var schedErr, buildIndexErr, trackerErr error
var wg sync.WaitGroup
wg.Add(3)
go func() {
schedErr = s.sched.Probe()
wg.Done()
}()
go func() {
buildIndexErr = s.tags.CheckReadiness()
wg.Done()
}()
go func() {
trackerErr = s.ac.CheckReadiness()
wg.Done()
}()
wg.Wait()
// TODO(akalpakchiev): Replace with errors.Join once upgraded to Go 1.20+.
errMsgs := []string{}
for _, err := range []error{schedErr, buildIndexErr, trackerErr} {
if err != nil {
errMsgs = append(errMsgs, err.Error())
}
}
if len(errMsgs) != 0 {
return handler.Errorf("agent not ready: %v", strings.Join(errMsgs, "\n")).Status(http.StatusServiceUnavailable)
}
s.lastReady = time.Now()
io.WriteString(w, "OK")
return nil
}
// patchSchedulerConfigHandler restarts the agent torrent scheduler with
// the config in request body.
func (s *Server) patchSchedulerConfigHandler(w http.ResponseWriter, r *http.Request) error {
defer r.Body.Close()
var config scheduler.Config
if err := json.NewDecoder(r.Body).Decode(&config); err != nil {
return handler.Errorf("json decode: %s", err).Status(http.StatusBadRequest)
}
s.sched.Reload(config)
return nil
}
func (s *Server) getBlacklistHandler(w http.ResponseWriter, r *http.Request) error {
blacklist, err := s.sched.BlacklistSnapshot()
if err != nil {
return handler.Errorf("blacklist snapshot: %s", err)
}
if err := json.NewEncoder(w).Encode(&blacklist); err != nil {
return handler.Errorf("json encode: %s", err)
}
return nil
}
func parseDigest(r *http.Request) (core.Digest, error) {
raw, err := httputil.ParseParam(r, "digest")
if err != nil {
return core.Digest{}, err
}
// TODO(codyg): Accept only a fully formed digest.
d, err := core.NewSHA256DigestFromHex(raw)
if err != nil {
d, err = core.ParseSHA256Digest(raw)
if err != nil {
return core.Digest{}, handler.Errorf("parse digest: %s", err).Status(http.StatusBadRequest)
}
}
return d, nil
}