func()

in oss/downloader.go [364:518]


func (d *downloaderDelegate) download() (*DownloadResult, error) {
	var (
		wg       sync.WaitGroup
		errValue atomic.Value
		cpCh     chan downloadedChunk
		cpWg     sync.WaitGroup
		cpChunks downloadedChunks
		tracker  bool   = d.calcCRC || d.checkpoint != nil
		tCRC64   uint64 = 0
	)

	saveErrFn := func(e error) {
		errValue.Store(e)
	}

	getErrFn := func() error {
		v := errValue.Load()
		if v == nil {
			return nil
		}
		e, _ := v.(error)
		return e
	}

	// writeChunkFn runs in worker goroutines to pull chunks off of the ch channel
	writeChunkFn := func(ch chan downloaderChunk) {
		defer wg.Done()
		var hash hash.Hash64
		if d.calcCRC {
			hash = NewCRC64(0)
		}

		for {
			chunk, ok := <-ch
			if !ok {
				break
			}

			if getErrFn() != nil {
				continue
			}

			dchunk, derr := d.downloadChunk(chunk, hash)

			if derr != nil && derr != io.EOF {
				saveErrFn(derr)
			} else {
				// update tracker info
				if tracker {
					cpCh <- dchunk
				}
			}
		}
	}

	// trackerFn runs in worker goroutines to update checkpoint info or calc downloaded crc
	trackerFn := func(ch chan downloadedChunk) {
		defer cpWg.Done()
		var (
			tOffset int64 = 0
		)

		if d.checkpoint != nil {
			tOffset = d.checkpoint.Info.Data.DownloadInfo.Offset
			tCRC64 = d.checkpoint.Info.Data.DownloadInfo.CRC64
		}

		for {
			chunk, ok := <-ch
			if !ok {
				break
			}
			cpChunks = append(cpChunks, chunk)
			sort.Sort(cpChunks)
			newOffset := tOffset
			i := 0
			for ii := range cpChunks {
				if cpChunks[ii].start == newOffset {
					newOffset += cpChunks[ii].size
					i++
				} else {
					break
				}
			}
			if newOffset != tOffset {
				//remove updated chunk in cpChunks
				if d.calcCRC {
					tCRC64 = d.combineCRC(tCRC64, cpChunks[0:i])
				}
				tOffset = newOffset
				cpChunks = cpChunks[i:]
				if d.checkpoint != nil {
					d.checkpoint.Info.Data.DownloadInfo.Offset = tOffset
					d.checkpoint.Info.Data.DownloadInfo.CRC64 = tCRC64
					d.checkpoint.dump()
				}
			}
		}
	}

	// Start the download workers
	ch := make(chan downloaderChunk, d.options.ParallelNum)
	for i := 0; i < d.options.ParallelNum; i++ {
		wg.Add(1)
		go writeChunkFn(ch)
	}

	// Start tracker worker if need track downloaded chunk
	if tracker {
		cpCh = make(chan downloadedChunk, maxInt(3, d.options.ParallelNum))
		cpWg.Add(1)
		go trackerFn(cpCh)
	}

	// Consume downloaded data
	if d.request.ProgressFn != nil && d.written > 0 {
		d.request.ProgressFn(d.written, d.written, d.sizeInBytes)
	}

	// Queue the next range of bytes to read.
	for getErrFn() == nil {
		if d.pos >= d.epos {
			break
		}
		size := minInt64(d.epos-d.pos, d.options.PartSize)
		ch <- downloaderChunk{w: d.w, start: d.pos, size: size, rstart: d.rstart}
		d.pos += size
	}

	// Waiting for parts download finished
	close(ch)
	wg.Wait()

	if tracker {
		close(cpCh)
		cpWg.Wait()
	}

	if err := getErrFn(); err != nil {
		return nil, d.wrapErr(err)
	}

	if d.checkCRC {
		if len(cpChunks) > 0 {
			sort.Sort(cpChunks)
		}
		if derr := checkResponseHeaderCRC64(fmt.Sprint(d.combineCRC(tCRC64, cpChunks)), d.headers); derr != nil {
			return nil, d.wrapErr(derr)
		}
	}

	return &DownloadResult{
		Written: d.written,
	}, nil
}