in internal/gcsx/multi_range_downloader_wrapper.go [188:255]
func (mrdWrapper *MultiRangeDownloaderWrapper) Read(ctx context.Context, buf []byte,
startOffset int64, endOffset int64, timeout time.Duration, metricHandle common.MetricHandle) (bytesRead int, err error) {
// Bidi Api with 0 as read_limit means no limit whereas we do not want to read anything with empty buffer.
// Hence, handling it separately.
if len(buf) == 0 {
return 0, nil
}
err = mrdWrapper.ensureMultiRangeDownloader()
if err != nil {
err = fmt.Errorf("MultiRangeDownloaderWrapper::Read: Error in creating MultiRangeDownloader: %v", err)
return
}
// We will only read what is requested by the client. Hence, capping end to the requested value.
if endOffset > startOffset+int64(len(buf)) {
endOffset = startOffset + int64(len(buf))
}
buffer := bytes.NewBuffer(buf)
buffer.Reset()
done := make(chan readResult, 1)
mu := sync.Mutex{}
defer func() {
mu.Lock()
close(done)
done = nil
mu.Unlock()
}()
requestId := uuid.New()
logger.Tracef("%.13v <- MultiRangeDownloader::Add (%s, [%d, %d))", requestId, mrdWrapper.object.Name, startOffset, endOffset)
start := time.Now()
mrdWrapper.Wrapped.Add(buffer, startOffset, endOffset-startOffset, func(offsetAddCallback int64, bytesReadAddCallback int64, e error) {
defer func() {
mu.Lock()
if done != nil {
done <- readResult{bytesRead: int(bytesReadAddCallback), err: e}
}
mu.Unlock()
}()
if e != nil && e != io.EOF {
e = fmt.Errorf("Error in Add Call: %w", e)
}
})
select {
case <-time.After(timeout):
err = fmt.Errorf("Timeout")
case <-ctx.Done():
err = fmt.Errorf("Context Cancelled: %w", ctx.Err())
case res := <-done:
bytesRead = res.bytesRead
err = res.err
}
duration := time.Since(start)
monitor.CaptureMultiRangeDownloaderMetrics(ctx, metricHandle, "MultiRangeDownloader::Add", start)
errDesc := "OK"
if err != nil {
errDesc = err.Error()
err = fmt.Errorf("MultiRangeDownloaderWrapper::Read: %w", err)
logger.Errorf("%v", err)
}
logger.Tracef("%.13v -> MultiRangeDownloader::Add (%s, [%d, %d)) (%v): %v", requestId, mrdWrapper.object.Name, startOffset, endOffset, duration, errDesc)
return
}