func()

in oss/lib/probe.go [571:731]


func (pc *ProbeCommand) DetectBandWidth() error {
	if pc.pbOption.bucketName == "" {
		return fmt.Errorf("--bucketname is empty")
	}

	bucket, err := pc.command.ossBucket(pc.pbOption.bucketName)
	if err != nil {
		return err
	}

	if pc.pbOption.probeItem == "download-speed" {
		if pc.pbOption.objectName == "" {
			return fmt.Errorf("--object is empty when probe-item is download-speed")
		}

		bExist, err := bucket.IsObjectExist(pc.pbOption.objectName)
		if err != nil {
			return err
		}

		if !bExist {
			return fmt.Errorf("oss object is not exist,%s", pc.pbOption.objectName)
		}
	}

	numCpu := runtime.NumCPU()
	var statBandwidth StatBandWidth
	statBandwidth.Reset(numCpu)

	var appendReader TestAppendReader
	if pc.pbOption.probeItem == "upload-speed" {
		appendReader.RandText = []byte(strings.Repeat("1", 32*1024))
	}
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)

	for i := 0; i < numCpu; i++ {
		time.Sleep(time.Duration(50) * time.Millisecond)
		if pc.pbOption.probeItem == "upload-speed" {
			go pc.PutObjectWithContext(bucket, &statBandwidth, &appendReader, ctx)
		} else if pc.pbOption.probeItem == "download-speed" {
			go pc.GetObjectWithContext(bucket, pc.pbOption.objectName, &statBandwidth, ctx)
		}
	}

	time.Sleep(time.Duration(2) * time.Second)

	fmt.Printf("cpu core count:%d\n", numCpu)
	startTick := time.Now().UnixNano() / 1000 / 1000
	nowTick := startTick
	changeTick := startTick
	nowParallel := numCpu
	addParallel := numCpu / 5
	if addParallel == 0 {
		addParallel = 1
	}
	var averageList []AverageInfo

	// ignore the first max speed
	bDiscarded := false
	var oldStat *StatBandWidth
	var nowStat *StatBandWidth

	oldStat = statBandwidth.GetStat()
	for nowParallel <= 2*numCpu {
		time.Sleep(time.Duration(1) * time.Second)
		nowStat = statBandwidth.GetStat()
		nowTick = time.Now().UnixNano() / 1000 / 1000

		nowSpeed := float64(nowStat.TotalBytes-oldStat.TotalBytes) / 1024
		averSpeed := float64(nowStat.TotalBytes/1024) / float64((nowTick-nowStat.StartTick)/1000)
		maxSpeed := nowStat.MaxSpeed
		if nowSpeed > maxSpeed {
			if !bDiscarded && maxSpeed < 0.0001 {
				bDiscarded = true
				oldStat.Reset(nowParallel)
				statBandwidth.Reset(nowParallel) //discard the first max speed,becase is not accurate
				continue
			}
			maxSpeed = nowSpeed
			statBandwidth.SetMaxSpeed(maxSpeed)
		}
		fmt.Printf("\rparallel:%d,average speed:%.2f(KB/s),current speed:%.2f(KB/s),max speed:%.2f(KB/s)", nowStat.Parallel, averSpeed, nowSpeed, maxSpeed)
		oldStat = nowStat

		// 30 second
		if nowTick-changeTick >= 30000 {
			nowParallel += addParallel
			for i := 0; i < addParallel; i++ {
				time.Sleep(time.Duration(50) * time.Millisecond)
				if pc.pbOption.probeItem == "upload-speed" {
					go pc.PutObjectWithContext(bucket, &statBandwidth, &appendReader, ctx)
				} else if pc.pbOption.probeItem == "download-speed" {
					go pc.GetObjectWithContext(bucket, pc.pbOption.objectName, &statBandwidth, ctx)
				}
			}
			fmt.Printf("\n")
			bDiscarded = false
			averageList = append(averageList, AverageInfo{Parallel: nowStat.Parallel, AveSpeed: averSpeed})
			changeTick = nowTick
			oldStat.Reset(nowParallel)
			statBandwidth.Reset(nowParallel)
		}
	}

	cancel()
	appendReader.Close()

	maxIndex := 0
	maxSpeed := 0.0
	for k, v := range averageList {
		if v.AveSpeed > maxSpeed {
			maxIndex = k
			maxSpeed = v.AveSpeed
		}
	}

	fmt.Printf("\nsuggest parallel is %d, max average speed is %.2f(KB/s)\n", averageList[maxIndex].Parallel, averageList[maxIndex].AveSpeed)

	maxRuntime, _ := GetInt(OptionRuntime, pc.command.options)

	if maxRuntime > 0 {
		time.Sleep(time.Duration(5) * time.Second)
		ctx = context.Background()
		ctx, cancel = context.WithCancel(ctx)
		addParallel = averageList[maxIndex].Parallel
		statBandwidth.Reset(addParallel)
		fmt.Printf("\nrun %s  %d seconds with parallel %d\n", pc.pbOption.probeItem, maxRuntime, addParallel)
		for i := 0; i < addParallel; i++ {
			if pc.pbOption.probeItem == "upload-speed" {
				go pc.PutObjectWithContext(bucket, &statBandwidth, &appendReader, ctx)
			} else if pc.pbOption.probeItem == "download-speed" {
				go pc.GetObjectWithContext(bucket, pc.pbOption.objectName, &statBandwidth, ctx)
			}
		}

		startT := time.Now().UnixNano() / 1000 / 1000 / 1000
		for {
			time.Sleep(time.Duration(1) * time.Second)
			nowStat = statBandwidth.GetStat()
			nowTick = time.Now().UnixNano() / 1000 / 1000

			nowSpeed := float64(nowStat.TotalBytes-oldStat.TotalBytes) / 1024
			averSpeed := float64(nowStat.TotalBytes/1024) / float64((nowTick-nowStat.StartTick)/1000)
			maxSpeed := nowStat.MaxSpeed
			if nowSpeed > maxSpeed {
				maxSpeed = nowSpeed
				statBandwidth.SetMaxSpeed(maxSpeed)
			}
			fmt.Printf("\rparallel:%d,average speed:%.2f(KB/s),current speed:%.2f(KB/s),max speed:%.2f(KB/s)", addParallel, averSpeed, nowSpeed, maxSpeed)
			oldStat = nowStat
			currT := time.Now().UnixNano() / 1000 / 1000 / 1000
			if startT+maxRuntime < currT {
				cancel()
				break
			}
		}
	}

	return nil
}