func()

in pkg/discovery/content/reader/reader.go [101:184]


func (r *reader) doP2p(log zerolog.Logger, fileChunkKey string, start, end int64, o operation, buf []byte) (int64, error) {
	if pcontext.IsRequestFromAPeer(r.context) {
		log.Warn().Msg("refusing to propagate request from one peer to another")
		return -1, errPeerNotFound
	}

	log.Debug().Msg(pcontext.PeerResolutionStartLog)
	defer log.Debug().Msg(pcontext.PeerResolutionStopLog)

	resolveCtx, cancel := context.WithTimeout(log.WithContext(r.context), r.resolveTimeout)
	defer cancel()

	startTime := time.Now()
	peerCount := 0
	peersCh, negCacheCallback, err := r.router.ResolveWithNegativeCacheCallback(resolveCtx, fileChunkKey, false, r.resolveRetries)
	if err != nil {
		//nolint:errcheck // ignore
		log.Error().Err(err).Msg(pcontext.PeerRequestErrorLog)
		return -1, err
	}

	// Request a peer for this file.
peerLoop:
	for {
		select {

		case <-resolveCtx.Done():
			// Resolving mirror has timed out.
			negCacheCallback()
			log.Info().Msg(pcontext.PeerNotFoundLog)
			break peerLoop

		case peer, ok := <-peersCh:
			// Channel closed means no more mirrors will be received and max retries has been reached.
			if !ok {
				negCacheCallback()
				log.Info().Msg(pcontext.PeerResolutionExhaustedLog)
				break peerLoop
			}

			if peerCount == 0 {
				// Only report the time it took to discover the first peer.
				r.metricsRecorder.RecordPeerDiscovery(peer.HttpHost, time.Since(startTime).Seconds())
				peerCount++
			}

			peerReq, err := r.peerRequest(peer.HttpHost, start, end)
			if err != nil {
				log.Error().Err(err).Msg(pcontext.PeerRequestErrorLog)
				// try next peer
				break
			}

			client := r.router.Net().HTTPClientFor(peer.ID)

			var count int64
			startTime = time.Now()
			switch o {
			case operationFstatRemote:
				count, err = r.fstatRemote(log, peerReq, client)
			case operationPreadRemote:
				var c int
				c, err = r.preadRemote(log, peerReq, client, buf)
				count = int64(c)
			default:
				err = fmt.Errorf("unknown operation: %v", o)
			}

			if err != nil {
				// try next peer
				log.Error().Err(err).Msg(pcontext.PeerRequestErrorLog)
			} else {
				op := "fstat"
				if o == operationPreadRemote {
					op = "pread"
				}
				r.metricsRecorder.RecordPeerResponse(peer.HttpHost, fileChunkKey, op, time.Since(startTime).Seconds(), count)
				return count, nil
			}
		}
	}

	return -1, errPeerNotFound
}