in oss/downloader.go [364:518]
func (d *downloaderDelegate) download() (*DownloadResult, error) {
var (
wg sync.WaitGroup
errValue atomic.Value
cpCh chan downloadedChunk
cpWg sync.WaitGroup
cpChunks downloadedChunks
tracker bool = d.calcCRC || d.checkpoint != nil
tCRC64 uint64 = 0
)
saveErrFn := func(e error) {
errValue.Store(e)
}
getErrFn := func() error {
v := errValue.Load()
if v == nil {
return nil
}
e, _ := v.(error)
return e
}
// writeChunkFn runs in worker goroutines to pull chunks off of the ch channel
writeChunkFn := func(ch chan downloaderChunk) {
defer wg.Done()
var hash hash.Hash64
if d.calcCRC {
hash = NewCRC64(0)
}
for {
chunk, ok := <-ch
if !ok {
break
}
if getErrFn() != nil {
continue
}
dchunk, derr := d.downloadChunk(chunk, hash)
if derr != nil && derr != io.EOF {
saveErrFn(derr)
} else {
// update tracker info
if tracker {
cpCh <- dchunk
}
}
}
}
// trackerFn runs in worker goroutines to update checkpoint info or calc downloaded crc
trackerFn := func(ch chan downloadedChunk) {
defer cpWg.Done()
var (
tOffset int64 = 0
)
if d.checkpoint != nil {
tOffset = d.checkpoint.Info.Data.DownloadInfo.Offset
tCRC64 = d.checkpoint.Info.Data.DownloadInfo.CRC64
}
for {
chunk, ok := <-ch
if !ok {
break
}
cpChunks = append(cpChunks, chunk)
sort.Sort(cpChunks)
newOffset := tOffset
i := 0
for ii := range cpChunks {
if cpChunks[ii].start == newOffset {
newOffset += cpChunks[ii].size
i++
} else {
break
}
}
if newOffset != tOffset {
//remove updated chunk in cpChunks
if d.calcCRC {
tCRC64 = d.combineCRC(tCRC64, cpChunks[0:i])
}
tOffset = newOffset
cpChunks = cpChunks[i:]
if d.checkpoint != nil {
d.checkpoint.Info.Data.DownloadInfo.Offset = tOffset
d.checkpoint.Info.Data.DownloadInfo.CRC64 = tCRC64
d.checkpoint.dump()
}
}
}
}
// Start the download workers
ch := make(chan downloaderChunk, d.options.ParallelNum)
for i := 0; i < d.options.ParallelNum; i++ {
wg.Add(1)
go writeChunkFn(ch)
}
// Start tracker worker if need track downloaded chunk
if tracker {
cpCh = make(chan downloadedChunk, maxInt(3, d.options.ParallelNum))
cpWg.Add(1)
go trackerFn(cpCh)
}
// Consume downloaded data
if d.request.ProgressFn != nil && d.written > 0 {
d.request.ProgressFn(d.written, d.written, d.sizeInBytes)
}
// Queue the next range of bytes to read.
for getErrFn() == nil {
if d.pos >= d.epos {
break
}
size := minInt64(d.epos-d.pos, d.options.PartSize)
ch <- downloaderChunk{w: d.w, start: d.pos, size: size, rstart: d.rstart}
d.pos += size
}
// Waiting for parts download finished
close(ch)
wg.Wait()
if tracker {
close(cpCh)
cpWg.Wait()
}
if err := getErrFn(); err != nil {
return nil, d.wrapErr(err)
}
if d.checkCRC {
if len(cpChunks) > 0 {
sort.Sort(cpChunks)
}
if derr := checkResponseHeaderCRC64(fmt.Sprint(d.combineCRC(tCRC64, cpChunks)), d.headers); derr != nil {
return nil, d.wrapErr(derr)
}
}
return &DownloadResult{
Written: d.written,
}, nil
}