in sharedlibraries/storage/parallelreader.go [81:149]
func (r *ParallelReader) Read(p []byte) (int, error) {
if r == nil {
return 0, errors.New("no parallel reader defined")
}
if r.objectOffset >= r.objectSize {
return 0, io.EOF
}
// In the first Read() call, kickoff all the workers to download their respective first chunk in parallel.
if r.objectOffset == 0 {
log.Logger.Infow("Workers started", "parallelWorkers", len(r.workers), "offset", r.objectOffset)
for i := 0; i < len(r.workers); i++ {
if startByte := r.partSizeBytes * int64(i); startByte < r.objectSize {
if r.workers[i] == nil {
r.cancel()
return 0, fmt.Errorf("no worker defined")
}
assignWorkersChunk(r, startByte, i)
}
}
}
// Wait for the worker to be ready which is downloading the required chunk.
for r.workers[r.currentWorkerID].copyReady == false {
id := <-r.idleWorkersIDs
select {
case <-r.ctx.Done():
log.Logger.Info("Parallel restore cancellation requested")
return 0, r.ctx.Err()
default:
// If the worker fails to read from GCS, we must exit the loop.
if r.workers[id].errReading != nil {
log.Logger.Errorw("Failed to read from GCS", "err", r.workers[id].errReading)
r.cancel()
return 0, r.workers[id].errReading
}
r.workers[id].copyReady = true
}
}
// Copy data from the worker's buffer to the provided p buffer.
id := r.currentWorkerID
bufOffset := r.workers[id].bufferOffset
// Copy valid data, avoiding issues with buffer overruns.
dataToCopy := r.workers[id].buffer[bufOffset:r.workers[id].chunkSize]
n := copy(p, dataToCopy)
r.objectOffset += int64(n)
r.workers[id].bufferOffset += int64(n)
r.workers[id].BytesRemain -= int64(n)
// Reset and assign the next chunk to the worker only if the entire buffer has been consumed.
if r.workers[id].BytesRemain == 0 {
r.workers[id].copyReady = false
// Check if the current worker need to be assigned a new chunk.
// The next len(r.workers) - 1 chunks are already assigned to the workers for downloading.
if startByte := r.objectOffset + int64(len(r.workers)-1)*r.partSizeBytes; startByte < r.objectSize {
assignWorkersChunk(r, startByte, id)
}
// Update the currentWorkerID to the next worker's ID having the next chunk.
if r.currentWorkerID++; r.currentWorkerID == len(r.workers) {
r.currentWorkerID = 0
}
}
return n, nil
}