pkg/discovery/content/registry/mirror.go (106 lines of code) (raw):
// Initial Copyright (c) 2023 Xenit AB and 2024 The Spegel Authors.
// Portions Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package registry
import (
"context"
"errors"
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"time"
pcontext "github.com/azure/peerd/pkg/context"
"github.com/azure/peerd/pkg/discovery/routing"
"github.com/azure/peerd/pkg/metrics"
"github.com/azure/peerd/pkg/peernet"
)
var (
// ResolveRetries is the number of times to attempt resolving a key before giving up.
ResolveRetries = 3
// ResolveTimeout is the timeout for resolving a key.
ResolveTimeout = 1 * time.Second
)
// Mirror is a handler that handles requests to this registry proxy.
type Mirror struct {
resolveTimeout time.Duration
router routing.Router
resolveRetries int
n peernet.Network
metricsRecorder metrics.Metrics
}
// Handle handles a request to this registry mirror.
func (m *Mirror) Handle(c pcontext.Context) {
key := c.GetString(pcontext.DigestCtxKey)
if key == "" {
key = c.GetString(pcontext.ReferenceCtxKey)
}
l := pcontext.Logger(c).With().Str("handler", "mirror").Str("ref", key).Logger()
l.Debug().Msg("mirror handler start")
s := time.Now()
defer func() {
l.Debug().Dur("duration", time.Since(s)).Msg("mirror handler stop")
}()
// Resolve mirror with the requested key
resolveCtx, cancel := context.WithTimeout(c, m.resolveTimeout)
defer cancel()
if key == "" {
// nolint
c.AbortWithError(http.StatusInternalServerError, errors.New("neither digest nor reference provided"))
}
startTime := time.Now()
peerCount := 0
peersChan, err := m.router.Resolve(resolveCtx, key, false, m.resolveRetries)
if err != nil {
//nolint
c.AbortWithError(http.StatusInternalServerError, err)
}
for {
select {
case <-resolveCtx.Done():
// Resolving mirror has timed out.
//nolint
c.AbortWithError(http.StatusNotFound, fmt.Errorf(pcontext.PeerNotFoundLog))
return
case peer, ok := <-peersChan:
// Channel closed means no more mirrors will be received and max retries has been reached.
if !ok {
//nolint
c.AbortWithError(http.StatusInternalServerError, fmt.Errorf(pcontext.PeerResolutionExhaustedLog))
return
}
if peerCount == 0 {
// Only report the time it took to discover the first peer.
m.metricsRecorder.RecordPeerDiscovery(peer.HttpHost, time.Since(startTime).Seconds())
peerCount++
}
succeeded := false
u, err := url.Parse(peer.HttpHost)
if err != nil {
//nolint
c.AbortWithError(http.StatusInternalServerError, err)
return
}
proxy := httputil.NewSingleHostReverseProxy(u)
proxy.Director = func(r *http.Request) {
r.URL = u
r.URL.Path = c.Request.URL.Path
r.URL.RawQuery = c.Request.URL.RawQuery
pcontext.SetOutboundHeaders(r, c)
}
count := int64(0)
proxy.ModifyResponse = func(resp *http.Response) error {
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("expected peer to respond with 200, got: %s", resp.Status)
}
succeeded = true
count = resp.ContentLength
return nil
}
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
l.Error().Err(err).Msg("peer request failed, attempting next")
}
proxy.Transport = m.n.RoundTripperFor(peer.ID)
proxy.ServeHTTP(c.Writer, c.Request)
if !succeeded {
break
}
m.metricsRecorder.RecordPeerResponse(peer.HttpHost, key, "pull", time.Since(startTime).Seconds(), count)
l.Info().Str("peer", u.Host).Int64("count", count).Msg("request served from peer")
return
}
}
}
// New creates a new mirror handler.
func New(ctx context.Context, router routing.Router) *Mirror {
return &Mirror{
metricsRecorder: metrics.FromContext(ctx),
resolveTimeout: ResolveTimeout,
router: router,
resolveRetries: ResolveRetries,
n: router.Net(),
}
}