in oss/lib/probe.go [571:731]
func (pc *ProbeCommand) DetectBandWidth() error {
if pc.pbOption.bucketName == "" {
return fmt.Errorf("--bucketname is empty")
}
bucket, err := pc.command.ossBucket(pc.pbOption.bucketName)
if err != nil {
return err
}
if pc.pbOption.probeItem == "download-speed" {
if pc.pbOption.objectName == "" {
return fmt.Errorf("--object is empty when probe-item is download-speed")
}
bExist, err := bucket.IsObjectExist(pc.pbOption.objectName)
if err != nil {
return err
}
if !bExist {
return fmt.Errorf("oss object is not exist,%s", pc.pbOption.objectName)
}
}
numCpu := runtime.NumCPU()
var statBandwidth StatBandWidth
statBandwidth.Reset(numCpu)
var appendReader TestAppendReader
if pc.pbOption.probeItem == "upload-speed" {
appendReader.RandText = []byte(strings.Repeat("1", 32*1024))
}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
for i := 0; i < numCpu; i++ {
time.Sleep(time.Duration(50) * time.Millisecond)
if pc.pbOption.probeItem == "upload-speed" {
go pc.PutObjectWithContext(bucket, &statBandwidth, &appendReader, ctx)
} else if pc.pbOption.probeItem == "download-speed" {
go pc.GetObjectWithContext(bucket, pc.pbOption.objectName, &statBandwidth, ctx)
}
}
time.Sleep(time.Duration(2) * time.Second)
fmt.Printf("cpu core count:%d\n", numCpu)
startTick := time.Now().UnixNano() / 1000 / 1000
nowTick := startTick
changeTick := startTick
nowParallel := numCpu
addParallel := numCpu / 5
if addParallel == 0 {
addParallel = 1
}
var averageList []AverageInfo
// ignore the first max speed
bDiscarded := false
var oldStat *StatBandWidth
var nowStat *StatBandWidth
oldStat = statBandwidth.GetStat()
for nowParallel <= 2*numCpu {
time.Sleep(time.Duration(1) * time.Second)
nowStat = statBandwidth.GetStat()
nowTick = time.Now().UnixNano() / 1000 / 1000
nowSpeed := float64(nowStat.TotalBytes-oldStat.TotalBytes) / 1024
averSpeed := float64(nowStat.TotalBytes/1024) / float64((nowTick-nowStat.StartTick)/1000)
maxSpeed := nowStat.MaxSpeed
if nowSpeed > maxSpeed {
if !bDiscarded && maxSpeed < 0.0001 {
bDiscarded = true
oldStat.Reset(nowParallel)
statBandwidth.Reset(nowParallel) //discard the first max speed,becase is not accurate
continue
}
maxSpeed = nowSpeed
statBandwidth.SetMaxSpeed(maxSpeed)
}
fmt.Printf("\rparallel:%d,average speed:%.2f(KB/s),current speed:%.2f(KB/s),max speed:%.2f(KB/s)", nowStat.Parallel, averSpeed, nowSpeed, maxSpeed)
oldStat = nowStat
// 30 second
if nowTick-changeTick >= 30000 {
nowParallel += addParallel
for i := 0; i < addParallel; i++ {
time.Sleep(time.Duration(50) * time.Millisecond)
if pc.pbOption.probeItem == "upload-speed" {
go pc.PutObjectWithContext(bucket, &statBandwidth, &appendReader, ctx)
} else if pc.pbOption.probeItem == "download-speed" {
go pc.GetObjectWithContext(bucket, pc.pbOption.objectName, &statBandwidth, ctx)
}
}
fmt.Printf("\n")
bDiscarded = false
averageList = append(averageList, AverageInfo{Parallel: nowStat.Parallel, AveSpeed: averSpeed})
changeTick = nowTick
oldStat.Reset(nowParallel)
statBandwidth.Reset(nowParallel)
}
}
cancel()
appendReader.Close()
maxIndex := 0
maxSpeed := 0.0
for k, v := range averageList {
if v.AveSpeed > maxSpeed {
maxIndex = k
maxSpeed = v.AveSpeed
}
}
fmt.Printf("\nsuggest parallel is %d, max average speed is %.2f(KB/s)\n", averageList[maxIndex].Parallel, averageList[maxIndex].AveSpeed)
maxRuntime, _ := GetInt(OptionRuntime, pc.command.options)
if maxRuntime > 0 {
time.Sleep(time.Duration(5) * time.Second)
ctx = context.Background()
ctx, cancel = context.WithCancel(ctx)
addParallel = averageList[maxIndex].Parallel
statBandwidth.Reset(addParallel)
fmt.Printf("\nrun %s %d seconds with parallel %d\n", pc.pbOption.probeItem, maxRuntime, addParallel)
for i := 0; i < addParallel; i++ {
if pc.pbOption.probeItem == "upload-speed" {
go pc.PutObjectWithContext(bucket, &statBandwidth, &appendReader, ctx)
} else if pc.pbOption.probeItem == "download-speed" {
go pc.GetObjectWithContext(bucket, pc.pbOption.objectName, &statBandwidth, ctx)
}
}
startT := time.Now().UnixNano() / 1000 / 1000 / 1000
for {
time.Sleep(time.Duration(1) * time.Second)
nowStat = statBandwidth.GetStat()
nowTick = time.Now().UnixNano() / 1000 / 1000
nowSpeed := float64(nowStat.TotalBytes-oldStat.TotalBytes) / 1024
averSpeed := float64(nowStat.TotalBytes/1024) / float64((nowTick-nowStat.StartTick)/1000)
maxSpeed := nowStat.MaxSpeed
if nowSpeed > maxSpeed {
maxSpeed = nowSpeed
statBandwidth.SetMaxSpeed(maxSpeed)
}
fmt.Printf("\rparallel:%d,average speed:%.2f(KB/s),current speed:%.2f(KB/s),max speed:%.2f(KB/s)", addParallel, averSpeed, nowSpeed, maxSpeed)
oldStat = nowStat
currT := time.Now().UnixNano() / 1000 / 1000 / 1000
if startT+maxRuntime < currT {
cancel()
break
}
}
}
return nil
}