pkg/discovery/content/reader/reader.go (226 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package reader
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
pcontext "github.com/azure/peerd/pkg/context"
"github.com/azure/peerd/pkg/discovery/routing"
"github.com/azure/peerd/pkg/metrics"
"github.com/rs/zerolog"
)
type operation int
const (
operationFstatRemote = operation(iota)
operationPreadRemote
)
var errPeerNotFound = errors.New("peer not found")
// reader is a Reader implementation.
type reader struct {
context pcontext.Context
resolveTimeout time.Duration
router routing.Router
resolveRetries int
defaultHttpClient *http.Client
metricsRecorder metrics.Metrics
}
var _ Reader = &reader{}
// Log returns the logger with context for this reader.
func (r *reader) Log() *zerolog.Logger {
l := pcontext.Logger(r.context)
return &l
}
// PreadRemote is like pread but to a remote file.
func (r *reader) PreadRemote(buf []byte, offset int64) (int, error) {
key := r.context.GetString(pcontext.FileChunkCtxKey)
start := offset
end := int64(len(buf)) + offset - 1
log := r.Log().With().Str("operation", "preadremote").Str("key", key).Int64("start", start).Int64("end", end).Logger()
count, err := r.doP2p(log, key, start, end, operationPreadRemote, buf)
if err == nil {
return int(count), nil
}
// Could not find a peer that has this file, request origin.
startTime := time.Now()
originReq, err := r.originRequest(start, end)
if err != nil {
return -1, err
}
count32 := int(0)
defer func() {
r.metricsRecorder.RecordUpstreamResponse(originReq.URL.Hostname(), key, "pread", time.Since(startTime).Seconds(), int64(count32))
}()
count32, err = r.preadRemote(log, originReq, r.defaultHttpClient, buf)
return count32, err
}
// FstatRemote stats a remote file.
func (r *reader) FstatRemote() (int64, error) {
key := r.context.GetString(pcontext.FileChunkCtxKey)
start := int64(0)
end := int64(0)
log := r.Log().With().Str("operation", "fstatremote").Int64("start", start).Int64("end", end).Str("key", key).Logger()
startTime := time.Now()
originReq, err := r.originRequest(start, end)
if err != nil {
return -1, err
}
var count int64
defer func() {
r.metricsRecorder.RecordUpstreamResponse(originReq.URL.Hostname(), key, "fstat", time.Since(startTime).Seconds(), count)
}()
count, err = r.fstatRemote(log, originReq, r.defaultHttpClient)
return count, err
}
// doP2p tries to resolve the key in the p2p network and if successful, it will perform the operation on the peer, and return the result.
func (r *reader) doP2p(log zerolog.Logger, fileChunkKey string, start, end int64, o operation, buf []byte) (int64, error) {
if pcontext.IsRequestFromAPeer(r.context) {
log.Warn().Msg("refusing to propagate request from one peer to another")
return -1, errPeerNotFound
}
log.Debug().Msg(pcontext.PeerResolutionStartLog)
defer log.Debug().Msg(pcontext.PeerResolutionStopLog)
resolveCtx, cancel := context.WithTimeout(log.WithContext(r.context), r.resolveTimeout)
defer cancel()
startTime := time.Now()
peerCount := 0
peersCh, negCacheCallback, err := r.router.ResolveWithNegativeCacheCallback(resolveCtx, fileChunkKey, false, r.resolveRetries)
if err != nil {
//nolint:errcheck // ignore
log.Error().Err(err).Msg(pcontext.PeerRequestErrorLog)
return -1, err
}
// Request a peer for this file.
peerLoop:
for {
select {
case <-resolveCtx.Done():
// Resolving mirror has timed out.
negCacheCallback()
log.Info().Msg(pcontext.PeerNotFoundLog)
break peerLoop
case peer, ok := <-peersCh:
// Channel closed means no more mirrors will be received and max retries has been reached.
if !ok {
negCacheCallback()
log.Info().Msg(pcontext.PeerResolutionExhaustedLog)
break peerLoop
}
if peerCount == 0 {
// Only report the time it took to discover the first peer.
r.metricsRecorder.RecordPeerDiscovery(peer.HttpHost, time.Since(startTime).Seconds())
peerCount++
}
peerReq, err := r.peerRequest(peer.HttpHost, start, end)
if err != nil {
log.Error().Err(err).Msg(pcontext.PeerRequestErrorLog)
// try next peer
break
}
client := r.router.Net().HTTPClientFor(peer.ID)
var count int64
startTime = time.Now()
switch o {
case operationFstatRemote:
count, err = r.fstatRemote(log, peerReq, client)
case operationPreadRemote:
var c int
c, err = r.preadRemote(log, peerReq, client, buf)
count = int64(c)
default:
err = fmt.Errorf("unknown operation: %v", o)
}
if err != nil {
// try next peer
log.Error().Err(err).Msg(pcontext.PeerRequestErrorLog)
} else {
op := "fstat"
if o == operationPreadRemote {
op = "pread"
}
r.metricsRecorder.RecordPeerResponse(peer.HttpHost, fileChunkKey, op, time.Since(startTime).Seconds(), count)
return count, nil
}
}
}
return -1, errPeerNotFound
}
// fstatRemote stats the file.
func (r *reader) fstatRemote(log zerolog.Logger, req *http.Request, client *http.Client) (int64, error) {
log.Debug().Str("url", req.URL.String()).Str("range", req.Header.Get("Range")).Msg("reader fstatRemote start")
defer log.Debug().Msg("reader fstatRemote stop")
resp, err := client.Do(req)
if err != nil {
log.Error().Err(err).Msg("reader fstatRemote error")
return 0, Error{resp, err}
}
defer func() {
if closeErr := resp.Body.Close(); closeErr != nil {
log.Error().Err(closeErr).Msg("reader fstatRemote body close error")
}
}()
if resp.StatusCode == 200 {
return resp.ContentLength, nil
}
if resp.StatusCode == 206 {
l := resp.ContentLength
rs := resp.Header.Get("Content-Range")
if rs == "" {
return l, nil
}
pos := strings.LastIndexByte(rs, '/')
if pos < 0 {
return l, nil
}
l, _ = strconv.ParseInt(rs[pos+1:], 10, 64)
return l, nil
}
log.Error().Err(err).Int("status", resp.StatusCode).Msg("reader fstatRemote error")
return 0, Error{resp, fmt.Errorf("unexpected response code: %d", resp.StatusCode)}
}
// preadRemote reads the file.
func (r *reader) preadRemote(log zerolog.Logger, req *http.Request, client *http.Client, buf []byte) (int, error) {
log.Debug().Str("url", req.URL.String()).Str("range", req.Header.Get("Range")).Msg("reader preadRemote start")
statusCode := -1
s := time.Now()
defer func() {
log.Debug().Int("status", statusCode).Dur("duration", time.Since(s)).Msg("reader preadRemote stop")
}()
resp, err := client.Do(req)
if resp != nil {
statusCode = resp.StatusCode
}
if err != nil {
detailedErr := Error{resp, err}
log.Error().Err(detailedErr).Str("url", req.URL.String()).Str("range", req.Header.Get("Range")).Msg("reader preadRemote error")
return 0, detailedErr
}
defer func() {
if closeErr := resp.Body.Close(); closeErr != nil {
log.Error().Err(closeErr).Msg("reader preadRemote body close error")
}
}()
if resp.StatusCode != 200 && resp.StatusCode != 206 {
log.Error().Err(err).Int("status", resp.StatusCode).Msg("reader preadRemote error")
return 0, Error{resp, fmt.Errorf("unexpected response code: %d", resp.StatusCode)}
}
return io.ReadFull(resp.Body, buf)
}
// originRequest will create a new request to origin.
func (r *reader) originRequest(start, end int64) (*http.Request, error) {
return r.remoteRequest(r.context.GetString(pcontext.BlobUrlCtxKey), start, end)
}
// perRequest will create a new request to a peer.
func (r *reader) peerRequest(peer string, start, end int64) (*http.Request, error) {
return r.remoteRequest(fmt.Sprintf("%v/blobs/%v", peer, r.context.GetString(pcontext.BlobUrlCtxKey)), start, end)
}
// remoteRequest creates a new HTTP request to a remote server.
func (r *reader) remoteRequest(u string, start, end int64) (*http.Request, error) {
req, err := http.NewRequest("GET", u, nil)
if err != nil {
return nil, err
}
for key, vals := range r.context.Request.Header {
vals2 := make([]string, len(vals))
copy(vals2, vals)
req.Header[key] = vals2
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))
pcontext.SetOutboundHeaders(req, r.context)
return req, nil
}
// NewReader creates a new remote reader.
func NewReader(c pcontext.Context, router routing.Router, resolveRetries int, resolveTimeout time.Duration, metricsRecorder metrics.Metrics) Reader {
return &reader{
context: c.Copy(),
resolveTimeout: resolveTimeout,
router: router,
resolveRetries: resolveRetries,
defaultHttpClient: router.Net().HTTPClientFor(""),
metricsRecorder: metricsRecorder,
}
}