func()

in internal/gcsx/multi_range_downloader_wrapper.go [188:255]


func (mrdWrapper *MultiRangeDownloaderWrapper) Read(ctx context.Context, buf []byte,
	startOffset int64, endOffset int64, timeout time.Duration, metricHandle common.MetricHandle) (bytesRead int, err error) {
	// Bidi Api with 0 as read_limit means no limit whereas we do not want to read anything with empty buffer.
	// Hence, handling it separately.
	if len(buf) == 0 {
		return 0, nil
	}

	err = mrdWrapper.ensureMultiRangeDownloader()
	if err != nil {
		err = fmt.Errorf("MultiRangeDownloaderWrapper::Read: Error in creating MultiRangeDownloader:  %v", err)
		return
	}

	// We will only read what is requested by the client. Hence, capping end to the requested value.
	if endOffset > startOffset+int64(len(buf)) {
		endOffset = startOffset + int64(len(buf))
	}

	buffer := bytes.NewBuffer(buf)
	buffer.Reset()
	done := make(chan readResult, 1)

	mu := sync.Mutex{}
	defer func() {
		mu.Lock()
		close(done)
		done = nil
		mu.Unlock()
	}()

	requestId := uuid.New()
	logger.Tracef("%.13v <- MultiRangeDownloader::Add (%s, [%d, %d))", requestId, mrdWrapper.object.Name, startOffset, endOffset)
	start := time.Now()
	mrdWrapper.Wrapped.Add(buffer, startOffset, endOffset-startOffset, func(offsetAddCallback int64, bytesReadAddCallback int64, e error) {
		defer func() {
			mu.Lock()
			if done != nil {
				done <- readResult{bytesRead: int(bytesReadAddCallback), err: e}
			}
			mu.Unlock()
		}()

		if e != nil && e != io.EOF {
			e = fmt.Errorf("Error in Add Call: %w", e)
		}
	})

	select {
	case <-time.After(timeout):
		err = fmt.Errorf("Timeout")
	case <-ctx.Done():
		err = fmt.Errorf("Context Cancelled: %w", ctx.Err())
	case res := <-done:
		bytesRead = res.bytesRead
		err = res.err
	}
	duration := time.Since(start)
	monitor.CaptureMultiRangeDownloaderMetrics(ctx, metricHandle, "MultiRangeDownloader::Add", start)
	errDesc := "OK"
	if err != nil {
		errDesc = err.Error()
		err = fmt.Errorf("MultiRangeDownloaderWrapper::Read: %w", err)
		logger.Errorf("%v", err)
	}
	logger.Tracef("%.13v -> MultiRangeDownloader::Add (%s, [%d, %d)) (%v): %v", requestId, mrdWrapper.object.Name, startOffset, endOffset, duration, errDesc)
	return
}