func()

in sharedlibraries/storage/parallelreader.go [81:149]


func (r *ParallelReader) Read(p []byte) (int, error) {
	if r == nil {
		return 0, errors.New("no parallel reader defined")
	}
	if r.objectOffset >= r.objectSize {
		return 0, io.EOF
	}

	// In the first Read() call, kickoff all the workers to download their respective first chunk in parallel.
	if r.objectOffset == 0 {
		log.Logger.Infow("Workers started", "parallelWorkers", len(r.workers), "offset", r.objectOffset)
		for i := 0; i < len(r.workers); i++ {
			if startByte := r.partSizeBytes * int64(i); startByte < r.objectSize {
				if r.workers[i] == nil {
					r.cancel()
					return 0, fmt.Errorf("no worker defined")
				}
				assignWorkersChunk(r, startByte, i)
			}
		}
	}

	// Wait for the worker to be ready which is downloading the required chunk.
	for r.workers[r.currentWorkerID].copyReady == false {
		id := <-r.idleWorkersIDs
		select {
		case <-r.ctx.Done():
			log.Logger.Info("Parallel restore cancellation requested")
			return 0, r.ctx.Err()
		default:
			// If the worker fails to read from GCS, we must exit the loop.
			if r.workers[id].errReading != nil {
				log.Logger.Errorw("Failed to read from GCS", "err", r.workers[id].errReading)
				r.cancel()
				return 0, r.workers[id].errReading
			}
			r.workers[id].copyReady = true
		}
	}

	// Copy data from the worker's buffer to the provided p buffer.
	id := r.currentWorkerID
	bufOffset := r.workers[id].bufferOffset

	// Copy valid data, avoiding issues with buffer overruns.
	dataToCopy := r.workers[id].buffer[bufOffset:r.workers[id].chunkSize]
	n := copy(p, dataToCopy)

	r.objectOffset += int64(n)
	r.workers[id].bufferOffset += int64(n)
	r.workers[id].BytesRemain -= int64(n)

	// Reset and assign the next chunk to the worker only if the entire buffer has been consumed.
	if r.workers[id].BytesRemain == 0 {
		r.workers[id].copyReady = false

		// Check if the current worker need to be assigned a new chunk.
		// The next len(r.workers) - 1 chunks are already assigned to the workers for downloading.
		if startByte := r.objectOffset + int64(len(r.workers)-1)*r.partSizeBytes; startByte < r.objectSize {
			assignWorkersChunk(r, startByte, id)
		}

		// Update the currentWorkerID to the next worker's ID having the next chunk.
		if r.currentWorkerID++; r.currentWorkerID == len(r.workers) {
			r.currentWorkerID = 0
		}
	}
	return n, nil
}