func()

in oss/lib/probe.go [1433:1554]


func (pc *ProbeCommand) DetectDownloadTime() error {
	if pc.pbOption.bucketName == "" {
		return fmt.Errorf("--bucketname is empty")
	}

	bucket, err := pc.command.ossBucket(pc.pbOption.bucketName)
	if err != nil {
		return err
	}

	//if pc.pbOption.probeItem == "download-time" {
	if pc.pbOption.objectName == "" {
		return fmt.Errorf("--object is empty when probe-item is download-time")
	}

	bExist, err := bucket.IsObjectExist(pc.pbOption.objectName)
	if err != nil {
		return err
	}

	if !bExist {
		return fmt.Errorf("oss object is not exist,%s", pc.pbOption.objectName)
	}

	meta, err := bucket.GetObjectDetailedMeta(pc.pbOption.objectName)
	if err != nil {
		return err
	}

	objectSize, err := strconv.ParseInt(meta.Get(oss.HTTPHeaderContentLength), 10, 64)
	if err != nil {
		return err
	}
	var offset int64
	//var partSize int64
	parts := []downloadPart{}
	partSize, _ := GetInt(OptionPartSize, pc.command.options)
	parallel, _ := GetInt(OptionParallel, pc.command.options)
	if parallel <= 0 {
		parallel = 1
	}

	if partSize > 0 {
		i := 0
		for offset = 0; offset < objectSize; offset += partSize {
			part := downloadPart{}
			part.Index = i
			part.Start = offset
			part.End = getPartEnd(offset, objectSize, partSize)
			parts = append(parts, part)
			i++
		}
	} else {
		part := downloadPart{}
		part.Index = 0
		part.Start = 0
		part.End = objectSize - 1
		parts = append(parts, part)
	}

	//}
	jobs := make(chan downloadPart, len(parts))
	results := make(chan downloadPart, len(parts))
	failed := make(chan error)
	die := make(chan bool)
	routines := int(parallel)
	var statBandwidth StatBandWidth
	statBandwidth.Reset(int(parallel))

	//fmt.Printf("\nDetectDownloadTime, partSize :%v, objectSize:%v, parallel:%v\n", partSize, objectSize, parallel)

	arg := downloadWorkerArg{bucket, pc.pbOption.objectName}
	for w := 1; w <= routines; w++ {
		go downloadWorker(w, arg, jobs, results, failed, die, &statBandwidth)
	}

	// Download parts concurrently
	go downloadScheduler(jobs, parts)

	go func() {
		oldStat := statBandwidth.GetStat()
		for {
			time.Sleep(time.Duration(2) * time.Second)
			nowStat := statBandwidth.GetStat()
			nowTick := time.Now().UnixNano() / 1000 / 1000

			nowSpeed := float64(nowStat.TotalBytes-oldStat.TotalBytes) / 1024
			averSpeed := float64(nowStat.TotalBytes/1024) / float64((nowTick-nowStat.StartTick)/1000)
			maxSpeed := nowStat.MaxSpeed
			if nowSpeed > maxSpeed {
				maxSpeed = nowSpeed
				statBandwidth.SetMaxSpeed(maxSpeed)
			}
			oldStat = nowStat
			fmt.Printf("\rdownloading average speed:%.2f(KB/s),current speed:%.2f(KB/s),max speed:%.2f(KB/s)", averSpeed, nowSpeed, maxSpeed)
		}
	}()

	completed := 0
	for completed < len(parts) {
		select {
		case part := <-results:
			completed++
			_ = (part.End - part.Start + 1)
		case err := <-failed:
			close(die)
			return err
		}
		if completed >= len(parts) {
			break
		}
	}

	nowTick := time.Now().UnixNano() / 1000 / 1000
	nowStat := statBandwidth.GetStat()
	averSpeed := float64(nowStat.TotalBytes/1024) / float64((nowTick-nowStat.StartTick)/1000)
	//total := float64(objectSize)

	fmt.Printf("\ndownload-speed part-size:%v, parallel:%v total bytes:%v, cost:%.3f s, avg speed:%.2f(kB/s)\n", partSize, parallel, nowStat.TotalBytes, float64(nowTick-nowStat.StartTick)/1000, averSpeed)

	return nil
}