in pkg/discovery/content/reader/reader.go [101:184]
func (r *reader) doP2p(log zerolog.Logger, fileChunkKey string, start, end int64, o operation, buf []byte) (int64, error) {
if pcontext.IsRequestFromAPeer(r.context) {
log.Warn().Msg("refusing to propagate request from one peer to another")
return -1, errPeerNotFound
}
log.Debug().Msg(pcontext.PeerResolutionStartLog)
defer log.Debug().Msg(pcontext.PeerResolutionStopLog)
resolveCtx, cancel := context.WithTimeout(log.WithContext(r.context), r.resolveTimeout)
defer cancel()
startTime := time.Now()
peerCount := 0
peersCh, negCacheCallback, err := r.router.ResolveWithNegativeCacheCallback(resolveCtx, fileChunkKey, false, r.resolveRetries)
if err != nil {
//nolint:errcheck // ignore
log.Error().Err(err).Msg(pcontext.PeerRequestErrorLog)
return -1, err
}
// Request a peer for this file.
peerLoop:
for {
select {
case <-resolveCtx.Done():
// Resolving mirror has timed out.
negCacheCallback()
log.Info().Msg(pcontext.PeerNotFoundLog)
break peerLoop
case peer, ok := <-peersCh:
// Channel closed means no more mirrors will be received and max retries has been reached.
if !ok {
negCacheCallback()
log.Info().Msg(pcontext.PeerResolutionExhaustedLog)
break peerLoop
}
if peerCount == 0 {
// Only report the time it took to discover the first peer.
r.metricsRecorder.RecordPeerDiscovery(peer.HttpHost, time.Since(startTime).Seconds())
peerCount++
}
peerReq, err := r.peerRequest(peer.HttpHost, start, end)
if err != nil {
log.Error().Err(err).Msg(pcontext.PeerRequestErrorLog)
// try next peer
break
}
client := r.router.Net().HTTPClientFor(peer.ID)
var count int64
startTime = time.Now()
switch o {
case operationFstatRemote:
count, err = r.fstatRemote(log, peerReq, client)
case operationPreadRemote:
var c int
c, err = r.preadRemote(log, peerReq, client, buf)
count = int64(c)
default:
err = fmt.Errorf("unknown operation: %v", o)
}
if err != nil {
// try next peer
log.Error().Err(err).Msg(pcontext.PeerRequestErrorLog)
} else {
op := "fstat"
if o == operationPreadRemote {
op = "pread"
}
r.metricsRecorder.RecordPeerResponse(peer.HttpHost, fileChunkKey, op, time.Since(startTime).Seconds(), count)
return count, nil
}
}
}
return -1, errPeerNotFound
}